Source code for tools21cm.radio_telescope_noise

'''
Methods:

* simulate the radio telescope observation strategy
* simulate telescope noise
'''

import numpy as np
import sys
from .radio_telescope_sensitivity import *
from .usefuls import *
from . import conv
from . import cosmo as cm
from . import smoothing as sm
from .power_spectrum import radial_average, _get_k, _get_dims
from .read_files import read_dictionary_data, write_dictionary_data
import scipy
from .scipy_func import *
from glob import glob
from time import time, sleep
import pickle
import astropy.units as u
from joblib import Parallel, delayed
from tqdm import tqdm

[docs] def signal_window(ncells, method, ndim=1, extra_param=30): """ Generates a 1D, 2D, or 3D window function for signal processing. Parameters ---------- ncells : int The size of the window along each dimension. method : str The name of the window function to use (e.g., 'blackmanharris', 'gaussian'). ndim : int, optional The number of dimensions for the window (1, 2, or 3). extra_param : float, optional An additional parameter required by some window functions, like the standard deviation for 'gaussian' or beta for 'kaiser'. Returns ------- np.ndarray or None The generated window array, or None if the method is not implemented or the dimension is not supported. """ if method.lower() in ['blackmanharris', 'blackman-harris']: win = windows.blackmanharris(ncells) elif method.lower() in ['blackman']: win = windows.blackman(ncells) elif method.lower() in ['barthann']: win = windows.barthann(ncells) elif method.lower() in ['bartlett']: win = windows.bartlett(ncells) elif method.lower() in ['gaussian']: win = windows.gaussian(ncells, extra_param) elif method.lower() in ['hamming']: win = windows.hamming(ncells) elif method.lower() in ['hann']: win = windows.hann(ncells) elif method.lower() in ['kaiser']: win = windows.kaiser(ncells, extra_param) else: print(f'{method} window function is not implemented') return None if ndim==1: pass elif ndim==2: win = (win[:,None] @ win[None,:]) elif ndim==3: win = (win[:,None] @ win[None,:])[:,:,None] @ win[None,:] else: print(f'window dimension = {ndim} is not supported yet') return None return win
[docs] def noise_coeval_power_spectrum_1d(ncells, z, depth_mhz, obs_time=1000, subarray_type="AA4", kbins=10, boxsize=None, binning='log', return_n_modes=False, total_int_time=6., int_time=10., declination=-30., uv_map=None, N_ant=None, uv_weighting='natural', T_sys=None, sefd_data=None, nu_data=None, D_station=40., ep_aperture=None, fft_wrap=False, verbose=True): """ Computes the 1D power spectrum of instrumental noise for a coeval observation. This function simulates the instrumental noise properties in the Fourier domain based on the telescope configuration and observation strategy, and then computes the radially averaged 1D power spectrum. Parameters ---------- ncells : int The number of grid cells in each spatial dimension. z : float The redshift of the observation. depth_mhz : float The observational bandwidth in MHz. obs_time : float, optional The total on-source observation time in hours. subarray_type : str, optional The telescope layout configuration name. kbins : int, optional The number of bins for radial averaging of the power spectrum. boxsize : float, optional The comoving size of the simulation box in Mpc. binning : {'log', 'linear'}, optional The type of binning to use for k-modes. return_n_modes : bool, optional If True, also returns the number of modes in each k-bin. total_int_time : float, optional Total observation time per day in hours for UV coverage simulation. int_time : float, optional Integration time in seconds for UV coverage simulation. declination : float, optional The pointing declination in degrees. uv_map : np.ndarray, optional A pre-computed UV coverage map. If None, it will be simulated. N_ant : int, optional The number of antennas. If None, it will be determined. uv_weighting : {'natural', 'uniform'}, optional The weighting scheme to apply in the UV plane. T_sys : float, callable, or None, optional System temperature in Kelvin. Can be a single float value or a callable function of frequency in MHz. If `None`, a default model approximating the SKA-Low sky temperature plus a receiver temperature is used. Default is `None`. sefd_data : np.ndarray or None, optional An array of known SEFD values for interpolation. If this is provided, `nu_data` must also be given. Default is `None`. nu_data : np.ndarray or None, optional An array of frequencies in MHz corresponding to `sefd_data`. Default is `None`. D_station : float, optional Diameter of the antenna station in meters. Default is 40.0. ep_aperture : float, callable, or None, optional Aperture efficiency. Can be a single float value or a callable function of frequency in MHz. If `None`, a default frequency-dependent model is used. Default is `None`. verbose : bool, optional If True, enables verbose output. Returns ------- pn : np.ndarray The 1D noise power spectrum values. kn : np.ndarray The central k-mode values for each bin. n_modes : np.ndarray, optional The number of modes in each k-bin (if `return_n_modes` is True). """ if boxsize is None: boxsize = conv.LB antxyz, N_ant = subarray_type_to_antxyz(subarray_type, verbose=verbose) if uv_map is None: uv_map, N_ant = get_uv_map(ncells, z, subarray_type=antxyz, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination) if N_ant is None: N_ant = antxyz.shape[0] sigma, rms_noi = sigma_noise_radio(z, depth_mhz, obs_time, int_time, uv_map=uv_map, N_ant=N_ant, verbose=False, T_sys=T_sys, sefd_data=sefd_data, nu_data=nu_data, D_station=D_station, ep_aperture=ep_aperture) box_dims = _get_dims(boxsize, uv_map.shape) k_nq = np.pi/boxsize*min(uv_map.shape) if uv_weighting.lower() in ['nat', 'natural', 'natural_weighting']: out = rms_noi/np.sqrt(uv_map) elif uv_weighting.lower() in ['uni', 'uniform', 'uniform_weighting']: out = rms_noi else: print(f'{uv_weighting} scheme is not known or implemented') power = np.fft.fftshift(out**2) # scale boxvol = numpy_product(box_dims) pixelsize = boxvol/(numpy_product(uv_map.shape)) power *= pixelsize**2/boxvol pn, kn, n_modes = radial_average(power, box_dims, kbins=kbins, binning=binning) if return_n_modes: return pn, kn, n_modes return pn, kn
[docs] def get_uv_map_lightcone(ncells, zs, subarray_type="AA4", total_int_time=6., int_time=10., boxsize=None, declination=-30., save_uvmap=None, n_jobs=4, verbose=True, checkpoint=16): """ Generates or loads a lightcone of UV coverage maps, one for each redshift. This function handles the creation of UV maps across a range of redshifts, using parallel processing for efficiency and providing an option to cache the results to a file to speed up subsequent runs. Parameters ---------- ncells : int The number of grid cells. zs : np.ndarray or list An array or list of redshift values for the lightcone. subarray_type, total_int_time, etc. : various, optional Observational parameters passed to `get_uv_map`. save_uvmap : str, optional File path to save or load the UV map dictionary. If the file exists, maps are loaded; otherwise, they are generated and saved. n_jobs : int, optional Number of CPUs for parallel generation of UV maps. verbose : bool, optional If True, enables progress bars and informational messages. checkpoint : int, optional If provided, the number of redshifts to process before saving the results to `save_uvmap`. This is useful for long runs to prevent data loss. Returns ------- dict A dictionary containing the UV maps for each redshift, keyed by the redshift value formatted to three decimal places, along with metadata about the simulation parameters. """ antxyz, N_ant = subarray_type_to_antxyz(subarray_type, verbose=verbose) if boxsize is None: boxsize = conv.LB if isinstance(zs, list): zs = np.array(zs) if subarray_type.upper()=='AA*': subarray_type = 'AAstar' # Define a dictionary of simulation parameters params = { 'ncells': ncells, 'boxsize': boxsize, 'total_int_time': total_int_time, 'int_time': int_time, 'declination': declination, 'subarray_type': subarray_type, 'N_ant': N_ant, } # Attempt to load existing UV maps if save_uvmap and os.path.exists(save_uvmap): if verbose: print(f"Loading existing UV maps from {save_uvmap}") uvs = read_dictionary_data(save_uvmap) # Validate using the params dictionary mismatches = [] for key, current_val in params.items(): cached_val = uvs.get(key) # Check for a mismatch if cached_val is not None and cached_val != current_val: # Add the error message to a list instead of raising immediately mismatch_msg = f" - '{key}': Cached value is '{cached_val}', but current call requested '{current_val}'." mismatches.append(mismatch_msg) if mismatches: all_errors = "\n".join(mismatches) raise ValueError( f"Parameter mismatches found in cached file '{save_uvmap}':\n" f"{all_errors}\n" f"Please use a different save_uvmap path or delete the old file." ) else: # Initialize from the defined dictionary uvs = params.copy() # Identify which redshifts need a UV map to be generated z_to_run = [zi for zi in zs if '{:.3f}'.format(zi) not in uvs] if z_to_run: if verbose: print(f'Found {len(z_to_run)} new redshifts to generate UV maps for.') # Define the worker function for a single redshift _uvmap_worker = lambda zi: get_uv_map(ncells, zi, subarray_type=antxyz, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination, verbose=False)[0] # Condition for using chunked parallel processing with checkpoints use_chunked_parallel = n_jobs > 1 and checkpoint is not None and checkpoint > 0 if use_chunked_parallel: num_chunks = (len(z_to_run) + checkpoint - 1) // checkpoint if verbose: print(f"Processing in {num_chunks} chunks of size up to {checkpoint}...") for i in tqdm(range(0, len(z_to_run), checkpoint), desc="Processing Chunks", disable=not verbose): z_chunk = z_to_run[i:i + checkpoint] # Run the parallel job on the current chunk # Set inner verbose to 0 to avoid clutter; outer tqdm handles progress results_chunk = Parallel(n_jobs=n_jobs, verbose=0)(delayed(_uvmap_worker)(zi) for zi in z_chunk) # Update the main dictionary with the results from the chunk for zi, uv_map in zip(z_chunk, results_chunk): uvs['{:.3f}'.format(zi)] = uv_map # Save after processing the chunk if save_uvmap: if verbose: print(f"\nCheckpoint: Saving results to {save_uvmap}") write_dictionary_data(uvs, save_uvmap) else: # Original behavior: either sequential or parallel without checkpoints if n_jobs > 1: # Parallel processing for all z's at once if verbose: print(f"Generating all {len(z_to_run)} maps in parallel...") results = Parallel(n_jobs=n_jobs, verbose=10 if verbose else 0)(delayed(_uvmap_worker)(i) for i in z_to_run) for zi, uv_map in zip(z_to_run, results): uvs['{:.3f}'.format(zi)] = uv_map else: # Sequential processing iterator = tqdm(z_to_run, desc="Generating UV maps sequentially", disable=not verbose) for i, zi in enumerate(iterator): uvs['{:.3f}'.format(zi)] = _uvmap_worker(zi) # Checkpoint logic for sequential mode is_checkpoint_step = checkpoint and (i + 1) % checkpoint == 0 is_not_last_step = (i + 1) < len(z_to_run) if save_uvmap and is_checkpoint_step and is_not_last_step: if verbose: print(f"\nCheckpoint: Saving results to {save_uvmap}") write_dictionary_data(uvs, save_uvmap) # Final save to ensure the last chunk or the full result is written if save_uvmap: if verbose: print(f"Saving final updated UV maps to {save_uvmap}") write_dictionary_data(uvs, save_uvmap) elif verbose: print("All requested redshift UV maps are already present.") return uvs
[docs] def noise_map(ncells, z, depth_mhz, obs_time=1000, subarray_type="AA4", boxsize=None, total_int_time=6., int_time=10., declination=-30., uv_map=None, N_ant=None, uv_weighting='natural', T_sys=None, sefd_data=None, nu_data=None, D_station=40., ep_aperture=None, fft_wrap=False, verbose=True, suppress_sharp_features_uv_map=False): """ Creates a 2D map of instrumental noise for a single observation slice. This function simulates the thermal noise from a radio interferometer, taking into account the UV coverage and weighting scheme, and produces a noise map in the image domain. Parameters ---------- ncells : int The number of grid cells in each spatial dimension. z : float The redshift of the observation. depth_mhz : float The observational bandwidth in MHz. obs_time : float, optional The total on-source observation time in hours. subarray_type : str, optional The telescope layout configuration name. boxsize : float, optional The comoving size of the simulation box in Mpc. total_int_time, int_time, declination : float, optional Parameters for `get_uv_map` if `uv_map` is not provided. uv_map, N_ant : various, optional Pre-computed UV map and number of antennas. uv_weighting : {'natural', 'uniform'}, optional The weighting scheme to apply. T_sys : float, callable, or None, optional System temperature in Kelvin. Can be a single float value or a callable function of frequency in MHz. If `None`, a default model approximating the SKA-Low sky temperature plus a receiver temperature is used. Default is `None`. sefd_data : np.ndarray or None, optional An array of known SEFD values for interpolation. If this is provided, `nu_data` must also be given. Default is `None`. nu_data : np.ndarray or None, optional An array of frequencies in MHz corresponding to `sefd_data`. Default is `None`. D_station : float, optional Diameter of the antenna station in meters. Default is 40.0. ep_aperture : float, callable, or None, optional Aperture efficiency. Can be a single float value or a callable function of frequency in MHz. If `None`, a default frequency-dependent model is used. Default is `None`. fft_wrap : bool, optional If True, use a wrapped FFT to handle periodic boundary conditions. verbose : bool, optional If True, enables verbose output. suppress_sharp_features_uv_map : bool or str, optional If a string, specifies a method to smooth the UV map to reduce artifacts. Returns ------- np.ndarray A 2D array representing the noise map in the image domain (in muJy). """ antxyz, N_ant = subarray_type_to_antxyz(subarray_type, verbose=verbose) if uv_map is None: uv_map, N_ant = get_uv_map(ncells, z, subarray_type=antxyz, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination) if N_ant is None: N_ant = antxyz.shape[0] sigma, rms_noi = sigma_noise_radio(z, depth_mhz, obs_time, int_time, uv_map=uv_map, N_ant=N_ant, verbose=False, T_sys=T_sys, sefd_data=sefd_data, nu_data=nu_data, D_station=D_station, ep_aperture=ep_aperture) noise_real = np.random.normal(loc=0.0, scale=rms_noi, size=(ncells, ncells)) noise_imag = np.random.normal(loc=0.0, scale=rms_noi, size=(ncells, ncells)) noise_arr = noise_real + 1.j*noise_imag noise_four = apply_uv_response_noise(noise_arr, uv_map, boxsize=boxsize, uv_weighting=uv_weighting, suppress_sharp_features_uv_map=suppress_sharp_features_uv_map) # noise_four = apply_uv_response_noise_briggs(noise_arr, uv_map, robust=2.0, epsilon=1e-6) win_2d = signal_window(ncells, 'blackmanharris', ndim=2) noise_four = noise_four*np.fft.fftshift(win_2d) if fft_wrap: noise_map = ifft2_wrap(noise_four)*np.sqrt(int_time/3600./obs_time) else: noise_map = np.fft.ifft2(noise_four)*np.sqrt(int_time/3600./obs_time) return np.real(noise_map)
def _suppress_sharp_features_uv_map(uv_map, boxsize=None, method='gaussian', filter_param=5.): """ (Internal) Smooths the UV coverage map to mitigate sharp features. Parameters ---------- uv_map : np.ndarray The 2D UV coverage map. boxsize : float, optional The comoving size of the box in Mpc, required for 'binned' method. method : {'gaussian', 'binned'}, optional The smoothing method to use. filter_param : float or int, optional The primary parameter for the smoothing method (e.g., sigma for Gaussian, number of bins for binned). Returns ------- np.ndarray The smoothed UV map. """ if method.lower()=='gaussian': uv_map_smooth = np.sqrt(gaussian_filter(uv_map, filter_param)) uv_map_smooth *= uv_map.max()/uv_map_smooth.max() elif method.lower() in ['binned', 'bin']: assert boxsize is not None box_dims = [boxsize,boxsize] k_nq = np.pi/boxsize*min(uv_map.shape) uv_rad_avg, k_rad_avg, n_modes = radial_average(np.fft.fftshift(uv_map), box_dims, kbins=filter_param, binning='log') k_comp, k_mag = _get_k(uv_map, [boxsize,boxsize]) xx, yy = np.log10(k_rad_avg[k_rad_avg<k_nq/2]), np.log10(uv_rad_avg[k_rad_avg<k_nq/2]) uv_map_smooth = np.fft.fftshift(10**interp1d(xx[np.isfinite(yy)], yy[np.isfinite(yy)], fill_value='extrapolate')(np.log10(k_mag))) else: print(f'{method} to suppress sharp features in the uv maps is not supported. Choose from "gaussian", "binned".') return uv_map_smooth
[docs] def apply_uv_response_noise(noise, uv_map, boxsize=None, uv_weighting='natural', uv_map_min=0.01, suppress_sharp_features_uv_map=False): """ (Internal) Applies the instrumental UV response to a noise array. This function weights the noise in the Fourier domain according to the UV coverage and the chosen weighting scheme. Parameters ---------- noise : np.ndarray The complex noise array in the Fourier domain. uv_map : np.ndarray The 2D UV coverage map. boxsize : float, optional The comoving size of the box, needed if smoothing is applied. uv_weighting : {'natural', 'uniform'}, optional The weighting scheme. uv_map_min : float, optional Threshold below which UV cells are considered un-sampled and zeroed out. suppress_sharp_features_uv_map : bool or str, optional Method to smooth the UV map before applying weights. Returns ------- np.ndarray The noise array after applying the UV response. """ if uv_weighting.lower() in ['nat', 'natural', 'natural_weighting']: if suppress_sharp_features_uv_map: uv_map_smooth = _suppress_sharp_features_uv_map(uv_map, boxsize=boxsize, method=suppress_sharp_features_uv_map, filter_param=15) else: uv_map_smooth = uv_map out = noise/np.sqrt(uv_map_smooth) elif uv_weighting.lower() in ['uni', 'uniform', 'uniform_weighting']: out = noise else: print(f'{uv_weighting} scheme is not known or implemented') out[uv_map<uv_map_min] = 0. return out
[docs] def apply_uv_response_noise_briggs(noise, uv_map, robust=2.0, epsilon=1e-6): """ Applies Briggs weighting to a noise array. Parameters ---------- noise : np.ndarray 2D array of base noise values (assumed uniform before weighting). uv_map : np.ndarray 2D array of natural weights (hit counts). robust : float, optional Briggs robustness parameter (-2 for uniform to +2 for near-natural). epsilon : float, optional A small value to prevent division by zero. Returns ------- np.ndarray Noise map after applying Briggs weighting. """ w_nat = uv_map w_mean = np.mean(w_nat[w_nat > 0]) + epsilon briggs_weight = w_nat / (1 + (w_nat / w_mean)**2 * 10**(2 * robust)) noise_out = noise / np.sqrt(briggs_weight + epsilon) noise_out[w_nat == 0] = 0.0 return noise_out
[docs] def ifft2_wrap(nn1): """ Performs a 2D inverse FFT with wrapping to handle boundaries. This is useful for avoiding artifacts when the data is not perfectly periodic. Parameters ---------- nn1 : np.ndarray The 2D array in Fourier space. Returns ------- np.ndarray The central part of the inverse transformed array. """ assert nn1.ndim==2 bla0 = np.vstack((nn1,nn1)) bla1 = np.roll(bla0, nn1.shape[0]/2, 0) bla2 = np.hstack((bla1,bla1)) bla3 = np.roll(bla2, nn1.shape[1]/2, 1) imap = np.fft.ifft2(bla3) return imap[nn1.shape[0]/2:-nn1.shape[0]/2,nn1.shape[1]/2:-nn1.shape[1]/2]
[docs] def apply_uv_response_on_image(array, uv_map): """ Simulates the effect of incomplete UV coverage on a true sky image. This function mimics the observation process by transforming an image to the Fourier (UV) domain, setting all un-sampled UV cells to zero, and transforming back to the image domain. Parameters ---------- array : np.ndarray The 2D input image array. uv_map : np.ndarray The 2D UV coverage map, where non-zero values indicate sampled cells. Returns ------- np.ndarray The resulting "dirty" image after applying the UV mask. """ assert array.shape == uv_map.shape img_arr = np.fft.fft2(array) img_arr[uv_map==0] = 0 img_map = np.fft.ifft2(img_arr) return np.real(img_map)
[docs] def max_baseline_to_max_k(redshift, max_baseline): """ Converts a maximum physical baseline length to a maximum k-mode. Parameters ---------- redshift : float The redshift of observation. max_baseline : float or astropy.units.Quantity The maximum baseline length of the telescope, assumed in km if no units are given. Returns ------- astropy.units.Quantity The corresponding maximum k-mode (spatial frequency). """ if not isinstance(max_baseline, (u.Quantity)): max_baseline *= u.km lcut = (1 + redshift) * (21 * u.cm / max_baseline).to('') * cm.z_to_cdist(redshift) kcut = np.pi / lcut return kcut
[docs] def get_uv_map(ncells, z, subarray_type="AA4", total_int_time=6., int_time=10., boxsize=None, declination=-30., include_mirror_baselines=True, verbose=True, max_baseline=None): """ Generates a 2D UV coverage map for a given observation. This function simulates a daily observation to calculate the UV coverage, optionally applying a cut based on maximum baseline length. Parameters ---------- ncells : int Number of cells in the grid. z : float Redshift of observation. subarray_type : str, optional The telescope layout configuration name. total_int_time, int_time, declination : float, optional Observation parameters. boxsize : float, optional Comoving size of the simulation box in Mpc. include_mirror_baselines : bool, optional If True, includes mirrored baselines. verbose : bool, optional If True, enables verbose output. max_baseline : float, optional The maximum baseline length in km. If provided, UV tracks beyond the corresponding k-mode are cut. Returns ------- uv_map : np.ndarray The 2D array of gridded UV coverage. N_ant : int The number of antennas. """ if boxsize is None: boxsize = conv.LB antxyz, N_ant = subarray_type_to_antxyz(subarray_type, verbose=verbose) uv_map, N_ant = get_uv_daily_observation(ncells, z, antxyz, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination, include_mirror_baselines=include_mirror_baselines, verbose=verbose) if max_baseline is not None: kcut = max_baseline_to_max_k(z, max_baseline) kk = np.linspace(-ncells/2, ncells/2, ncells) * np.pi / boxsize kx, ky = np.meshgrid(kk, kk) ksq = kx**2 + ky**2 uv_map_shift = np.fft.fftshift(uv_map) uv_map_shift[ksq>kcut**2] = 0.0 uv_map = np.fft.fftshift(uv_map_shift) return uv_map, N_ant
[docs] def make_uv_map_lightcone(ncells, zs, subarray_type="AA4", total_int_time=6., int_time=10., boxsize=None, declination=-30., verbose=True): """ Creates a 3D cube of UV maps, one for each redshift slice. Parameters ---------- ncells : int Number of cells in the grid. zs : np.ndarray An array of redshift values for the lightcone. subarray_type, total_int_time, int_time, declination, boxsize : various, optional Observational parameters passed to `get_uv_map`. verbose : bool, optional If True, enables verbose output. Returns ------- uv_lc : np.ndarray A 3D array `(ncells, ncells, n_redshifts)` containing the UV maps. N_ant : int The number of antennas. """ uv_lc = np.zeros((ncells,ncells,zs.shape[0])) percc = np.round(100./zs.shape[0],decimals=2) for i in range(zs.shape[0]): z = zs[i] uv_map, N_ant = get_uv_map(ncells, z, subarray_type=subarray_type, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination, verbose=verbose) uv_lc[:,:,i] = uv_map print("\nThe lightcone has been constructed upto %.1f per cent." %(i*percc)) return uv_lc, N_ant
[docs] def apply_uv_response_on_coeval(array, z, subarray_type="AA4", boxsize=None, total_int_time=6., int_time=10., declination=-30., uv_map=None, N_ant=None): """ Applies the UV response slice-by-slice to a 3D coeval data cube. Parameters ---------- array : np.ndarray The input 3D data cube `(nx, ny, nz)`. z, subarray_type, boxsize, etc. : various, optional Parameters passed to `get_uv_map` if `uv_map` is not provided. Returns ------- np.ndarray The observed 3D data cube after applying the UV mask to each slice. """ ncells = array.shape[-1] if boxsize is None: boxsize = conv.LB if uv_map is None: uv_map, N_ant = get_uv_map(ncells, z, subarray_type=subarray_type, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination) data3d = np.zeros(array.shape) print("Applying the uv tracks to the the coeval signal cube...") for k in tqdm(range(ncells)): data2d = apply_uv_response_on_image(array[:,:,k], uv_map=uv_map) data3d[:,:,k] = data2d return data3d
[docs] def noise_cube_coeval(ncells, z, depth_mhz=None, obs_time=1000, subarray_type="AA4", boxsize=None, total_int_time=6., int_time=10., declination=-30., uv_map=None, N_ant=None, uv_weighting='natural', verbose=True, fft_wrap=False, T_sys=None, sefd_data=None, nu_data=None, D_station=40., ep_aperture=None, suppress_sharp_features_uv_map=False): """ Generates a 3D coeval cube of instrumental noise. This function simulates a noise cube where the noise properties are constant along the line-of-sight (frequency) axis. Parameters ---------- ncells, z, obs_time, etc. : various Parameters passed to `noise_map` for each slice. depth_mhz : float, optional The bandwidth per channel. If None, it is calculated from `boxsize`. Returns ------- np.ndarray A 3D cube `(ncells, ncells, ncells)` of instrumental noise in mK. """ antxyz, N_ant = subarray_type_to_antxyz(subarray_type, verbose=verbose) if boxsize is None: boxsize = conv.LB if depth_mhz is None: depth_mhz = (cm.z_to_nu(cm.cdist_to_z(cm.z_to_cdist(z)-boxsize/2))-cm.z_to_nu(cm.cdist_to_z(cm.z_to_cdist(z)+boxsize/2)))/ncells if uv_map is None: uv_map, N_ant = get_uv_map(ncells, z, subarray_type=antxyz, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination) # ncells = int(ncells); print(ncells) noise3d = np.zeros((ncells,ncells,ncells)) if verbose: print("Creating the noise cube...") sleep(1) for k in tqdm(range(ncells), disable=not verbose): noise2d = noise_map(ncells, z, depth_mhz, obs_time=obs_time, subarray_type=antxyz, boxsize=boxsize, total_int_time=total_int_time, int_time=int_time, declination=declination, uv_map=uv_map, N_ant=N_ant, uv_weighting=uv_weighting, fft_wrap=fft_wrap, T_sys=T_sys, sefd_data=sefd_data, nu_data=nu_data, D_station=D_station, ep_aperture=ep_aperture, suppress_sharp_features_uv_map=suppress_sharp_features_uv_map) noise3d[:,:,k] = noise2d if verbose: print("...noise cube created.") return jansky_2_kelvin(noise3d, z, boxsize=boxsize)
[docs] def noise_cube_lightcone(ncells, z, obs_time=1000, subarray_type="AA4", boxsize=None, save_uvmap=None, total_int_time=6., int_time=10., declination=-30., N_ant=None, uv_weighting='natural', fft_wrap=False, verbose=True, n_jobs=4, checkpoint=64, T_sys=None, sefd_data=None, nu_data=None, D_station=40., ep_aperture=None, suppress_sharp_features_uv_map=False): """ Generates a 3D lightcone of instrumental noise around a central redshift. This function is ideal for single redshift studies with a narrow bandwidth, where each slice along the line-of-sight corresponds to a slightly different redshift and frequency. Parameters ---------- ncells : int The grid size. z : float The central redshift of the lightcone. obs_time, subarray_type, etc. : various Observational parameters. save_uvmap : str, optional File path to save or load pre-computed UV maps to speed up re-runs. n_jobs, checkpoint : int, optional Parameters for parallel processing of UV map generation. Returns ------- np.ndarray A 3D `(ncells, ncells, ncells)` lightcone of instrumental noise in mK. """ antxyz, N_ant = subarray_type_to_antxyz(subarray_type, verbose=verbose) if boxsize is None: boxsize = conv.LB zs = cm.cdist_to_z(np.linspace(cm.z_to_cdist(z)-boxsize/2, cm.z_to_cdist(z)+boxsize/2, ncells)) # This function body is very similar to `noise_lightcone`. Consider refactoring. return noise_lightcone(ncells, zs, obs_time, subarray_type, boxsize, save_uvmap, total_int_time, int_time, declination, N_ant, uv_weighting, fft_wrap, verbose, n_jobs, checkpoint, T_sys, sefd_data, nu_data, D_station, ep_aperture, suppress_sharp_features_uv_map)
[docs] def noise_lightcone(ncells, zs, obs_time=1000, subarray_type="AA4", boxsize=None, save_uvmap=None, total_int_time=6., int_time=10., declination=-30., uv_weighting='natural', fft_wrap=False, verbose=True, n_jobs=4, checkpoint=16, T_sys=None, sefd_data=None, nu_data=None, D_station=40., ep_aperture=None, suppress_sharp_features_uv_map=False): """ Generates a 3D lightcone of instrumental noise over a list of redshifts. Each slice along the line-of-sight corresponds to a different redshift, and noise properties are calculated accordingly. This function first calls `get_uv_map_lightcone` to efficiently generate or load all required UV maps. Parameters ---------- ncells : int The number of grid cells along each spatial dimension. zs : np.ndarray or list An array of redshift values defining the slices of the lightcone. obs_time : float, optional Total observation time in hours. Default is 1000. subarray_type : str, optional The type of telescope subarray (e.g., "AA4"). Default is "AA4". boxsize : float, optional The comoving size of the simulation box in Mpc. If None, a default cosmological value is used. save_uvmap : str, optional File path to save or load the UV coverage maps. Caching these maps can significantly speed up subsequent runs. total_int_time : float, optional Total integration time in hours used for generating the UV coverage. Default is 6. int_time : float, optional The integration time for a single visibility measurement in seconds. Default is 10. declination : float, optional The pointing declination of the telescope in degrees. Default is -30. uv_weighting : str, optional The UV weighting scheme to use (e.g., 'natural', 'uniform'). Default is 'natural'. fft_wrap : bool, optional If True, use `pyfft_wrap` for FFTs, which can be faster. Default is False. verbose : bool, optional If True, print progress bars and status messages. Default is True. n_jobs : int, optional The number of CPU cores to use for parallel generation of UV maps. Default is 4. checkpoint : int, optional Number of redshift slices to process before saving the UV maps to a checkpoint file. Useful for long runs. Default is 16. sefd_data : str, optional Path to a file containing System Equivalent Flux Density (SEFD) data. nu_data : str, optional Path to a file containing frequencies corresponding to the SEFD data. suppress_sharp_features_uv_map : bool, optional If True, applies a suppression filter to the UV map to mitigate sharp features. Default is False. Returns ------- np.ndarray A 3D array of shape `(ncells, ncells, len(zs))` representing the noise lightcone in units of mK. See Also -------- get_uv_map_lightcone : Generates the underlying UV coverage maps. noise_map : Generates a 2D noise map for a single redshift. """ if isinstance(zs, list): zs = np.array(zs) # Step 1: Get all UV maps efficiently. uvs = get_uv_map_lightcone( ncells, zs, subarray_type=subarray_type, total_int_time=total_int_time, int_time=int_time, boxsize=boxsize, declination=declination, save_uvmap=save_uvmap, n_jobs=n_jobs, verbose=verbose, checkpoint=checkpoint, ) N_ant = uvs.get('Nant') # Step 2: Create the noise cube using the pre-computed UV maps. if verbose: print('Creating noise lightcone...') noise3d = np.zeros((ncells, ncells, zs.size)) for k, zi in enumerate(tqdm(zs, desc="Generating noise slices", disable=not verbose)): if k + 1 < zs.size: depth_mhz = np.abs(cm.z_to_nu(zs[k+1]) - cm.z_to_nu(zs[k])) else: depth_mhz = np.abs(cm.z_to_nu(zs[k]) - cm.z_to_nu(zs[k-1])) uv_map = uvs['{:.3f}'.format(zi)] # Note: We pass `subarray_type` but `uv_map` is provided, so a new one won't be generated inside noise_map. noise2d = noise_map( ncells, zi, depth_mhz, obs_time=obs_time, subarray_type=subarray_type, boxsize=boxsize, total_int_time=total_int_time, int_time=int_time, declination=declination, uv_map=uv_map, N_ant=N_ant, uv_weighting=uv_weighting, verbose=False, fft_wrap=fft_wrap, T_sys=T_sys, sefd_data=sefd_data, nu_data=nu_data, D_station=D_station, ep_aperture=ep_aperture, suppress_sharp_features_uv_map=suppress_sharp_features_uv_map ) noise3d[:,:,k] = jansky_2_kelvin(noise2d, zi, boxsize=boxsize) return noise3d
[docs] def gauss_kernel_3d(size, sigma=1.0, fwhm=None): """ Generates a 3D normalized Gaussian kernel. Parameters ---------- size : int Width of the output array in pixels. sigma : float, optional The standard deviation of the Gaussian. fwhm : float, optional The Full Width at Half Maximum. Overrides `sigma` if provided. Returns ------- np.ndarray A 3D numpy array with the Gaussian kernel, normalized to sum to 1. """ if fwhm != None: sigma = fwhm/(2.*np.sqrt(2.*np.log(2))) if size % 2 == 0: size = int(size/2) x,y,z = np.mgrid[-size:size, -size:size, -size:size] else: size = int(size/2) x,y,z = np.mgrid[-size:size+1, -size:size+1, -size:size+1] g = np.exp(-(x**2 + y**2 + z**2)/(2.*sigma**2)) return g/g.sum()
[docs] def smooth_gauss_3d(array, fwhm): """ Smooths a 3D array using a Gaussian kernel via FFT convolution. Parameters ---------- array : np.ndarray The 3D input array to be smoothed. fwhm : float The Full Width at Half Maximum of the Gaussian kernel in pixels. Returns ------- np.ndarray The smoothed 3D array. """ gg = gauss_kernel_3d(array.shape[0],fwhm=fwhm) out = scipy.signal.fftconvolve(array, gg) return out