Source code for tools21cm.bubble_stats

'''
Methods to calcluate the sizes of the regions of interest 
and estimate the size distributions.
'''

import numpy as np
from .Friends_of_Friends import FoF_search
from scipy import ndimage
import os,sys
import datetime, time
from . import mfp_np, spa_np, conv, morph 
from .scipy_func import *
from tqdm import tqdm
from joblib import Parallel, delayed
from .usefuls import loading_msg

[docs] def fof(data, xth=0.5, connectivity=1): """ Determines the sizes using the friends-of-friends approach. It assumes the length of the grid as the linking length. Parameters ---------- data: ndarray The array containing the input data xth: float The threshold value (Default: 0.5) Returns ------- map: ndarray array with each regions of interest label sizes: list all the sizes """ use_skimage=True t1 = datetime.datetime.now() data = (data>=xth) if 'skimage' in sys.modules and use_skimage: from skimage import morphology out_map = morphology.label(data, connectivity=connectivity) elements, size_list = np.unique(out_map, return_counts=True) size_list = size_list[1:] else: out_map, size_list = FoF_search(data, xth) t2 = datetime.datetime.now() runtime = (t2-t1).total_seconds()/60 print("Program runtime: %f minutes." %runtime) print("The output is a tuple containing output-map and volume-list array respectively.") return out_map, size_list
[docs] def spa(data, xth=0.95, boxsize=None, nscales=20, upper_lim=False, binning='log', verbose=True, use_fftconvolve=True): """ Determines the sizes using the Spherical-Averege (SPA) approach. Parameters ---------- input : ndarray 3D array of ionization fraction. xth : float The threshold value (Default: 0.5). boxsize : float The boxsize in cMpc can be given (Default: conv.LB). nscales : int The number of different radii to consider (Default: 20). upper_lim : bool It decides if the threshold is the upper limit or the lower limit (Default: True). verbose : bool Print verbose (Default: True). use_fftconvolve : bool Use FFT-based convolution (Default: True). Returns ------- r : ndarray sizes of the regions dn : ndarray probability of finding the corresponding size """ if boxsize is None: boxsize = conv.LB if (upper_lim): data = -1.*data xth = -1.*xth rs, ni = spa_np.spa_np(data, xth=xth, binning=binning, nscales=nscales, verbose=verbose, use_fftconvolve=use_fftconvolve) rs_ = rs*boxsize/data.shape[0] ni_ = ni/np.sum(ni) return rs_, ni_
[docs] def mfp(data, xth=0.5, boxsize=None, iterations=10000000, verbose=True, upper_lim=False, bins=None, r_min=None, r_max=None): """ Determines the sizes using the Mean-Free-Path (MFP) approach. Parameters ---------- input : ndarray 2D/3D array of ionization fraction/brightness temperature. xth : float The threshold value (Default: 0.5). boxsize : float The boxsize in cMpc can be given (Default: conv.LB). iterations: int Number of iterations (Default: 10_000_000). verbose : bool It prints the progress of the program (Default: True). upper_lim : bool It decides if the threshold is the upper limit or the lower limit (Default: False). bins : int Give number of bins or an array of sizes to re-bin into (Default: None). r_min : float Minimum size after rebinning (Default: None). r_max : float Maximum size after rebinning (Default: None). Returns ------- r : ndarray sizes of the regions dn : ndarray probability of finding the corresponding size """ iterations = int(iterations) if boxsize is None: boxsize = conv.LB print('Boxsize is set to %.2f Mpc.'%boxsize) dim = len(data.shape) t1 = datetime.datetime.now() if (upper_lim): data = -1.*data xth = -1.*xth check_box = (data>=xth).sum() if verbose: print(f'{check_box}/{data.size} cells are marked as region of interest (ROI).') if check_box==0: data = np.ones(data.shape) iterations = 3 if dim == 2: if verbose: print("MFP method applied on 2D data.") out = mfp_np.mfp2d(data, xth, iterations=iterations, verbose=verbose) elif dim == 3: if verbose: print("MFP method applied on 3D data.") out = mfp_np.mfp3d(data, xth, iterations=iterations, verbose=verbose) else: if verbose: print("The data doesn't have the correct dimension") return 0 nn = out[0]/iterations rr = out[1] t2 = datetime.datetime.now() runtime = (t2-t1).total_seconds()/60 if verbose: print("\nProgram runtime: %.2f minutes." %runtime) if check_box==0: if verbose: print("There is no ROI in the data. Therefore, the number density of all the sizes are zero.") # return rr*boxsize/data.shape[0], np.zeros(rr.shape) nn = np.zeros(rr.shape) if verbose: print("The output contains a tuple with three values: r, rdP/dr") if verbose: print("The curve has been normalized.") r0,p0 = rr*boxsize/data.shape[0], rr*nn #rr[nn.argmax()]*boxsize/data.shape[0] if bins is not None: r0,p0 = rebin_bsd(r0, p0, bins=bins, r_min=r_min, r_max=r_max) return r0, p0
[docs] def averaged_mfp(data, xth=0.5, boxsize=None, iterations=10000000, rays_per_point=10, verbose=True, upper_lim=False, bins=None, r_min=None, r_max=None, n_jobs=-1): """ Determines the sizes using the averaged Mean-Free-Path (MFP) approach. Parameters ---------- input : ndarray 2D/3D array of ionization fraction/brightness temperature. xth : float The threshold value (Default: 0.5). boxsize : float The boxsize in cMpc can be given (Default: conv.LB). iterations: int Number of iterations (Default: 10_000_000). iterations: int Number of iterations (Default: 10). verbose : bool It prints the progress of the program (Default: True). upper_lim : bool It decides if the threshold is the upper limit or the lower limit (Default: False). bins : int Give number of bins or an array of sizes to re-bin into (Default: None). r_min : float Minimum size after rebinning (Default: None). r_max : float Maximum size after rebinning (Default: None). Returns ------- r : ndarray sizes of the regions dn : ndarray probability of finding the corresponding size """ iterations = int(iterations) rays_per_point = int(rays_per_point) if boxsize is None: boxsize = conv.LB print('Boxsize is set to %.2f Mpc.'%boxsize) dim = len(data.shape) t1 = datetime.datetime.now() if (upper_lim): data = -1.*data xth = -1.*xth check_box = (data>=xth).sum() if check_box==0: data = np.ones(data.shape) iterations = 3 if dim == 3: ar = np.zeros_like(data) ar[data >= xth] = 1 loc = np.argwhere(ar == 1) rand_loc = np.random.randint(0, high=loc.shape[0], size=iterations) xs, ys, zs = loc[rand_loc, 0], loc[rand_loc, 1], loc[rand_loc, 2] print("MFP method applied on 3D data") def compute_ray_length(j): point = [xs[j], ys[j], zs[j]] num_szj, size_pxj = mfp_np.mfp3d(data, xth, iterations=rays_per_point, verbose=False, point=point) return np.sum(num_szj * size_pxj) / np.sum(num_szj) ray_list = Parallel(n_jobs=n_jobs)(delayed(compute_ray_length)(j) for j in tqdm(range(iterations), disable=not verbose)) ray_list = np.array(ray_list) size_px, num_sz = np.unique(ray_list, return_counts=1) out = [num_sz, size_px] else: print("The data doesn't have the correct dimension") return 0 if out is None: return None # nn = out[0]/iterations # rr = out[1] ht = np.histogram(np.log(ray_list[ray_list>0]), density=True, bins=50) nn, rr = ht[0], np.exp(ht[1][1:]/2+ht[1][:-1]/2) t2 = datetime.datetime.now() runtime = (t2-t1).total_seconds()/60 print("Program runtime: %f minutes." %runtime) if check_box==0: print("There is no ROI in the data. Therefore, the number density of all the sizes are zero.") # return rr*boxsize/data.shape[0], np.zeros(rr.shape) nn = np.zeros(rr.shape) print("The output contains a tuple with three values: r, rdP/dr") print("The curve has been normalized.") # Return radii (in physical units) and fraction of side lines which # have this side line. This is different from a bubble size distribution! r0,p0 = rr*boxsize/data.shape[0], nn #rr[nn.argmax()]*boxsize/data.shape[0] if bins is not None: r0,p0 = rebin_bsd(r0, p0, bins=bins, r_min=r_min, r_max=r_max) return r0, p0
[docs] def mfp_from_point(data, point, xth=0.5, boxsize=None, iterations=10000000, verbose=True, upper_lim=False, bins=None, r_min=None, r_max=None): """ Determines the sizes using the Mean-Free-Path (MFP) approach. Parameters ---------- input : ndarray 2D/3D array of ionization fraction/brightness temperature. point : ndarray or list [x, y, z] of the point. xth : float The threshold value (Default: 0.5). boxsize : float The boxsize in cMpc can be given (Default: conv.LB). iterations: float Number of iterations (Default: 1e7). verbose : bool It prints the progress of the program (Default: True). upper_lim : bool It decides if the threshold is the upper limit or the lower limit (Default: False). bins : int Give number of bins or an array of sizes to re-bin into (Default: None). r_min : float Minimum size after rebinning (Default: None). r_max : float Maximum size after rebinning (Default: None). Returns ------- r : ndarray sizes of the regions dn : ndarray probability of finding the corresponding size """ if boxsize is None: boxsize = conv.LB print('Boxsize is set to %.2f Mpc.'%boxsize) dim = len(data.shape) t1 = datetime.datetime.now() if (upper_lim): data = -1.*data xth = -1.*xth check_box = (data>=xth).sum() if check_box==0: data = np.ones(data.shape) iterations = 3 if dim == 2: print("MFP method applied on 2D data") out = mfp_np.mfp2d(data, xth, iterations=iterations, verbose=verbose, point=point) elif dim == 3: print("MFP method applied on 3D data") out = mfp_np.mfp3d(data, xth, iterations=iterations, verbose=verbose, point=point) else: print("The data doesn't have the correct dimension") return 0 if out is None: return None nn = out[0]/iterations rr = out[1] t2 = datetime.datetime.now() runtime = (t2-t1).total_seconds()/60 print("Program runtime: %f minutes." %runtime) if check_box==0: print("There is no ROI in the data. Therefore, the number density of all the sizes are zero.") # return rr*boxsize/data.shape[0], np.zeros(rr.shape) nn = np.zeros(rr.shape) print("The output contains a tuple with three values: r, rdP/dr") print("The curve has been normalized.") # Return radii (in physical units) and fraction of side lines which # have this side line. This is different from a bubble size distribution! r0,p0 = rr*boxsize/data.shape[0], nn #rr[nn.argmax()]*boxsize/data.shape[0] if bins is not None: r0,p0 = rebin_bsd(r0, p0, bins=bins, r_min=r_min, r_max=r_max) return r0, p0
def rebin_bsd(rr, pp, bins=10, r_min=None, r_max=None): fp = interp1d(rr, pp, kind='cubic') if np.array(bins).size == 1: if r_min is None: r_min = rr.min()+1 if r_max is None: r_max = rr.max()-10 rs = 10**np.linspace(np.log10(r_min), np.log10(r_max), bins) else: rs = np.array(bins) return rs, fp(rs)
[docs] def dist_from_volumes(sizes, resolution=1., bins=100, null_factor=None): """ Volume distribution and effective radius distribution. Parameters ---------- sizes : list List of volumes in pixel**3 units. resolution : float Distance between two pixels in cMpc (Default: 1). bins : int Number of bins of volumes in log space. Returns ------- v : ndarray volumes of the regions Pv : ndarray probability of finding the corresponding volume r : ndarray effective radii of the regions Pr : ndarray probability of finding the corresponding effective radius """ vols = np.array(sizes) radii = (vols*3./4./np.pi)**(1./3.) ht_v = np.histogram(np.log10(vols), bins=bins) ht_r = np.histogram(np.log10(radii), bins=bins) vs, d_v = np.zeros(len(ht_v[0])+1), np.zeros(len(ht_v[0])+1) vs = 10.**ht_v[1]*resolution**3 d_v[:-1] = 1.*ht_v[0]/np.sum(ht_v[0]) if null_factor is None: dummy = d_v[d_v!=0].min()/1000. else: dummy = null_factor d_v[d_v==0] = dummy rs, d_r = np.zeros(len(ht_r[0])+1), np.zeros(len(ht_r[0])+1) rs = 10.**ht_r[1]*resolution d_r[1:] = 1.*ht_r[0]/np.sum(ht_r[0]) if null_factor is None: d_r[0] = d_r[d_r!=0].min()/1000. else: d_r[0] = null_factor print("The output is a tuple conatining 4 numpy array: V, VdP/dV, r, rdp/dr.") return vs, d_v, rs, d_r
def get_distribution(array, resolution=1., bins=100, sizes=False): if sizes: sizes = array else: mn, mx = array.min(), array.max() sizes = np.arange(mx)+1. for i in range(int(mx)): label = i+1 sizes[i] = len(array[array==label]) print(label,) ht = np.histogram(np.log(sizes), bins=bins) vols, dist = np.zeros(len(ht[0])+1), np.zeros(len(ht[0])+1) vols = np.exp(ht[1])*resolution dist[:-1] = ht[0] return sizes, dist/np.sum(dist), vols def plot_fof_sizes(sizes, bins=100, boxsize=None, normalize='box'): lg = np.log10(np.array(sizes)) ht = np.histogram(lg, bins=bins) xx = 10**ht[1] yy = ht[0]*xx[:-1] if boxsize is None: boxsize = conv.LB if normalize=='ionized': zz = yy/np.sum(yy) else: zz = yy/boxsize**3 dummy = zz[zz!=0].min()/10. zz[zz==0] = dummy zz = np.hstack((zz,dummy)) print("The output is Size, Size**2 dP/d(Size), lowest value") return xx, zz, dummy def disk_structure(n): struct = np.zeros((2*n+1, 2*n+1, 2*n+1)) x, y, z = np.indices((2*n+1, 2*n+1, 2*n+1)) mask = (x - n)**2 + (y - n)**2 + (z - n)**2 <= n**2 struct[mask] = 1 return struct.astype(np.bool) def granulometry_CDF(data, sizes=None, verbose=True, n_jobs=1): s = max(data.shape) if sizes is None: sizes = np.arange(1, s/2, 2) sizes = sizes.astype(int) # granulo = np.zeros((len(sizes))) np.random.shuffle(sizes) verbose = True if verbose: # func = lambda n: ndimage.binary_opening(data, structure=disk_structure(sizes[n])).sum() func = lambda n: morph.binary_opening(data, structure=disk_structure(sizes[n])).sum() granulo = np.array(Parallel(n_jobs=n_jobs)(delayed(func)(i) for i in tqdm(range(len(sizes))) )) # for n in tqdm(range(len(sizes))): granulo[n] = ndimage.binary_opening(data, structure=disk_structure(sizes[n])).sum() print("Completed.") arg_sort = np.argsort(sizes) return granulo[arg_sort]
[docs] def granulometry_bsd(data, xth=0.5, boxsize=None, verbose=True, upper_lim=False, sampling=2, log_bins=None, n_jobs=1): """ Determined the sizes using the Granulometry (Gran) approach. It is based on Kakiichi et al. (2017) Parameters ---------- input : ndarray 2D/3D array of ionization fraction/brightness temperature. xth : float The threshold value (Default: 0.5). boxsize : float The boxsize in cMpc can be given (Default: conv.LB). verbose : bool It prints the progress of the program (Default: True). upper_lim : bool It decides if the threshold is the upper limit or the lower limit (Default: False). sampling : int Give the resolution of the radii in the pixel units (Default: 2). n_jobs : int Give the number of CPUs. Returns ------- r : ndarray sizes of the regions dn : ndarray probability of finding the corresponding size """ t1 = datetime.datetime.now() if boxsize is None: boxsize = conv.LB if (upper_lim): data = -1.*data xth = -1.*xth mask = data > xth # if log_bins is not None: sz = np.unique((10**np.linspace(0, np.log10(data.shape[0]/4), log_bins)).astype(int)) # else: sz = np.arange(1, data.shape[0]/4, sampling) # granulo = granulometry_CDF(mask, sizes=sz, verbose=verbose, n_jobs=n_jobs) # rr = (sz*boxsize/data.shape[0])[:-1] # nn = np.array([(granulo[i]-granulo[i+1])/np.abs(sz[i]-sz[i+1]) for i in range(len(granulo)-1)]) if n_jobs>1: print('Parallelization not implemented yet.') area, dFdR, R = _granulometry(mask, verbose=verbose) Rs = (R*boxsize/data.shape[0]) dFdlnR = R*dFdR t2 = datetime.datetime.now() runtime = (t2-t1).total_seconds()/60 if verbose: # print("\nProgram runtime: %f minutes." %runtime) print("The output contains a tuple with three values: r, rdP/dr, dP/dr") print("The curve has been normalized.") return Rs, dFdlnR, dFdR
def _granulometry(data, n_jobs=1, verbose=True): def disk(n): struct = np.zeros((2 * n + 1, 2 * n + 1)) x, y = np.indices((2 * n + 1, 2 * n + 1)) mask = (x - n)**2 + (y - n)**2 <= n**2 struct[mask] = 1 return struct.astype(np.bool) def ball(n): struct = np.zeros((2*n+1, 2*n+1, 2*n+1)) x, y, z = np.indices((2*n+1, 2*n+1, 2*n+1)) mask = (x - n)**2 + (y - n)**2 + (z - n)**2 <= n**2 struct[mask] = 1 return struct.astype(np.bool) s = max(data.shape) dim = data.ndim pixel = range(np.int(s/2)) area0 = np.float(data.sum()) area = np.zeros_like(pixel) def func(n): if dim == 2: opened_data = morph.binary_opening(data,structure=disk(n)) if dim == 3: opened_data = morph.binary_opening(data,structure=ball(n)) return float(opened_data.sum()) if n_jobs>1: area = np.array(Parallel(n_jobs=n_jobs)(delayed(func)(i) for i in tqdm(pixel) )) else: t1 = time.time() for n in pixel: # print 'binary opening the data with radius =',n,' [pixels]' if verbose: loading_msg('R = {} pixels | time elapsed {:.1f} s'.format(n, (time.time()-t1))) area[n] = func(n) if area[n] == 0: break area = area.astype(float) # print(area) pattern_spectrum = np.append((area[:-1]-area[1:]).astype(float)/area[0], 0) # print(pattern_spectrum) return area, pattern_spectrum, np.array(pixel) from skimage.segmentation import watershed from skimage.feature import peak_local_max from skimage.morphology import reconstruction def h_transform(image, h, verbose=True): if verbose: print('Applying h-transform...') seed = np.copy(image) if image.ndim==3: seed[1:-1, 1:-1, 1:-1] = image.min() else: seed[1:-1, 1:-1] = image.min() mask = image seed = image - h dilated = reconstruction(seed, mask, method='dilation') image = image - dilated if verbose: print('...done') return image def _watershed(image, h=None, threshold_rel=None, footprint=None, verbose=True): distance = ndimage.distance_transform_edt(image) if h is not None: distance1 = h_transform(distance, h, verbose=verbose) if footprint is None: footprint = np.ones((3, 3)) if image.ndim==2 else np.ones((3,3,3)) if verbose: print('Finding watersheds...') coords = peak_local_max(distance if h is None else distance1, footprint=footprint, labels=image, threshold_rel=threshold_rel) mask = np.zeros(distance.shape, dtype=bool) mask[tuple(coords.T)] = True markers, _ = ndimage.label(mask) labels = watershed(-distance if h is None else -distance1, markers, mask=image) if verbose: print('...done') return labels
[docs] def watershed_bsd(data, xth=0.5, boxsize=None, verbose=True, upper_lim=False, h=None, n_jobs=1): """ Still needs testing. Determined the sizes using the Watershed approach. It is based on Lin et al. (2016) Parameters ---------- input : ndarray 2D/3D array of ionization fraction/brightness temperature. xth : float The threshold value (Default: 0.5). boxsize : float The boxsize in cMpc can be given (Default: conv.LB). verbose : bool It prints the progress of the program (Default: True). upper_lim : bool It decides if the threshold is the upper limit or the lower limit (Default: False). h : float Parameter for performing h-transform to smooth out local peaks. n_jobs : int Give the number of CPUs. Returns ------- r : ndarray sizes of the regions dn : ndarray probability of finding the corresponding size """ t1 = datetime.datetime.now() if boxsize is None: boxsize = conv.LB if (upper_lim): data = -1.*data xth = -1.*xth mask = data > xth # if log_bins is not None: sz = np.unique((10**np.linspace(0, np.log10(data.shape[0]/4), log_bins)).astype(int)) # else: sz = np.arange(1, data.shape[0]/4, sampling) # granulo = granulometry_CDF(mask, sizes=sz, verbose=verbose, n_jobs=n_jobs) # rr = (sz*boxsize/data.shape[0])[:-1] # nn = np.array([(granulo[i]-granulo[i+1])/np.abs(sz[i]-sz[i+1]) for i in range(len(granulo)-1)]) if n_jobs>1: print('Parallelization not implemented yet.') labels = _watershed(mask, h=h) uniq = np.unique(labels[labels>0], return_counts=1) r_list = np.cbrt(3*uniq[0]/4/np.pi) r_pixl = np.arange(0, np.int(r_list.max()+2), 1)-0.5 ht = np.histogram(r_list, bins=r_pixl) dFdR, R = ht[0]/ht[0].sum(), ht[1][1:]/2+ht[1][:-1]/2 Rs = (R*boxsize/data.shape[0]) dFdlnR = R*dFdR t2 = datetime.datetime.now() runtime = (t2-t1).total_seconds()/60 # print("\nProgram runtime: %f minutes." %runtime) print("The output contains a tuple with three values: r, rdP/dr, dP/dr") print("The curve has been normalized.") return Rs, dFdlnR, dFdR