Source code for nispace.nulls

import nibabel as nib
import numpy as np
import pandas as pd
import warnings
from joblib import Parallel, delayed
from nilearn.image import resample_img
from neuromaps.images import load_gifti, load_nifti
from neuromaps.nulls.nulls import batch_surrogates
from neuromaps.nulls.nulls import _get_distmat
from neuromaps.datasets import fetch_fsaverage
from scipy.spatial.distance import cdist
from scipy.stats import zscore
from sklearn.preprocessing import minmax_scale
from tqdm.auto import tqdm
from numba import njit

# import MoranRandomization function, copied from brainspace, as our default null model
# brainspace was removed as an dependency because it installs vtk, which is a large 3d rendering
# library that NiSpace does not use. 
from .modules.brainspace_moran import MoranRandomization
    
# brainsmash is optional dependency. Moran
try:
    from brainsmash.mapgen import Base
    _BRAINSMASH_AVAILABLE = True
except ImportError:
    _BRAINSMASH_AVAILABLE = False

from . import lgr
from .stats.coloc import corr
from .utils.utils import (set_log, mirror_nifti, mirror_gifti, vect_to_vol_arr, vol_to_vect_arr,
                          _corr_vector)


def _dist_mat_from_coords(coords, dtype=np.float32):
    dist_mat = np.zeros((coords.shape[0], coords.shape[0]), dtype=dtype)
    for i, row in enumerate(coords):
        dist_mat[i] = cdist(row[None], coords).astype(dtype)   
    return dist_mat

def _img_density_for_neuromaps(img):
    if isinstance(img, nib.GiftiImage):
        img = (img,)
    if isinstance(img, nib.Nifti1Image):
        return f"{np.round((img.affine[0,0])):.0f}mm"
    elif isinstance(img, tuple):
        return f"{np.round((img[0].agg_data().shape[0]/1000)):.0f}k"
    else:
        raise ValueError(f"Provide input of type nib.Nifti1Image or (tuple of) nib.GiftiImage(s)!")
    
def _img_space_for_neuromaps(img):
    if isinstance(img, nib.GiftiImage):
        img = (img,)
    if isinstance(img, nib.Nifti1Image):
        return "mni152"
    elif isinstance(img, tuple):
        return "fsaverage"
    else:
        raise ValueError(f"Provide input of type nib.Nifti1Image or (tuple of) nib.GiftiImage(s)!")

def _get_null_data_mask(data_1d, dist_mat):
    med = np.isinf(dist_mat + np.diag([np.inf] * len(dist_mat))).all(axis=1)
    mask = np.logical_not(np.logical_or(np.isnan(data_1d), med))
    return mask

def _symmetrize_nans(data_1d, idc):
    # check dtype
    if not isinstance(data_1d, (np.ndarray, pd.Series)) or not isinstance(idc, (list, tuple)):
        raise ValueError("'data_1d' must be a numpy array or pandas Series and 'idc' must be a list or tuple!")
    # check length
    if len(data_1d) != len(np.concatenate(idc)):
        raise ValueError("Length of 'data_1d' must match sum of number of elements in 'idc'!")
    # check if all idc have the same length
    if not all(len(i_idc) == len(idc[0]) for i_idc in idc):
        raise ValueError("All elements in 'idc' must have the same length!")
    # symmetrize
    data_1d = np.array(data_1d)
    isnan = np.full(len(idc[0]), False)
    for i_idc in idc:
        isnan = np.logical_or(isnan, np.isnan(data_1d[i_idc]))
    for i_idc in idc:
        data_1d[i_idc] = np.where(isnan, np.nan, data_1d[i_idc])
    # return
    return data_1d

[docs]def correlate_hemis_parc(data, parc_idc_lh=None, parc_idc_rh=None, l2rmap=None, rank=False): data = np.atleast_2d(np.array(data)) parc_idc_lh = np.array(parc_idc_lh) parc_idc_rh = np.array(parc_idc_rh) n = data.shape[1] n_hemi = n // 2 if n / 2 != n_hemi: raise ValueError("Data must have an even number of parcels") if parc_idc_lh is None: parc_idc_lh = np.arange(n_hemi) if parc_idc_rh is None: parc_idc_rh = np.arange(n_hemi, n) data_lh = data[:, parc_idc_lh] data_rh = data[:, parc_idc_rh] if l2rmap is not None: l2rmap = np.nan_to_num(np.array(l2rmap)).astype(data.dtype) for i in range(data.shape[0]): data_lh[i, :] = _apply_l2rmap(data[i, :], l2rmap, parc_idc_lh, parc_idc_rh)[parc_idc_rh] r = [] for i in range(data.shape[0]): lh, rh = data_lh[i,:], data_rh[i,:] notnan = ~(np.isnan(lh) | np.isnan(rh)) r.append(corr(lh[notnan], rh[notnan], rank=rank)) return np.array(r)
# @njit(cache=True) # def _apply_l2rmap(data_1d, l2rmap, parc_idc_lh, parc_idc_rh): # data_lh_1d = data_1d[parc_idc_lh] # data_mirrored_1d = np.full_like(data_1d, np.nan) # data_mirrored_1d[parc_idc_lh] = data_lh_1d # for i_parcel, idx_parcel in enumerate(parc_idc_rh): # data_mirrored_1d[idx_parcel] = np.nansum( data_lh_1d * l2rmap[:, i_parcel] ) # return data_mirrored_1d @njit(cache=True) def _apply_l2rmap(data_1d, l2rmap, parc_idc_lh, parc_idc_rh): """Note: l2rmap must be without nans, but replacing them with 0s should not be a problem""" data_mirrored_1d = data_1d.copy() data_lh_1d = data_1d[parc_idc_lh] lh_notnan = ~np.isnan(data_lh_1d) data_mirrored_1d[parc_idc_rh] = np.dot(data_lh_1d[lh_notnan], l2rmap[lh_notnan, :]) return data_mirrored_1d def _mirror_parc_maps(data, parc_idc_lh, parc_idc_rh, l2rmap=None, interhemi_correlation=1, seed=None, parc=None, project_to_volume=False, resample_vol=2, n_proc=1): data = np.array(data) if data.ndim == 1: data = data[None, :] data_lh = data[:, parc_idc_lh] data_mirrored = np.full_like(data, np.nan) data_mirrored[:, parc_idc_lh] = data_lh # simple copy of left to right indices if not project_to_volume and l2rmap is None: data_mirrored[:, parc_idc_rh] = data_lh # introduce interhemispheric correlation if interhemi_correlation != 1: # apply correlated vector function for i in range(data.shape[0]): data_mirrored[i, parc_idc_rh] = _corr_vector( data_mirrored[i, parc_idc_rh], interhemi_correlation, seed) # use left-to-right mapping elif not project_to_volume and l2rmap is not None: l2rmap = np.nan_to_num(np.array(l2rmap)).astype(data.dtype) parc_idc_lh = np.array(parc_idc_lh) parc_idc_rh = np.array(parc_idc_rh) if interhemi_correlation == 1: mirror_fun = lambda x, seed=None: _apply_l2rmap(x, l2rmap, parc_idc_lh, parc_idc_rh) else: def mirror_fun(data_1d, seed=None): data_1d = _apply_l2rmap(data_1d, l2rmap, parc_idc_lh, parc_idc_rh) data_1d[parc_idc_rh] = _corr_vector( data_1d[parc_idc_rh], interhemi_correlation, seed) return data_1d for i in range(data.shape[0]): data_mirrored[i, :] = mirror_fun(data[i, :], seed=i**2+i if seed is not None else None) # else: complicated approach for cases in which the parcellation is not bilaterally symmetric # 1. take left-hemisphere parcels and mirror them across the x-axis # 2. project the right hemisphere data into volume space using the original parcellation # 3. re-parcellate the right-hemisphere data using the original parcellation elif project_to_volume: # TODO: test and debug raise NotImplementedError("Projecting to volume not at all tested!") if parc is None: raise ValueError("'parc' must be provided if 'project_to_volume' is True!") # parcellation data if resample_vol is not None: parc = resample_img(parc, target_affine=np.eye(3) * resample_vol, interpolation="nearest") parc_data = parc.get_fdata() idc_all = np.trim_zeros(np.unique(parc_data)) idc_left = idc_all[parc_idc_lh] idc_right = idc_all[parc_idc_rh] # 1. parc_left = mirror_nifti(parc_data, affine=parc.affine, direction="drop_right") parc_right = mirror_nifti(parc_data, affine=parc.affine, direction="drop_left") parc_right_mirrored = mirror_nifti(parc_left, affine=parc.affine, direction="switch") # 2. and 3. def par_fun(vect_left): vol_right_mirrored = vect_to_vol_arr(vect_left, parc_right_mirrored, idc_left) vect_right_mirrored = vol_to_vect_arr(vol_right_mirrored, parc_right, idc_right) return vect_right_mirrored data_mirrored_rh = Parallel(n_jobs=n_proc)( delayed(par_fun)(data[i, :]) for i in range(data.shape[0]) ) data_mirrored[:, parc_idc_rh] = np.stack(data_mirrored_rh, axis=0) # for i in range(data.shape[0]): # vect_left = data[i, parc_idc_lh] # vol_right_mirrored = vect_to_vol_arr(vect_left, parc_right_mirrored, idc_left) # data_mirrored[i, parc_idc_rh] = vol_to_vect_arr(vol_right_mirrored, parc_right, idc_right) else: raise ValueError("Invalid input arguments, no mirroring method selected!") return data_mirrored
[docs]def nulls_burt2020(data_1d, dist_mat, n_nulls=1000, seed=None, **kwargs): data_1d = np.array(data_1d).flatten() # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = _get_null_data_mask(data_1d, dist_mat) data_1d = data_1d[mask] dist_mat = dist_mat[np.ix_(mask, mask)] # null maps null_data[:, mask] = Base( x=data_1d, D=dist_mat, seed=seed, **kwargs )(n_nulls, 50) # return return null_data.astype(data_1d.dtype)
[docs]def nulls_burt2018(data_1d, dist_mat, n_nulls=1000, seed=None, **kwargs): data_1d = np.array(data_1d).flatten() # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = _get_null_data_mask(data_1d, dist_mat) data_1d = data_1d[mask] dist_mat = dist_mat[np.ix_(mask, mask)] # data adjustment data_1d += np.abs(np.nanmin(data_1d)) + 0.1 # null maps null_data[:, mask] = batch_surrogates(dist_mat, data_1d, n_surr=n_nulls, seed=seed, **kwargs).T # return return null_data.astype(data_1d.dtype)
[docs]def nulls_moran(data_1d, dist_mat, n_nulls=1000, seed=None, **kwargs): data_1d = np.array(data_1d).flatten() # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = _get_null_data_mask(data_1d, dist_mat) data_1d = data_1d[mask] dist_mat = dist_mat[np.ix_(mask, mask)] # distance matrix adjustment np.fill_diagonal(dist_mat, 1) dist_mat **= -1 # null maps null_data[:, mask] = MoranRandomization( joint=True, tol=1e-6, n_rep=n_nulls, random_state=seed, **kwargs ).fit(dist_mat).randomize(data_1d) # return return null_data.astype(data_1d.dtype)
[docs]def nulls_random(data_1d, dist_mat=None, n_nulls=1000, seed=None): # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = ~np.isnan(data_1d) data_1d = data_1d[mask] # null maps rng = np.random.default_rng(seed) null_data[:, mask] = np.stack([rng.permutation(data_1d) for _ in range(n_nulls)], axis=0) # return return null_data.astype(data_1d.dtype)
_NULL_METHODS = { # Random "random": nulls_random, # Moran's I implemented via BrainSpace -> volumetric and surface "moran": nulls_moran, "brainspace": nulls_moran, # Variogram-method implemented via Brainsmash -> volumetric and surface "burt2020": nulls_burt2020, "brainsmash": nulls_burt2020, "variogram": nulls_burt2020, # Smoothing-method from Burt2018 -> volumetric and surface "burt2018": nulls_burt2018, # TODO: add spin methods }
[docs]def get_distance_matrix(parc, parc_space, parc_hemi=["L", "R"], parc_resample=2, centroids=False, surf_euclidean=False, n_proc=1, verbose=True, dtype=np.float32): verbose = set_log(lgr, verbose) ## generate distance matrix # case volumetric if "mni" in parc_space.lower(): # get parcellation data parc = load_nifti(parc) if parc_resample: if parc_resample is True: parc_resample = 3 lgr.info(f"Downsampling volumetric parcellation to voxelsize of {parc_resample} " "for distance matrix generation.") parc = resample_img( parc, target_affine=np.diag([parc_resample] * 3), interpolation="nearest" ) parc_data = parc.get_fdata() parc_affine = parc.affine parcels = np.trim_zeros(np.unique(parc_data)) n_parcels = len(parcels) mask = np.logical_not(np.logical_or(np.isclose(parc_data, 0), np.isnan(parc_data))) parc_data_m = parc_data * mask # case distances between volumetric parcel centroids if centroids: # get centroids ijk = find_vol_parc_centroids(parc_data_m, affine=parc_affine, parcel_idc=parcels) # get distances dist = _dist_mat_from_coords(ijk, dtype) # case mean distances between parcel-wise voxels else: # get parcel-wise coordinates in world space ijk_parcels = dict() for i_parcel in parcels: xyz_parcel = np.column_stack(np.where(parc_data_m==i_parcel)) ijk_parcels[i_parcel] = nib.affines.apply_affine(parc_affine, xyz_parcel) def mni_dist(i, i_parcel): dist_i = np.zeros(n_parcels, dtype=dtype) j = i for _ in range(n_parcels - j): dist_i[j] = cdist(ijk_parcels[i_parcel], ijk_parcels[parcels[j]]) \ .mean().astype(dtype) j += 1 return dist_i lgr.info(f"Estimating euclidean distance matrix between {n_parcels} volumetric parcels.") dist_list = Parallel(n_jobs=n_proc)( delayed(mni_dist)(i, i_parcel) for i, i_parcel in enumerate(tqdm( parcels, desc=f"Running ({n_proc} proc)", disable=not verbose )) ) dist = np.r_[dist_list] # mirror to lower triangle dist = dist + dist.T # zero diagonal np.fill_diagonal(dist, 0) # case surface elif parc_space in ["fsaverage", "fsLR", "fsa", "fslr"]: if surf_euclidean & ("fsa" in parc_space): lgr.info(f"Estimating euclidean distance matrix between surface parcels.") _parc_centroids = find_surf_parc_centroids( parc=parc, parc_hemi=parc_hemi, parc_density=_img_density_for_neuromaps(parc), ) dist = _dist_mat_from_coords(_parc_centroids, dtype=dtype) elif surf_euclidean & ("fsa" not in parc_space): lgr.warning("Distance matrix generation currently not implemented for surface " "spaces other than fsaverage. Will use random splits!") # TODO: implement fsLR distance matrix elif "fsLR" in parc_space.lower(): lgr.critical_raise( "Distance matrix generation currently not implemented for fsLR space. " "Use 'fsaverage' instead!", ValueError ) else: lgr.info(f"Estimating geodesic distance matrix between surface parcels.") def surf_dist(i_hemi, hemi): dist = _get_distmat( hemi, atlas=parc_space, density=_img_density_for_neuromaps(parc[i_hemi]), parcellation=parc[i_hemi] if len(parc_hemi) > 1 else parc, n_proc=n_proc ) return(dist) dist = Parallel(n_jobs=n_proc)( delayed(surf_dist)(i, h) for i, h in enumerate(tqdm( parc_hemi, desc=f"Calculating distance matrix ({n_proc} proc)", disable=not verbose )) ) if isinstance(parc, tuple): dist = tuple(dist) else: dist = dist[0] # case other else: lgr.error(f"Distance matrix generation not supported for space {parc_space}!") ## return return dist
[docs]def find_vol_parc_centroids(parc, affine=None, parcel_idc=None, return_data_space=False): # get parcellation data if isinstance(parc, np.ndarray): parc_data = parc else: parc = load_nifti(parc) parc_data = parc.get_fdata() # get affine matrix if affine is None: if not isinstance(parc, nib.Nifti1Image): lgr.critical_raise("If 'affine' is not provided, 'parc' must be a Nifti image!", TypeError) affine = parc.affine # get parcel indices if parcel_idc is None: parcel_idc = np.trim_zeros(np.unique(parc_data)) # get centroid coordinates in world space xyz = np.zeros((len(parcel_idc), 3), float) for i, i_parcel in enumerate(parcel_idc): xyz[i, :] = np.column_stack(np.where(parc_data==i_parcel)).mean(axis=0) mni = nib.affines.apply_affine(affine, xyz) return mni if not return_data_space else (mni, xyz)
[docs]def find_surf_parc_centroids(parc, parc_hemi, parc_density="10k"): # get parcellation if isinstance(parc_hemi, str): parc_hemi = [parc_hemi] if isinstance(parc, (tuple, list)) & (len(parc)==2): parc = (load_gifti(parc[0]), load_gifti(parc[1])) parc_hemi = ["L", "R"] lgr.info("Two-hemispheric parcellation provided, assuming order ['L', 'R'].") elif isinstance(parc, (str, nib.GiftiImage)): parc = (load_gifti(parc),) if len(parc_hemi)>1: lgr.warning("Provided parcellation is one hemisphere but parc_label indicated both hemispheres. " "Setting parc_hemi to ['L']!") parc_hemi = ["L"] else: lgr.info(f"One-hemispheric parcellation provided (hemisphere: {parc_hemi})") else: lgr.critical_raise(f"Parcellation must be provided as (tuple/list of) path(s) or Gifti image(s), " f"not {type(parc)}!", TypeError) # get standard surface surfaces = fetch_fsaverage(parc_density)["pial"] if (len(parc_hemi)==1) & (parc_hemi[0]=="L"): surfaces = load_gifti(surfaces[0]), elif (len(parc_hemi)==1) & (parc_hemi[0]=="R"): surfaces = load_gifti(surfaces[1]), elif len(parc_hemi)==2: surfaces = (load_gifti(surfaces[0]), load_gifti(surfaces[1])) else: lgr.critical_raise("Problem with 'parc_hemi'. Provide ['L'], ['R'], or ['L', 'R']", ValueError) centroids = [] # iterate hemispheres for parc_h, surf_h in zip(parc, surfaces): labels = parc_h.darrays[0].data coords = surf_h.darrays[0].data # iterate parcels ("labels") and collect mean coordinates for idx in np.trim_zeros(np.unique(labels)): parcel = np.atleast_2d(coords[labels == idx].mean(axis=0)) parcel = coords[np.argmin(cdist(coords, parcel), axis=0)[0]] centroids.append(parcel) return np.row_stack(centroids)
[docs]def generate_null_maps(method, data, parcellation, dist_mat=None, parc_space=None, parc_hemi=None, parc_symmetric=False, n_nulls=1000, parc_resample=2, centroids=False, parc_idc_lh=None, parc_idc_rh=None, parc_idc_sc=None, lr_mirror_dist_mat=False, lr_mirror_null_maps=False, l2rmap=None, match_interhemi_correlation=True, report_interhemi_correlation=False, cx_sc_minmax_scale=False, dtype=float, n_proc=1, seed=None, verbose=True, **kwargs): if verbose is False: set_log(lgr, verbose) ## Checks # null method if method not in _NULL_METHODS: lgr.critical_raise(f"Null method {method} not implemented!", ValueError) null_fun = _NULL_METHODS[method] random_nulls = False if null_fun.__name__ == "nulls_burt2020" and not _BRAINSMASH_AVAILABLE: lgr.critical_raise("Null method 'burt2020' requires brainsmash! Run 'pip install brainsmash'!", ImportError) elif null_fun.__name__ == "nulls_random": random_nulls = True # input data if not isinstance(data, (pd.DataFrame, pd.Series, np.ndarray)): lgr.critical_raise(f"Input data not array-like! Type: {type(data)}", ValueError) if isinstance(data, pd.DataFrame): data_labs = list(data.index) elif isinstance(data, pd.Series): data_labs = [data.name] data = np.array(data) if len(data.shape) == 1: data = data[np.newaxis, :] n_data = data.shape[0] if "data_labs" not in locals(): data_labs = list(range(n_data)) # print lgr.info(f"Null map generation: Assuming n = {n_data} data vector(s) for " f"n = {data.shape[1]} parcels.") ## random nulls -> no distmat if random_nulls: dist_mat = (None, None) if isinstance(dist_mat, tuple) else None ## distance matrix provided -> we dont need parcellation if dist_mat is not None and not random_nulls: lgr.info(f"Using provided distance matrix/matrices.") if isinstance(dist_mat, (np.ndarray, pd.DataFrame)): n_parcels = dist_mat.shape[0] if parc_space is None: lgr.warning("Distance matrix provided as array but 'parc_space' is None: " "Assuming 'mni152'! Define 'parc_space' if one surface hemisphere!") parc_space = "mni152" elif isinstance(dist_mat, tuple): n_parcels = (dist_mat[0].shape[0], dist_mat[1].shape[0]) if parc_space is None: lgr.warning("Distance matrix provided as tuple but 'parc_space' is None: " "Assuming 'fsaverage'!") parc_space = "fsaverage" else: lgr.critical("Distance matrix is wrong data type, should be array or tuple of arrays, " f"is: {type(dist_mat)}! Setting 'dist_mat' to None!") dist_mat = None ## get dist mat -> we need parcellation if dist_mat is None and not random_nulls: # load function def load_parc(parc, parc_type, parc_space): if parc_type=="nifti": parc = load_nifti(parc) parc_space = "MNI152" if parc_space is None else parc_space n_parcels = len(np.trim_zeros(np.unique(parc.get_fdata()))) elif parc_type=="gifti": parc = load_gifti(parc) parc_space = "fsaverage" if parc_space is None else parc_space n_parcels = len(np.trim_zeros(np.unique(parc.darrays[0].data))) elif parc_type=="giftituple": parc = (load_gifti(parc[0]), load_gifti(parc[1])) parc_space = "fsaverage" if parc_space is None else parc_space n_parcels = (len(np.trim_zeros(np.unique(parc[0].darrays[0].data))), len(np.trim_zeros(np.unique(parc[1].darrays[0].data)))) return parc, parc_space, n_parcels # recognize parcellation type if isinstance(parcellation, nib.Nifti1Image): parc_type = "nifti" elif isinstance(parcellation, nib.GiftiImage): parc_type = "gifti" elif isinstance(parcellation, tuple): parc_type = "giftituple" elif isinstance(parcellation, str): if parcellation.endswith(".nii") | parcellation.endswith(".nii.gz"): parc_type = "nifti" elif parcellation.endswith(".gii") | parcellation.endswith(".gii.gz"): parc_type = "gifti" else: lgr.critical_raise(f"'parcellation' is string ({parcellation}) " "but ending was not recognized!", ValueError) else: lgr.critical_raise(f"'parcellation' data type ({type(parcellation)}) not defined!", TypeError) # load parcellation parc, parc_space, n_parcels = load_parc(parcellation, parc_type, parc_space) # check for problems if isinstance(parc, nib.GiftiImage): if parc_hemi is None: lgr.warning("If only one gifti parcellation image is supplied, 'parc_hemi' must " "be one of: ['L'], ['R']! Assuming left hemisphere!" ) parc_hemi = ["L"] elif len(parc_hemi) > 1: lgr.warning("If only one gifti parcellation image is supplied, 'parc_hemi' can " "only be one of: ['L'], ['R']! Assuming left hemisphere!" ) parc_hemi = ["L"] if isinstance(parc, tuple): if parc_hemi is None: parc_hemi = ["L", "R"] elif len(parc_hemi) == 1: lgr.warning("If 'parc_hemi' is ['L'] or ['R'], only one gifti parcellation image " "should be supplied as string or gifti! Assuming both hemispheres!") parc_hemi = ["L", "R"] if np.sum(n_parcels) != data.shape[1]: lgr.error(f"Number of parcels in data (1. dimension, {data.shape[1]}) " f"does not match number of parcels in parcellation ({n_parcels})!") # print temp = f", parc_hemi = {parc_hemi}" lgr.info(f"Loaded parcellation (parc_space = '{parc_space}'" f"{temp if parc_space in ['fsaverage', 'fsLR', 'fsa', 'fslr'] else ''}).") ## calculate distance matrix # lgr.info("Calculating distance matrix/matrices ({d}).".format( # d='euclidean' if parc_space in ['mni','MNI','mni152','MNI152'] else 'geodesic')) dist_mat = get_distance_matrix( parc=parc, parc_space=parc_space, parc_hemi=parc_hemi, parc_resample=parc_resample, centroids=centroids, n_proc=n_proc, verbose=verbose ) ## generate null data # check symmetry settings if lr_mirror_dist_mat and not parc_symmetric: lgr.warning("Left-right mirroring of distance matrix (lr_mirror_dist_mat) requested, but " "parcellation may not be symmetric. Check if your parcellation is symmetric and " "set 'parc_symmetric' to True. Be careful, this might lead to unexpected results!") lr_mirror_dist_mat = False if lr_mirror_null_maps and not parc_symmetric and l2rmap is None: lgr.warning("Left-right mirroring of null maps ('lr_mirror_null_maps') requested, but " "'parc_symmetric' is False and no left-to-right mapping df ('l2rmap') provided. " "Check if your parcellation is symmetric and set 'parc_symmetric' to True. " "Be careful, this might lead to unexpected results!") lr_mirror_null_maps = False # check if separate indices for hemispheres are provided as tuple of arrays if parc_idc_lh is not None and parc_idc_rh is not None: if not isinstance(parc_idc_lh, (list, np.ndarray)) or not isinstance(parc_idc_rh, (list, np.ndarray)): lgr.warning("'parc_idc_lh' and 'parc_idc_rh' must be lists or arrays! Setting both to None!") parc_idc_lh, parc_idc_rh = None, None elif parc_idc_lh is None and parc_idc_rh is None: pass else: for var, idc in [("parc_idc_lh", parc_idc_lh), ("parc_idc_rh", parc_idc_rh)]: if idc is not None: if not isinstance(idc, (list, np.ndarray)): lgr.warning(f"'{var}' must be a list or array! Setting '{var}' to None!") locals()[var] = None else: lgr.warning(f"Only indices of {var} provided, inferring indices of other hemisphere!") if var == "parc_idc_lh": parc_idc_rh = np.setdiff1d(np.arange(data.shape[1]), idc) else: parc_idc_lh = np.setdiff1d(np.arange(data.shape[1]), idc) # check if separate indices for subcortex are provided as array if parc_idc_sc is not None: if not isinstance(parc_idc_sc, (list, np.ndarray)): lgr.warning("'parc_idc_sc' must be a list or array! Setting 'parc_idc_sc' to None!") parc_idc_sc = None # create indices for cortex if parc_idc_sc is not None: parc_idc_cx = np.setdiff1d(np.arange(data.shape[1]), parc_idc_sc) if len(parc_idc_cx) == 0: parc_idc_sc, parc_idc_cx = None, None # get all index lists according to which we want to split the data and distance matrix if isinstance(dist_mat, tuple): # surface input split_by_idc = ( np.arange(dist_mat[0].shape[0]), # left hemisphere np.arange(dist_mat[1].shape[0]) + dist_mat[0].shape[0], # right hemisphere ) elif parc_idc_lh is None and parc_idc_sc is None: # none given split_by_idc = ( np.arange(data.shape[1]), # whole dataset ) lr_mirror_dist_mat = False elif parc_idc_lh is not None and parc_idc_sc is not None: # hemis + sc given lgr.info("Generating null data separately for left and right cortex and subcortex.") split_by_idc = ( np.intersect1d(parc_idc_lh, parc_idc_cx), # left cortex np.intersect1d(parc_idc_rh, parc_idc_cx), # right cortex np.intersect1d(parc_idc_lh, parc_idc_sc), # left subcortex np.intersect1d(parc_idc_rh, parc_idc_sc), # right subcortex ) elif parc_idc_lh is not None: # hemi but not sc given lgr.info("Generating null data separately for left and right hemisphere.") split_by_idc = ( parc_idc_lh, # whole left hemisphere parc_idc_rh, # whole right hemisphere ) elif parc_idc_sc is not None: # sc but not hemi given lgr.info("Generating null data separately for cortex and subcortex.") split_by_idc = ( parc_idc_cx, # whole cortex parc_idc_sc, # whole subcortex ) # set lr_mirror_dist_mat to False as we apparently dont have hemisphere-indices available lr_mirror_dist_mat = False else: lgr.critical_raise("Problem with 'split_by_idc': No indices generated/provided!", ValueError) # check if indices are missing missing_idc = np.setdiff1d(np.arange(data.shape[1]), np.concatenate(split_by_idc)) if len(missing_idc) == data.shape[1]: lgr.critical_raise("No parcel indices are present in the processed data! Check the provided " "'parc_idc_lh', 'parc_idc_rh', and 'parc_idc_sc' variables.", ValueError) elif len(missing_idc) > 0: lgr.warning(f"Some parcel indices are missing in the processed data! You might want to check " f"the provided 'parc_idc_lh', 'parc_idc_rh', and 'parc_idc_sc' variables. " f"Missing indices: {missing_idc}") # check duplicate indices if np.unique(np.concatenate(split_by_idc)).size != np.concatenate(split_by_idc).size: lgr.critical_raise("Duplicate indices found in 'parc_idc_lh' and 'parc_idc_rh'! " "Check if 'parc_idc_lh' and 'parc_idc_rh' are correctly defined.", ValueError) # check if any index set is empty if any(len(idc) == 0 for idc in split_by_idc): lgr.critical_raise(f"Empty index set found! {[len(idc) for idc in split_by_idc]}", ValueError) # split distance matrix to align with surface hemisphere distance matrices if not isinstance(dist_mat, tuple): if dist_mat is not None: dist_mat_split = tuple([dist_mat[np.ix_(i, i)] for i in split_by_idc]) else: dist_mat_split = tuple([None] * len(split_by_idc)) else: dist_mat_split = dist_mat # check distance matrices if not random_nulls: if any(dist is None or dist.shape[0] != dist.shape[1] for dist in dist_mat_split): lgr.critical_raise("Distance matrix is not square or None! Check the provided distance matrix.", ValueError) # mirror distance matrix if requested if lr_mirror_dist_mat and dist_mat is not None: lgr.info("Left-right averaging distance matrices to generate symmetrized null maps.") if len(dist_mat_split) == 2: dist_mat_split = tuple([np.mean(dist_mat_split, axis=0)] * 2) if not np.allclose(dist_mat_split[0], dist_mat_split[1]): lgr.critical_raise("Left-right averaged whole-hemisphere distance matrices are not equal! " "Check if 'parc_idc_lh' and 'parc_idc_rh' are correctly defined.", ValueError) elif len(dist_mat_split) == 4: dist_mat_split = tuple([np.mean(dist_mat_split[:2], axis=0)] * 2 + [np.mean(dist_mat_split[2:], axis=0)] * 2) if not (np.allclose(dist_mat_split[0], dist_mat_split[1]) and np.allclose(dist_mat_split[2], dist_mat_split[3])): lgr.critical_raise("Left-right averaged cortical and subcortical distance matrices are not equal! " "Check if 'parc_idc_lh' and 'parc_idc_rh' are correctly defined.", ValueError) if np.isnan(data).any(): lgr.info("Symmetrizing NaNs in data.") for i in range(data.shape[0]): data[i, :] = _symmetrize_nans(data[i, :], [parc_idc_lh, parc_idc_rh]) # define function to generate null data def par_fun(data_1d, seed): null_data = np.full((n_nulls, len(data_1d)), np.nan) for idc, dist in zip(split_by_idc, dist_mat_split): data_1d_sel = data_1d[idc] if np.isnan(data_1d_sel).all(): null_data[:, idc] = np.nan else: null_data[:, idc] = null_fun(data_1d=data_1d_sel, dist_mat=dist, n_nulls=n_nulls, seed=seed, **kwargs) return null_data # run null data generation if seed is None: seed = np.random.randint(0, 2**32 - 1) nulls = Parallel(n_jobs=n_proc)( delayed(par_fun)(data[i, :], seed + i) for i in tqdm( range(n_data), desc=f"{null_fun.__name__.split('_')[1].capitalize()} null maps ({n_proc} proc)", disable=not verbose ) ) nulls = {l: n.astype(dtype) for l, n in zip(data_labs, nulls)} # mirror null maps if requested if lr_mirror_null_maps and parc_idc_lh is not None and parc_idc_rh is not None: # checks if (parc_idc_lh is None or parc_idc_rh is None): lgr.warning("Left-to-right mirroring of null maps requested but 'parc_idc_lh' and/or " "'parc_idc_rh' are not defined! Skipping mirroring.") elif len(parc_idc_lh) != len(parc_idc_rh): lgr.warning("Left-to-right mirroring of null maps requested but left/right hemisphere " "parcel indices have different lengths! Skipping mirroring.") else: lgr.info("Left-to-right mirroring null maps " f"({'simple mirroring' if parc_symmetric else 'using left-to-right mapping'}).") # get interhemispheric correlation if necessary if match_interhemi_correlation: lgr_msg = "Matching interhemispheric correlation. " if match_interhemi_correlation or report_interhemi_correlation: interhemi_corr_obs = correlate_hemis_parc(data, parc_idc_lh, parc_idc_rh, l2rmap) lgr_msg += f"Observed correlation: {interhemi_corr_obs.mean():.2f}" if len(nulls) > 1: lgr_msg += f" (mean), {interhemi_corr_obs.min():.2f} - {interhemi_corr_obs.max():.2f} (range)" lgr.info(lgr_msg) else: interhemi_corr_obs = np.ones(len(nulls)) # run for each input map / set of null maps def par_fun(n, r, seed): return _mirror_parc_maps( n, parc_idc_lh, parc_idc_rh, l2rmap=None if parc_symmetric else l2rmap, interhemi_correlation=r, seed=seed ) nulls_list = Parallel(n_jobs=n_proc)( delayed(par_fun)(n, interhemi_corr_obs[i], seed + i ** 2) for i, n in enumerate(tqdm(nulls.values(), desc="Mirroring null maps", disable=not verbose)) ) nulls = {l: n for l, n in zip(nulls.keys(), nulls_list)} # get interhemispheric correlation of null maps if report_interhemi_correlation: interhemi_corr_nulls = np.zeros(len(nulls)) for i, (l, n) in enumerate(nulls.items()): interhemi_corr_nulls[i] = correlate_hemis_parc(n, parc_idc_lh, parc_idc_rh, l2rmap).mean() lgr.info(f"Interhemispheric correlation of null maps:\n" + \ "\n".join([f"{l}: obs: {interhemi_corr_obs[i]:.3f} -> null (mean): {interhemi_corr_nulls[i]:.3f}" for i, l in enumerate(nulls)])) # adjust scaling if cx_sc_minmax_scale: if parc_idc_sc is None: lgr.warning("To perform subcortical and cortical scaling adjustment, provide 'parc_idc_sc'!") else: lgr.info("Matching min-max range of subcortical and cortical null data to observed data.") for i, (l, n) in enumerate(nulls.items()): # get min, max, and mean of original subcortical data scale_sc = (np.nanmin(data[i, parc_idc_sc]), np.nanmax(data[i, parc_idc_sc])) # get min, max, and mean of original cortical data scale_cx = (np.nanmin(data[i, parc_idc_cx]), np.nanmax(data[i, parc_idc_cx])) # scale subcortical null data nulls[l][:, parc_idc_sc] = minmax_scale(n[:, parc_idc_sc], feature_range=scale_sc, axis=1) # scale cortical null data nulls[l][:, parc_idc_cx] = minmax_scale(n[:, parc_idc_cx], feature_range=scale_cx, axis=1) ## return lgr.info("Null data generation finished.") return nulls, dist_mat