Source code for pyklip.klip

import numpy as np
import numpy.fft as fft
import scipy.linalg as la
import scipy.ndimage as ndimage
import scipy.interpolate as sinterp
from scipy.stats import t
import warnings

[docs] def make_polar_coordinates(x, y, center=[0,0]): ''' Args: x: meshgrid of x coordinates y: meshgrid of y coordinates center: new location of origin Returns: polar coordinates centered at the specified origin ''' r = np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) phi = np.arctan2(y - center[1], x - center[0]) # TODO: np.arctan2 already ensures return value to be in range [-pi, pi) # So the following line of code is not necessary # However, since the following line introduces an offset of -pi, if eliminating the next line of code, # will need to make sure all of pyklip's reduction is unaffected by eliminating this offset # make sure phi is in range [-pi, pi) phi = (phi % (2 * np.pi)) - np.pi return r, phi
[docs] def collapse_data(data, pixel_weights=None, axis=1, collapse_method='mean'): """ Function to collapse multi-dimensional data along axis using collapse_method Args: data: (multi-dimension)arrays of 2D images or 3D cubes. pixel_weights: ones if collapse method is not weighted collapse axis: axis index along which to collapse collapse_method: currently support 'median', 'mean', 'weighted-mean', 'trimmed-mean', 'weighted-median' Returns: Collapsed data """ with warnings.catch_warnings(): warnings.simplefilter("ignore", category=RuntimeWarning) if collapse_method.lower() == 'median': return np.nanmedian(data, axis=axis) elif collapse_method.lower() == 'mean': return np.nanmean(data, axis=axis) elif ('weighted' in collapse_method.lower()) and ('mean' in collapse_method.lower()): if pixel_weights is None: pixel_weights = np.ones(data.shape) collapsed_data = np.nanmean(pixel_weights * data, axis=axis) collapsed_data /= np.nanmean(pixel_weights, axis=axis) return collapsed_data elif ('trimmed' in collapse_method.lower()) and ('mean' in collapse_method.lower()): collapsed = np.sort(data, axis=axis) collapsed = np.take(collapsed, range(2, collapsed.shape[axis] - 2), axis=axis) collapsed = np.nanmean(collapsed, axis=axis) return collapsed elif ('weighted' in collapse_method.lower()) and ('median' in collapse_method.lower()): if pixel_weights is None: pixel_weights = np.ones(data.shape) collapsed_data = np.nanmedian(pixel_weights * data, axis=axis) collapsed_data /= np.nanmedian(pixel_weights, axis=axis) return collapsed_data else: # default to mean collapse if input does not match any supported pattern print('{} collapse not supported, using mean collapse instead...'.format(collapse_method)) return np.nanmean(data, axis=axis)
[docs] def klip_math(sci, ref_psfs, numbasis, covar_psfs=None, return_basis=False, return_basis_and_eig=False): """ Helper function for KLIP that does the linear algebra Args: sci: array of length p containing the science data ref_psfs: N x p array of the N reference PSFs that characterizes the PSF of the p pixels numbasis: number of KLIP basis vectors to use (can be an int or an array of ints of length b) covar_psfs: covariance matrix of reference psfs passed in so you don't have to calculate it here return_basis: If true, return KL basis vectors (used when onesegment==True) return_basis_and_eig: If true, return KL basis vectors as well as the eigenvalues and eigenvectors of the covariance matrix. Used for KLIP Forward Modelling of Laurent Pueyo. Returns: sub_img_rows_selected: array of shape (p,b) that is the PSF subtracted data for each of the b KLIP basis cutoffs. If numbasis was an int, then sub_img_row_selected is just an array of length p KL_basis: array of shape (max(numbasis),p). Only if return_basis or return_basis_and_eig is True. evals: Eigenvalues of the covariance matrix. The covariance matrix is assumed NOT to be normalized by (p-1). Only if return_basis_and_eig is True. evecs: Eigenvectors of the covariance matrix. The covariance matrix is assumed NOT to be normalized by (p-1). Only if return_basis_and_eig is True. """ # for the science image, subtract the mean and mask bad pixels sci_mean_sub = sci - np.nanmean(sci) # sci_nanpix = np.where(np.isnan(sci_mean_sub)) # sci_mean_sub[sci_nanpix] = 0 # do the same for the reference PSFs # playing some tricks to vectorize the subtraction ref_psfs_mean_sub = ref_psfs - np.nanmean(ref_psfs, axis=1)[:, None] ref_psfs_mean_sub[np.where(np.isnan(ref_psfs_mean_sub))] = 0 # calculate the covariance matrix for the reference PSFs # note that numpy.cov normalizes by p-1 to get the NxN covariance matrix # we have to correct for that a few lines down when consturcting the KL # vectors since that's not part of the equation in the KLIP paper if covar_psfs is None: covar_psfs = np.cov(ref_psfs_mean_sub) # maximum number of KL modes tot_basis = covar_psfs.shape[0] # only pick numbasis requested that are valid. We can't compute more KL basis than there are reference PSFs # do numbasis - 1 for ease of indexing since index 0 is using 1 KL basis vector numbasis = np.clip(numbasis - 1, 0, tot_basis-1) # clip values, for output consistency we'll keep duplicates max_basis = np.max(numbasis) + 1 # maximum number of eigenvectors/KL basis we actually need to use/calculate # calculate eigenvalues and eigenvectors of covariance matrix, but only the ones we need (up to max basis) evals, evecs = la.eigh(covar_psfs, eigvals=(tot_basis-max_basis, tot_basis-1)) # check if there are negative eignevalues as they will cause NaNs later that we have to remove # the eigenvalues are ordered smallest to largest #check_nans = evals[-1] < 0 # currently this checks that *all* the evals are neg, but we want just one. # also, include 0 because that is a bad value too check_nans = np.any(evals <= 0) # alternatively, check_nans = evals[0] <= 0 # scipy.linalg.eigh spits out the eigenvalues/vectors smallest first so we need to reverse # we're going to recopy them to hopefully improve caching when doing matrix multiplication evals = np.copy(evals[::-1]) evecs = np.copy(evecs[:,::-1], order='F') #fortran order to improve memory caching in matrix multiplication # keep an index of the negative eignevalues for future reference if there are any if check_nans: neg_evals = (np.where(evals <= 0))[0] # calculate the KL basis vectors kl_basis =, evecs) # JB question: Why is there this [None, :]? (It adds an empty first dimension) kl_basis = kl_basis * (1. / np.sqrt(evals * (np.size(sci) - 1)))[None, :] #multiply a value for each row # sort to KL basis in descending order (largest first) # kl_basis = kl_basis[:,eig_args_all] # duplicate science image by the max_basis to do simultaneous calculation for different k_KLIP sci_mean_sub_rows = np.tile(sci_mean_sub, (max_basis, 1)) sci_rows_selected = np.tile(sci_mean_sub, (np.size(numbasis), 1)) # this is the output image which has less rows # bad pixel mask # do it first for the image we're just doing computations on but don't care about the output sci_nanpix = np.where(np.isnan(sci_mean_sub_rows)) sci_mean_sub_rows[sci_nanpix] = 0 # now do it for the output image sci_nanpix = np.where(np.isnan(sci_rows_selected)) sci_rows_selected[sci_nanpix] = 0 # do the KLIP equation, but now all the different k_KLIP simultaneously # calculate the inner product of science image with each of the different kl_basis vectors # TODO: can we optimize this so it doesn't have to multiply all the rows because in the next lines we only select some of them inner_products =, np.require(kl_basis, requirements=['F'])) # select the KLIP modes we want for each level of KLIP by multiplying by lower diagonal matrix lower_tri = np.tril(np.ones([max_basis, max_basis])) inner_products = inner_products * lower_tri # if there are NaNs due to negative eigenvalues, make sure they don't mess up the matrix multiplicatoin # by setting the appropriate values to zero if check_nans: needs_to_be_zeroed = np.where(lower_tri == 0) inner_products[needs_to_be_zeroed] = 0 # make a KLIP PSF for each amount of klip basis, but only for the amounts of klip basis we actually output kl_basis[:, neg_evals] = 0 klip_psf =[numbasis,:], kl_basis.T) # for KLIP PSFs that use so many KL modes that they become nans, we have to put nan's back in those badbasis = np.where(numbasis >= np.min(neg_evals)) #use basis with negative eignevalues klip_psf[badbasis[0], :] = np.nan else: # make a KLIP PSF for each amount of klip basis, but only for the amounts of klip basis we actually output klip_psf =[numbasis,:], kl_basis.T) # make subtracted image for each number of klip basis sub_img_rows_selected = sci_rows_selected - klip_psf # restore NaNs sub_img_rows_selected[sci_nanpix] = np.nan if return_basis is True: return sub_img_rows_selected.transpose(), kl_basis.transpose() elif return_basis_and_eig is True: return sub_img_rows_selected.transpose(), kl_basis.transpose(),evals*(np.size(sci)-1), evecs else: return sub_img_rows_selected.transpose()
# old code that only did one number of KL basis for truncation # #truncation either based on user input or maximum number of PSFs # trunc_basis = np.min([numbasis, tot_basis]) # #eigenvalues are ordered largest first now # eig_args = eig_args_all[0: trunc_basis] # kl_basis = kl_basis[:, eig_args] # # #project KL vectors onto science image to construct model PSF # inner_products =, kl_basis) # klip_psf =, kl_basis.T) # # #subtract from original image to get final image # sub_img = sci_mean_sub - klip_psf # # #restore NANs # sub_img[sci_nanpix] = np.nan # # #pdb.set_trace() # # return sub_img
[docs] def estimate_movement(radius, parang0=None, parangs=None, wavelength0=None, wavelengths=None, mode=None): """ Estimates the movement of a hypothetical astrophysical source in ADI and/or SDI at the given radius and given reference parallactic angle (parang0) and reference wavelegnth (wavelength0) Args: radius: the radius from the star of the hypothetical astrophysical source parang0: the parallactic angle of the reference image (in degrees) parangs: array of length N of the parallactic angle of all N images (in degrees) wavelength0: the wavelength of the reference image wavelengths: array of length N of the wavelengths of all N images NOTE: we expect parang0 and parangs to be either both defined or both None. Same with wavelength0 and wavelengths mode: one of ['ADI', 'SDI', 'ADI+SDI'] for ADI, SDI, or ADI+SDI Returns: moves: array of length N of the distance an astrophysical source would have moved from the reference image """ #default no movement parameters dtheta = 0 # how much the images moved in theta (polar coordinate) scale_fac = 1 # how much the image moved radially (r/radius) if (parang0 is not None): dtheta = np.radians(parang0 - parangs) if (wavelength0 is not None): scale_fac = (wavelength0/wavelengths) #define cartesean coordinate system where astrophysical source is at (x,y) = (r,0) x0 = radius y0 = 0. #find x,y location of astrophysical source for the rest of the images r = radius * scale_fac x = r * np.cos(dtheta) y = r * np.sin(dtheta) moves = np.sqrt((x-x0)**2 + (y-y0)**2) return moves
[docs] def calc_scaling(sats, refwv=18): """ Helper function that calculates the wavelength scaling factor from the satellite spot locations. Uses the movement of spots diagonally across from each other, to calculate the scaling in a (hopefully? tbd.) centering-independent way. This method is definitely temporary and will be replaced by better scaling strategies as we come up with them. Scaling is calculated as the average of (1/2 * sqrt((x_1-x_2)**2+(y_1-y_2))), over the two pairs of spots. Args: sats: [4 x Nlambda x 2] array of x and y positions for the 4 satellite spots refwv: reference wavelength for scaling (optional, default = 20) Returns: scaling_factors: Nlambda array of scaling factors """ pairs = [(0,3), (1,2)] # diagonally-located spots (spot_num - 1 for indexing) separations = np.mean([0.5*np.sqrt(np.diff(sats[p,:,0], axis=0)[0]**2 + np.diff(sats[p,:,1], axis=0)[0]**2) for p in pairs], axis=0) # average over each pair, the first axis scaling_factors = separations/separations[refwv] return scaling_factors
[docs] def nan_map_coordinates_2d(img, yp, xp, mc_kwargs=None): """ scipy.ndimage.map_coordinates() that handles nans for 2-D transformations. Only works in 2-D! Do NaN detection by defining any pixel in the new coordiante system (xp, yp) as a nan If any one of the neighboring pixels in the original image is a nan (e.g. (xp, yp) = (120.1, 200.1) is nan if either (120, 200), (121, 200), (120, 201), (121, 201) is a nan) Args: img (np.array): 2-D image that is looking to be transformed yp (np.array): 2-D array of y-coordinates that the image is evaluated out xp (np.array): 2-D array of x-coordinates that the image is evaluated out mc_kwargs (dict): other parameters to pass into the map_coordinates function. Returns: transformed_img (np.array): 2-D transformed image. Each pixel is evaluated at the (yp, xp) specified by xp and yp. """ # check if optional parameters are passed in if mc_kwargs is None: mc_kwargs = {} # if nothing specified, we will pad transformations with np.nan if "cval" not in mc_kwargs: mc_kwargs["cval"] = np.nan # check all four pixels around each pixel and see whether they are nans xp_floor = np.clip(np.floor(xp).astype(int), 0, img.shape[1]-1) xp_ceil = np.clip(np.ceil(xp).astype(int), 0, img.shape[1]-1) yp_floor = np.clip(np.floor(yp).astype(int), 0, img.shape[0]-1) yp_ceil = np.clip(np.ceil(yp).astype(int), 0, img.shape[0]-1) rotnans = np.where(np.isnan(img[yp_floor.ravel(), xp_floor.ravel()]) | np.isnan(img[yp_floor.ravel(), xp_ceil.ravel()]) | np.isnan(img[yp_ceil.ravel(), xp_floor.ravel()]) | np.isnan(img[yp_ceil.ravel(), xp_ceil.ravel()])) # resample image based on new coordinates, set nan values as median nanpix = np.where(np.isnan(img)) medval = np.nanmedian(img) img_copy = np.copy(img) img_copy[nanpix] = medval transformed_img = ndimage.map_coordinates(img_copy, [yp, xp], **mc_kwargs) # mask nans img_shape = transformed_img.shape transformed_img.shape = [img_shape[0] * img_shape[1]] transformed_img[rotnans] = np.nan transformed_img.shape = img_shape return transformed_img
[docs] def align_and_scale(img, new_center, old_center=None, scale_factor=1, dtype=float): """ Helper function that realigns and/or scales the image Args: img: 2D image to perform manipulation on new_center: 2 element tuple (xpos, ypos) of new image center old_center: 2 element tuple (xpos, ypos) of old image center scale_factor: how much the stretch/contract the image. Will we scaled w.r.t the new_center (done after relaignment). We will adopt the convention >1: stretch image (shorter to longer wavelengths) <1: contract the image (longer to shorter wvs) This means scale factor should be lambda_0/lambda where lambda_0 is the wavelength you want to scale to Returns: resampled_img: shifted and/or scaled 2D image """ #create the coordinate system of the image to manipulate for the transform dims = img.shape x, y = np.meshgrid(np.arange(dims[1], dtype=dtype), np.arange(dims[0], dtype=dtype)) mod_flag = 0 #check how many modifications we are making #if old_center is specified, realign the images if ((old_center is not None) & ~(np.array_equal(new_center, old_center))): dx = new_center[0] - old_center[0] dy = new_center[1] - old_center[1] x -= dx y -= dy mod_flag += 1 #6/10/2016: Next line is a Bug fix new_center = old_center #if scale_factor is specified, scale the images if scale_factor != 1: #conver to polar for scaling r = np.sqrt((x - new_center[0]) ** 2 + (y - new_center[1]) ** 2) theta = np.arctan2(y - new_center[1], x - new_center[0]) #theta range is [-pi,pi] #Because x and y are the coordinates where we want to interpolate in the original image. See the following lines r /= scale_factor #convert back to cartesian x = r * np.cos(theta) + new_center[0] y = r * np.sin(theta) + new_center[1] mod_flag += 1 #if nothing is to be changed, return a copy of the image if mod_flag == 0: return np.copy(img) resampled_img = nan_map_coordinates_2d(img, y, x) return resampled_img
[docs] def rotate(img, angle, center, new_center=None, flipx=False, astr_hdr=None): """ Rotate an image by the given angle about the given center. Optional: can shift the image to a new image center after rotation. Also can reverse x axis for those left handed astronomy coordinate systems Args: img: a 2D image angle: angle CCW to rotate by (degrees) center: 2 element list [x,y] that defines the center to rotate the image to respect to new_center: 2 element list [x,y] that defines the new image center after rotation flipx: reverses x axis after rotation astr_hdr: wcs astrometry header for the image Returns: resampled_img: new 2D image """ # skip this step if img is all nans if np.size(np.where(~np.isnan(img))) == 0: return np.copy(img) #convert angle to radians angle_rad = np.radians(angle) #create the coordinate system of the image to manipulate for the transform dims = img.shape x, y = np.meshgrid(np.arange(dims[1], dtype=float), np.arange(dims[0], dtype=float)) #if necessary, move coordinates to new center if new_center is not None: dx = new_center[0] - center[0] dy = new_center[1] - center[1] x -= dx y -= dy #flip x if needed to get East left of North if flipx is True: x = center[0] - (x - center[0]) #do rotation. CW rotation formula to get a CCW of the image xp = (x-center[0])*np.cos(angle_rad) + (y-center[1])*np.sin(angle_rad) + center[0] yp = -(x-center[0])*np.sin(angle_rad) + (y-center[1])*np.cos(angle_rad) + center[1] resampled_img = nan_map_coordinates_2d(img, yp, xp) #edit the astrometry header if given to compensate for orientation if astr_hdr is not None: _rotate_wcs_hdr(astr_hdr, angle, flipx=flipx) return resampled_img
def _rotate_wcs_hdr(wcs_header, rot_angle, flipx=False, flipy=False): """ Modifies the wcs header when rotating/flipping an image. Args: wcs_header: wcs astrometry header rot_angle: in degrees CCW, the specified rotation desired flipx: after the rotation, reverse x axis? Yes if True flipy: after the rotation, reverse y axis? Yes if True """ # rotate WCS header by a rotation matrix rot_angle_rad = np.radians(rot_angle) cos_rot = np.cos(rot_angle_rad) sin_rot = np.sin(rot_angle_rad) rot_matrix = np.array([[cos_rot, sin_rot], [-sin_rot, cos_rot]]) =, rot_matrix) # flip RA if true to be North up East left if flipx is True:[:,0] *= -1 if flipy is True:[:,1] *= -1
[docs] def meas_contrast(dat, iwa, owa, resolution, center=None, low_pass_filter=True): """ Measures the contrast in the image. Image must already be in contrast units and should be corrected for algorithm thoughput. Args: dat: 2D image - already flux calibrated iwa: inner working angle owa: outer working angle resolution: size of noise resolution element in pixels (for speckle noise ~ FWHM or lambda/D) but it can be 1 pixel if limited by pixel-to-pixel noise. center: location of star (x,y). If None, defaults the image size // 2. low_pass_filter: if True, run a low pass filter. Can also be a float which specifices the width of the Gaussian filter (sigma). If False, no Gaussian filter is run Returns: (seps, contrast): tuple of separations in pixels and corresponding 5 sigma FPF """ if center is None: starx = dat.shape[1]//2 stary = dat.shape[0]//2 else: starx, stary = center # figure out how finely to sample the radial profile dr = resolution/2.0 numseps = int((owa-iwa)/dr) # don't want to start right at the edge of the occulting mask # but also want to well sample the contrast curve so go at twice the resolution seps = np.arange(numseps) * dr + iwa + resolution/2.0 dsep = resolution # find equivalent Gaussian PSF for this resolution # run a low pass filter on the data, check if input is boolean or a number if not isinstance(low_pass_filter, bool): # manually passed in low pass filter size sigma = low_pass_filter filtered = nan_gaussian_filter(dat, sigma) elif low_pass_filter: # set low pass filter size to be same as resolution element sigma = dsep / 2.355 # assume resolution element size corresponds to FWHM filtered = nan_gaussian_filter(dat, sigma) else: # no filtering filtered = dat contrast = [] # create a coordinate grid x,y = np.meshgrid(np.arange(float(dat.shape[1])), np.arange(float(dat.shape[0]))) r = np.sqrt((x-starx)**2 + (y-stary)**2) theta = np.arctan2(y-stary, x-starx) % 2*np.pi for sep in seps: # calculate noise in an annulus with width of the resolution element annulus = np.where((r < sep + resolution/2) & (r > sep - resolution/2)) noise_mean = np.nanmean(filtered[annulus]) noise_std = np.nanstd(filtered[annulus], ddof=1) # account for small sample statistics num_samples = int(np.floor(2*np.pi*sep/resolution)) # find 5 sigma flux using student-t statistics # Correction based on Mawet et al. 2014 fpf_flux = t.ppf(0.99999971334, num_samples-1, scale=noise_std) * np.sqrt(1 + 1./num_samples) + noise_mean contrast.append(fpf_flux) return seps, np.array(contrast)
[docs] def nan_gaussian_filter(img, sigma, ivar=None): """ Gaussian low-pass filter that handles nans Args: img: 2-D image sigma: float specifiying width of Gaussian ivar: inverse variance frame for the image, optional Returns: filtered: 2-D image that has been smoothed with a Gaussian """ if ivar is None: ivar = np.ones(img.shape) if img.shape != ivar.shape or len(img.shape) != 2: raise ValueError("image and ivar must be 2D ndarrays of the same shape") # make a copy to mask with nans masked = np.copy(img) masked_ivar = np.copy(ivar) nan_locs = np.where(np.isnan(img) | np.isnan(ivar)) masked[nan_locs] = 0 masked_ivar[nan_locs] = 0 # filter the image filtered = ndimage.gaussian_filter(masked * masked_ivar, sigma=sigma, truncate=4) # because of NaNs, we need to renormalize the gaussian filter, since NaNs shouldn't contribute filter_norm = ndimage.gaussian_filter(masked_ivar, sigma=sigma, truncate=4) filtered /= filter_norm filtered[nan_locs] = np.nan # for some reason, the fitlered image peak pixel fluxes get decreased by 2 # (2020-09-10: checking the gain for each image, it seems this is no longer a problem, next line is commented out) #filtered *= 2 return filtered
[docs] def high_pass_filter(img, filtersize=10): """ A FFT implmentation of high pass filter. Args: img: a 2D image filtersize: size in Fourier space of the size of the space. In image space, size=img_size/filtersize Returns: filtered: the filtered image """ # mask NaNs if there are any nan_index = np.where(np.isnan(img)) if np.size(nan_index) > 0: good_index = np.where(~np.isnan(img)) y, x = np.indices(img.shape) good_coords = np.array([x[good_index], y[good_index]]).T # shape of Npix, ndimage nan_fixer = sinterp.NearestNDInterpolator(good_coords, img[good_index]) fixed_dat = nan_fixer(x[nan_index], y[nan_index]) img[nan_index] = fixed_dat transform = fft.fft2(img) # coordinate system in FFT image u,v = np.meshgrid(fft.fftfreq(transform.shape[1]), fft.fftfreq(transform.shape[0])) # scale u,v so it has units of pixels in FFT space rho = np.sqrt((u*transform.shape[1])**2 + (v*transform.shape[0])**2) # scale rho up so that it has units of pixels in FFT space # rho *= transform.shape[0] # create the filter filt = 1. - np.exp(-(rho**2/filtersize**2)) filtered = np.real(fft.ifft2(transform*filt)) # restore NaNs filtered[nan_index] = np.nan img[nan_index] = np.nan return filtered
[docs] def define_annuli_bounds(annuli, IWA, OWA, annuli_spacing='constant'): """ Defines the annuli boundaries radially Args: annuli: number of annuli IWA: inner working angle (pixels) OWA: outer working anglue (pixels) annuli_spacing: how to distribute the annuli radially. Currently three options. Constant (equally spaced), log (logarithmical expansion with r), and linear (linearly expansion with r) Returns: rad_bounds: array of 2-element tuples that specify the beginning and end radius of that annulus """ #calculate the annuli ranges if annuli_spacing.lower() == "constant": dr = float(OWA - IWA) / annuli rad_bounds = [(dr * rad + IWA, dr * (rad + 1) + IWA) for rad in range(annuli)] elif annuli_spacing.lower() == "log": # calculate normalization of log scaling unnormalized_log_scaling = np.log(np.arange(annuli) + 1) + 1 log_coeff = float(OWA - IWA)/np.sum(unnormalized_log_scaling) # construct the radial spacing rad_bounds = [] for i in range(annuli): # lower bound is either mask or end of previous annulus if i == 0: lower_bound = IWA else: lower_bound = rad_bounds[-1][1] upper_bound = lower_bound + log_coeff * unnormalized_log_scaling[i] rad_bounds.append((lower_bound, upper_bound)) elif annuli_spacing.lower() == "linear": # scale linaer scaling to OWA-IWA linear_coeff = float(OWA - IWA)/np.sum(np.arange(annuli) + 1) rad_bounds = [(IWA + linear_coeff * rad, IWA + linear_coeff * (rad + 1)) for rad in range(annuli)] else: raise ValueError("annuli_spacing currently only supports 'constant', 'log', or 'linear'") # check to make sure the annuli are all greater than 1 pixel min_width = np.min(np.diff(rad_bounds, axis=1)) if min_width < 1: raise ValueError("Too many annuli, some annuli are less than 1 pixel") return rad_bounds