Source code for tools21cm.angular_coordinates

'''
Methods to convert data between physical (cMpc) coordinates
and observational (angular-frequency) coordinates.
'''

import numpy as np
from .lightcone import redshifts_at_equal_comoving_distance
from . import cosmo as cm
from . import conv
from . import helper_functions as hf
from . import smoothing
from . import const
from scipy.signal import fftconvolve
from tqdm import tqdm


[docs] def physical_lightcone_to_observational(physical_lightcone, input_z_low, output_dnu, output_dtheta, input_box_size_mpc=None, verbose=True, order=2, mode='pad'): ''' Interpolate a lightcone volume from physical (length) units to observational (angle/frequency) units. Parameters ---------- physical_lightcone : numpy.ndarray The input lightcone volume in physical coordinates with shape (N, N, M) where N is the number of cells per side in angular direction and M is the number of cells along the line-of-sight input_z_low : float or array-like The lowest redshift of the input lightcone. If array-like, should contain redshifts for each slice along the line-of-sight output_dnu : float The frequency resolution of the output volume in MHz output_dtheta : float The angular resolution of the output in arcmin input_box_size_mpc : float, optional The size of the input field of view in Mpc. If None (default), uses the value from conv.LB verbose : bool, optional Whether to show progress messages (default: True) order : int, optional The order of the spline interpolation (0-5). Default is 2. Use order=0 for ionization fraction data. mode : str, optional How to handle the field of view at different redshifts: - 'pad' (default): Fix FoV at lowest redshift (largest angular size) and pad higher redshift slices - 'crop': Fix FoV at highest redshift (smallest angular size) and crop lower redshift slices - 'full', 'extend': Same as 'pad' - 'valid': Same as 'crop' Returns ------- tuple output_volume : numpy.ndarray The transformed lightcone in observational coordinates with shape (N_theta, N_theta, N_nu) output_freqs : numpy.ndarray Array of output frequencies in MHz corresponding to the slices along the line-of-sight ''' assert mode.lower() in ['pad', 'full', 'extend', 'crop', 'valid'], "Accepted input for mode: 'pad', 'full', 'extend', 'crop', 'valid'." if input_box_size_mpc == None: input_box_size_mpc = conv.LB if isinstance(input_z_low,(float,int)): cell_size = input_box_size_mpc/physical_lightcone.shape[0] distances = cm.z_to_cdist(input_z_low) + np.arange(physical_lightcone.shape[2])*cell_size input_z_high = cm.cdist_to_z(distances).max() else: input_z_low, input_z_high = input_z_low.min(), input_z_low.max() #Calculate the FoV in degrees at lowest z (largest one) fov_deg_low = cm.angular_size_comoving(input_box_size_mpc, input_z_low) fov_deg_high = cm.angular_size_comoving(input_box_size_mpc, input_z_high) #For each output redshift: average the corresponding slices if verbose: print(f'At the lowest redshift (z={input_z_low:.3f}), the angluar scale is {fov_deg_low:.3f} deg') print(f'At the highest redshift (z={input_z_high:.3f}), the angluar scale is {fov_deg_high:.3f} deg') print('Making observational lightcone...') print('Binning in frequency...') lightcone_freq, output_freqs = bin_lightcone_in_frequency(physical_lightcone,\ input_z_low, input_box_size_mpc, output_dnu) #Calculate dimensions of output volume n_cells_theta = int(fov_deg_low*60./output_dtheta) n_cells_nu = len(output_freqs) #Go through each slice and make angular slices for each one if verbose: print('Binning in angle...') output_volume = np.zeros((n_cells_theta, n_cells_theta, n_cells_nu)) for i in tqdm(range(n_cells_nu), disable=not verbose): z = cm.nu_to_z(output_freqs[i]) output_volume[:,:,i] = physical_slice_to_angular(lightcone_freq[:,:,i], z, \ slice_size_mpc=input_box_size_mpc, fov_deg=fov_deg_low,\ dtheta=output_dtheta, order=order) if mode.lower() in ['pad', 'full', 'extend']: pass elif mode.lower() in ['crop', 'valid']: n_cells_theta_out = int(fov_deg_high*60./output_dtheta) output_volume = output_volume[(n_cells_theta-n_cells_theta_out)//2:(n_cells_theta+n_cells_theta_out)//2,(n_cells_theta-n_cells_theta_out)//2:(n_cells_theta+n_cells_theta_out)//2,:] else: print(f"mode={mode} is not implemented") return output_volume, output_freqs
[docs] def observational_lightcone_to_physical(observational_lightcone, input_freqs, input_dtheta, verbose=True, order=2): ''' Interpolate a lightcone volume measured in observational (angle/frequency) units into physical (length) units. The output resolution will be set to the coarest one, as determined either by the angular or the frequency resolution. The lightcone must have the LoS as the last index, with frequencies decreasing along the LoS. Parameters: observational_lightcone (numpy array): the input lightcone volume input_freqs (numpy array): the frequency in MHz of each slice along the line of sight of the input input_dheta (float): the angular size of a cell in arcmin verbose (bool): show progress bar order (int): The order of the spline interpolation, default is 2. The order has to be in the range 0-5. Use order=0 for ionization fraction data. Returns: * The output volume * The redshifts along the LoS of the output * The output cell size in Mpc ''' assert input_freqs[0] > input_freqs[-1] assert observational_lightcone.shape[0] == observational_lightcone.shape[1] #Determine new cell size - set either by frequency or angle. #The FoV size in Mpc is set by the lowest redshift dnu = input_freqs[0]-input_freqs[1] z_low = cm.nu_to_z(input_freqs[0]) fov_deg = observational_lightcone.shape[0]*input_dtheta/60. fov_mpc = fov_deg/cm.angular_size_comoving(1., z_low) cell_size_perp = fov_mpc/observational_lightcone.shape[0] cell_size_par = cm.nu_to_cdist(input_freqs[-1])-cm.nu_to_cdist(input_freqs[-2]) output_cell_size = max([cell_size_par, cell_size_perp]) hf.print_msg('Making physical lightcone with cell size %.2f Mpc' % output_cell_size) #Go through each slice along frequency axis. Cut off excess and #interpolate down to correct resolution n_cells_perp = int(fov_mpc/output_cell_size) output_volume_par = np.zeros((n_cells_perp, n_cells_perp, observational_lightcone.shape[2])) for i in tqdm(range(output_volume_par.shape[2]), disable=not verbose): z = cm.nu_to_z(input_freqs[i]) output_volume_par[:,:,i] = angular_slice_to_physical(observational_lightcone[:,:,i],\ z, slice_size_deg=fov_deg, output_cell_size=output_cell_size,\ output_size_mpc=fov_mpc, order=order) #Bin along frequency axis output_volume, output_redshifts = bin_lightcone_in_mpc(output_volume_par, \ input_freqs, output_cell_size) return output_volume, output_redshifts, output_cell_size
[docs] def physical_slice_to_angular(input_slice, z, slice_size_mpc, fov_deg, dtheta, order=0): ''' Interpolate a slice in physical coordinates to angular coordinates. Parameters: input_slice (numpy array): the 2D slice in physical coordinates z (float): the redshift of the input slice slice_size_Mpc (float): the size of the input slice in cMpc fov_deg (float): the field-of-view in degrees. The output will be padded to match this size dtheta (float): the target resolution in arcmin Returns: (angular_slice, size_deg) ''' #Resample fov_mpc = cm.deg_to_cdist(fov_deg, z) cell_size_mpc = fov_mpc/(fov_deg*60./dtheta) n_cells_resampled = int(slice_size_mpc/cell_size_mpc) #Avoid edge effects with even number of cells if n_cells_resampled % 2 == 0: n_cells_resampled -= 1 resampled_slice = resample_slice(input_slice, n_cells_resampled, order) #Pad the array slice_n = resampled_slice.shape[0] padded_n = int(fov_deg*60./dtheta)# np.round(slice_n*(fov_mpc/slice_size_mpc)) if padded_n < slice_n: if slice_n - padded_n > 2: print('Warning! Padded slice is significantly smaller than original!') print('This should not happen...') padded_n = slice_n padded_slice = _get_padded_slice(resampled_slice, padded_n) return padded_slice
[docs] def angular_slice_to_physical(input_slice, z, slice_size_deg, output_cell_size, output_size_mpc, order=0, prefilter=True): ''' Interpolate a slice in angular coordinates to physical Parameters: input_slice (numpy array): the 2D slice in observational coordinates z (float): the redshift of the input slice slice_size_deg (float): the size of the input slice in deg output_cell_size (float): the output cell size in cMpc output_size_mpc (float): the output size in mpc Returns: (physical_slice, size_mpc) ''' #Resample slice_size_mpc = cm.deg_to_cdist(slice_size_deg, z) n_cells_resampled = int(slice_size_mpc/output_cell_size) #Avoid edge effects with even number of cells if n_cells_resampled % 2 == 0: n_cells_resampled += 1 resampled_slice = resample_slice(input_slice, n_cells_resampled, order, prefilter) #Remove cells to get correct size n_cutout_cells = int(output_size_mpc/output_cell_size)# np.round(resampled_slice.shape[0]*output_size_mpc/slice_size_mpc) if n_cutout_cells > input_slice.shape[0]: if input_slice.shape[0] - n_cutout_cells > 2: print('Warning! Cutout slice is larger than original.') print('This should not happen') n_cutout_cells = input_slice.shape[0] slice_cutout = resampled_slice[:n_cutout_cells, :n_cutout_cells] return slice_cutout
[docs] def resample_slice(input_slice, n_output_cells, order=0, prefilter=True): ''' Resample a 2D slice to new dimensions. Parameters: input_slice (numpy array): the input slice n_output_cells (int) : the number of output cells Returns: output slice ''' tophat_width = np.round(input_slice.shape[0]/n_output_cells) if tophat_width < 1 or (not prefilter): tophat_width = 1 slice_smoothed = smoothing.smooth_tophat(input_slice, tophat_width) idx = np.linspace(0, slice_smoothed.shape[0]-1, n_output_cells) output_slice = smoothing.interpolate2d(slice_smoothed, idx, idx, order=order) return output_slice
[docs] def bin_lightcone_in_frequency(lightcone, z_low, box_size_mpc, dnu): ''' Bin a lightcone in frequency bins. Parameters: lightcone (numpy array): the lightcone in length units z_low (float): the lowest redshift of the lightcone box_size_mpc (float): the side of the lightcone in Mpc dnu (float): the width of the frequency bins in MHz Returns: * The lightcone, binned in frequencies with high frequencies first * The frequencies along the line of sight in MHz ''' #Figure out dimensions and make output volume cell_size = box_size_mpc/lightcone.shape[0] distances = cm.z_to_cdist(z_low) + np.arange(lightcone.shape[2])*cell_size input_redshifts = cm.cdist_to_z(distances) input_frequencies = cm.z_to_nu(input_redshifts) nu1 = input_frequencies[0] nu2 = input_frequencies[-1] output_frequencies = np.arange(nu1, nu2, -dnu) output_lightcone = np.zeros((lightcone.shape[0], lightcone.shape[1], \ len(output_frequencies))) #Bin in frequencies by smoothing and indexing max_cell_size = cm.nu_to_cdist(output_frequencies[-1])-cm.nu_to_cdist(output_frequencies[-2]) smooth_scale = np.round(max_cell_size/cell_size) if smooth_scale < 1: smooth_scale = 1 hf.print_msg('Smooth along LoS with scale %f' % smooth_scale) tophat3d = np.ones((1,1,int(smooth_scale))) tophat3d /= np.sum(tophat3d) lightcone_smoothed = fftconvolve(lightcone, tophat3d) for i in range(output_lightcone.shape[2]): nu = output_frequencies[i] idx = int(hf.find_idx(input_frequencies, nu)) output_lightcone[:,:,i] = lightcone_smoothed[:,:,idx] return output_lightcone, output_frequencies
[docs] def bin_lightcone_in_mpc(lightcone, frequencies, cell_size_mpc): ''' Bin a lightcone in Mpc slices along the LoS ''' distances = cm.nu_to_cdist(frequencies) n_output_cells = (distances[-1]-distances[0])/cell_size_mpc output_distances = np.arange(distances[0], distances[-1], cell_size_mpc) output_lightcone = np.zeros((lightcone.shape[0], lightcone.shape[1], int(n_output_cells))) #Bin in Mpc by smoothing and indexing smooth_scale = np.round(len(frequencies)/n_output_cells) tophat3d = np.ones((1,1,int(smooth_scale))) tophat3d /= np.sum(tophat3d) lightcone_smoothed = fftconvolve(lightcone, tophat3d, mode='same') for i in range(output_lightcone.shape[2]): idx = int(hf.find_idx(distances, output_distances[i])) output_lightcone[:,:,i] = lightcone_smoothed[:,:,int(idx)] output_redshifts = cm.cdist_to_z(output_distances) return output_lightcone, output_redshifts
def _get_padded_slice(input_slice, padded_n): slice_n = input_slice.shape[0] padded_slice = np.zeros((padded_n, padded_n)) padded_slice[:slice_n, :slice_n] = input_slice padded_slice[slice_n:, :slice_n] = input_slice[:(padded_n-slice_n),:slice_n] padded_slice[:slice_n, slice_n:] = input_slice[:,:(padded_n-slice_n)] padded_slice[slice_n:, slice_n:] = input_slice[:(padded_n-slice_n), :(padded_n-slice_n)] return padded_slice
[docs] def padding_lightcone(lc, padded_n=None, mode='wrap', verbose=True): ''' Pad lightcone in the field of view direction. Parameters: lc (numpy array): the light-cone in physical coordinates with third axis as the light-of-sight. padded_n (int): number of cells to pad. The default value is half of the number of cells. mode (str): mode used for padding (see the documentation of numpy.pad). verbose (bool): show progress bar. Returns: padded lightcone ''' if padded_n is None: padded_n = int(lc.shape[1]/2) out_lc = np.zeros((lc.shape[0]+2*padded_n,lc.shape[1]+2*padded_n,lc.shape[2])) for i in tqdm(range(lc.shape[2]), disable=not verbose): out_lc[:,:,i] = np.pad(lc[:,:,i], padded_n, mode=mode) return out_lc