# Source code for pyklip.empca

#!/usr/bin/env python

import numpy as np
import numpy.fft as fft
import scipy.linalg as la
import scipy.ndimage as ndimage
import scipy.interpolate as sinterp
import scipy.signal as signal
from scipy.stats import t
import sys
import multiprocessing
import time

"""
Weighted Principal Component Analysis using Expectation Maximization

Original: Stephen Bailey, Spring 2012
Rewritten by Timothy Brandt, Spring 2016
"""

[docs] def np_calc_chisq(data, b, w, coef): """ Calculate chi squared Args: im: nim x npix, single-precision numpy.ndarray. Data to be fit by the basis images b: nvec x npts, double precision numpy.ndarray. The nvec basis images. w: nim x npts, single-precision numpy.ndarray. Weights (inverse variances) of the data. coef: nvec x npts, double precision numpy.ndarray. The coefficients of the basis image fits. Returns: chisq, the total chi squared summed over all points and all images """ chisq = 0 nim = data.shape[0] for i in range(nim): chisq += np.sum((data[i] - np.sum(coef[i] * b.T, axis=1)) ** 2 * w[i]) return chisq
[docs] def set_pixel_weights(imflat, rflat, ivar=None, mode='standard', inner_sup=17, outer_sup=66): ''' Args: imflat: array of flattend images, shape (N, number of section indices) rflat: radial component of the polar coordinates flattened to 1D, length = number of section indices mode: 'standard': assume poission statistics to calculate variance as sqrt(photon count) use inverse sqrt(variance) as pixel weights and multiply by a radial weighting inner_sup: radius within which to supress weights outer_sup: radius beyond which to supress weights Returns: pixel weights for empca ''' #default weights are ones weights = np.ones(imflat.shape) if mode.lower() == 'standard': # this is simply using sqrt(pixel value) as the standard deviation, hence inverse weights weights = 1. / (np.sqrt(np.abs(imflat)) + 10) weights *= imflat != 0 # suppress contribution from pixels beyond inner and outer working angles weights *= 1 / (1 + np.exp((inner_sup - rflat) / 1.)) weights *= 1 / (1 + np.exp((rflat - outer_sup) / 1.)) if mode.lower() == 'standard_ivar': #TODO: add ivar to CHARISData class and implement this method pass return weights
def _random_orthonormal(nvec, nvar, seed=1): ''' Generate random orthonormal vectors as initial guess model Doesn't protect against rare duplicate vectors leading to 0s Args: nvec: rank of model nvar: number of parameters (e.g. number of pixels in an image for psf fitting) seed: Returns: array of random orthonormal vectors A[nvec, nvar] ''' if seed is not None: np.random.seed(seed) A = np.random.normal(size=(nvec, nvar)) for i in range(nvec): A[i] /= np.linalg.norm(A[i]) for i in range(1, nvec): for j in range(0, i): A[i] -= np.dot(A[j], A[i])*A[j] A[i] /= np.linalg.norm(A[i]) if np.any(np.isnan(A)): raise ValueError("random orthonormal is nan") return A
[docs] def weighted_empca(data, weights=None, niter=25, nvec=5, randseed=1, maxcpus=1, silent=True): ''' Perform iterative low-rank matrix approximation of data using weights. Generated model vectors are not orthonormal and are not rotated/ranked by ability to model the data, but as a set they are good at describing the data. Args: data: images to model weights: weights for every pixel niter: maximum number of iterations to perform nvec: number of vectors to solve (rank of the approximation) randseed: rand num generator seed; if None, don't re-initialize maxcpus: maximum cpus to use for parallel programming silent: bool, whether to show chi_squared for each iteration Returns: returns the best low-rank approximation to the data in a weighted least-squares sense (dot product of coefficients and basis vectors). ''' if weights is None: weights = np.ones(data.shape, float) ################################################################## # The following code makes sure that there are two copies each # of data and weights, one in C format (last axis fast) and one # in Fortran format (first axis fast). This costs a factor of # two in memory usage but speeds up access later. ################################################################## if not (isinstance(data, np.ndarray) and isinstance(weights, np.ndarray)): raise TypeError("'data' and 'weights' must be numpy ndarrays.") if not (data.shape == weights.shape and len(data.shape) == 2): raise ValueError("'data' and 'weights' must be 2D arrays of the same shape.") if data.flags['C']: dataC = data.astype(float) dataF = (dataC.T).copy(order='C') elif data.flags['F']: dataC = data.copy(order='C').astype(float) dataF = data.T.astype(float) else: raise AttributeError("Attribute 'flags' missing from data.") if weights.flags['C']: weightsC = weights.astype(float) weightsF = (weightsC.T).copy(order='C') elif weights.flags['F']: weightsC = weights.copy(order='C').astype(float) weightsF = weights.T.astype(float) else: raise AttributeError("Attribute 'flags' missing from weights.") ################################################################## # Random initial guess for the low-rank approximation, zero # for the initial fit/approximation coefficients. ################################################################## nobs, nvar = data.shape P = _random_orthonormal(nvec, nvar, seed=randseed) C = np.zeros((nobs, nvec)) if not silent: print('iter dchi2 R2 time (s)') ncpus = multiprocessing.cpu_count() if maxcpus is not None: ncpus = min(ncpus, maxcpus) chisq_orig = np_calc_chisq(dataC, P*0, weightsC, C) chisq_last = chisq_orig datwgt = dataC*weightsC singular_matrix = 0 for itr in range(1, niter + 1): tstart = time.time() ############################################################## # Solve for best-fit coefficients with the previous/first # low-rank approximation. ############################################################## P3D = np.empty((P.shape[0], P.shape[0], P.shape[1])) for i in range(P.shape[0]): P3D[i] = P*P[i] A = np.tensordot(weights, P3D.T, axes=1) b = np.dot(datwgt, P.T) try: C = np.linalg.solve(A, b).T except: singular_matrix += 1 Ainv = np.linalg.pinv(A) C = np.einsum('nmp,np->nm', Ainv, b).T ############################################################## # Compute the weighted residual (chi squared) value from the # previous fit. ############################################################## if not silent: chisq = np_calc_chisq(dataC, P, weightsC, C.T) print('%3d %9.3g %12.6f %11.3f' % (itr, chisq - chisq_last, 1 - chisq / chisq_orig, time.time() - tstart)) chisq_last = chisq if itr == niter: ########################################################## # Compute the low-rank approximation to the data. ########################################################## model = np.dot(C.T, P) else: ########################################################## # Update the low-rank approximation. ########################################################## C3D = np.empty((C.shape[0], C.shape[0], C.shape[1])) for i in range(C.shape[0]): C3D[i] = C*C[i] A = np.tensordot(weights.T, C3D.T, axes=1) b = np.dot(datwgt.T, C.T) try: P = np.linalg.solve(A, b).T except: singular_matrix += 1 Ainv = np.linalg.pinv(A) P = np.einsum('nmp,np->nm', Ainv, b).T ################################################################## # Normalize the low-rank approximation. ################################################################## for k in range(nvec): P[k] /= np.linalg.norm(P[k]) if singular_matrix > 0: print('number of singular matrices encountered:{}'.format(singular_matrix)) return model