Source code for tools21cm.power_spectrum

'''
Contains functions to estimate various two point statistics.
'''

import numpy as np, gc
from . import const, conv, cosmo, smoothing
from .helper_functions import print_msg, get_eval
from .scipy_func import numpy_product
from scipy import fftpack, stats, interpolate

def apply_window(input_array, window):
    if window is None: return input_array
    from scipy.signal import windows
    if window.lower()=='blackmanharris':
            input_array *= windows.blackmanharris(input_array.shape[-1])[None,None,:]
    elif window.lower()=='tukey':
            input_array *= windows.tukey(input_array.shape[-1])[None,None,:]
    else:
            input_array *= window
    return input_array


[docs] def power_spectrum_nd(input_array, box_dims=None, verbose=False, **kwargs): ''' Calculate the power spectrum of input_array and return it as an n-dimensional array. Parameters: input_array (numpy array): the array to calculate the power spectrum of. Can be of any dimensions. box_dims = None (float or array-like): the dimensions of the box in Mpc. If this is None, the current box volume is used along all dimensions. If it is a float, this is taken as the box length along all dimensions. If it is an array-like, the elements are taken as the box length along each axis. Returns: The power spectrum in the same dimensions as the input array. ''' if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array.shape) if(verbose): print( 'Calculating power spectrum...') ft = fftpack.fftshift(fftpack.fftn(input_array.astype('float64'))) power_spectrum = np.abs(ft)**2 if(verbose): print( '...done') # scale boxvol = numpy_product(box_dims) pixelsize = boxvol/(numpy_product(input_array.shape)) power_spectrum *= pixelsize**2/boxvol return power_spectrum
[docs] def cross_power_spectrum_nd(input_array1, input_array2, box_dims, **kwargs): ''' Calculate the cross power spectrum two arrays and return it as an n-dimensional array. Parameters: input_array1 (numpy array): the first array to calculate the power spectrum of. Can be of any dimensions. input_array2 (numpy array): the second array. Must have same dimensions as input_array1. box_dims = None (float or array-like): the dimensions of the box in Mpc. If this is None, the current box volume is used along all dimensions. If it is a float, this is taken as the box length along all dimensions. If it is an array-like, the elements are taken as the box length along each axis. Returns: The cross power spectrum in the same dimensions as the input arrays. TODO: Also return k values. ''' assert(input_array1.shape == input_array2.shape) if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array1.shape) print_msg( 'Calculating power spectrum...') ft1 = fftpack.fftshift(fftpack.fftn(input_array1.astype('float64'))) ft2 = fftpack.fftshift(fftpack.fftn(input_array2.astype('float64'))) power_spectrum = np.real(ft1)*np.real(ft2)+np.imag(ft1)*np.imag(ft2) print_msg( '...done') # scale boxvol = numpy_product(box_dims) pixelsize = boxvol/(numpy_product(input_array1.shape)) power_spectrum *= pixelsize**2/boxvol return power_spectrum
def _adaptive_bin_edges(modes, n_bins, log=True): ''' Compute bin edges so that every bin contains at least one discrete mode. log=True : edges at geometric midpoints (suitable for log-spaced bin centres). log=False : edges at arithmetic midpoints (suitable for linear bin centres). Returns an array of length min(len(modes), n_bins) + 1. ''' n_modes = len(modes) if n_modes == 0: return None def _mid(a, b): return float(np.sqrt(a * b)) if log else float((a + b) / 2.0) if n_modes <= n_bins: edges = [modes[0] * 0.5] for i in range(n_modes - 1): edges.append(_mid(modes[i], modes[i + 1])) edges.append(modes[-1] * 2.0) else: idx = np.unique(np.round(np.linspace(0, n_modes, n_bins + 1)).astype(int)) idx = np.clip(idx, 0, n_modes) edges = [modes[0] * 0.5] for i in range(1, len(idx) - 1): edges.append(_mid(modes[idx[i] - 1], modes[idx[i]])) edges.append(modes[-1] * 2.0) return np.array(edges)
[docs] def radial_average(input_array, box_dims, kbins=10, binning='log', kmax=None, fill_empty_bins=True, **kwargs): ''' Radially average data. Mostly for internal use. Parameters: input_array (numpy array): the data array box_dims (float or array-like): box dimensions in Mpc. kbins = 10 (integer or array-like): number of bins or explicit bin edges. binning = 'log' : binning scheme ('log', 'linear', 'adaptive', 'adaptive_linear', 'kde', 'kde_linear'). kmax = None (float): upper k bin edge (from _resolve_k_limit). fill_empty_bins = True : fill zero-mode bins via 1D nearest-neighbour. Returns: (data, bin_centers, n_modes) ''' if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') k_comp, k = _get_k(input_array, box_dims) # ---- KDE path (log-k or linear-k) ---- if binning in ('kde', 'kde_linear'): use_log = (binning == 'kde') n_bins = kbins if isinstance(kbins, int) else len(kbins) - 1 valid = k.flatten() > 0 kv = np.log10(k.flatten()[valid]) if use_log else k.flatten()[valid] kmax_s = (np.log10(kmax) if use_log else kmax) if kmax is not None else kv.max() k_mid_s = np.linspace(kv.min(), kmax_s, n_bins) bw = float(np.ptp(k_mid_s)) / max(n_bins - 1, 1) pw = input_array.flatten()[valid] ps_out = np.empty(n_bins) nm_out = np.empty(n_bins) for i, kc in enumerate(k_mid_s): w = np.exp(-(kv - kc)**2 / (2.0 * bw**2)) wsum = w.sum() ps_out[i] = (pw * w).sum() / wsum if wsum > 0 else np.nan nm_out[i] = float(np.sum(np.abs(kv - kc) <= bw)) k_mid_out = np.power(10, k_mid_s) if use_log else k_mid_s return ps_out, k_mid_out, nm_out # ---- Adaptive / tophat path ---- if binning in ('adaptive', 'adaptive_linear'): k_modes = np.unique(k[k > 0]) if kmax is not None: k_modes = k_modes[k_modes <= kmax] n_bins = kbins if isinstance(kbins, int) else len(kbins) - 1 kbins_edges = _adaptive_bin_edges(k_modes, n_bins, log=(binning == 'adaptive')) else: kbins_edges = _get_kbins(kbins, box_dims, k, binning=binning, kmax=kmax) dk = (kbins_edges[1:] - kbins_edges[:-1]) / 2. outdata = np.histogram(k.flatten(), bins=kbins_edges, weights=input_array.flatten())[0].astype(float) n_modes = np.histogram(k.flatten(), bins=kbins_edges)[0].astype(float) outdata /= n_modes if fill_empty_bins: nan_mask = np.isnan(outdata) if nan_mask.any() and not nan_mask.all(): non_nan = np.where(~nan_mask)[0] for ni in np.where(nan_mask)[0]: nearest = non_nan[np.argmin(np.abs(non_nan - ni))] outdata[ni] = outdata[nearest] if binning == 'adaptive': k_mid = np.sqrt(kbins_edges[:-1] * kbins_edges[1:]) elif binning == 'adaptive_linear': k_mid = (kbins_edges[:-1] + kbins_edges[1:]) / 2.0 else: k_mid = kbins_edges[:-1] + dk return outdata, k_mid, n_modes
[docs] def power_spectrum_1d(input_array_nd, kbins=100, box_dims=None, return_n_modes=False, binning='log', window=None, k_limit=None, fill_empty_bins=True, **kwargs): ''' Calculate the spherically averaged power spectrum of an array and return it as a one-dimensional array. Parameters: input_array_nd (numpy array): the data array kbins = 100 (integer or array-like): number of bins or explicit bin edges. box_dims = None (float or array-like): box dimensions in Mpc. return_n_modes = False (bool): if True, also return n_modes array. binning = 'log' : binning scheme. Options: 'log' — uniform log-spaced bins (default) 'linear' — uniform linear-spaced bins 'adaptive' — edges at geometric midpoints between discrete k-modes, centers at geometric means; no empty bins 'adaptive_linear' — same but arithmetic midpoints/means (linear-k) 'kde' — Gaussian kernel in log-k space (bandwidth = 1 bin spacing); avoids empty bins, mild cross-bin mixing 'kde_linear' — Gaussian kernel in linear-k space window = None : taper ('blackmanharris', 'tukey', or array). k_limit = None : upper k limit (None, 'nyquist', or 'aliasing'). fill_empty_bins = True : fill zero-mode bins via 1D nearest-neighbour after tophat/adaptive binning. No effect for kde variants. Returns: (Pk, bins) or (Pk, bins, n_modes) if return_n_modes is True. ''' input_array_nd = apply_window(input_array_nd, window) if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array_nd.shape) kmax = _resolve_k_limit(k_limit, box_dims, input_array_nd.shape) input_array = power_spectrum_nd(input_array_nd, box_dims=box_dims) ps, bins, n_modes = radial_average(input_array, kbins=kbins, box_dims=box_dims, binning=binning, kmax=kmax, fill_empty_bins=fill_empty_bins) if return_n_modes: return ps, bins, n_modes return ps, bins
[docs] def power_spectrum_2d(input_array, kbins=10, binning='log', box_dims=244/.7, return_modes=False, nu_axis=2, window=None, k_limit=None, fill_empty_bins=True, **kwargs): ''' Calculate the power spectrum and bin it in kper and kpar. Parameters: input_array (numpy array): the data array nu_axis = 2 (integer): the line-of-sight axis kbins = 10 (integer or [n_kper, n_kpar]): number of bins per direction. box_dims = 244/.7 (float or array-like): box dimensions in Mpc. return_modes = False (bool): if True, also return n_modes array. binning = 'log' : binning scheme. Options: 'log' — uniform log-spaced bins (default) 'linear' — uniform linear-spaced bins 'adaptive' — edges at geometric midpoints between discrete k-modes, centers at geometric means; no empty bins 'adaptive_linear' — same but arithmetic midpoints/means (linear-k) 'kde' — Gaussian kernel in log-k space (bandwidth = 1 bin spacing); avoids empty bins, mild cross-bin mixing 'kde_linear' — Gaussian kernel in linear-k space window = None : taper ('blackmanharris', 'tukey', or array). k_limit = None : upper k limit (None, 'nyquist', or 'aliasing'). fill_empty_bins = True : after tophat/adaptive binning, fill any remaining NaN (zero-mode) bins via 2D nearest-neighbour in index space. Set False to keep NaN. Ignored for kde variants. Returns: (Pk, kper_mid, kpar_mid) or (Pk, kper_mid, kpar_mid, n_modes). ''' input_array = apply_window(input_array, window) if type(kbins) == list: binning = None elif np.array(kbins).size == 1: kbins = [kbins, kbins] elif not isinstance(kbins[0], int): binning = None if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array.shape) power = power_spectrum_nd(input_array, box_dims) k_xyz, k = _get_k(input_array, box_dims) xy_axis = [0, 1, 2] xy_axis.remove(nu_axis) kz = np.abs(k_xyz[nu_axis]) kp = np.sqrt(k_xyz[xy_axis[0]]**2 + k_xyz[xy_axis[1]]**2) del k_xyz, k gc.collect() # Per-direction Nyquist limits if k_limit is not None: shape = np.array(input_array.shape) bdims = np.array(box_dims, dtype=float) k_ny_all = np.pi * shape / bdims kl = k_limit.lower() if kl == 'nyquist': factor = 1.0 elif kl == 'aliasing': factor = 0.5 else: raise ValueError(f"k_limit must be None, 'nyquist', or 'aliasing'; got {k_limit!r}") kmax_per = float(k_ny_all[xy_axis].min()) * factor kmax_par = float(k_ny_all[nu_axis]) * factor kmax_per = min(kmax_per, kp.max()) kmax_par = min(kmax_par, kz.max()) else: kmax_per = kp.max() kmax_par = kz.max() del xy_axis # ---- KDE path (log-k or linear-k) ---- if binning in ('kde', 'kde_linear'): use_log = (binning == 'kde') valid = (kp.flatten() > 0) & (kz.flatten() > 0) kpv = np.log10(kp.flatten()[valid]) if use_log else kp.flatten()[valid] kzv = np.log10(kz.flatten()[valid]) if use_log else kz.flatten()[valid] kper_max_s = np.log10(kmax_per) if use_log else kmax_per kpar_max_s = np.log10(kmax_par) if use_log else kmax_par kper_mid_s = np.linspace(kpv.min(), kper_max_s, kbins[0]) kpar_mid_s = np.linspace(kzv.min(), kpar_max_s, kbins[1]) bw_per = float(np.ptp(kper_mid_s)) / max(kbins[0] - 1, 1) bw_par = float(np.ptp(kpar_mid_s)) / max(kbins[1] - 1, 1) pw = power.flatten()[valid] ps_out = np.empty((kbins[0], kbins[1])) for i, kpc in enumerate(kper_mid_s): w_per = np.exp(-(kpv - kpc)**2 / (2.0 * bw_per**2)) for j, kzc in enumerate(kpar_mid_s): w = w_per * np.exp(-(kzv - kzc)**2 / (2.0 * bw_par**2)) wsum = w.sum() ps_out[i, j] = (pw * w).sum() / wsum if wsum > 0 else np.nan kper_mid = np.power(10, kper_mid_s) if use_log else kper_mid_s kpar_mid = np.power(10, kpar_mid_s) if use_log else kpar_mid_s if return_modes: edges_per = np.linspace(kper_mid_s[0] - bw_per / 2, kper_mid_s[-1] + bw_per / 2, kbins[0] + 1) edges_par = np.linspace(kpar_mid_s[0] - bw_par / 2, kpar_mid_s[-1] + bw_par / 2, kbins[1] + 1) nm = stats.binned_statistic_2d(kpv, kzv, None, statistic='count', bins=[edges_per, edges_par]) return ps_out, kper_mid, kpar_mid, nm.statistic return ps_out, kper_mid, kpar_mid # ---- Tophat / adaptive path ---- if binning is None: kper = np.array(kbins[0]) kpar = np.array(kbins[1]) elif binning == 'log': kper = np.linspace(np.log10(kp[kp != 0].min()), np.log10(kmax_per), kbins[0] + 1) kpar = np.linspace(np.log10(kz[kz != 0].min()), np.log10(kmax_par), kbins[1] + 1) kp, kz = np.log10(kp), np.log10(kz) elif binning == 'linear': kper = np.linspace(kp[kp != 0].min(), kmax_per, kbins[0] + 1) kpar = np.linspace(kz[kz != 0].min(), kmax_par, kbins[1] + 1) elif binning in ('adaptive', 'adaptive_linear'): _log = (binning == 'adaptive') kper = _adaptive_bin_edges(np.unique(kp[(kp > 0) & (kp <= kmax_per)]), kbins[0], log=_log) kpar = _adaptive_bin_edges(np.unique(kz[(kz > 0) & (kz <= kmax_par)]), kbins[1], log=_log) kp_f, kz_f, pw_f = kp.flatten(), kz.flatten(), power.flatten() ps = stats.binned_statistic_2d(x=kp_f, y=kz_f, values=pw_f, statistic='mean', bins=[kper, kpar]) if binning == 'log': kper_mid = np.power(10, 0.5 * (kper[:-1] + kper[1:])) kpar_mid = np.power(10, 0.5 * (kpar[:-1] + kpar[1:])) elif binning == 'adaptive': kper_mid = np.sqrt(kper[:-1] * kper[1:]) kpar_mid = np.sqrt(kpar[:-1] * kpar[1:]) elif binning == 'adaptive_linear': kper_mid = (kper[:-1] + kper[1:]) / 2.0 kpar_mid = (kpar[:-1] + kpar[1:]) / 2.0 else: kper_mid = (kper[:-1] + kper[1:]) / 2. kpar_mid = (kpar[:-1] + kpar[1:]) / 2. ps_out = ps.statistic if fill_empty_bins: nan_mask = np.isnan(ps_out) if nan_mask.any() and not nan_mask.all(): from scipy.ndimage import distance_transform_edt _, (ri, ci) = distance_transform_edt(nan_mask, return_indices=True) ps_out = ps_out.copy() ps_out[nan_mask] = ps_out[ri[nan_mask], ci[nan_mask]] if return_modes: n_modes = stats.binned_statistic_2d(x=kp_f, y=kz_f, values=None, statistic='count', bins=[kper, kpar]) return ps_out, kper_mid, kpar_mid, n_modes.statistic return ps_out, kper_mid, kpar_mid
[docs] def cross_power_spectrum_1d(input_array1_nd, input_array2_nd, kbins=100, box_dims=None, return_n_modes=False, binning='log', k_limit=None, fill_empty_bins=True, **kwargs): ''' Calculate the spherically averaged cross power spectrum of two arrays and return it as a one-dimensional array. Parameters: input_array1_nd (numpy array): the first data array input_array2_nd (numpy array): the second data array kbins = 100 (integer or array-like): number of bins or explicit bin edges. box_dims = None (float or array-like): box dimensions in Mpc. return_n_modes = False (bool): if True, also return n_modes array. binning = 'log' : binning scheme ('log', 'linear', 'adaptive', 'kde'). k_limit = None : upper k limit (None, 'nyquist', or 'aliasing'). fill_empty_bins = True : fill zero-mode bins via 1D nearest-neighbour. Returns: (Pk, bins) or (Pk, bins, n_modes) if return_n_modes is True. ''' if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array1_nd.shape) kmax = _resolve_k_limit(k_limit, box_dims, input_array1_nd.shape) input_array = cross_power_spectrum_nd(input_array1_nd, input_array2_nd, box_dims=box_dims) ps, bins, n_modes = radial_average(input_array, kbins=kbins, box_dims=box_dims, binning=binning, kmax=kmax, fill_empty_bins=fill_empty_bins) if return_n_modes: return ps, bins, n_modes return ps, bins
[docs] def power_spectrum_mu(input_array, los_axis=0, mubins=20, kbins=10, box_dims=None, weights=None, exclude_zero_modes=True, return_n_modes=False, absolute_mus=True, k_limit=None, **kwargs): ''' Calculate the power spectrum and bin it in mu=cos(theta) and k. Parameters: input_array (numpy array): the data array los_axis = 0 (integer): the line-of-sight axis mubins = 20 (integer): the number of mu bins kbins = 10 (integer or array-like): The number of bins, or a list containing the bin edges. If an integer is given, the bins are logarithmically spaced. box_dims = None (float or array-like): the dimensions of the box in Mpc. If this is None, the current box volume is used along all dimensions. If it is a float, this is taken as the box length along all dimensions. If it is an array-like, the elements are taken as the box length along each axis. return_n_modes = False (bool): if true, also return the number of modes in each bin exlude_zero_modes = True (bool): if true, modes with any components of k equal to zero will be excluded. absolute_mus = True (boolean): if true, use the absolute values of mu, range [0,1]. If false, use the range [-1,1] k_limit = None : Upper k limit for the bins. Options: None — use the grid maximum (default, no restriction) 'nyquist' — cap at k_Nyquist = π·min(N_i/L_i) 'aliasing' — cap at k_Nyquist/2 (alias-free limit) Returns: A tuple with (Pk, mubins, kbins), where Pk is an array with the power spectrum of dimensions (n_mubins x n_kbins), mubins is an array with the mu bin centers and kbins is an array with the k bin centers. ''' if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array.shape) kmax = _resolve_k_limit(k_limit, box_dims, input_array.shape) powerspectrum = power_spectrum_nd(input_array, box_dims=box_dims) ps, mu_bins, k_bins, n_modes = mu_binning(powerspectrum, los_axis, mubins, kbins, box_dims, weights, exclude_zero_modes, absolute_mus=absolute_mus, kmax=kmax) if return_n_modes: return ps, mu_bins, k_bins, n_modes return ps, mu_bins, k_bins
[docs] def cross_power_spectrum_mu(input_array1, input_array2, los_axis=0, mubins=20, kbins=10, box_dims=None, weights=None, exclude_zero_modes=True, return_n_modes=False, absolute_mus=True, k_limit=None, **kwargs): ''' Calculate the cross power spectrum and bin it in mu=cos(theta) and k. Parameters: input_array1 (numpy array): the first data array input_array2 (numpy array): the second data array los_axis = 0 (integer): the line-of-sight axis mubins = 20 (integer): the number of mu bins kbins = 10 (integer or array-like): The number of bins, or a list containing the bin edges. If an integer is given, the bins are logarithmically spaced. box_dims = None (float or array-like): the dimensions of the box in Mpc. If this is None, the current box volume is used along all dimensions. If it is a float, this is taken as the box length along all dimensions. If it is an array-like, the elements are taken as the box length along each axis. return_n_modes = False (bool): if true, also return the number of modes in each bin exlude_zero_modes = True (bool): if true, modes with any components of k equal to zero will be excluded. absolute_mus = True (boolean): if true, use the absolute values of mu, range [0,1]. If false, use the range [-1,1] Returns: A tuple with (Pk, mubins, kbins), where Pk is an array with the cross power spectrum of dimensions (n_mubins x n_kbins), mubins is an array with the mu bin centers and kbins is an array with the k bin centers. TODO: Add support for (non-numpy) lists for the bins ''' if kwargs.get('boxsize') is not None: box_dims = kwargs.get('boxsize') box_dims = _get_dims(box_dims, input_array1.shape) kmax = _resolve_k_limit(k_limit, box_dims, input_array1.shape) powerspectrum = cross_power_spectrum_nd(input_array1, input_array2, box_dims=box_dims) ps, mu_bins, k_bins, n_modes = mu_binning(powerspectrum, los_axis, mubins, kbins, box_dims, weights, exclude_zero_modes, absolute_mus=absolute_mus, kmax=kmax) if return_n_modes: return ps, mu_bins, k_bins, n_modes return ps, mu_bins, k_bins
[docs] def mu_binning(powerspectrum, los_axis = 0, mubins=20, kbins=10, box_dims=None, weights=None, exclude_zero_modes=True, binning='log', absolute_mus=True, kmax=None): ''' This function is for internal use only. ''' if weights != None: powerspectrum *= weights assert(len(powerspectrum.shape)==3) k_comp, k = _get_k(powerspectrum, box_dims) mu = _get_mu(k_comp, k, los_axis, absolute_mus) #Calculate k values, and make k bins kbins = _get_kbins(kbins, box_dims, k, binning=binning, kmax=kmax) dk = (kbins[1:]-kbins[:-1])/2. n_kbins = len(kbins)-1 #Exclude k_perp = 0 modes if exclude_zero_modes: good_idx = _get_nonzero_idx(powerspectrum.shape, los_axis) else: good_idx = np.ones_like(powerspectrum) #Make mu bins min_mu=0.0 if absolute_mus else -1.0 if isinstance(mubins,int): mubins = np.linspace(min_mu, 1.0 , mubins+1) dmu = (mubins[1:]-mubins[:-1])/2. n_mubins = len(mubins)-1 #Remove the zero component from the power spectrum. mu is undefined here powerspectrum[tuple((np.array(powerspectrum.shape)/2).astype(int))] = 0. #Bin the data print_msg('Binning data...') outdata = np.zeros((n_mubins,n_kbins)) n_modes = np.zeros((n_mubins,n_kbins)) for ki in range(n_kbins): print_msg('Bin %d of %d' % (ki, n_kbins)) kmin_bin = kbins[ki] kmax_bin = kbins[ki+1] kidx = (k >= kmin_bin) & (k < kmax_bin) kidx = kidx*good_idx for i in range(n_mubins): mu_min = mubins[i] mu_max = mubins[i+1] idx = (mu >= mu_min) & (mu < mu_max) & kidx.astype(bool) outdata[i,ki] = np.mean(powerspectrum[idx]) n_modes[i,ki] = np.size(powerspectrum[idx]) if weights != None: outdata[i,ki] /= weights[idx].mean() return outdata, mubins[:-1]+dmu, kbins[:-1]+dk, n_modes
def get_k(input_array, box_dims): box_dims = _get_dims(box_dims, input_array.shape) return _get_k(input_array, box_dims) def get_kbins(kbins, box_dims, k=None, array=None, binning='log'): assert k is not None or array is not None box_dims = _get_dims(box_dims, array.shape) if k is None: k_comp, k = _get_k(array, box_dims) return _get_kbins(kbins, box_dims, k, binning=binning) #Some methods for internal use def _get_k(input_array, box_dims): ''' Get the k values for input array with given dimensions. Return k components and magnitudes. For internal use. ''' dim = len(input_array.shape) if dim == 1: x = np.arange(len(input_array)) center = x.max()/2. kx = 2.*np.pi*(x-center)/box_dims[0] return [kx], kx elif dim == 2: x,y = np.indices(input_array.shape, dtype='int32') center = np.array([(x.max()-x.min())/2, (y.max()-y.min())/2]) kx = 2.*np.pi * (x-center[0])/box_dims[0] ky = 2.*np.pi * (y-center[1])/box_dims[1] k = np.sqrt(kx**2 + ky**2) return [kx, ky], k elif dim == 3: nx,ny,nz = input_array.shape x,y,z = np.indices(input_array.shape, dtype='int32') center = np.array([nx/2 if nx%2==0 else (nx-1)/2, ny/2 if ny%2==0 else (ny-1)/2, \ nz/2 if nz%2==0 else (nz-1)/2]) kx = 2.*np.pi * (x-center[0])/box_dims[0] ky = 2.*np.pi * (y-center[1])/box_dims[1] kz = 2.*np.pi * (z-center[2])/box_dims[2] k = np.sqrt(kx**2 + ky**2 + kz**2 ) return [kx,ky,kz], k def _get_mu(k_comp, k, los_axis, absolute_mus): ''' Get the mu values for given k values and a line-of-sight axis. For internal use ''' #Line-of-sight distance from center if los_axis == 0: los_dist = k_comp[0] elif los_axis == 1: los_dist = k_comp[1] elif los_axis == 2: los_dist = k_comp[2] else: raise Exception('Your space is not %d-dimensional!' % los_axis) #mu=cos(theta) = k_par/k mu = np.abs(los_dist/k) if absolute_mus else los_dist/np.abs(k) mu[np.where(k < 0.001)] = np.nan return mu def _get_kbins(kbins, box_dims, k, binning='log', kmin=None, kmax=None): ''' Make a list of bin edges if kbins is an integer, otherwise return it as it is. ''' if isinstance(kbins,int): kmin = 2.*np.pi/min(box_dims) if kmin is None else kmin kmax = k.max() if kmax is None else kmax if binning=='linear': kbins = np.linspace(kmin, kmax, kbins+1) else: kbins = 10**np.linspace(np.log10(kmin), np.log10(kmax), kbins+1) return kbins def _get_dims(box_dims, ashape): ''' If box dims is a scalar, assume that dimensions are cubic and make a list If it's not given, assume it's the default value of the box size Otherwise, return as it is ''' if box_dims == None: return [conv.LB]*len(ashape) if not hasattr(box_dims, '__iter__'): return [box_dims]*len(ashape) return box_dims def _resolve_k_limit(k_limit, box_dims, array_shape): ''' Convert a k_limit string to a kmax value. Parameters: k_limit : None, 'nyquist', or 'aliasing' None → return None (use the grid maximum) 'nyquist' → π * min(N_i / L_i) (Nyquist wavenumber) 'aliasing' → k_nyquist / 2 (alias-free limit) box_dims : list of box lengths [Mpc] per axis array_shape : shape of the data array Returns: float or None ''' if k_limit is None: return None shape = np.array(array_shape[:len(box_dims)]) k_ny_axes = np.pi * shape / np.array(box_dims, dtype=float) k_nyquist = k_ny_axes.min() kl = k_limit.lower() if kl == 'nyquist': return float(k_nyquist) elif kl == 'aliasing': return float(k_nyquist) / 2.0 raise ValueError(f"k_limit must be None, 'nyquist', or 'aliasing'; got {k_limit!r}")
[docs] def dimensionless_ps(data, kbins=100, box_dims=None, binning='log', factor=10, k_limit=None): r''' Dimensionless power spectrum is P(k)*k^3/(2pi^2) Parameters ---------- data : ndarray The numpy data whose power spectrum is to be determined. kbins : int Number of bins for in the k-space (Default: 100). box_dims: float The size of the box in Mpc (Default: Takes the value from the set_sim_constants). binning : str The type of binning to be used for the k-space (Default: 'log'). factor : int The factor multiplied to the given kbins to smooth the spectrum from (Default: 10). k_limit : None, 'nyquist', or 'aliasing' Upper k limit passed to power_spectrum_1d (Default: None). Returns ------- (\Delta^2, ks) ''' from scipy.interpolate import interp1d Pk, ks = power_spectrum_1d(data, kbins=kbins*factor, box_dims=box_dims, binning=binning, k_limit=k_limit) f_Dlta = interp1d(ks, Pk*ks**3/2/np.pi**2) knew = 10**np.linspace(np.log10(ks[1]),np.log10(ks[-1]), kbins) if binning=='log' else np.linspace(ks[1],ks[-1], kbins) return f_Dlta(knew), knew
def _get_nonzero_idx(ps_shape, los_axis): ''' Get the indices where k_perp != 0 ''' x,y,z = np.indices(ps_shape) if los_axis == 0: zero_idx = (y == ps_shape[1]/2)*(z == ps_shape[2]/2) elif los_axis == 1: zero_idx = (x == ps_shape[0]/2)*(z == ps_shape[2]/2) else: zero_idx = (x == ps_shape[0]/2)*(y == ps_shape[1]/2) good_idx = np.invert(zero_idx) return good_idx
[docs] def anisotropy_ratio_r_mu(input_array, los_axis=0, mu_cut=0.5, mubins=20, kbins=10, box_dims=None, weights=None, exclude_zero_modes=True, return_n_modes=False, absolute_mus=True, k_limit=None, **kwargs): ''' Calculate the anisotropy ratio r_mu(k). Parameters: input_array (numpy array): the data array los_axis = 0 (integer): the line-of-sight axis mu_cut = 0.5 (float): the value of mu to split the P(k,mu) data. mubins = 20 (integer): the number of mu bins kbins = 10 (integer or array-like): The number of bins, or a list containing the bin edges. If an integer is given, the bins are logarithmically spaced. box_dims = None (float or array-like): the dimensions of the box in Mpc. If this is None, the current box volume is used along all dimensions. If it is a float, this is taken as the box length along all dimensions. If it is an array-like, the elements are taken as the box length along each axis. return_n_modes = False (bool): if true, also return the number of modes in each bin exlude_zero_modes = True (bool): if true, modes with any components of k equal to zero will be excluded. absolute_mus = True (boolean): if true, use the absolute values of mu, range [0,1]. If false, use the range [-1,1] k_limit = None : Upper k limit for the bins. Options: None — use the grid maximum (default, no restriction) 'nyquist' — cap at k_Nyquist = π·min(N_i/L_i) 'aliasing' — cap at k_Nyquist/2 (alias-free limit) Returns: A tuple with (rmu, kbins). ''' Pk, mubins, kbins = power_spectrum_mu(input_array, los_axis=los_axis, mubins=mubins, kbins=kbins, box_dims=box_dims, weights=weights, exclude_zero_modes=exclude_zero_modes, return_n_modes=return_n_modes, absolute_mus=absolute_mus, k_limit=k_limit, **kwargs) Pup = Pk[mubins>=mu_cut,:] Pdn = Pk[mubins<mu_cut,:] rup = np.array([pp[np.isfinite(pp)].mean() for pp in Pup.T]) rdn = np.array([pp[np.isfinite(pp)].mean() for pp in Pdn.T]) rmu = rup/rdn-1.0 return rmu, kbins
[docs] def horizon_wedge_equation(z, fov_deg=90.0): ''' Horizon wedge boundary in the cylindrical power spectrum. Equation 14 in arXiv:2201.10798. Parameters: z (float): redshift fov_deg (float): field of view in degrees (default 90) Returns: A lambda function k_parallel(k_perpendicular). ''' f_kpar = lambda kper: kper * np.sin(fov_deg * np.pi / 180) / (1 + z) * \ cosmo.hubble_parameter(z) / const.c * cosmo.z_to_cdist(z) return f_kpar
[docs] def plot_2d_power(ps, xlabel=r'$k_\perp$', ylabel=r'$k_\parallel$', ps_label=r'$P(k_\perp,k_\parallel)$', fig=None, ax=None, plotting_scale=None, draw_wedge=None, **kwargs): ''' Plot the 2D cylindrical power spectrum. Parameters: ps (tuple or dict): (Pk, kper_bins, kpar_bins) or {'Pk': Pk, 'kper': kper_bins, 'kpar': kpar_bins} xlabel, ylabel, ps_label: axis and colorbar labels fig, ax: matplotlib figure and axes (created if None) plotting_scale (dict): axis scales, e.g. {'x': 'log', 'y': 'log', 'z': 'log'} draw_wedge (dict or None): wedge params e.g. {'z': 9.0, 'fov_deg': 90.0, 'ls': '--', 'color': 'k'} Returns: matplotlib figure ''' import matplotlib.pyplot as plt import matplotlib.colors as colors if plotting_scale is None: plotting_scale = {'x': 'log', 'y': 'log', 'z': 'log'} if draw_wedge is None: draw_wedge = {'z': 9.0, 'fov_deg': 90.0, 'ls': '--', 'color': 'k'} try: pp, kper, kpar = (ps[ke] for ke in ['Pk', 'kper', 'kpar']) except TypeError: pp, kper, kpar = ps fp = interpolate.RegularGridInterpolator( (kper, kpar), pp, method='linear', bounds_error=False, fill_value=None ) if fig is None and ax is None: fig, ax = plt.subplots(1, 1, figsize=(7, 5)) elif fig is None: pass else: ax = fig.axes[0] X, Y = kper, kpar Xg, Yg = np.meshgrid(X, Y, indexing='ij') C = fp(np.stack([Xg, Yg], axis=-1)).T norm = colors.LogNorm(vmin=C[np.isfinite(C)].min(), vmax=C[np.isfinite(C)].max()) \ if plotting_scale.get('z') == 'log' else None pcm = ax.pcolormesh(X, Y, C, norm=norm, **kwargs) if draw_wedge is not None: f_kpar = horizon_wedge_equation(draw_wedge['z'], fov_deg=draw_wedge.get('fov_deg', 90.0)) ax.plot(X, f_kpar(X), ls=draw_wedge.get('ls', '--'), color=draw_wedge.get('color', 'k')) ax.axis([X.min(), X.max(), Y.min(), Y.max()]) plt.colorbar(pcm, ax=ax, label=ps_label, pad=0.01) ax.set_xlabel(xlabel) ax.set_ylabel(ylabel) ax.set_xscale(plotting_scale.get('x', 'log')) ax.set_yscale(plotting_scale.get('y', 'log')) return fig