Source code for pyklip.fitpsf

import warnings
import pickle
import math
import sys

import numpy as np
import scipy.linalg as linalg
import scipy.ndimage as ndi
import scipy.ndimage.interpolation as sinterp
import scipy.optimize as optimize

import pyklip.covars as covars
import astropy.stats.circstats as circstats

# emcee more MCMC sampling
import emcee

#Check python version
if sys.version_info < (3,0):
    v2 = True
else:
    v2 = False
    
#import pymultinest if the user has it installed
if v2 == False:
    try:
        import pymultinest
        nomultinest = False
    except ModuleNotFoundError:
        nomultinest = True
elif v2 == True:
    try:
        import pymultinest
        nomultinest = False
    except ImportError:
        nomultinest = True

[docs] class FitPSF(object): """ Base class to perform astrometry on direct imaging data_stamp using GP regression. Can utilize a Bayesian framework with MCMC or a frequentist framework with least squares. Args: fitboxsize: fitting box side length (pixels) method (str): either 'mcmc' or 'maxl' depending on framework you want. Defaults to 'mcmc'. fmt (str): either 'seppa' or 'xy' depending on how you want to input the guess coordiantes Attributes: guess_x (float): (initialization) guess x position [pixels] guess_y (float): (initialization) guess y positon [pixels] guess_flux (float): guess scale factor between model and data fit_x (:py:class:`pyklip.fitpsf.ParamRange`): (result) the result from the MCMC fit for the planet's location [pixels] fit_y (:py:class:`pyklip.fitpsf.ParamRange`): (result) the result from the MCMC fit for the planet's location [pixels] fit_flux (:py:class:`pyklip.fitpsf.ParamRange`): (result) factor to scale the FM to match the flux of the data covar_params (list of :py:class:`pyklip.fitpsf.ParamRange`): (result) hyperparameters for the Gaussian processa fm_stamp (np.array): (fitting) The 2-D stamp of the forward model (centered at the nearest pixel to the guessed location) data_stamp (np.array): (fitting) The stamp of the data (centered at the nearest pixel to the guessed location) (2-D unless there were NaNs in which 1-D) noise_map (np.array): (fitting) The stamp of the noise for each pixel the data computed assuming azimuthally similar noise (same dim as data_stamp) padding (int): amount of pixels on one side to pad the data/forward model stamp sampler (emcee.EnsembleSampler): an instance of the emcee EnsambleSampler. Only for Bayesian fit. See emcee docs for more details. """ def __init__(self, fitboxsize, method='mcmc'): """ Initilaizes the FitPSF class """ # store initailization self.fitboxsize = fitboxsize if method.lower() == "maxl": self.isbayesian = False elif method.lower() == "mcmc": self.isbayesian = True else: raise ValueError("method needs to be either 'maxl' or 'mcmc'. Received {0}.".format(method)) # stuff that isn't generated yet # stamps of the data_stamp and the forward model self.fm_stamp = None # Forward Model self.padding = 0 # padding for FM. You kinda need this to shift the FM around self.data_stamp = None # Data self.noise_map = None # same shape as self.data_stamp self.data_stamp_x = None # RA offset of data_stamp (in pixels) self.data_stamp_y = None # Dec offset (in pixels) self.data_stamp_x_center = None # RA offset of center pixel (stampsize // 2) self.data_stamp_y_center = None # Dec offset of center pixel (stampsize // 2) # guess flux (a hyperparameter) self._guess_flux = None # covariance paramters. Use the covariance initilizer function to initilize them self.covar = None self.covar_param_guesses = None self.covar_param_labels = None self.include_readnoise = False # MCMC fit params self.bounds = None self.sampler = None # Max-Likelihood fit self.hess_inv = None # best fit self.fit_x = None self.fit_y = None self.fit_flux = None self.covar_params = None # automatically guess flux if it hasn't been defined @property def guess_flux(self): # if it was already set return it if self._guess_flux is not None: return self._guess_flux # if the data and fm stamps haven't been created yet, we can't do much elif self.data_stamp is None or self.fm_stamp is None: return None # guess it based on the max value in both stamps self._guess_flux = np.nanmax(self.data_stamp) / np.nanmax(self.fm_stamp) return self._guess_flux @guess_flux.setter def guess_flux(self, newval): self._guess_flux = newval
[docs] def generate_fm_stamp(self, fm_image, fm_pos=None, fm_wcs=None, extract=True, padding=5): """ Generates a stamp of the forward model and stores it in self.fm_stamp Args: fm_image: full image containing the fm_stamp fm_pos: [x,y] location of the forwrd model in the fm_image fm_wcs: if not None, specifies the sky angles in the image. If None, assume image is North up East left extract: if True, need to extract the forward model from the image. Otherwise, assume the fm_stamp is already centered in the frame (fm_image.shape // 2) padding: number of pixels on each side in addition to the fitboxsize to extract to pad the fm_stamp (should be >= 1) Returns: """ # cheeck the padding to make sure it's valid if not isinstance(padding, int): raise TypeError("padding must be an integer") if padding < 1: warnings.warn("Padding really should be >= 1 pixel so we can shift the FM around", RuntimeWarning) self.padding = padding if extract: if fm_wcs is not None: raise NotImplementedError("Have not implemented rotation using WCS") # image is now rotated North up east left # find the location of the FM psf_xpos = fm_pos[0] psf_ypos = fm_pos[1] else: # PSf is already cenetered psf_xpos = fm_image.shape[1]//2 psf_ypos = fm_image.shape[0]//2 # now we found the FM in the image, extract out a centered stamp of it # grab the coordinates of the image stampsize = 2 * self.padding + self.fitboxsize # full stamp needs padding around all sides x_stamp, y_stamp = np.meshgrid(np.arange(stampsize * 1.) - stampsize //2, np.arange(stampsize * 1.) - stampsize// 2) x_stamp += psf_xpos y_stamp += psf_ypos # zero nans because it messes with interpolation fm_image[np.where(np.isnan(fm_image))] = 0 fm_stamp = ndi.map_coordinates(fm_image, [y_stamp, x_stamp]) self.fm_stamp = fm_stamp
[docs] def generate_data_stamp(self, data, guess_loc, noise_map, radial_noise_center=None, dr=4, exclusion_radius=10): """ Generate a stamp of the data_stamp ~centered on planet and also corresponding noise map Args: data: the final collapsed data_stamp (2-D) guess_loc: guess location of where to fit the model in the data noise_map: if not None, noise map for each pixel (either same shape as input data, or shape of data stamp) if None, one will be generated assuming azimuthal noise using an annulus widthh of dr. radial_noise_center MUST be defined. radial_noise_center: if we assume the noise is azimuthally symmetric and changes radially, this is the [x,y] center for it dr: width of annulus in pixels from which the noise map will be generated exclusion_radius: radius around the guess planet location which doens't get factored into the radial noise estimate Returns: """ if noise_map is None and radial_noise_center is None: raise ValueError("radial_noise_center needs to be specified if noise map is not passed in") # store initailization self.guess_x = guess_loc[0] self.guess_y = guess_loc[1] # round to nearest pixel xguess_round = int(np.round(self.guess_x)) yguess_round = int(np.round(self.guess_y)) # get index bounds for grabbing pixels from data_stamp ymin = yguess_round - self.fitboxsize//2 xmin = xguess_round - self.fitboxsize//2 ymax = yguess_round + self.fitboxsize//2 + 1 xmax = xguess_round + self.fitboxsize//2 + 1 if self.fitboxsize % 2 == 0: # for even fitbox sizes, need to truncate ymax/xmax by 1 ymax -= 1 xmax -= 1 data_stamp = data[ymin:ymax, xmin:xmax] self.data_stamp = data_stamp # store coordinates of stamp also y_img, x_img = np.indices(data.shape, dtype=float) x_data_stamp = x_img[ymin:ymax, xmin:xmax] y_data_stamp = y_img[ymin:ymax, xmin:xmax] self.data_stamp_x = x_data_stamp self.data_stamp_y = y_data_stamp self.data_stamp_x_center = self.data_stamp_x[0, self.fitboxsize // 2] self.data_stamp_y_center = self.data_stamp_y[self.fitboxsize // 2, 0] if noise_map is not None: # check size of noise map: if noise_map.shape[0] == self.fitboxsize and noise_map.shape[1] == self.fitboxsize: noise_stamp = noise_map else: # assume it is the whole image in size noise_stamp = noise_map[ymin:ymax, xmin:xmax] else: # need to generate the noise map assuming noise is azimuthally symmetric about a center # blank map noise_stamp = np.zeros(data_stamp.shape) # define exclusion around planet. distance_from_planet = np.sqrt((x_img - self.guess_x)**2 + (y_img - self.guess_y)**2) # define radial coordinate r_img = np.sqrt((x_img - radial_noise_center[0])**2 + (y_img - radial_noise_center[1])**2) r_stamp = np.sqrt((x_data_stamp - radial_noise_center[0])**2 + (y_data_stamp - radial_noise_center[1])**2) # calculate noise for each pixel in the data_stamp stamp for y_index, x_index in np.ndindex(data_stamp.shape): r_pix = r_stamp[y_index, x_index] pixels_for_noise = np.where((np.abs(r_img - r_pix) <= dr/2.) & (distance_from_planet > exclusion_radius)) noise_stamp[y_index, x_index] = np.nanstd(data[pixels_for_noise]) self.noise_map = noise_stamp # if there are NaNs, unravel the data to 1-D and remove NaNs nanpix = np.where(np.isnan(self.data_stamp)) if np.size(nanpix) > 0: goodpix = np.where(~np.isnan(self.data_stamp)) # self.data_stamp = self.data_stamp[goodpix] # self.data_stamp_RA_offset = self.data_stamp_RA_offset[goodpix] # self.data_stamp_Dec_offset = self.data_stamp_Dec_offset[goodpix] # self.noise_map = self.noise_map[goodpix] self._usegoodpix = goodpix else: self._usegoodpix = None
[docs] def set_kernel(self, covar, covar_param_guesses, covar_param_labels, include_readnoise=False, read_noise_fraction=0.01): """ Set the Gaussian process kernel used in our fit Args: covar: Covariance kernel for GP regression. If string, can be "matern32" or "sqexp" or "diag" Can also be a function: cov = cov_function(x_indices, y_indices, sigmas, cov_params) covar_param_guesses: a list of guesses on the hyperparmeteres (size of N_hyperparams). This can be an empty list for 'diag'. covar_param_labels: a list of strings labelling each covariance parameter include_readnoise: if True, part of the noise is a purely diagonal term (i.e. read/photon noise) read_noise_fraction: fraction of the total measured noise is read noise (between 0 and 1) Returns: """ if isinstance(covar, str): if covar.lower() == "matern32": self.covar = covars.matern32 elif covar.lower() == "sqexp": self.covar = covars.sq_exp elif covar.lower() == "diag": self.covar = covars.delta else: raise ValueError("Covariance matricies currently supported are 'matern32', 'sqexp', and diag") else: # this better be a covariance function. We're trusting you self.covar = covar self.covar_param_guesses = covar_param_guesses self.covar_param_labels = covar_param_labels if include_readnoise: self.include_readnoise = True self.covar_param_guesses.append(read_noise_fraction) self.covar_param_labels.append(r"K_{\delta}")
[docs] def set_bounds(self, dx, dy, df, covar_param_bounds, read_noise_bounds=None): """ Set bounds on Bayesian priors. All paramters can be a 2 element tuple/list/array that specifies the lower and upper bounds x_min < x < x_max. Or a single value whose interpretation is specified below If you are passing in both lower and upper bounds, both should be in linear scale! Args: dx: Distance from initial guess position in pixels. For a single value, this specifies the largest distance form the initial guess (i.e. x_guess - dx < x < x_guess + dx) dy: Same as dx except with y df: Flux range. If single value, specifies how many orders of 10 the flux factor can span in one direction (i.e. log_10(guess_flux) - df < log_10(guess_flux) < log_10(guess_flux) + df covar_param_bounds: Params for covariance matrix. Like df, single value specifies how many orders of magnitude parameter can span. Otherwise, should be a list of 2-elem touples read_noise_bounds: Param for read noise term. If single value, specifies how close to 0 it can go based on powers of 10 (i.e. log_10(-read_noise_bound) < read_noise < 1 ) Returns: """ self.bounds = [] # x/RA bounds if np.size(dx) == 2: self.bounds.append(dx) else: self.bounds.append([self.guess_x - dx, self.guess_x + dx]) # y/Dec bounds if np.size(dy) == 2: self.bounds.append(dy) else: self.bounds.append([self.guess_y - dy, self.guess_y + dy]) if np.size(df) == 2: self.bounds.append(df) else: self.bounds.append([self.guess_flux / (10.**df), self.guess_flux * (10**df)]) # hyperparam bounds if np.ndim(covar_param_bounds) == 2: for covar_param_bound in covar_param_bounds: self.bounds.append(covar_param_bound) else: # this is a 1-D list, with each param specified by one paramter for covar_param_bound, covar_param_guess in zip(covar_param_bounds, self.covar_param_guesses): self.bounds.append([covar_param_guess / (10.**covar_param_bound), covar_param_guess * (10**covar_param_bound)]) if read_noise_bounds is not None: # read noise if np.size(read_noise_bounds) == 2: self.bounds.append(read_noise_bounds) else: self.bounds.append([self.covar_param_guesses[-1]/10**read_noise_bounds, 1])
[docs] def fit_psf(self, nwalkers=100, nburn=200, nsteps=800, save_chain=True, chain_output="bka-chain.pkl", numthreads=None): """ Fits the PSF to the data in either a frequentist or Bayesian way depending on initialization. Args: nwalkers: number of walkers (mcmc-only) nburn: numbe of samples of burn-in for each walker (mcmc-only) nsteps: number of samples each walker takes (mcmc-only) save_chain: if True, save the output in a pickled file (mcmc-only) chain_output: filename to output the chain to (mcmc-only) numthreads: number of threads to use (mcmc-only) Returns: """ if self.isbayesian: return self._mcmc_fit_psf(nwalkers=nwalkers, nburn=nburn, nsteps=nsteps, save_chain=save_chain, chain_output=chain_output, numthreads=numthreads) else: return self._lsqr_fit_psf()
def _mcmc_fit_psf(self, nwalkers=100, nburn=200, nsteps=800, save_chain=True, chain_output="bka-chain.pkl", numthreads=None): """ Run a Bayesian fit of the astrometry using MCMC Saves to self.chian Args: nwalkers: number of walkers nburn: numbe of samples of burn-in for each walker nsteps: number of samples each walker takes save_chain: if True, save the output in a pickled file chain_output: filename to output the chain to numthreads: number of threads to use Returns: """ # create array of initial guesses # array of guess RA, Dec, and flux # for everything that's not RA/Dec offset, should be converted to log space for MCMC sampling init_guess = np.array([self.guess_x, self.guess_y, math.log(self.guess_flux)]) # append hyperparams for covariance matrix, which also need to be converted to log space init_guess = np.append(init_guess, np.log(self.covar_param_guesses)) # number of dimensions of MCMC fit ndim = np.size(init_guess) # initialize walkers in a ball around the best fit value pos = [init_guess + 1e-4 * np.random.randn(ndim) for i in range(nwalkers)] # prior bounds also need to be put in log space sampler_bounds = np.copy(self.bounds) sampler_bounds[2:] = np.log(sampler_bounds[2:]) global lnprob sampler = emcee.EnsembleSampler(nwalkers, ndim, lnprob, args=(self, sampler_bounds, self.covar), kwargs={'readnoise' : self.include_readnoise}, threads=numthreads) # burn inf print("Running burn in") pos, _, _ = sampler.run_mcmc(pos, nburn) # reset sampler sampler.reset() # chains should hopefulyl have converged. Now run MCMC print("Burn in finished. Now sampling posterior") sampler.run_mcmc(pos, nsteps) print("MCMC sampler has finished") # convert chains in log space back in linear space sampler.chain[:,:,2:] = np.exp(sampler.chain[:,:,2:]) # save state self.sampler = sampler # save best fit values # percentiles has shape [ndims, 3] percentiles = np.swapaxes(np.percentile(sampler.flatchain, [16, 50, 84], axis=0), 0, 1) self.fit_x = ParamRange(percentiles[0][1], np.array([percentiles[0][2], percentiles[0][0]]) - percentiles[0][1]) self.fit_y = ParamRange(percentiles[1][1], np.array([percentiles[1][2], percentiles[1][0]]) - percentiles[1][1]) self.fit_flux = ParamRange(percentiles[2][1], np.array([percentiles[2][2], percentiles[2][0]]) - percentiles[2][1]) self.covar_params = [ParamRange(thispercentile[1], np.array([thispercentile[2], thispercentile[0]]) - thispercentile[1] ) for thispercentile in percentiles[3:]] if save_chain: pickle_file = open(chain_output, 'wb') pickle.dump(sampler.chain, pickle_file) pickle.dump(sampler.lnprobability, pickle_file) pickle.dump(sampler.acceptance_fraction, pickle_file) #pickle.dump(sampler.acor, pickle_file) pickle_file.close() def _lsqr_fit_psf(self): """ Do a frequentist maximum likelihood fit to the data. Approximate errors using the Hessian of the likelihood function. """ # create array of initial guesses # array of guess RA, Dec, and flux # for everything that's not RA/Dec offset, should be converted to log space for MCMC sampling # init_guess = np.array([self.guess_x, self.guess_y, math.log(self.guess_flux)]) # # append hyperparams for covariance matrix, which also need to be converted to log space # init_guess = np.append(init_guess, np.log(self.covar_param_guesses)) init_guess = np.array([self.guess_x, self.guess_y, (self.guess_flux)]) # append hyperparams for covariance matrix, which also need to be converted to log space init_guess = np.append(init_guess, (self.covar_param_guesses)) if self.bounds is None: cost_function = lnprob # construct some bounds, just very loose bounds = [[self.guess_x - self.fitboxsize/2, self.guess_x + self.fitboxsize/2], [self.guess_y - self.fitboxsize/2, self.guess_y + self.fitboxsize/2], [0, np.inf]] for _ in self.covar_param_guesses: bounds += [[0, np.inf]] cost_function_args = (self, bounds, self.covar) self.bounds = bounds else: # prior bounds also need to be put in log space sampler_bounds = np.copy(self.bounds) #sampler_bounds[2:] = np.log(sampler_bounds[2:]) cost_function = lnprob cost_function_args = (self, sampler_bounds, self.covar) cost_function_args += (self.include_readnoise, True) #global cost_function nm_result = optimize.minimize(cost_function, init_guess, args=cost_function_args, method="Nelder-Mead") # BFGS will only fit for position and flux, and their uncertainties. new_init_guess = nm_result.x[:3] new_init_guess = np.append(new_init_guess, nm_result.x[3:]) #if cost_function_args[1] is not None: # cost_function_args[1] = cost_function_args[1][:3] # modify limits to not include hyperparameters result = optimize.minimize(cost_function, new_init_guess, args=cost_function_args, method="BFGS") if not result.success: warnings.warn("Optimizer did not converge! Estimated uncertainties are likely unreliable. Msg: {0}".format(result.message)) # best fit values, and use the Hessian to approximate the uncertainties in the parameters ra_best = result.x[0] dec_best = result.x[1] flux_best = (result.x[2]) covar_params_best = [nm_result.x[i] for i in range(3, np.size(nm_result.x))] ra_err = np.sqrt(np.abs(result.hess_inv[0,0])) dec_err = np.sqrt(np.abs(result.hess_inv[1,1])) flux_err = np.sqrt(np.abs(result.hess_inv[2,2])) # convert to linear space the flux and covariance parameters covar_params_best = np.exp(covar_params_best) # flux error is approx symmetric in log space flux_err_two_sided = np.array([np.exp(flux_best - flux_err), np.exp(flux_best + flux_err)]) flux_best = math.exp(flux_best) # save best fit values # percentiles has shape [ndims, 3] self.fit_x = ParamRange(ra_best, ra_err) self.fit_y = ParamRange(dec_best, dec_err) self.fit_flux = ParamRange(flux_best, flux_err_two_sided) self.covar_params = [ParamRange(param_best, 0) for param_best in covar_params_best] self.hess_inv = result.hess_inv
[docs] def make_corner_plot(self, fig=None): """ Generate a corner plot of the posteriors from the MCMC Args: fig: if not None, a matplotlib Figure object Returns: fig: the Figure object. If input fig is None, function will make a new one """ if not self.isbayesian: raise AttributeError("Corner plot is only available if using Bayesian MCMC framework") import corner all_labels = [r"x", r"y", r"$\alpha$"] all_labels = np.append(all_labels, self.covar_param_labels) fig = corner.corner(self.sampler.flatchain, labels=all_labels, quantiles=[0.16, 0.5, 0.84], fig=fig) return fig
[docs] def best_fit_and_residuals(self, fig=None): """ Generate a plot of the best fit FM compared with the data_stamp and also the residuals Args: fig (matplotlib.Figure): if not None, a matplotlib Figure object Returns: fig (matplotlib.Figure): the Figure object. If input fig is None, function will make a new one """ import matplotlib import matplotlib.pylab as plt if fig is None: fig = plt.figure(figsize=(12, 4)) # create best fit FM dx = self.fit_x.bestfit - self.data_stamp_x_center dy = self.fit_y.bestfit - self.data_stamp_y_center fm_bestfit = self.fit_flux.bestfit * sinterp.shift(self.fm_stamp, [dy, dx]) if self.padding > 0: fm_bestfit = fm_bestfit[self.padding:-self.padding, self.padding:-self.padding] # make residual map residual_map = self.data_stamp - fm_bestfit # normalize all images to same scale colornorm = matplotlib.colors.Normalize(vmin=np.nanpercentile(self.data_stamp, 0.03), vmax=np.nanpercentile(self.data_stamp, 99.7)) # plot the data_stamp ax1 = fig.add_subplot(131) im1 = ax1.imshow(self.data_stamp, interpolation='nearest', cmap='cubehelix', norm=colornorm) ax1.invert_yaxis() ax1.set_title("Data") ax1.set_xlabel("X (pixels)") ax1.set_ylabel("Y (pixels)") ax2 = fig.add_subplot(132) im2 = ax2.imshow(fm_bestfit, interpolation='nearest', cmap='cubehelix', norm=colornorm) ax2.invert_yaxis() ax2.set_title("Best-fit Model") ax2.set_xlabel("X (pixels)") ax3 = fig.add_subplot(133) im3 = ax3.imshow(residual_map, interpolation='nearest', cmap='cubehelix', norm=colornorm) ax3.invert_yaxis() ax3.set_title("Residuals") ax3.set_xlabel("X (pixels)") fig.subplots_adjust(right=0.82) fig.subplots_adjust(hspace=0.4) ax_pos = ax3.get_position() cbar_ax = fig.add_axes([0.84, ax_pos.y0, 0.02, ax_pos.height]) cb = fig.colorbar(im1, cax=cbar_ax) cb.set_label("Counts (DN)") return fig
[docs] def lnprior(fitparams, bounds, readnoise=False, negate=False): """ Bayesian prior Args: fitparams: array of params (size N) bounds: array of (N,2) with corresponding lower and upper bound of params bounds[i,0] <= fitparams[i] < bounds[i,1] readnoise (bool): If True, the last fitparam fits for diagonal noise negate (bool): if True, negatives the probability (used for minimization algos) Returns: prior: 0 if inside bound ranges, -inf if outside """ prior = 0.0 for param, bound in zip(fitparams, bounds): if (param >= bound[1]) | (param < bound[0]): prior *= -np.inf break if negate: prior *= -1 return prior
[docs] def lnlike(fitparams, fma, cov_func, readnoise=False, negate=False): """ Likelihood function Args: fitparams: array of params (size N). First three are [dRA,dDec,f]. Additional parameters are GP hyperparams dRA,dDec: RA,Dec offsets from star. Also coordianaes in self.data_{RA,Dec}_offset f: flux scale factor to normalizae the flux of the data_stamp to the model fma (FMAstrometry): a FMAstrometry object that has been fully set up to run cov_func (function): function that given an input [x,y] coordinate array returns the covariance matrix e.g. cov = cov_function(x_indices, y_indices, sigmas, cov_params) readnoise (bool): If True, the last fitparam fits for diagonal noise negate (bool): if True, negatives the probability (used for minimization algos) Returns: likeli: log of likelihood function (minus a constant factor) """ x_trial = fitparams[0] y_trial = fitparams[1] f_trial = fitparams[2] hyperparms_trial = fitparams[3:] if readnoise: # last hyperparameter is a diagonal noise term. Separate it out readnoise_amp = np.exp(hyperparms_trial[-1]) hyperparms_trial = hyperparms_trial[:-1] # get trial parameters out of log space f_trial = math.exp(f_trial) hyperparms_trial = np.exp(hyperparms_trial) dx = x_trial - fma.data_stamp_x_center dy = y_trial - fma.data_stamp_y_center fm_shifted = sinterp.shift(fma.fm_stamp, [dy, dx]) if fma.padding > 0: fm_shifted = fm_shifted[fma.padding:-fma.padding, fma.padding:-fma.padding] if fma._usegoodpix is not None: fm_shifted = fm_shifted[fma._usegoodpix] data_stamp = fma.data_stamp[fma._usegoodpix] x_offsets = fma.data_stamp_x[fma._usegoodpix] y_offsets = fma.data_stamp_y[fma._usegoodpix] noise_map = fma.noise_map[fma._usegoodpix] else: data_stamp = fma.data_stamp x_offsets = fma.data_stamp_x y_offsets = fma.data_stamp_y noise_map = fma.noise_map diff_ravel = data_stamp.ravel() - f_trial * fm_shifted.ravel() cov = cov_func(x_offsets.ravel(), y_offsets.ravel(), noise_map.ravel(), hyperparms_trial) if readnoise: # add a diagonal term cov = (1 - readnoise_amp) * cov + readnoise_amp * np.diagflat(noise_map.ravel()**2 ) # solve Cov * x = diff for x = Cov^-1 diff. Numerically more stable than inverse # to make it faster, we comptue the Cholesky factor and use it to also compute the determinent try: (L_cov, lower_cov) = linalg.cho_factor(cov) cov_inv_dot_diff = linalg.cho_solve((L_cov, lower_cov), diff_ravel) # solve Cov x = diff for x logdet = 2*np.sum(np.log(np.diag(L_cov))) except: cov_inv = np.linalg.inv(cov) cov_inv_dot_diff = np.dot(cov_inv, diff_ravel) logdet = np.linalg.slogdet(cov)[1] residuals = diff_ravel.dot(cov_inv_dot_diff) constant = logdet loglikelihood = -0.5 * (residuals + constant) if negate: loglikelihood *= -1 return loglikelihood
[docs] def lnprob(fitparams, fma, bounds, cov_func, readnoise=False, negate=False): """ Function to compute the relative posterior probabiltiy. Product of likelihood and prior Args: fitparams: array of params (size N). First three are [dRA,dDec,f]. Additional parameters are GP hyperparams dRA,dDec: RA,Dec offsets from star. Also coordianaes in self.data_{RA,Dec}_offset f: flux scale factor to normalizae the flux of the data_stamp to the model fma: a FMAstrometry object that has been fully set up to run bounds: array of (N,2) with corresponding lower and upper bound of params bounds[i,0] <= fitparams[i] < bounds[i,1] cov_func: function that given an input [x,y] coordinate array returns the covariance matrix e.g. cov = cov_function(x_indices, y_indices, sigmas, cov_params) readnoise (bool): If True, the last fitparam fits for diagonal noise negate (bool): if True, negatives the probability (used for minimization algos) Returns: """ lp = lnprior(fitparams, bounds, readnoise=readnoise, negate=negate) if not np.isfinite(lp): if not negate: return -np.inf else: return np.inf return lp + lnlike(fitparams, fma, cov_func, readnoise=readnoise, negate=negate)
[docs] class ParamRange(object): """ Stores the best fit value and uncertainities for a parameter in a neat fasion Args: bestfit (float): the bestfit value err_range: either a float or a 2-element tuple (+val1, -val2) and gives the 1-sigma range Attributes: bestfit (float): the bestfit value error (float): the average 1-sigma error error_2sided (np.array): [+error1, -error2] 2-element array with asymmetric errors """ def __init__(self, bestfit, err_range): self.bestfit = bestfit if isinstance(err_range, (int, float)): self.error = err_range self.error_2sided = np.array([err_range, -err_range]) elif len(err_range) == 2: self.error_2sided = np.array(err_range) self.error = np.mean(np.abs(err_range))
[docs] class FMAstrometry(FitPSF): """ Specifically for fitting astrometry of a directly imaged companion relative to its star. Extension of :py:class:`pyklip.fitpsf.FitPSF`. Args: guess_sep: the guessed separation (pixels) guess_pa: the guessed position angle (degrees) fitboxsize: fitting box side length (pixels) method (str): either 'mcmc' or 'maxl' depending on framework you want. Defaults to 'mcmc'. Attributes: guess_sep (float): (initialization) guess separation for planet [pixels] guess_pa (float): (initialization) guess PA for planet [degrees] guess_RA_offset (float): (initialization) guess RA offset [pixels] guess_Dec_offset (float): (initialization) guess Dec offset [pixels] raw_RA_offset (:py:class:`pyklip.fitpsf.ParamRange`): (result) the raw result from the MCMC fit for the planet's location [pixels] raw_Dec_offset (:py:class:`pyklip.fitpsf.ParamRange`): (result) the raw result from the MCMC fit for the planet's location [pixels] raw_flux (:py:class:`pyklip.fitpsf.ParamRange`): (result) factor to scale the FM to match the flux of the data covar_params (list of :py:class:`pyklip.fitpsf.ParamRange`): (result) hyperparameters for the Gaussian process raw_sep(:py:class:`pyklip.fitpsf.ParamRange`): (result) the inferred raw result from the MCMC fit for the planet's location [pixels] raw_PA(:py:class:`pyklip.fitpsf.ParamRange`): (result) the inferred raw result from the MCMC fit for the planet's location [degrees] RA_offset(:py:class:`pyklip.fitpsf.ParamRange`): (result) the RA offset of the planet that includes all astrometric errors [pixels or mas] Dec_offset(:py:class:`pyklip.fitpsf.ParamRange`): (result) the Dec offset of the planet that includes all astrometric errors [pixels or mas] sep(:py:class:`pyklip.fitpsf.ParamRange`): (result) the separation of the planet that includes all astrometric errors [pixels or mas] PA(:py:class:`pyklip.fitpsf.ParamRange`): (result) the PA of the planet that includes all astrometric errors [degrees] fm_stamp (np.array): (fitting) The 2-D stamp of the forward model (centered at the nearest pixel to the guessed location) data_stamp (np.array): (fitting) The 2-D stamp of the data (centered at the nearest pixel to the guessed location) noise_map (np.array): (fitting) The 2-D stamp of the noise for each pixel the data computed assuming azimuthally similar noise padding (int): amount of pixels on one side to pad the data/forward model stamp sampler (emcee.EnsembleSampler): an instance of the emcee EnsambleSampler. Only for Bayesian fit. See emcee docs for more details. """ def __init__(self, guess_sep, guess_pa, fitboxsize, method='mcmc'): # derive delta RA and delta Dec # in pixels self.guess_sep = guess_sep self.guess_pa = guess_pa self.guess_RA_offset = self.guess_sep * np.sin(np.radians(self.guess_pa)) self.guess_Dec_offset = self.guess_sep * np.cos(np.radians(self.guess_pa)) # best fit self.raw_RA_offset = None self.raw_Dec_offset = None self.raw_flux = None # best fit infered parameters self.raw_sep = None self.raw_PA = None self.RA_offset = None self.Dec_offset = None self.sep = None self.PA = None super(FMAstrometry, self).__init__(fitboxsize, method)
[docs] def generate_fm_stamp(self, fm_image, fm_center, fm_wcs=None, extract=True, padding=5): """ Generates a stamp of the forward model and stores it in self.fm_stamp Args: fm_image: full imgae containing the fm_stamp fm_center: [x,y] center of image (assuing fm_stamp is located at sep/PA) corresponding to guess_sep and guess_pa fm_wcs: if not None, specifies the sky angles in the image. If None, assume image is North up East left extract: if True, need to extract the forward model from the image. Otherwise, assume the fm_stamp is already centered in the frame (fm_image.shape // 2) padding: number of pixels on each side in addition to the fitboxsize to extract to pad the fm_stamp (should be >= 1) Returns: """ fm_x = -self.guess_RA_offset + fm_center[0] fm_y = self.guess_Dec_offset + fm_center[1] self.fm_center = fm_center super(FMAstrometry, self).generate_fm_stamp(fm_image, fm_pos=[fm_x, fm_y], fm_wcs=fm_wcs, extract=extract, padding=padding)
[docs] def generate_data_stamp(self, data, data_center, data_wcs=None, noise_map=None, dr=4, exclusion_radius=10): """ Generate a stamp of the data_stamp ~centered on planet and also corresponding noise map Args: data: the final collapsed data_stamp (2-D) data_center: location of star in the data_stamp. data_wcs: sky angles WCS object. To rotate the image properly [NOT YET IMPLMETNED] if None, data_stamp is already rotated North up East left noise_map: if not None, noise map for each pixel in the data_stamp (2-D). if None, one will be generated assuming azimuthal noise using an annulus widthh of dr dr: width of annulus in pixels from which the noise map will be generated exclusion_radius: radius around the guess planet location which doens't get factored into noise estimate Returns: """ # rotate image North up east left if necessary if data_wcs is not None: # rotate raise NotImplementedError("Rotating based on WCS is not currently implemented yet") xguess = -self.guess_RA_offset + data_center[0] yguess = self.guess_Dec_offset + data_center[1] self.data_center = data_center super(FMAstrometry, self).generate_data_stamp(data, [xguess, yguess], None, radial_noise_center=data_center, dr=dr, exclusion_radius=exclusion_radius)
[docs] def fit_astrometry(self, nwalkers=100, nburn=200, nsteps=800, save_chain=True, chain_output="bka-chain.pkl", numthreads=None): """ Fits the PSF of the planet in either a frequentist or Bayesian way depending on initialization. Args: nwalkers: number of walkers (mcmc-only) nburn: numbe of samples of burn-in for each walker (mcmc-only) nsteps: number of samples each walker takes (mcmc-only) save_chain: if True, save the output in a pickled file (mcmc-only) chain_output: filename to output the chain to (mcmc-only) numthreads: number of threads to use (mcmc-only) Returns: """ self.fit_psf(nwalkers, nburn, nsteps, save_chain, chain_output, numthreads) # convert chains to relative separation self.sampler.chain[:,:,0] -= self.data_center[0] self.sampler.chain[:,:,0] *= -1 self.sampler.chain[:,:,1] -= self.data_center[1] # save RA/dec offsets self.raw_RA_offset = ParamRange(-(self.fit_x.bestfit - self.data_center[0]), self.fit_x.error_2sided[::-1]) self.raw_Dec_offset = ParamRange(self.fit_y.bestfit - self.data_center[1], self.fit_y.error_2sided[::-1]) self.raw_flux = self.fit_flux
[docs] def propogate_errs(self, star_center_err=None, platescale=None, platescale_err=None, pa_offset=None, pa_uncertainty=None): """ Propogate astrometric error. Stores results in its own fields Args: star_center_err (float): uncertainity of the star location (pixels) platescale (float): mas/pix conversion to angular coordinates platescale_err (float): mas/pix error on the platescale pa_offset (float): Offset, in the same direction as position angle, to set North up (degrees) pa_uncertainity (float): Error on position angle/true North calibration (Degrees) """ if self.isbayesian: # format MCMC chains # ensure numpy arrays x_fit = self.sampler.chain[:,:,0].flatten() y_fit = self.sampler.chain[:,:,1].flatten() else: x_fit = self.raw_RA_offset.bestfit y_fit = self.raw_Dec_offset.bestfit # calcualte statistial errors in x and y x_best = self.raw_RA_offset.bestfit y_best = self.raw_Dec_offset.bestfit x_1sigma_raw = self.raw_RA_offset.error_2sided y_1sigma_raw = self.raw_Dec_offset.error_2sided print("Raw X/Y Centroid = ({0}, {1}) with statistical error of {2} pix in X and {3} pix in Y".format(x_best, y_best, x_1sigma_raw, y_1sigma_raw)) # calculate sep and pa from x/y separation sep_fit = np.sqrt((x_fit)**2 + (y_fit)**2) # For PA compute mean using circstats package, find delta_pa between all points and the mean, # then compute median/precentiles pa_fit = (np.arctan2(y_fit, -x_fit) - (np.pi/2.0)) % (2.0*np.pi) # Radians! if self.isbayesian: # for Bayesian, convert the chains to sep/pa to get uncertainity pa_mean = circstats.circmean(pa_fit - np.pi) + np.pi # Circmean [-pi, pi] d_pa = np.arctan2(np.sin(pa_fit-pa_mean), np.cos(pa_fit-pa_mean)) pa_median = np.median(d_pa) + pa_mean pa_percentile = np.nanpercentile(d_pa, [84,16]) - np.median(d_pa) # median of d_pa should be small pa_fit = np.degrees(pa_fit) # Convert to degrees # calculate sep and pa statistical errors sep_best = np.median(sep_fit) pa_best = np.degrees(pa_median) sep_1sigma_raw = (np.percentile(sep_fit, [84,16]) - sep_best) pa_1sigma_raw = np.degrees(pa_percentile) else: # since we just have point estimates, use analytical error propogration sep_best = sep_fit pa_fit = np.degrees(pa_fit) pa_best = pa_fit sep_1sigma_raw = (x_fit/sep_fit)**2 * x_1sigma_raw**2 + (y_fit/sep_fit)**2 * y_1sigma_raw**2 sep_1sigma_raw = np.sqrt(sep_1sigma_raw) pa_1sigma_raw = (y_fit/sep_fit**2)**2 * x_1sigma_raw**2 + (x_fit/sep_fit**2)**2 * y_1sigma_raw**2 pa_1sigma_raw = np.sqrt(pa_1sigma_raw) pa_1sigma_raw = np.degrees(pa_1sigma_raw) print("Raw Sep/PA Centroid = ({0}, {1}) with statistical error of {2} pix in Sep and {3} pix in PA".format(sep_best, pa_best, sep_1sigma_raw, pa_1sigma_raw)) # store the raw sep and PA values self.raw_sep = ParamRange(sep_best, sep_1sigma_raw) self.raw_PA = ParamRange(pa_best, pa_1sigma_raw) # Now let's start propogating error terms if they are supplied. # We do them in Sep/PA space first since it's more natural here # star center error if star_center_err is None: print("Skipping star center uncertainity...") star_center_err = 0 else: print("Adding in star center uncertainity") sep_err_pix = (sep_1sigma_raw**2) + star_center_err**2 sep_err_pix = np.sqrt(sep_err_pix) # plate scale error if platescale is not None: print("Converting pixels to milliarcseconds") if platescale_err is None: print("Skipping plate scale uncertainity...") platescale_err = 0 else: print("Adding in plate scale error") sep_err_mas = np.sqrt((sep_err_pix * platescale)**2 + (platescale_err * sep_best)**2) # PA Offset if pa_offset is not None: print("Adding in a PA/North angle offset") # Have to repeat wrapping procedure above in case pa_offset is large pa_fit = np.radians((pa_fit + pa_offset) % 360) # Convert back to radians for circstats if self.isbayesian: pa_mean = circstats.circmean(pa_fit - np.pi) + np.pi # Circmean [-pi, pi] d_pa = np.arctan2(np.sin(pa_fit-pa_mean), np.cos(pa_fit-pa_mean)) pa_median = np.median(d_pa) + pa_mean pa_best = np.degrees(pa_median) else: pa_best = np.degrees(pa_fit) pa_fit = np.degrees(pa_fit) # Convert back to degrees # PA Uncertainity if pa_uncertainty is None: print("Skipping PA/North uncertainity...") pa_uncertainty = 0 else: print("Adding in PA uncertainity") pa_err = np.radians(pa_1sigma_raw)**2 + (star_center_err/sep_best)**2 + np.radians(pa_uncertainty)**2 pa_err = np.sqrt(pa_err) pa_err_deg = np.degrees(pa_err) sep_err_pix_avg = np.mean(np.abs(sep_err_pix)) pa_err_deg_avg = np.mean(np.abs(pa_err_deg)) print("Sep = {0} +/- {1} ({2}) pix, PA = {3} +/- {4} ({5}) degrees".format(sep_best, sep_err_pix_avg, sep_err_pix, pa_best, pa_err_deg_avg, pa_err_deg)) # Store sep/PA (excluding platescale) values self.sep = ParamRange(sep_best, sep_err_pix) self.PA = ParamRange(pa_best, pa_err_deg) if platescale is not None: sep_err_mas_avg = np.mean(np.abs(sep_err_mas)) print("Sep = {0} +/- {1} ({2}) mas, PA = {3} +/- {4} ({5}) degrees".format(sep_best*platescale, sep_err_mas_avg, sep_err_mas, pa_best, pa_err_deg_avg, pa_err_deg)) # overwrite sep values with values converted to milliarcseconds self.sep = ParamRange(sep_best*platescale, sep_err_mas) # convert PA errors back into x y (RA/Dec) ra_fit = -sep_fit * np.cos(np.radians(pa_fit+90)) dec_fit = sep_fit * np.sin(np.radians(pa_fit+90)) # ra/dec statistical errors. This is after the rotation from pa_offset is applied if self.isbayesian: ra_best = np.median(ra_fit) dec_best = np.median(dec_fit) ra_1sigma_raw = np.percentile(ra_fit, [84,16]) - ra_best dec_1sigma_raw = np.percentile(dec_fit, [84,16]) - dec_best else: ra_best = ra_fit dec_best = dec_fit # this should depend on sine and cosine of pa_offset pa_offset_rad = np.radians(pa_offset) ra_1sigma_raw = np.sqrt(math.cos(pa_offset_rad)**2 * x_1sigma_raw**2 + math.sin(pa_offset_rad)**2 * y_1sigma_raw**2) dec_1sigma_raw = np.sqrt(math.sin(pa_offset_rad)**2 * x_1sigma_raw**2 + math.cos(pa_offset_rad)**2 * y_1sigma_raw**2) ra_err_full_pix = np.sqrt((ra_1sigma_raw**2) + (star_center_err)**2 + (dec_best * np.radians(pa_uncertainty))**2 ) dec_err_full_pix = np.sqrt((dec_1sigma_raw**2) + (star_center_err)**2 + (ra_best * np.radians(pa_uncertainty))**2 ) # Store error propgoated RA/Dec values (excluding platescale) self.RA_offset = ParamRange(ra_best, ra_err_full_pix) self.Dec_offset = ParamRange(dec_best, dec_err_full_pix) print("RA offset = {0} +/- {1} ({2}) pix".format(self.RA_offset.bestfit, self.RA_offset.error, self.RA_offset.error_2sided)) print("Dec offset = {0} +/- {1} ({2}) pix".format(self.Dec_offset.bestfit, self.Dec_offset.error, self.Dec_offset.error_2sided)) if platescale is not None: ra_err_full_mas = np.sqrt((ra_err_full_pix*platescale)**2 + (platescale_err * ra_best)**2) dec_err_full_mas = np.sqrt((dec_err_full_pix*platescale)**2 + (platescale_err * dec_best)**2) # Overwrite with calibrated RA/Dec converted to milliarcsecs self.RA_offset = ParamRange(ra_best*platescale, ra_err_full_mas) self.Dec_offset = ParamRange(dec_best*platescale, dec_err_full_mas) print("RA offset = {0} +/- {1} ({2}) mas".format(self.RA_offset.bestfit, self.RA_offset.error, self.RA_offset.error_2sided)) print("Dec offset = {0} +/- {1} ({2}) mas".format(self.Dec_offset.bestfit, self.Dec_offset.error, self.Dec_offset.error_2sided))
[docs] def quick_psf_fit(data, psf, x_guess, y_guess, fitboxsize): """ A wrapper for a quick maximum likelihood fit to a PSF to the data. Args: data (np.array): 2-D data frame psf (np.array): 2-D PSF template. This should be smaller than the size of data and larger than the fitboxsize x_guess (float): approximate x position of the location you are fitting the psf to y_guess (float): approximate y position of the location you are fitting the psf to fitboxsize (int): fitting region is a square. This is the lenght of one side of the square Returns: x_fit, y_fit, flux_fit x_fit (float): x position y_fit (float): y position flux_fit (float): multiplicative scale factor for the psf to match the data """ fit = FitPSF(fitboxsize, method='maxl') padding = int((np.min(psf.shape) - fitboxsize) // 2) fit.generate_fm_stamp(psf, extract=False, padding=padding) fit.generate_data_stamp(data, [x_guess, y_guess], np.ones(data.shape)) fit.set_kernel("diag", [], [], False) fit.fit_psf() return fit.fit_x.bestfit, fit.fit_y.bestfit, fit.fit_flux.bestfit
[docs] class PlanetEvidence(FMAstrometry): """ Specifically for nested sampling of the parameter space of a directly imaged companion relative to its star. Extension of :py:class:`pyklip.fitpsf.FitPSF`. Args: guess_sep: the guessed separation (pixels) guess_pa: the guessed position angle (degrees) fitboxsize: fitting box side length (pixels) fm_basename: Prefix of the foward model sampling files multinest saves in /chains/ null_basename: Prefix of the null hypothesis model sampling files multinest saves in /chains/ Attributes: guess_sep (float): (initialization) guess separation for planet [pixels] guess_pa (float): (initialization) guess PA for planet [degrees] guess_RA_offset (float): (initialization) guess RA offset [pixels] guess_Dec_offset (float): (initialization) guess Dec offset [pixels] raw_RA_offset (:py:class:`pyklip.fitpsf.ParamRange`): (result) the raw result from the MCMC fit for the planet's location [pixels] raw_Dec_offset (:py:class:`pyklip.fitpsf.ParamRange`): (result) the raw result from the MCMC fit for the planet's location [pixels] raw_flux (:py:class:`pyklip.fitpsf.ParamRange`): (result) factor to scale the FM to match the flux of the data covar_params (list of :py:class:`pyklip.fitpsf.ParamRange`): (result) hyperparameters for the Gaussian process raw_sep(:py:class:`pyklip.fitpsf.ParamRange`): (result) the inferred raw result from the MCMC fit for the planet's location [pixels] raw_PA(:py:class:`pyklip.fitpsf.ParamRange`): (result) the inferred raw result from the MCMC fit for the planet's location [degrees] RA_offset(:py:class:`pyklip.fitpsf.ParamRange`): (result) the RA offset of the planet that includes all astrometric errors [pixels or mas] Dec_offset(:py:class:`pyklip.fitpsf.ParamRange`): (result) the Dec offset of the planet that includes all astrometric errors [pixels or mas] sep(:py:class:`pyklip.fitpsf.ParamRange`): (result) the separation of the planet that includes all astrometric errors [pixels or mas] PA(:py:class:`pyklip.fitpsf.ParamRange`): (result) the PA of the planet that includes all astrometric errors [degrees] fm_stamp (np.array): (fitting) The 2-D stamp of the forward model (centered at the nearest pixel to the guessed location) data_stamp (np.array): (fitting) The 2-D stamp of the data (centered at the nearest pixel to the guessed location) noise_map (np.array): (fitting) The 2-D stamp of the noise for each pixel the data computed assuming azimuthally similar noise padding (int): amount of pixels on one side to pad the data/forward model stamp sampler (pymultinest.run): function that runs the pymultinest sampling for both hypotheses """ def __init__(self, guess_sep, guess_pa, fitboxsize, sampling_outputdir, l_only = False, fm_basename = 'Planet', null_basename = 'Null'): #Check if pymultinest is not installed and imported if nomultinest and v2 == False: raise ModuleNotFoundError('Pymultinest is not installed') elif nomultinest and v2 == True: raise ImportError('Pymultinest is not installed') import os # derive delta RA and delta Dec # in pixels self.guess_sep = guess_sep self.guess_pa = guess_pa #Set where samples get stored self.fm_basename = str(sampling_outputdir) + str(fm_basename) + '-' self.null_basename = str(sampling_outputdir) + str(null_basename) + '-' #Set which null hypothesis model to use self.l_only = l_only if not os.path.exists(str(sampling_outputdir)): os.mkdir(str(sampling_outputdir)) super(PlanetEvidence, self).__init__(self.guess_sep, self.guess_pa, fitboxsize)
[docs] def multifit(self): """ Nested sampling parameter estimation and evidence calculation for the forward model and correlated noise. """ #Copy bounds to use for priors sampler_bounds = np.copy(self.bounds) sampler_bounds[2:] = np.log(sampler_bounds[2:]) def nested_prior_fm(params, ndim, nparams): params[0] = sampler_bounds[0][0] + params[0]*(sampler_bounds[0][1] - sampler_bounds[0][0]) params[1] = sampler_bounds[1][0] + params[1]*(sampler_bounds[1][1] - sampler_bounds[1][0]) params[2] = sampler_bounds[2][0] + params[2]*(sampler_bounds[2][1] - sampler_bounds[2][0]) params[3] = sampler_bounds[3][0] + params[3]*(sampler_bounds[3][1] - sampler_bounds[3][0]) def nested_prior_null3(params, ndim, nparams): params[0] = sampler_bounds[0][0] + params[0]*(sampler_bounds[0][1] - sampler_bounds[0][0]) params[1] = sampler_bounds[1][0] + params[1]*(sampler_bounds[1][1] - sampler_bounds[1][0]) params[2] = sampler_bounds[3][0] + params[2]*(sampler_bounds[3][1] - sampler_bounds[3][0]) def nested_prior_null1(params, ndim, nparams): params[0] = sampler_bounds[3][0] + params[0]*(sampler_bounds[3][1] - sampler_bounds[3][0]) global lnlike def nested_lnlike_fm(fitparams, ndim, nparams, readnoise = False): x_trial = fitparams[0] y_trial = fitparams[1] f_trial = fitparams[2] hyperparms_trial = fitparams[3] if readnoise: # last hyperparameter is a diagonal noise term. Separate it out readnoise_amp = np.exp(hyperparms_trial) hyperparms_trial = hyperparms_trial newparams = [x_trial, y_trial, f_trial, hyperparms_trial] return lnlike(newparams, self, self.covar) def nested_lnlike_null3(fitparams, ndim, nparams, readnoise=False): x_trial = fitparams[0] y_trial = fitparams[1] f_trial = -np.inf hyperparms_trial = fitparams[2] if readnoise: # last hyperparameter is a diagonal noise term. Separate it out readnoise_amp = np.exp(hyperparms_trial) hyperparms_trial = hyperparms_trial newparams = [x_trial, y_trial, f_trial, hyperparms_trial] return lnlike(newparams, self, self.covar) def nested_lnlike_null1(fitparams, ndim, nparams, readnoise=False): x_trial = self.guess_x y_trial = self.guess_y f_trial = -np.inf hyperparms_trial = fitparams[0] if readnoise: # last hyperparameter is a diagonal noise term. Separate it out readnoise_amp = np.exp(hyperparms_trial) hyperparms_trial = hyperparms_trial newparams = [x_trial, y_trial, f_trial, hyperparms_trial] return lnlike(newparams, self, self.covar) #Run MultiNest fir the Forward Model pymultinest.run(nested_lnlike_fm, nested_prior_fm, n_dims=4, outputfiles_basename=self.fm_basename, write_output=True, resume=False, init_MPI=False) #Run MultiNest for the null hypothesis if self.l_only == False: pymultinest.run(nested_lnlike_null3, nested_prior_null3, n_dims=3, outputfiles_basename=self.null_basename, write_output=True, resume=False, init_MPI=False) else: pymultinest.run(nested_lnlike_null1, nested_prior_null1, n_dims=1, outputfiles_basename=self.null_basename, write_output=True, resume=False, init_MPI=False) self.fm_data = pymultinest.Analyzer(4, outputfiles_basename=self.fm_basename) if self.l_only == False: self.null_data = pymultinest.Analyzer(3, outputfiles_basename=self.null_basename) elif self.l_only == True: self.null_data = pymultinest.Analyzer(1, outputfiles_basename=self.null_basename)
[docs] def nested_corner_plots(self, posts, n_dim): import corner if n_dim == 4: x_data = np.ndarray.flatten(posts[:,0]) y_data = np.ndarray.flatten(posts[:,1]) f_data = np.ndarray.flatten(np.exp(posts[:,2])) hyperparam_data = np.ndarray.flatten(np.exp(posts[:,3])) data = np.vstack([x_data,y_data,f_data,hyperparam_data]) all_labels = [r"x",r"y",r"$\alpha$",r"l"] elif n_dim == 3: x_data = np.ndarray.flatten(posts[:,0]) y_data = np.ndarray.flatten(posts[:,1]) hyperparam_data = np.ndarray.flatten(np.exp(posts[:,2])) data = np.vstack([x_data,y_data,hyperparam_data]) all_labels = [r"x", r"y", r"l"] elif n_dim == 1: hyperparam_data = np.ndarray.flatten(np.exp(posts[:,0])) data = np.vstack([hyperparam_data]) all_labels = [r"l"] fig = corner.corner(data.T, labels = all_labels, quantiles=[0.16, 0.5, 0.84]) return fig
[docs] def fit_plots(self): fm_data = pymultinest.Analyzer(4, outputfiles_basename=self.fm_basename) fm_posts = fm_data.get_equal_weighted_posterior() fm_corner = self.nested_corner_plots(fm_posts, n_dim=4) if self.l_only == False: null_data = pymultinest.Analyzer(3, outputfiles_basename=self.null_basename) null_posts = null_data.get_equal_weighted_posterior() null_corner = self.nested_corner_plots(null_posts, n_dim=3) elif self.l_only == True: null_data = pymultinest.Analyzer(1, outputfiles_basename=self.null_basename) null_posts = null_data.get_equal_weighted_posterior() null_corner = self.nested_corner_plots(null_posts, n_dim=1) return fm_corner, null_corner
[docs] def fit_stats(self): fm_stats = self.fm_data.get_stats() null_stats = self.null_data.get_stats() return fm_stats, null_stats
[docs] def fm_residuals(self): fm_stats = self.fm_data.get_stats() self.fit_x = ParamRange(fm_stats['modes'][0]['mean'][0], fm_stats['modes'][0]['sigma'][0]) self.fit_y = ParamRange(fm_stats['modes'][0]['mean'][1], fm_stats['modes'][0]['sigma'][1]) self.fit_flux = ParamRange(np.exp(fm_stats['modes'][0]['mean'][2]), np.exp(fm_stats['modes'][0]['sigma'][2])) self.covar_params = ParamRange(np.exp(fm_stats['modes'][0]['mean'][3]), np.exp(fm_stats['modes'][0]['sigma'][3])) residual_fig = self.best_fit_and_residuals() # create best fit FM dx = self.fit_x.bestfit - self.data_stamp_x_center dy = self.fit_y.bestfit - self.data_stamp_y_center fm_bestfit = self.fit_flux.bestfit * sinterp.shift(self.fm_stamp, [dy, dx]) if self.padding > 0: fm_bestfit = fm_bestfit[self.padding:-self.padding, self.padding:-self.padding] # make residual map residual_map = self.data_stamp - fm_bestfit #Compute SNR as defined by maximum of FM best fit and the standard deviatioon of residuals snr_stamp = np.max(fm_bestfit)/np.nanstd(residual_map) print('SNR from data stamp residuals: ' + str(snr_stamp)) return residual_fig, snr_stamp