Source code for democratic_detrender.poly_AM

""" This module contains functions to implement the Polynomial Autocorrelation Minimization (polyAM) detrending method. """

import numpy as np
from scipy.interpolate import interp1d
from numpy.polynomial import Polynomial

from democratic_detrender.helper_functions import durbin_watson, get_detrended_lc
from democratic_detrender.manipulate_data import split_around_transits


[docs] def polyAM_function(times, fluxes, degree): #poly_coeffs = np.polyfit(times, fluxes, degree) #model = np.polyval(poly_coeffs, times) # Fit a polynomial in a numerically stable way (x normalized to [-1,1]) p = Polynomial.fit(times, fluxes, deg=degree) # Evaluate the fitted model at the original x points model = p(times) return model
[docs] def polyAM_iterative( times, fluxes, mask, mask_fitted_planet, local_start_x, local_end_x, max_degree=30, min_degree=1, ): ### this function utilizes polyAM_function above, iterates it up to max_degree. no_pre_transit = False no_post_transit = False vals_to_minimize = [] degs_to_try = np.arange(min_degree, max_degree + 1, 1) DWstats = [] models = [] in_transit = False out_transit = True for index in range(0, len(mask_fitted_planet)): mask_val = mask_fitted_planet[index] if out_transit: if mask_val: in_transit_index = index in_transit = True out_transit = False if in_transit: if not mask_val: out_transit_index = index in_transit = False out_transit = True # this was added in to solve the case of only an ingress try: out_transit_index except NameError: out_transit_index = len(times) if in_transit_index == 0: no_pre_transit = True if out_transit_index == len(times): no_post_transit = True for deg in degs_to_try: model = polyAM_function(times[~mask], fluxes[~mask], deg) if no_pre_transit: DWstat_pre_transit = 2.0 local_start_index = 0 # just for plotting else: # print(np.where(times == local_start_x)) local_start_index = np.where(times == local_start_x)[0][0] residuals_pre_transit = ( (fluxes[local_start_index:in_transit_index] + 1) / (model[local_start_index:in_transit_index] + 1) ) - 1 DWstat_pre_transit = durbin_watson(residuals_pre_transit) if no_post_transit: DWstat_post_transit = 2.0 local_end_index = len(times) - 1 # just for plotting else: # print('') # print('times') # print(times) # print('') # print('local_end_x') # print(local_end_x) # print('') local_end_index = np.where(times == local_end_x)[0][0] npoints_missing_from_model = out_transit_index - in_transit_index residuals_post_transit = ( (fluxes[out_transit_index:local_end_index] + 1) / ( model[ out_transit_index - npoints_missing_from_model : local_end_index - npoints_missing_from_model ] + 1 ) ) - 1 DWstat_post_transit = durbin_watson(residuals_post_transit) val_to_minimize = np.sqrt( (DWstat_pre_transit - 2.0) ** 2.0 + (DWstat_post_transit - 2.0) ** 2.0 ) vals_to_minimize.append(val_to_minimize) models.append(model) best_degree = degs_to_try[np.argmin(np.array(vals_to_minimize))] best_DW_val = vals_to_minimize[np.argmin(np.array(vals_to_minimize))] best_model = models[np.argmin(np.array(vals_to_minimize))] return best_model, best_degree, best_DW_val, max_degree
[docs] def polynomial_method( x, y, yerr, mask, mask_fitted_planet, t0s, duration, period, local_x ): poly_mod = [] poly_mod_all = [] x_all = [] y_all = [] yerr_all = [] mask_all = [] mask_fitted_planet_all = [] DWs = [] for ii in range(0, len(x)): x_ii = x[ii] y_ii = y[ii] yerr_ii = yerr[ii] mask_ii = mask[ii] mask_fitted_planet_ii = mask_fitted_planet[ii] local_start_x_ii = local_x[ii][0] local_end_x_ii = local_x[ii][len(local_x[ii]) - 1] """ print('') print('') print('') print('look here:') print(local_x[ii]) print(local_start_x_ii) print(local_end_x_ii) print('') print('') print('') print('') """ try: """ print('in try:') print(x_ii) print('--------------') """ poly = polyAM_iterative( x_ii, y_ii, mask_ii, mask_fitted_planet_ii, local_start_x_ii, local_end_x_ii, ) poly_interp = interp1d( x_ii[~mask_ii], poly[0], bounds_error=False, fill_value="extrapolate" ) best_model = poly_interp(x_ii) DWs.append(poly[2]) poly_mod.append(best_model) poly_mod_all.extend(best_model) except Exception as e: print(f"polyAM failed for the {ii+1}th epoch: {e}") # gp failed for this epoch, just add nans of the same size nan_array = np.empty(np.shape(y_ii)) nan_array[:] = np.nan poly_mod.append(nan_array) poly_mod_all.extend(nan_array) x_all.extend(x_ii) y_all.extend(y_ii) yerr_all.extend(yerr_ii) mask_all.extend(mask_ii) mask_fitted_planet_all.extend(mask_fitted_planet_ii) # zoom into local window ( x_out, y_out, yerr_out, mask_out, mask_fitted_planet_out, model_out, ) = split_around_transits( np.array(x_all), np.array(y_all), np.array(yerr_all), np.array(mask_all), np.array(mask_fitted_planet_all), t0s, float(6 * duration / (24.0)) / period, period, model=np.array(poly_mod_all), ) # add a linear polynomial fit at the end model_linear = [] y_out_detrended = [] x_out_detrended = [] for ii in range(0, len(model_out)): x_ii = np.array(x_out[ii], dtype=float) y_ii = np.array(y_out[ii], dtype=float) mask_ii = np.array(mask_out[ii], dtype=bool) model_ii = np.array(model_out[ii], dtype=float) y_ii_detrended = get_detrended_lc(y_ii, model_ii) try: linear_ii = polyAM_function(x_ii[~mask_ii], y_ii_detrended[~mask_ii], 1) poly_interp = interp1d( x_ii[~mask_ii], linear_ii, bounds_error=False, fill_value="extrapolate" ) model_ii_linear = poly_interp(x_ii) model_linear.append(model_ii_linear) y_ii_linear_detrended = get_detrended_lc(y_ii_detrended, model_ii_linear) y_out_detrended.append(y_ii_linear_detrended) x_out_detrended.append(x_ii) except Exception as e: print(f"polyAM failed for the {ii+1}th epoch at the linear step: {e}") # polyAM failed for this epoch, just add nans of the same size nan_array = np.empty(np.shape(y_ii)) nan_array[:] = np.nan y_out_detrended.append(nan_array) detrended_lc = np.concatenate(y_out_detrended, axis=0) return detrended_lc, DWs