Source code for democratic_detrender.gp

""" This module contains functions to implement the Gaussian Process (GP) detrending method. """

import numpy as np
from scipy.interpolate import interp1d

import pymc as pm
import pymc_ext as pmx
from celerite2.pymc import GaussianProcess
import celerite2.pymc.terms as terms

from democratic_detrender.manipulate_data import split_around_transits
from democratic_detrender.helper_functions import get_detrended_lc
from democratic_detrender.poly_AM import polyAM_function

[docs] def gp_new(time_star, lc_star, lc_err_star, time_model): """ A function that fits a GP to the out-of-transit data of a single epoch and returns the maximum a posteriori solution. Parameters ---------- time_star : array The time array of the light curve. lc_star : array The flux array of the light curve. lc_err_star : array The flux error array of the light curve. time_model : array The time array of the full light curve (including in-transit data). This is used to compute the GP prediction. Returns ------- map_soln : dict A dictionary containing the maximum a posteriori solution of the GP fit. """ # Set up the model with pm.Model() as model: rho_gp = pm.InverseGamma( "rho_gp", initval=2.0, **pmx.utils.estimate_inverse_gamma_parameters(1.0, 5.0) ) # The flux zero point mean = pm.Normal("mean", mu=0.0, sigma=10.0) # Noise parameters med_yerr = np.median(lc_err_star) std_y = np.std(lc_star) sigma_gp = pm.InverseGamma( "sigma_gp", initval=0.5 * std_y, **pmx.utils.estimate_inverse_gamma_parameters(med_yerr, std_y), ) # The Gaussian Process noise model kernel = terms.SHOTerm(sigma=sigma_gp, rho=rho_gp, Q=1.0 / 3) gp = GaussianProcess(kernel, t=time_star, diag=lc_err_star ** 2, mean=mean) gp.marginal("gp", observed=lc_star) # Compute the GP model prediction for plotting purposes pm.Deterministic("pred", gp.predict(lc_star, t=time_model)) # Optimize the model map_soln = pmx.optimize() return map_soln
[docs] def gp_method(x, y, yerr, mask, mask_fitted_planet, t0s, duration, period): """ A function that applies the GP detrending method to a list of light curve epochs. It fits a GP to the out-of-transit data of each epoch and returns the detrended light curve. Parameters ---------- x : list of arrays A list of time arrays for each epoch. y : list of arrays A list of flux arrays for each epoch. yerr : list of arrays A list of flux error arrays for each epoch. mask : list of arrays A list of boolean arrays indicating the in-transit data points for each epoch. mask_fitted_planet : list of arrays A list of boolean arrays indicating the data points used to fit the planet model for each epoch. t0s : array The transit midpoints of the planet. duration : float The duration of the planet transit in hours. period : float The orbital period of the planet in days. Returns ------- detrended_lc : array The detrended light curve obtained by applying the GP method. """ gp_mod = [] gp_mod_all = [] x_all = [] y_all = [] yerr_all = [] mask_all = [] mask_fitted_planet_all = [] for ii in range(0, len(x)): x_ii = x[ii] y_ii = y[ii] yerr_ii = yerr[ii] mask_ii = mask[ii] mask_fitted_planet_ii = mask_fitted_planet[ii] try: gp_model = gp_new(x_ii[~mask_ii], y_ii[~mask_ii], yerr_ii[~mask_ii], x_ii) gp_mod.append(gp_model["pred"]) gp_mod_all.extend(gp_model["pred"]) except Exception as e: print(f"GP failed for the {ii+1}th epoch: {e}") # gp failed for this epoch, just add nans of the same size nan_array = np.empty(np.shape(y_ii)) nan_array[:] = np.nan gp_mod.append(nan_array) gp_mod_all.extend(nan_array) x_all.extend(x_ii) y_all.extend(y_ii) yerr_all.extend(yerr_ii) mask_all.extend(mask_ii) mask_fitted_planet_all.extend(mask_fitted_planet_ii) # zoom into local window ( x_out, y_out, yerr_out, mask_out, mask_fitted_planet_out, model_out, ) = split_around_transits( np.array(x_all), np.array(y_all), np.array(yerr_all), np.array(mask_all), np.array(mask_fitted_planet_all), t0s, float(6 * duration / (24.0)) / period, period, model=np.array(gp_mod_all), ) # add a linear polynomial fit at the end model_linear = [] y_out_detrended = [] for ii in range(0, len(model_out)): x_ii = np.array(x_out[ii], dtype=float) y_ii = np.array(y_out[ii], dtype=float) mask_ii = np.array(mask_out[ii], dtype=bool) model_ii = np.array(model_out[ii], dtype=float) try: y_ii_detrended = get_detrended_lc(y_ii, model_ii) linear_ii = polyAM_function(x_ii[~mask_ii], y_ii_detrended[~mask_ii], 1) poly_interp = interp1d( x_ii[~mask_ii], linear_ii, bounds_error=False, fill_value="extrapolate" ) model_ii_linear = poly_interp(x_ii) model_linear.append(model_ii_linear) y_ii_linear_detrended = get_detrended_lc(y_ii_detrended, model_ii_linear) y_out_detrended.append(y_ii_linear_detrended) except Exception as e: print(f"GP failed for the {ii+1}th epoch at the linear step: {e}") # GP failed for this epoch, just add nans of the same size nan_array = np.empty(np.shape(y_ii)) nan_array[:] = np.nan y_out_detrended.append(nan_array) detrended_lc = np.concatenate(y_out_detrended, axis=0) return detrended_lc