Source code for nispace.nulls

import nibabel as nib
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from nilearn.image import resample_img, coord_transform
from neuromaps.images import load_gifti, load_nifti, load_data, PARCIGNORE
from neuromaps.nulls.nulls import batch_surrogates
from neuromaps.nulls.spins import gen_spinsamples, get_parcel_centroids
from collections import namedtuple
from neuromaps.points import make_surf_graph
from scipy.sparse.csgraph import dijkstra
from scipy.spatial.distance import cdist
from sklearn.preprocessing import minmax_scale
from tqdm.auto import tqdm

_SurfPair = namedtuple("_SurfPair", ["L", "R"])

# import MoranRandomization function, copied from brainspace, as our default null model
# brainspace was removed as an dependency because it installs vtk, which is a large 3d rendering
# library that NiSpace does not use. 
from ._brainspace_moran import MoranRandomization
    
# brainsmash is optional dependency. Moran
try:
    from brainsmash.mapgen import Base
    _BRAINSMASH_AVAILABLE = True
except ImportError:
    _BRAINSMASH_AVAILABLE = False

import logging
lgr = logging.getLogger(__name__)
from .stats.coloc import corr
from .utils.utils import set_log


def _dist_mat_from_coords(coords, dtype=np.float32):
    dist_mat = np.zeros((coords.shape[0], coords.shape[0]), dtype=dtype)
    for i, row in enumerate(coords):
        dist_mat[i] = cdist(row[None], coords).astype(dtype)
    return dist_mat


def _surf_geodesic_row(i, parcel_verts, graph, n_parcels, centroids, dtype):
    """Compute one row of the geodesic parcel-parcel distance matrix (upper triangle).

    Default (centroids=False): multi-source Dijkstra from all vertices of parcel i,
    then average distances to all vertices of each parcel j >= i.
    Centroids (centroids=True): single-source Dijkstra from the centroid vertex of
    parcel i, then read off distances to centroid vertices of parcels j >= i.
    """
    src = parcel_verts[i] if not centroids else parcel_verts[i][:1]
    dists = dijkstra(graph, directed=False, indices=src)  # (n_src, n_verts)
    if dists.ndim == 1:
        dists = dists[np.newaxis, :]
    row = np.zeros(n_parcels, dtype=dtype)
    for j in range(i, n_parcels):
        tgt = parcel_verts[j] if not centroids else parcel_verts[j][:1]
        row[j] = dists[:, tgt].mean()
    return row


def _surf_dist_hemi(gifti_surf, gifti_parc, medial_gifti, centroids, n_proc, dtype, verbose, hemi=""):
    """Geodesic parcel-parcel distance matrix for one hemisphere.

    Mirrors the volumetric voxel-to-voxel path:
    - build graph once
    - one Parallel job per parcel (upper triangle only)
    - mirror to fill lower triangle
    """
    vert, faces = load_gifti(gifti_surf).agg_data()
    labels = load_gifti(gifti_parc).agg_data()
    if labels.ndim > 1:
        labels = labels.squeeze()
    labels = labels.astype(int)

    # medial-wall mask: True = exclude vertex
    medial_mask = np.zeros(len(vert), dtype=bool)
    if medial_gifti is not None:
        mw = load_gifti(medial_gifti).agg_data()
        if mw.ndim > 1:
            mw = mw.squeeze()
        medial_mask = ~mw.astype(bool)

    # build graph with medial wall excluded
    graph = make_surf_graph(vert, faces, mask=medial_mask)

    # collect vertex indices per parcel (all parcels; medial-wall parcels fall back to all vertices)
    parc_ids = list(np.trim_zeros(np.unique(labels)))
    n_parcels = len(parc_ids)

    def _parcel_verts(pid, require_non_medial=True):
        verts = np.where((labels == pid) & ~medial_mask)[0] if require_non_medial \
                else np.where(labels == pid)[0]
        if len(verts) == 0:
            verts = np.where(labels == pid)[0]
            lgr.warning(f"Parcel {pid} has no non-medial-wall vertices; "
                        "it will be kept but distances may be unreliable.")
        return verts

    if centroids:
        # snap centroid: vertex within parcel closest to coordinate mean
        parcel_verts = []
        for pid in parc_ids:
            verts_idx = _parcel_verts(pid)
            mean_coord = vert[verts_idx].mean(axis=0)
            snap = verts_idx[np.argmin(np.linalg.norm(vert[verts_idx] - mean_coord, axis=1))]
            parcel_verts.append(np.array([snap]))
    else:
        parcel_verts = [_parcel_verts(pid) for pid in parc_ids]

    hemi_tag = f" {hemi}" if hemi else ""
    mode_tag = "centroid" if centroids else "vertex-to-vertex"
    lgr.info(f"Estimating geodesic distance matrix: {hemi_tag + ', ' if hemi_tag else ''}{n_parcels} surface parcels, "
             f"{mode_tag} mode, {n_proc} proc.")

    dist_rows = Parallel(n_jobs=n_proc)(
        delayed(_surf_geodesic_row)(i, parcel_verts, graph, n_parcels, centroids, dtype)
        for i in tqdm(range(n_parcels), desc=f"Distance matrix{hemi_tag} ({n_proc} proc)", disable=not verbose)
    )

    dist = np.array(dist_rows, dtype=dtype)
    dist = dist + dist.T
    np.fill_diagonal(dist, 0)
    return dist

def _img_density_for_neuromaps(img):
    if isinstance(img, nib.GiftiImage):
        img = (img,)
    if isinstance(img, nib.Nifti1Image):
        return f"{np.round((img.affine[0,0])):.0f}mm"
    elif isinstance(img, tuple):
        return f"{np.round((img[0].agg_data().shape[0]/1000)):.0f}k"
    else:
        raise ValueError(f"Provide input of type nib.Nifti1Image or (tuple of) nib.GiftiImage(s)!")
    
def _img_space_for_neuromaps(img):
    if isinstance(img, nib.GiftiImage):
        img = (img,)
    if isinstance(img, nib.Nifti1Image):
        return "mni152"
    elif isinstance(img, tuple):
        density = _img_density_for_neuromaps(img[0])
        if density in ['3k', '10k', '41k']:
            return "fsaverage"
        elif density in ['4k', '8k', '32k']:
            return "fslr"
        elif density == '164k':
            lgr.warning("Identified surface image with 164k density, assuming fsLR but could be fsaverage!")
            return "fslr"
        else:
            lgr.critical_raise(f"Identified surface image with unknown density {density}!")
    else:
        raise ValueError(f"Provide input of type nib.Nifti1Image or (tuple of) nib.GiftiImage(s)!")

def _get_null_data_mask(data_1d, dist_mat):
    med = np.isinf(dist_mat + np.diag([np.inf] * len(dist_mat))).all(axis=1)
    mask = np.logical_not(np.logical_or(np.isnan(data_1d), med))
    return mask

def _symmetrize_nans(data_1d, idc):
    # check dtype
    if not isinstance(data_1d, (np.ndarray, pd.Series)) or not isinstance(idc, (list, tuple)):
        raise ValueError("'data_1d' must be a numpy array or pandas Series and 'idc' must be a list or tuple!")
    # check length
    if len(data_1d) != len(np.concatenate(idc)):
        raise ValueError("Length of 'data_1d' must match sum of number of elements in 'idc'!")
    # check if all idc have the same length
    if not all(len(i_idc) == len(idc[0]) for i_idc in idc):
        raise ValueError("All elements in 'idc' must have the same length!")
    # symmetrize
    data_1d = np.array(data_1d)
    isnan = np.full(len(idc[0]), False)
    for i_idc in idc:
        isnan = np.logical_or(isnan, np.isnan(data_1d[i_idc]))
    for i_idc in idc:
        data_1d[i_idc] = np.where(isnan, np.nan, data_1d[i_idc])
    # return
    return data_1d

[docs]def correlate_hemis_parc(data, parc_idc_lh=None, parc_idc_rh=None, l2rmap=None, rank=False): data = np.atleast_2d(np.array(data)) n = data.shape[1] n_hemi = n // 2 if parc_idc_lh is None: parc_idc_lh = np.arange(n_hemi) if parc_idc_rh is None: parc_idc_rh = np.arange(n_hemi, n) parc_idc_lh = np.array(parc_idc_lh) parc_idc_rh = np.array(parc_idc_rh) data_lh = data[:, parc_idc_lh] data_rh = data[:, parc_idc_rh] if l2rmap is not None: l2rmap = np.nan_to_num(np.array(l2rmap)).astype(data.dtype) for i in range(data.shape[0]): lh_1d = data[i, parc_idc_lh] notnan = ~np.isnan(lh_1d) if notnan.any(): data_lh[i, :] = np.dot(lh_1d[notnan], l2rmap[notnan, :]) else: data_lh[i, :] = np.nan r = [] for i in range(data.shape[0]): lh, rh = data_lh[i,:], data_rh[i,:] notnan = ~(np.isnan(lh) | np.isnan(rh)) lh_sel, rh_sel = lh[notnan], rh[notnan] if len(lh_sel) < 2 or np.std(lh_sel) == 0 or np.std(rh_sel) == 0: r.append(np.nan) else: r.append(corr(lh_sel, rh_sel, rank=rank)) return np.array(r)
[docs]def find_parcel_hemispheres(parcellation): # easy: surface if isinstance(parcellation, tuple): # load data data_lh, data_rh = load_data(parcellation[0]), load_data(parcellation[1]) # get labels and indices labels_lh, labels_rh = np.trim_zeros(np.unique(data_lh)), np.trim_zeros(np.unique(data_rh)) labels_all = np.concatenate([labels_lh, labels_rh]) # get indices idc_all = np.arange(len(labels_all)) idc_lh = idc_all[np.isin(labels_all, labels_lh)] idc_rh = idc_all[np.isin(labels_all, labels_rh)] # complicated: volume elif isinstance(parcellation, nib.Nifti1Image): # load data data = load_data(parcellation) data_flat = data[data != 0].flatten() # get labels labels_all = np.trim_zeros(np.unique(data)) # get MNI coordinates ijk = np.argwhere(data != 0) x, y, z = coord_transform(ijk[:,0], ijk[:,1], ijk[:,2], parcellation.affine) # check for every label if the majority of voxels is in the left or right hemisphere labels_lh, labels_rh = [], [] idc_lh, idc_rh = [], [] for idx, lab in enumerate(labels_all): lr = (x[data_flat == lab] > 0).mean() if lr < 0.5: labels_lh.append(lab) idc_lh.append(idx) else: labels_rh.append(lab) idc_rh.append(idx) # return indices and labels idc_lh, idc_rh = np.array(idc_lh, dtype=int), np.array(idc_rh, dtype=int) labels_lh, labels_rh = np.array(labels_lh, dtype=int), np.array(labels_rh, dtype=int) # one gifti elif isinstance(parcellation, nib.GiftiImage): idc_lh, idc_rh, labels_lh, labels_rh = [None] * 4 else: raise ValueError(f"Parcellation type {type(parcellation)} not supported.") return (idc_lh, idc_rh), (labels_lh, labels_rh)
def _avg_dist_mats(D1, D2): return (D1 + D2) / 2
[docs]def nulls_burt2020(data_1d, dist_mat, n_nulls=1000, seed=None, **kwargs): data_1d = np.array(data_1d).flatten() # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = _get_null_data_mask(data_1d, dist_mat) data_1d = data_1d[mask] dist_mat = dist_mat[np.ix_(mask, mask)] # null maps null_data[:, mask] = Base( x=data_1d, D=dist_mat, seed=seed, **kwargs )(n_nulls, 50) # return return null_data.astype(data_1d.dtype)
[docs]def nulls_burt2018(data_1d, dist_mat, n_nulls=1000, seed=None, **kwargs): data_1d = np.array(data_1d).flatten() # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = _get_null_data_mask(data_1d, dist_mat) data_1d = data_1d[mask] dist_mat = dist_mat[np.ix_(mask, mask)] # data adjustment data_1d += np.abs(np.nanmin(data_1d)) + 0.1 # null maps null_data[:, mask] = batch_surrogates(dist_mat, data_1d, n_surr=n_nulls, seed=seed, **kwargs).T # return return null_data.astype(data_1d.dtype)
[docs]def nulls_moran(data_1d, dist_mat, n_nulls=1000, seed=None, **kwargs): data_1d = np.array(data_1d).flatten() # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = _get_null_data_mask(data_1d, dist_mat) data_1d = data_1d[mask] dist_mat = dist_mat[np.ix_(mask, mask)] # distance matrix adjustment np.fill_diagonal(dist_mat, 1) dist_mat **= -1 # null maps null_data[:, mask] = MoranRandomization( procedure=kwargs.pop("procedure", "singleton"), joint=kwargs.pop("joint", True), seed=seed, n_nulls=n_nulls, **kwargs ).fit(dist_mat).randomize(data_1d) # return return null_data.astype(data_1d.dtype)
[docs]def nulls_random(data_1d, dist_mat=None, n_nulls=1000, seed=None): # results array with shape (n_nulls, n_parcels) null_data = np.full((n_nulls, len(data_1d)), np.nan) # mask mask = ~np.isnan(data_1d) data_1d = data_1d[mask] # null maps rng = np.random.default_rng(seed) null_data[:, mask] = np.stack([rng.permutation(data_1d) for _ in range(n_nulls)], axis=0) # return return null_data.astype(data_1d.dtype)
_SPIN_METHODS = {"alexander_bloch", "spin", "vasa", "hungarian"} _SPIN_METHOD_MAP = { "alexander_bloch": "original", "spin": "original", "vasa": "vasa", "hungarian": "hungarian", } _NULL_METHODS = { # Random "random": nulls_random, # Moran's I implemented via BrainSpace -> volumetric and surface "moran": nulls_moran, "brainspace": nulls_moran, # Variogram-method implemented via Brainsmash -> volumetric and surface "burt2020": nulls_burt2020, "brainsmash": nulls_burt2020, "variogram": nulls_burt2020, # Smoothing-method from Burt2018 -> volumetric and surface "burt2018": nulls_burt2018, # Spin tests -> surface only "alexander_bloch": None, # handled via spin code path "spin": None, "vasa": None, "hungarian": None, } def _get_surface_atlas(parc_space, density): """Return (atlas_dict, surf_key) for a supported surface space.""" from .datasets import fetch_template if "fsa" in parc_space.lower(): space, surf_key = "fsaverage", "pial" elif "fslr" in parc_space.lower(): space, surf_key = "fsLR", "midthickness" else: lgr.critical_raise( f"Surface space '{parc_space}' not supported. Use 'fsaverage' or 'fsLR'.", ValueError, ) atlas = {} for desc in [surf_key, "sphere", "medial"]: L, R = fetch_template(space, desc=desc, res=density, check_file_hash=False, verbose=False) atlas[desc] = _SurfPair(L, R) return atlas, surf_key
[docs]def generate_spins(parc, parc_space, n_perm=1000, method="original", seed=None, parc_hemi=None): """Generate spin resampling indices for a surface parcellation. Supports bilateral (tuple of two GiftiImages) and single-hemisphere (single GiftiImage) parcellations. Returns a tuple (spins_lh, spins_rh) of int32 arrays. For bilateral parcellations both have shape (n_parcels_hemi, n_perm). For a single-hemisphere parcellation the unused hemisphere gets shape (0, n_perm). RH indices are local to [0, n_rh). """ is_bilateral = isinstance(parc, tuple) is_unilateral = isinstance(parc, nib.GiftiImage) if not (is_bilateral or is_unilateral): lgr.critical_raise( "Spin tests require a surface parcellation: either a bilateral " "(lh_img, rh_img) tuple or a single GiftiImage.", ValueError, ) density = _img_density_for_neuromaps(parc) atlas, _ = _get_surface_atlas(parc_space, density) spheres = atlas["sphere"] if is_bilateral: centroids, hemiid = get_parcel_centroids( surfaces=(spheres[0], spheres[1]), parcellation=(parc[0], parc[1]), method="surface", ) else: # single hemisphere — determine which side keep_hemi = None if parc_hemi is not None: h = parc_hemi[0] if isinstance(parc_hemi, (list, tuple)) else parc_hemi keep_hemi = h if h in ("L", "R") else None if keep_hemi is None: keep_hemi = "L" lgr.warning("generate_spins: parc_hemi not specified for unilateral image; assuming 'L'.") centroids = find_surf_parc_centroids( parc, parc_space=parc_space, parc_hemi=[keep_hemi], parc_density=density, ) # hemiid: 0 = LH, 1 = RH — tells gen_spinsamples the geometry of the rotation hemiid = np.zeros(len(centroids), dtype=int) if keep_hemi == "L" \ else np.ones(len(centroids), dtype=int) spins = gen_spinsamples( coords=centroids, hemiid=hemiid, n_rotate=n_perm, method=method, seed=seed, verbose=False, ) n_lh = int((hemiid == 0).sum()) spins_lh = spins[:n_lh, :].astype(np.int32) spins_rh = (spins[n_lh:, :] - n_lh).astype(np.int32) return spins_lh, spins_rh
[docs]def apply_spins(data_1d, spins_lh, spins_rh, idc_lh, idc_rh, n_perm=None): """Apply precomputed spin indices to a 1D data array. Returns null_data of shape (n_perm, n_parcels). """ if n_perm is None: n_perm = spins_lh.shape[1] n_parcels = len(data_1d) null_data = np.full((n_perm, n_parcels), np.nan, dtype=data_1d.dtype) idc_lh = np.asarray(idc_lh, dtype=int) idc_rh = np.asarray(idc_rh, dtype=int) data_lh = data_1d[idc_lh] data_rh = data_1d[idc_rh] for k in range(n_perm): null_data[k, idc_lh] = data_lh[spins_lh[:, k]] null_data[k, idc_rh] = data_rh[spins_rh[:, k]] return null_data
[docs]def get_distance_matrix(parc, parc_space, parc_hemi=["L", "R"], parc_resample=2, centroids=False, surf_euclidean=False, n_proc=1, verbose=True, dtype=np.float32): verbose = set_log(lgr, verbose) ## generate distance matrix # case volumetric if "mni" in parc_space.lower(): # get parcellation data parc = load_nifti(parc) if parc_resample and not isinstance(parc_resample, str): if parc_resample is True: parc_resample = 3 current_voxsize = abs(round(parc.affine[0, 0])) lgr.info(f"Resampling volumetric parcellation from {current_voxsize}mm to {parc_resample}mm " "for distance matrix generation.") ids_before = set(np.trim_zeros(np.unique(parc.get_fdata()))) parc_resampled = resample_img( parc, target_affine=np.diag([parc_resample] * 3), interpolation="nearest", force_resample=True, copy_header=True ) lost = ids_before - set(np.trim_zeros(np.unique(parc_resampled.get_fdata()))) if lost: lgr.warning( f"Resampling to {parc_resample}mm voxels would drop {len(lost)} parcel(s) " f"(IDs: {sorted(int(i) for i in lost)}). Skipping downsampling and using " f"original {current_voxsize}mm resolution." ) else: parc = parc_resampled parc_data = parc.get_fdata() parc_affine = parc.affine parcels = np.trim_zeros(np.unique(parc_data)) n_parcels = len(parcels) mask = np.logical_not(np.logical_or(np.isclose(parc_data, 0), np.isnan(parc_data))) parc_data_m = parc_data * mask # case distances between volumetric parcel centroids if centroids: lgr.info(f"Estimating euclidean distance matrix: {n_parcels} volumetric parcels, centroid mode.") # get centroids ijk = find_vol_parc_centroids(parc_data_m, affine=parc_affine, parcel_idc=parcels) # get distances dist = _dist_mat_from_coords(ijk, dtype) # case mean distances between parcel-to-parcel voxels else: # get parcel-wise coordinates in world space ijk_parcels = dict() for i_parcel in parcels: xyz_parcel = np.column_stack(np.where(parc_data_m==i_parcel)) ijk_parcels[i_parcel] = nib.affines.apply_affine(parc_affine, xyz_parcel) def mni_dist(i, i_parcel): dist_i = np.zeros(n_parcels, dtype=dtype) j = i for _ in range(n_parcels - j): dist_i[j] = \ cdist(ijk_parcels[i_parcel], ijk_parcels[parcels[j]]).mean().astype(dtype) j += 1 return dist_i lgr.info(f"Estimating euclidean distance matrix: {n_parcels} volumetric parcels, " f"voxel-to-voxel mode, {n_proc} proc.") dist_list = Parallel(n_jobs=n_proc)( delayed(mni_dist)(i, i_parcel) for i, i_parcel in enumerate(tqdm( parcels, desc=f"Distance matrix ({n_proc} proc)", disable=not verbose )) ) dist = np.r_[dist_list] # mirror to lower triangle dist = dist + dist.T # zero diagonal np.fill_diagonal(dist, 0) # case surface elif parc_space in ["fsaverage", "fsLR", "fsa", "fslr"]: if parc_resample and isinstance(parc_resample, str): from neuromaps.transforms import fsaverage_to_fsaverage, fslr_to_fslr current_density = _img_density_for_neuromaps(parc[0] if isinstance(parc, tuple) else parc) if current_density != parc_resample: lgr.info(f"Resampling surface parcellation from {current_density} to {parc_resample} density " "for distance matrix generation.") resample_fn = fsaverage_to_fsaverage if "fsa" in parc_space.lower() else fslr_to_fslr def _parc_ids(p): imgs = p if isinstance(p, tuple) else (p,) return set(np.trim_zeros(np.unique( np.concatenate([load_data(img).astype(int).ravel() for img in imgs]) ))) ids_before = _parc_ids(parc) parc_resampled = resample_fn(parc, parc_resample, method="nearest") lost = ids_before - _parc_ids(parc_resampled) if lost: lgr.warning( f"Resampling to {parc_resample} would drop {len(lost)} parcel(s) " f"(IDs: {sorted(lost)}). Skipping downsampling and using original " f"{current_density} density." ) else: parc = parc_resampled if surf_euclidean: density = _img_density_for_neuromaps(parc[0] if isinstance(parc, tuple) else parc) lgr.info(f"Estimating euclidean distance matrix: {parc_space} {density} surface parcels, " f"centroid mode.") _parc_centroids = find_surf_parc_centroids( parc=parc, parc_space=parc_space, parc_hemi=parc_hemi, parc_density=density, ) dist = _dist_mat_from_coords(_parc_centroids, dtype=dtype) else: density = _img_density_for_neuromaps(parc[0] if isinstance(parc, tuple) else parc) atlas, surf_key = _get_surface_atlas(parc_space, density) hemis = parc_hemi if isinstance(parc_hemi, (list, tuple)) else [parc_hemi] dist_hemis = [] for i_hemi, hemi in enumerate(hemis): surf_path = getattr(atlas[surf_key], hemi) medial_path = getattr(atlas["medial"], hemi) parc_h = parc[i_hemi] if isinstance(parc, tuple) else parc dist_hemis.append( _surf_dist_hemi(surf_path, parc_h, medial_path, centroids, n_proc, dtype, verbose, hemi=hemi) ) dist = tuple(dist_hemis) if isinstance(parc, tuple) else dist_hemis[0] # case other else: lgr.error(f"Distance matrix generation not supported for space {parc_space}!") ## return return dist
[docs]def find_vol_parc_centroids(parc, affine=None, parcel_idc=None, return_data_space=False, snap=True): # get parcellation data if isinstance(parc, np.ndarray): parc_data = parc else: parc = load_nifti(parc) parc_data = parc.get_fdata() # get affine matrix if affine is None: if not isinstance(parc, nib.Nifti1Image): lgr.critical_raise("If 'affine' is not provided, 'parc' must be a Nifti image!", TypeError) affine = parc.affine # get parcel indices if parcel_idc is None: parcel_idc = np.trim_zeros(np.unique(parc_data)) # get centroid coordinates in world space xyz = np.zeros((len(parcel_idc), 3), float) for i, i_parcel in enumerate(parcel_idc): voxel_ijk = np.column_stack(np.where(parc_data == i_parcel)).astype(float) mean_ijk = voxel_ijk.mean(axis=0) if snap: # snap to nearest voxel actually within the parcel mean_ijk = voxel_ijk[cdist(mean_ijk[None], voxel_ijk)[0].argmin()] xyz[i, :] = mean_ijk mni = nib.affines.apply_affine(affine, xyz) return mni if not return_data_space else (mni, xyz)
[docs]def find_surf_parc_centroids(parc, parc_space="fsaverage", parc_hemi=None, parc_density=None, snap=True): # get parcellation if isinstance(parc_hemi, str): parc_hemi = [parc_hemi] if isinstance(parc, (tuple, list)) & (len(parc)==2): parc = (load_gifti(parc[0]), load_gifti(parc[1])) parc_hemi = ["L", "R"] lgr.info("Two-hemispheric parcellation provided, assuming order ['L', 'R'].") elif isinstance(parc, (str, nib.GiftiImage)): parc = (load_gifti(parc),) if len(parc_hemi)>1: lgr.warning("Provided parcellation is one hemisphere but parc_label indicated both hemispheres. " "Setting parc_hemi to ['L']!") parc_hemi = ["L"] else: lgr.info(f"One-hemispheric parcellation provided (hemisphere: {parc_hemi})") else: lgr.critical_raise(f"Parcellation must be provided as (tuple/list of) path(s) or Gifti image(s), " f"not {type(parc)}!", TypeError) # guess parc_density if None if parc_density is None: parc_density = _img_density_for_neuromaps(parc) # get standard surface atlas, surf_key = _get_surface_atlas(parc_space, parc_density) surfaces = atlas[surf_key] if (len(parc_hemi)==1) & (parc_hemi[0]=="L"): surfaces = load_gifti(surfaces[0]), elif (len(parc_hemi)==1) & (parc_hemi[0]=="R"): surfaces = load_gifti(surfaces[1]), elif len(parc_hemi)==2: surfaces = (load_gifti(surfaces[0]), load_gifti(surfaces[1])) else: lgr.critical_raise("Problem with 'parc_hemi'. Provide ['L'], ['R'], or ['L', 'R']", ValueError) centroids = [] # iterate hemispheres for parc_h, surf_h in zip(parc, surfaces): labels = parc_h.darrays[0].data coords = surf_h.darrays[0].data # iterate parcels ("labels") and collect mean coordinates for idx in np.trim_zeros(np.unique(labels)): parcel_coords = coords[labels == idx] mean_coord = parcel_coords.mean(axis=0) if snap: # snap to nearest vertex within the parcel (guaranteed to be on the surface) parcel = parcel_coords[cdist(mean_coord[None], parcel_coords)[0].argmin()] else: parcel = mean_coord centroids.append(parcel) return np.row_stack(centroids)
[docs]def generate_null_maps(method, data, parcellation, dist_mat=None, spin_mat=None, parc_space=None, parc_hemi=None, parc_symmetric=False, n_nulls=1000, parc_resample=2, centroids=False, parc_idc_lh=None, parc_idc_rh=None, parc_idc_sc=None, lr_mirror_dist_mat=False, split_hemi=None, split_cxsc=False, cx_sc_minmax_scale=False, parc_name=None, dtype=float, n_proc=1, seed=None, verbose=True, **kwargs): verbose = set_log(lgr, verbose) ## Checks # null method if method not in _NULL_METHODS: lgr.critical_raise(f"Null method {method} not implemented!", ValueError) null_fun = _NULL_METHODS[method] random_nulls = False if null_fun is not None and null_fun.__name__ == "nulls_burt2020" and not _BRAINSMASH_AVAILABLE: lgr.critical_raise("Null method 'burt2020' requires brainsmash! Run 'pip install brainsmash'!", ImportError) elif null_fun is not None and null_fun.__name__ == "nulls_random": random_nulls = True # input data if not isinstance(data, (pd.DataFrame, pd.Series, np.ndarray)): lgr.critical_raise(f"Input data not array-like! Type: {type(data)}", ValueError) if isinstance(data, pd.DataFrame): data_labs = list(data.index) elif isinstance(data, pd.Series): data_labs = [data.name] data = np.array(data) if len(data.shape) == 1: data = data[np.newaxis, :] n_data = data.shape[0] if "data_labs" not in locals(): data_labs = list(range(n_data)) # print lgr.info(f"Null map generation: Assuming n = {n_data} data vector(s) for " f"n = {data.shape[1]} parcels.") ## spin nulls -> separate code path, bypass dist_mat entirely if method in _SPIN_METHODS: spin_method = _SPIN_METHOD_MAP[method] # validate: surface parcellation required (bilateral tuple or unilateral GiftiImage) if not isinstance(parcellation, (tuple, nib.GiftiImage)): lgr.critical_raise( f"Null method '{method}' requires a surface parcellation. " f"Volumetric parcellations are not supported (use a distance-based null instead).", ValueError, ) # validate: hemisphere indices required if parc_idc_lh is None or parc_idc_rh is None: lgr.critical_raise( f"Null method '{method}' requires 'parc_idc_lh' and 'parc_idc_rh'.", ValueError ) idc_lh = np.array(parc_idc_lh) idc_rh = np.array(parc_idc_rh) # use precomputed spins only for alexander_bloch/spin; vasa/hungarian always generate if spin_mat is not None and spin_method == "original": if isinstance(spin_mat, tuple) and len(spin_mat) == 2: spins_lh, spins_rh = spin_mat if spins_lh.shape[1] < n_nulls: lgr.warning(f"Precomputed spin matrix has {spins_lh.shape[1]} spins but " f"n_perm={n_nulls} requested. Regenerating.") spin_mat = None else: lgr.info("Using provided precomputed spin matrix.") else: lgr.warning("Provided 'spin_mat' must be a tuple (spins_lh, spins_rh). Regenerating.") spin_mat = None if spin_mat is None or spin_method != "original": lgr.info(f"Generating spin samples (method='{spin_method}', n={n_nulls}).") spins_lh, spins_rh = generate_spins( parc=parcellation, parc_space=parc_space, n_perm=n_nulls, method=spin_method, seed=seed, parc_hemi=parc_hemi, ) spin_mat = (spins_lh, spins_rh) # apply spins to each data row nulls = {} for i, lab in enumerate(tqdm(data_labs, desc="Spin null maps", disable=not verbose)): nulls[lab] = apply_spins( data_1d=data[i, :].astype(dtype), spins_lh=spins_lh, spins_rh=spins_rh, idc_lh=idc_lh, idc_rh=idc_rh, n_perm=n_nulls, ) lgr.info("Null data generation finished.") # TODO: combined spin+moran: instead of returning here, continue to the moran # path for parc_idc_sc parcels, merge spin nulls (cx) + moran nulls (sc), then return. return nulls, spin_mat ## random nulls -> no distmat if random_nulls: dist_mat = (None, None) if isinstance(dist_mat, tuple) else None ## distance matrix provided -> we dont need parcellation if dist_mat is not None and not random_nulls: lgr.info(f"Using provided distance matrix/matrices.") if isinstance(dist_mat, (np.ndarray, pd.DataFrame)): n_parcels = dist_mat.shape[0] dist_mat = np.array(dist_mat, dtype=dtype) if parc_space is None: lgr.warning("Distance matrix provided as array but 'parc_space' is None: " "Assuming 'mni152'! Define 'parc_space' if one surface hemisphere!") parc_space = "mni152" elif isinstance(dist_mat, tuple): n_parcels = (dist_mat[0].shape[0], dist_mat[1].shape[0]) dist_mat = tuple(np.array(dm, dtype=dtype) for dm in dist_mat) if parc_space is None: lgr.warning("Distance matrix provided as tuple but 'parc_space' is None: " "Assuming 'fsaverage'!") parc_space = "fsaverage" else: lgr.critical("Distance matrix is wrong data type, should be array or tuple of arrays, " f"is: {type(dist_mat)}! Setting 'dist_mat' to None!") dist_mat = None ## get dist mat -> we need parcellation if dist_mat is None and not random_nulls: # load function def load_parc(parc, parc_type, parc_space): if parc_type=="nifti": parc = load_nifti(parc) parc_space = "MNI152" if parc_space is None else parc_space n_parcels = len(np.trim_zeros(np.unique(parc.get_fdata()))) elif parc_type=="gifti": parc = load_gifti(parc) parc_space = "fsaverage" if parc_space is None else parc_space n_parcels = len(np.trim_zeros(np.unique(parc.darrays[0].data))) elif parc_type=="giftituple": parc = (load_gifti(parc[0]), load_gifti(parc[1])) parc_space = "fsaverage" if parc_space is None else parc_space n_parcels = (len(np.trim_zeros(np.unique(parc[0].darrays[0].data))), len(np.trim_zeros(np.unique(parc[1].darrays[0].data)))) return parc, parc_space, n_parcels # recognize parcellation type if isinstance(parcellation, nib.Nifti1Image): parc_type = "nifti" elif isinstance(parcellation, nib.GiftiImage): parc_type = "gifti" elif isinstance(parcellation, tuple): parc_type = "giftituple" elif isinstance(parcellation, str): if parcellation.endswith(".nii") | parcellation.endswith(".nii.gz"): parc_type = "nifti" elif parcellation.endswith(".gii") | parcellation.endswith(".gii.gz"): parc_type = "gifti" else: lgr.critical_raise(f"'parcellation' is string ({parcellation}) " "but ending was not recognized!", ValueError) else: lgr.critical_raise(f"'parcellation' data type ({type(parcellation)}) not defined!", TypeError) # load parcellation parc, parc_space, n_parcels = load_parc(parcellation, parc_type, parc_space) # check for problems if isinstance(parc, nib.GiftiImage): if parc_hemi is None: lgr.warning("If only one gifti parcellation image is supplied, 'parc_hemi' must " "be one of: ['L'], ['R']! Assuming left hemisphere!" ) parc_hemi = ["L"] elif len(parc_hemi) > 1: lgr.warning("If only one gifti parcellation image is supplied, 'parc_hemi' can " "only be one of: ['L'], ['R']! Assuming left hemisphere!" ) parc_hemi = ["L"] if isinstance(parc, tuple): if parc_hemi is None: parc_hemi = ["L", "R"] elif len(parc_hemi) == 1: lgr.warning("If 'parc_hemi' is ['L'] or ['R'], only one gifti parcellation image " "should be supplied as string or gifti! Assuming both hemispheres!") parc_hemi = ["L", "R"] if np.sum(n_parcels) != data.shape[1]: lgr.error(f"Number of parcels in data (1. dimension, {data.shape[1]}) " f"does not match number of parcels in parcellation ({n_parcels})!") # print temp = f", parc_hemi = {parc_hemi}" lgr.info(f"Loaded parcellation (parc_space = '{parc_space}'" f"{temp if parc_space in ['fsaverage', 'fsLR', 'fsa', 'fslr'] else ''}).") ## calculate distance matrix # lgr.info("Calculating distance matrix/matrices ({d}).".format( # d='euclidean' if parc_space in ['mni','MNI','mni152','MNI152'] else 'geodesic')) dist_mat = get_distance_matrix( parc=parc, parc_space=parc_space, parc_hemi=parc_hemi, parc_resample=parc_resample, centroids=centroids, n_proc=n_proc, verbose=verbose ) ## generate null data # check symmetry settings if lr_mirror_dist_mat and not parc_symmetric: lgr.warning("Left-right mirroring of distance matrix (lr_mirror_dist_mat) requested, but " "parcellation may not be symmetric. Set 'parc_symmetric=True' to enable this. " "Disabling lr_mirror_dist_mat.") lr_mirror_dist_mat = False # check if separate indices for hemispheres are provided as tuple of arrays if parc_idc_lh is not None and parc_idc_rh is not None: if not isinstance(parc_idc_lh, (list, np.ndarray)) or not isinstance(parc_idc_rh, (list, np.ndarray)): lgr.warning("'parc_idc_lh' and 'parc_idc_rh' must be lists or arrays! Setting both to None!") parc_idc_lh, parc_idc_rh = None, None elif parc_idc_lh is None and parc_idc_rh is None: pass else: for var, idc in [("parc_idc_lh", parc_idc_lh), ("parc_idc_rh", parc_idc_rh)]: if idc is not None: if not isinstance(idc, (list, np.ndarray)): lgr.warning(f"'{var}' must be a list or array! Setting '{var}' to None!") locals()[var] = None else: lgr.warning(f"Only indices of {var} provided, inferring indices of other hemisphere!") if var == "parc_idc_lh": parc_idc_rh = np.setdiff1d(np.arange(data.shape[1]), idc) else: parc_idc_lh = np.setdiff1d(np.arange(data.shape[1]), idc) # check if separate indices for subcortex are provided as array if parc_idc_sc is not None: if not isinstance(parc_idc_sc, (list, np.ndarray)): lgr.warning("'parc_idc_sc' must be a list or array! Setting 'parc_idc_sc' to None!") parc_idc_sc = None # create indices for cortex if parc_idc_sc is not None: parc_idc_cx = np.setdiff1d(np.arange(data.shape[1]), parc_idc_sc) if len(parc_idc_cx) == 0: parc_idc_sc, parc_idc_cx = None, None # auto-detect split_hemi: True for surface (tuple dist_mat), False for volumetric if split_hemi is None: split_hemi = isinstance(dist_mat, tuple) # if split_hemi=False and dist_mat is a surface tuple, flatten to block-diagonal single matrix if not split_hemi and isinstance(dist_mat, tuple): n_blocks = [d.shape[0] for d in dist_mat] n_total = sum(n_blocks) D_full = np.zeros((n_total, n_total), dtype=dtype) offset = 0 for d in dist_mat: n = d.shape[0] D_full[offset:offset+n, offset:offset+n] = d offset += n dist_mat = D_full # get all index lists according to which we want to split the data and distance matrix if isinstance(dist_mat, tuple): # surface input with split_hemi=True split_by_idc = ( np.arange(dist_mat[0].shape[0]), # left hemisphere np.arange(dist_mat[1].shape[0]) + dist_mat[0].shape[0], # right hemisphere ) elif split_hemi and split_cxsc and parc_idc_lh is not None and parc_idc_sc is not None: lgr.info("Generating null data separately for left and right cortex and subcortex.") split_by_idc = ( np.intersect1d(parc_idc_lh, parc_idc_cx), # left cortex np.intersect1d(parc_idc_rh, parc_idc_cx), # right cortex np.intersect1d(parc_idc_lh, parc_idc_sc), # left subcortex np.intersect1d(parc_idc_rh, parc_idc_sc), # right subcortex ) elif split_hemi and parc_idc_lh is not None: lgr.info("Generating null data separately for left and right hemisphere.") split_by_idc = ( parc_idc_lh, # whole left hemisphere parc_idc_rh, # whole right hemisphere ) elif split_cxsc and parc_idc_sc is not None: lgr.info("Generating null data separately for cortex and subcortex.") split_by_idc = ( parc_idc_cx, # whole cortex parc_idc_sc, # whole subcortex ) lr_mirror_dist_mat = False else: split_by_idc = (np.arange(data.shape[1]),) # whole-brain (default) # check if indices are missing missing_idc = np.setdiff1d(np.arange(data.shape[1]), np.concatenate(split_by_idc)) if len(missing_idc) == data.shape[1]: lgr.critical_raise("No parcel indices are present in the processed data! Check the provided " "'parc_idc_lh', 'parc_idc_rh', and 'parc_idc_sc' variables.", ValueError) elif len(missing_idc) > 0: lgr.warning(f"Some parcel indices are missing in the processed data! You might want to check " f"the provided 'parc_idc_lh', 'parc_idc_rh', and 'parc_idc_sc' variables. " f"Missing indices: {missing_idc}") # check duplicate indices if np.unique(np.concatenate(split_by_idc)).size != np.concatenate(split_by_idc).size: lgr.critical_raise("Duplicate indices found in 'parc_idc_lh' and 'parc_idc_rh'! " "Check if 'parc_idc_lh' and 'parc_idc_rh' are correctly defined.", ValueError) # check if any index set is empty if any(len(idc) == 0 for idc in split_by_idc): lgr.critical_raise(f"Empty index set found! {[len(idc) for idc in split_by_idc]}", ValueError) # split distance matrix to align with surface hemisphere distance matrices if not isinstance(dist_mat, tuple): if dist_mat is not None: dist_mat_split = tuple([dist_mat[np.ix_(i, i)] for i in split_by_idc]) else: dist_mat_split = tuple([None] * len(split_by_idc)) else: dist_mat_split = dist_mat # check distance matrices if not random_nulls: if any(dist is None or dist.shape[0] != dist.shape[1] for dist in dist_mat_split): lgr.critical_raise("Distance matrix is not square or None! Check the provided distance matrix.", ValueError) # mirror distance matrix if requested if lr_mirror_dist_mat and dist_mat is not None: lgr.info("Left-right averaging distance matrices to generate symmetrized null maps.") if len(dist_mat_split) == 1 and parc_idc_lh is not None and parc_idc_rh is not None: # split_hemi=False: patch the LH and RH diagonal blocks of the full dist_mat lh, rh = np.array(parc_idc_lh), np.array(parc_idc_rh) d = dist_mat_split[0].copy() avg_cx = _avg_dist_mats(d[np.ix_(lh, lh)], d[np.ix_(rh, rh)]) d[np.ix_(lh, lh)] = avg_cx d[np.ix_(rh, rh)] = avg_cx dist_mat_split = (d,) elif len(dist_mat_split) == 2: avg = _avg_dist_mats(dist_mat_split[0], dist_mat_split[1]) dist_mat_split = (avg, avg) if not np.allclose(dist_mat_split[0], dist_mat_split[1]): lgr.critical_raise("Left-right averaged whole-hemisphere distance matrices are not equal! " "Check if 'parc_idc_lh' and 'parc_idc_rh' are correctly defined.", ValueError) elif len(dist_mat_split) == 4: avg_cx = _avg_dist_mats(dist_mat_split[0], dist_mat_split[1]) avg_sc = _avg_dist_mats(dist_mat_split[2], dist_mat_split[3]) dist_mat_split = (avg_cx, avg_cx, avg_sc, avg_sc) if not (np.allclose(dist_mat_split[0], dist_mat_split[1]) and np.allclose(dist_mat_split[2], dist_mat_split[3])): lgr.critical_raise("Left-right averaged cortical and subcortical distance matrices are not equal! " "Check if 'parc_idc_lh' and 'parc_idc_rh' are correctly defined.", ValueError) if np.isnan(data).any(): lgr.info("Symmetrizing NaNs in data.") for i in range(data.shape[0]): data[i, :] = _symmetrize_nans(data[i, :], [parc_idc_lh, parc_idc_rh]) # define function to generate null data def par_fun(data_1d, seed): null_data = np.full((n_nulls, len(data_1d)), np.nan) for idc, dist in zip(split_by_idc, dist_mat_split): data_1d_sel = data_1d[idc] if np.isnan(data_1d_sel).all(): null_data[:, idc] = np.nan else: null_data[:, idc] = null_fun(data_1d=data_1d_sel, dist_mat=dist, n_nulls=n_nulls, seed=seed, **kwargs) return null_data # run null data generation if seed is None: seed = np.random.randint(0, 2**32 - 1) nulls = Parallel(n_jobs=n_proc)( delayed(par_fun)(data[i, :], seed + i) for i in tqdm( range(n_data), desc=f"{null_fun.__name__.split('_')[1].capitalize()} null maps ({n_proc} proc)", disable=not verbose ) ) nulls = {l: n.astype(dtype) for l, n in zip(data_labs, nulls)} # adjust scaling if cx_sc_minmax_scale: if parc_idc_sc is None: lgr.warning("To perform subcortical and cortical scaling adjustment, provide 'parc_idc_sc'!") else: lgr.info("Matching min-max range of subcortical and cortical null data to observed data.") for i, (l, n) in enumerate(nulls.items()): # get min, max, and mean of original subcortical data scale_sc = (np.nanmin(data[i, parc_idc_sc]), np.nanmax(data[i, parc_idc_sc])) # get min, max, and mean of original cortical data scale_cx = (np.nanmin(data[i, parc_idc_cx]), np.nanmax(data[i, parc_idc_cx])) # scale subcortical null data nulls[l][:, parc_idc_sc] = minmax_scale(n[:, parc_idc_sc], feature_range=scale_sc, axis=1) # scale cortical null data nulls[l][:, parc_idc_cx] = minmax_scale(n[:, parc_idc_cx], feature_range=scale_cx, axis=1) ## return lgr.info("Null data generation finished.") return nulls, dist_mat