import copy
from typing import List, Union, Sequence, Literal, Dict
from pathlib import Path
import numpy as np
import pandas as pd
import nibabel as nib
import matplotlib.pyplot as plt
from joblib import Parallel, delayed
from tqdm import tqdm
import logging
lgr = logging.getLogger(__name__)
from .io import parcellate_data, to_pickle, from_pickle
from .core.parcellation import Parcellation
from .core.reduce_x import _reduce_dimensions
from .core.transform_y import _dummy_code_groups, _num_code_subjects, _get_transform_fun
from .core.colocalize import _get_colocalize_fun, _sort_colocs, _get_coloc_stats, _rank_regress
from .core.permute import (_get_null_maps, _get_exact_p_values, _get_correct_mc_method,
_EMPIRICAL_MC_METHODS)
from .core.plot import _plot_categorical
from .core.constants import _PARCS_DEFAULT, _COLOC_METHODS
from .datasets import fetch_parcellation, fetch_reference, _check_parcellation
from .nulls import get_distance_matrix, _SPIN_METHODS
from .stats.coloc import beta, elasticnet, lasso, mlr, partialpearson, pearson, ridge
from .stats.misc import (mc_correction, residuals_nan, zscore_df, permute_groups,
compute_meff, meff_sidak_correction,
maxT_correction, step_maxT_correction, _null_stats_to_array)
from .stats.effectsize import rzscore_nan, zscore_nan
from .cv import _get_dist_dep_splits, _get_rand_splits
from .plotting import nice_stats_labels, brainplot
from .utils.utils import (set_log, _quiet, fill_nan, _get_df_string, _lower_strip_ws, mean_by_set_df,
get_column_names, lower, print_arg_pairs,
_parse_df_string, _parse_bool)
def _match_maps(index, queries):
"""Return sorted unique integer positions in *index* matching any of *queries*.
Matching priority:
1. Exact equality with the index value
2. For tuple index values: exact element match or partial string match on any element
3. For scalar index values: exact or partial string match
"""
if isinstance(queries, (str, int)):
queries = [queries]
keep = []
for i, val in enumerate(index):
for q in queries:
q_str = str(q)
if q == val:
keep.append(i)
break
if isinstance(val, tuple):
parts = [str(v) for v in val]
if q in val or any(q_str == p or q_str in p for p in parts):
keep.append(i)
break
else:
s = str(val)
if q_str == s or q_str in s:
keep.append(i)
break
return sorted(set(keep))
# ==================================================================================================
# DEPRECATION MESSAGE STRINGS
# ==================================================================================================
_DEPR_RETURN_SELF = (
"In the first non-dev release, all NiSpace object methods will return the "
"object itself by default. Set NiSpace(return_self=True) to disable this warning."
)
_DEPR_COMBAT_KEEP = (
"'combat_keep' is deprecated and will be ignored. All regression covariates "
"are now automatically protected during ComBat harmonization."
)
_DEPR_P_FROM_AVERAGE_Y_COLOC = (
"'p_from_average_y_coloc' is deprecated and will be removed in the first "
"non-dev release. Use 'pooled_p' instead."
)
_DEPR_SORT_COLOCS = (
"'sort_colocs' is deprecated and will be removed in the first non-dev release. "
"Use sort_by='coloc' instead."
)
# ==================================================================================================
# DEFINE CLASS
# ==================================================================================================
[docs]class NiSpace:
"""
The NiSpace class. To be imported via `from nispace import NiSpace`.
Docs under construction.
"""
def __init__(self,
x: Union[np.ndarray, pd.DataFrame, pd.Series,
List[Union[str, Path, nib.Nifti1Image, nib.GiftiImage]],
Dict[str, Union[str, Path, nib.Nifti1Image, nib.GiftiImage]]],
y: Union[np.ndarray, pd.DataFrame, pd.Series,
List[Union[str, Path, nib.Nifti1Image, nib.GiftiImage]],
Dict[str, Union[str, Path, nib.Nifti1Image, nib.GiftiImage]]] = None,
z: Union[Literal["gm", "wm", "csf", "veins", "arteries"],
List[Literal["gm", "wm", "csf", "veins", "arteries"]],
np.ndarray, pd.DataFrame, pd.Series,
List[Union[str, Path, nib.Nifti1Image, nib.GiftiImage]],
Dict[str, Union[str, Path, nib.Nifti1Image, nib.GiftiImage]]] = None,
x_labels: Sequence[str] = None,
y_labels: Sequence[str] = None,
z_labels: Sequence[str] = None,
data_space: Literal["MNI152NLin6Asym", "MNI152NLin2009cAsym", "fsaverage", "fsLR"] = "MNI152NLin6Asym",
standardize: Union[Literal["x", "y", "z", "xy", "xz", "yz", "xyz"], bool] = "xz",
drop_nan: bool = False,
parcellation: Union[str, Path, nib.Nifti1Image, nib.GiftiImage] = None,
parcellation_labels: Sequence[str] = None,
parcellation_space: Literal["MNI152NLin6Asym", "MNI152NLin2009cAsym", "fsaverage", "fsLR"] = "MNI152NLin6Asym",
parcellation_hemi: Union[Literal["R", "L"], Sequence[Literal["L", "R"]]] = ["L", "R"],
parcellation_symmetric: bool = False,
parcellation_l2rmap: pd.DataFrame = None,
parcellation_idc_lh: Sequence[int] = None,
parcellation_idc_rh: Sequence[int] = None,
parcellation_idc_sc: Sequence[int] = None,
parcellation_dist_mat: Union[np.ndarray, pd.DataFrame] = None,
parcellation_spin_mat: np.ndarray = None,
load_dist_mat: bool = True,
load_spin_mat: bool = True,
resampling_target: Literal["data", "parcellation"] = "data",
n_proc: int = 1,
verbose: bool = True,
dtype: Union[type, str] = np.float32,
return_self: bool = True,
**kwargs):
"""
Initialize the NiSpace object.
On initialization, the parameters are only stored. Processing is done with NiSpace.fit().
Parameters
----------
x : array-like of shape(n_reference, n_parcels) or shape(n_parcels) or len(n_reference) list-like of image data
The reference maps (e.g., pet or mRNA data). Can be a numpy array, pandas DataFrame,
pandas Series, a list or a dictionary containing (paths to) image objects. If a
dictionary, the keys are the names of the maps, but will be overridden by x_labels.
y : array-like of shape(n_target, n_parcels) or shape(n_parcels) or len(n_target) list-like of image data, optional
The target data (i.e., usually your maps of interest). Data types can be the same as
for x. Default is None. If None, NiSpace will create a copy of the reference maps to
evaluate reference map-to-map intercorrelations.
z : str, list of str, or array-like, optional
Maps to regress from reference/target maps across parcels. Can be one of the shortcut
strings "gm", "wm", "csf", "veins", or "arteries" (or a list of several) to
automatically fetch the corresponding tissue probability map (TPM) from the NiSpace
data library. Alternatively, accepts a numpy array, pandas DataFrame/Series, or a list
of image paths/objects. Default is None. If only one map is provided it is regressed
from every Y map; if multiple maps are provided they are used jointly as predictors.
x_labels : sequence of str, optional
Labels for the x data. Default is None. If None and x is DataFrame or Series, the
labels are taken from x's index (DataFrame) or name (Series).
y_labels : sequence of str, optional
Labels for the y data. Default is None. If None and y is DataFrame or Series, the
labels are taken from y's index (DataFrame) or name (Series).
z_labels : sequence of str, optional
Labels for the z data. Default is None. If None and len(z) == len(y) any y is DataFrame
or Series, the labels are taken from y's (not z's) index (DataFrame) or name (Series).
data_space : str, optional
The space in which the (x,y,z) data is defined; passed to neuromaps. Should be one of
"MNI152NLin6Asym", "MNI152NLin2009cAsym", "fsaverage" or "fsLR".
standardize : str or bool, optional
Whether to standardize the parcellated (x,y,z) data within each map across parcels.
Default is "xz". If True, will standardize all data. If (combination of) "x", "y",
or "z", will standardize only the respective data array.
drop_nan : bool, optional
Whether to drop NaN values. Default is False. NaN values should be handled case-by-case
in all following analyses, so False is a good choice.
parcellation : str, Path, Nifti1Image, or GiftiImage, optional
The parcellation image to use. Default is None. Required in following cases:
1) if image (paths) are passed to x, y, or z; 2) if z is a TPM shortcut string; 3) if "map" permutation
is used in NiSpace.permute(); 4) if distance-based cross-validation is used. Cases 3/4
apply even if initial data is passed pre-parcellated in arrays.
parcellation_labels : sequence of str, optional
Labels for the parcellation. Default is None. If None and input (x,y) data is DataFrame or
Series,
parcellation_space : str, optional
The space in which the parcellation is defined. See data_space for details.
parcellation_hemi : sequence of str, optional
The hemispheres in which the parcellation if defined if parcellation_space is
"fsaverage" or "fslr". Default is ["L", "R"].
parcellation_dist_mat : array-like of shape(n_parcels, n_parcels), optional
The distance matrix for the parcellation. Default is None. Required as described in
cases 3) and 4) for parcellation.
resampling_target : str, optional
The target for resampling. Options: "data" (default) or "parcellation". If "data", the
parcellation is resampled to the data space before application (nearest neighbor).
If "parcellation", the data is resampled to the parcellation space (linear).
Resampling only works in MNI -> MNI and MNI -> mni/fsaverage/fslr direction; if, e.g.,
input data is in MNI space and parcellation is in fsaverage, resampling_target will be
forced to "parcellation", as MNI -> fsaverage/fslr transformation is not supported.
n_proc : int, optional
The number of processes to use in joblib parallelization. Default is 1. -1 will use as
many processes as cores are detected.
verbose : bool, optional
Whether to print (a lot of) verbose output. Default is True.
dtype : data-type, optional
The data type to use for the arrays. Default is np.float32 to save memory.
Returns
-------
None
"""
self._x = x
self._x_with_self = False
self._y = y
self._z = z
self._x_lab = x_labels
self._y_lab = y_labels
self._z_lab = z_labels
if isinstance(data_space, str):
data_space = [data_space] * 3
elif isinstance(data_space, (list, tuple)) & len(data_space)==1:
data_space = data_space * 3
elif isinstance(data_space, (list, tuple)) & len(data_space)==3:
pass
else:
lgr.critical_raise("'data_space' must be a string, a list with len==1 or a list with "
f"len==3! Is {type(data_space)}.",
ValueError)
self._data_space = data_space
if "parc" in kwargs and parcellation is None:
parcellation = kwargs.pop("parc")
# custom image/path parcellations are built immediately; integrated strings
# and already-constructed Parcellation objects are handled in fit()
if parcellation is None:
self._parc = None
elif not isinstance(parcellation, (str, Parcellation)):
# image or path → build Parcellation immediately
self._parc = Parcellation.from_path(
source=parcellation,
space=parcellation_space,
labels=parcellation_labels,
dist_mat=parcellation_dist_mat,
spin_mat=parcellation_spin_mat,
symmetric=parcellation_symmetric,
l2rmap=parcellation_l2rmap,
hemi=parcellation_hemi,
)
else:
# string name or existing Parcellation object → resolved in fit()
self._parc = {
"parc": parcellation,
"labels": parcellation_labels,
"space": parcellation_space,
"hemi": parcellation_hemi,
"symmetric": parcellation_symmetric,
"l2rmap": parcellation_l2rmap,
"idc_lh": parcellation_idc_lh,
"idc_rh": parcellation_idc_rh,
"idc_sc": parcellation_idc_sc,
}
self._parc_dist_mat = {
"null_maps": parcellation_dist_mat
}
if not isinstance(parcellation_dist_mat, tuple):
self._parc_dist_mat["cv"] = parcellation_dist_mat
self._parc_spin_mat = parcellation_spin_mat
self._load_dist_mat = load_dist_mat
self._load_spin_mat = load_spin_mat
self._resampl_target = resampling_target
self._n_proc = n_proc
self._drop_nan = drop_nan
self._dtype = dtype
self._verbose = verbose
if standardize == True:
self._zscore = "xyz"
elif standardize in [None, False]:
self._zscore = ""
else:
self._zscore = standardize
self._transform_count = 0
# empty data storage dicts
self._X_dimred = {}
self._dimred = {}
self._Y_trans = {}
self._colocs = {}
self._colocs_fun = {}
self._nulls = {
"_colocs": {}
}
self._p_colocs = {}
self._z_colocs = {}
# defaults for get functions (IMPORTANT: this determines what coloc and get function will do!)
self._last_settings = {
"method": None,
"X_reduction": False,
"Y_transform": False,
"xsea": False,
"rank": False,
"zy_matched": False,
"regress_z": None,
"mc_method": None,
"z_method": "robust",
"pooled_p": False,
}
# deprecation adjustment
self._return_self = return_self
# FIT ==========================================================================================
[docs] def fit(self, **kwargs):
"""
"Fit" the NiSpace class instance, i.e., check input and apply parcellation if necessary.
Input and parameters are set on initialization.
Parameters
----------
**kwargs
Any keyword argument accepted by :func:`parcellate_data` can be
passed here and will override that function's defaults. The three
most commonly needed ones are:
ignore_background_data : bool
Whether to exclude background voxels from parcel-mean
computation. Default: True
background_value : float, list, set, array, or 'auto'
Value(s) treated as background. Scalar, ``'auto'`` (border-voxel
auto-detection), ``None`` (same as ``'auto'``), or any
collection of the above. Default: ``['auto', 0.0]``
drop_background_parcels : bool
Whether to set all-background parcels to NaN after
aggregation. Redundant when ``ignore_background_data=True``
because those parcels already return NaN from empty-mean
aggregation; only useful with ``ignore_background_data=False``.
Default: False
Returns
-------
self : object
Returns the instance itself.
"""
verbose = set_log(lgr, self._verbose)
lgr.info("*** NiSpace.fit() - Data extraction and preparation. ***")
# TODO (first non-dev release): remove return_self parameter and all _return_self branches
# throughout api.py; methods should unconditionally return self
if not self._return_self:
lgr.warning(_DEPR_RETURN_SELF)
## handle parcellation
if self._parc is not None:
# integrated parcellation
if isinstance(self._parc, dict) and isinstance(self._parc["parc"], str):
# check if parcellation is an integrated parcellation
parc_integrated = _check_parcellation(self._parc["parc"], force_str=True, raise_not_found=False)
if parc_integrated is not None:
# fetch full multi-space Parcellation (space=None → Parcellation object)
parc_obj = fetch_parcellation(
parcellation=parc_integrated,
hemi=self._parc["hemi"],
return_dist_mat=self._load_dist_mat,
return_spin_mat=self._load_spin_mat,
verbose=verbose,
)
# activate space only when raw image data needs parcellating;
# DataFrames/Series/ndarrays are already parcellated — everything
# else (lists, paths, image objects, "gm" string, …) is raw
needs_parcellating = any(
not isinstance(d, (pd.DataFrame, pd.Series, np.ndarray))
for d in [self._x, self._y, self._z] if d is not None
)
if needs_parcellating:
active_space = parc_obj.get_image_for_dataspace(self._parc["space"])
parc_obj.set_active_space(active_space)
self._parc = parc_obj
# populate dist_mat dict from Parcellation for backward compat
dm = parc_obj.get_dist_mat(compute_if_missing=False)
self._parc_dist_mat["null_maps"] = dm
if not isinstance(dm, tuple):
self._parc_dist_mat["cv"] = dm
if self._parc_spin_mat is None:
self._parc_spin_mat = parc_obj.get_spin_mat()
# custom parcellation (string file path not matched as integrated)
if not isinstance(self._parc, Parcellation):
self._parc = Parcellation.from_path(
source=self._parc["parc"],
space=self._parc["space"],
labels=self._parc["labels"],
dist_mat=self._parc_dist_mat["null_maps"],
symmetric=self._parc["symmetric"],
l2rmap=self._parc["l2rmap"],
hemi=self._parc["hemi"],
)
## extract input data
_input_kwargs = dict(
parcellation=self._parc,
resampling_target=self._resampl_target,
n_proc=self._n_proc,
verbose=verbose,
dtype=self._dtype,
) | kwargs
# reference data -> usually e.g. PET atlases
lgr.info("Checking input data for 'x' (should be, e.g., PET data):")
self._X = parcellate_data(
self._x,
data_labels=self._x_lab,
data_space=self._data_space[0],
**_input_kwargs
)
lgr.info(f"Got 'x' data for {self._X.shape[0]} x {self._X.shape[1]} parcels.")
# target data -> usually e.g. subject data or group-level outcome data
if self._y is None:
lgr.warning("No 'y' data detected. Will use 'X' as both reference and target data!")
self._Y = self._X.copy()
self._x_with_self = True
if isinstance(self._zscore, str):
if "x" in self._zscore and not "y" in self._zscore:
self._zscore += "y"
else:
lgr.info("Checking input data for 'y' (should be, e.g., subject data):")
self._Y = parcellate_data(
self._y,
data_labels=self._y_lab,
data_space=self._data_space[1],
**_input_kwargs
)
lgr.info(f"Got 'y' data for {self._Y.shape[0]} x {self._Y.shape[1]} parcels.")
# data to control correlations for
if self._z is not None:
lgr.info("Checking input data for z (should be, e.g., grey matter probability):")
_TPM_SHORTCUTS = {"gm", "wm", "csf", "veins", "arteries"}
_z_list = [self._z] if isinstance(self._z, str) else (
list(self._z) if isinstance(self._z, list) else None)
if _z_list is not None and all(
isinstance(s, str) and s.lower() in _TPM_SHORTCUTS for s in _z_list):
_z_list = [s.lower() for s in _z_list]
lgr.info(f"Fetching TPM reference map(s) for z: {_z_list}.")
self._z = fetch_reference("tpm", maps=_z_list, space=self._data_space[2],
print_references=False, verbose=verbose)
if self._z_lab is None:
self._z_lab = _z_list
self._Z = parcellate_data(
self._z,
data_labels=self._z_lab,
data_space=self._data_space[2],
**_input_kwargs
)
lgr.info(f"Got 'z' data for {self._Z.shape[0]} x {self._Z.shape[1]} parcels.")
else:
self._Z = None
## check parcel number
if self._X.shape[1] != self._Y.shape[1]:
lgr.critical_raise("Got differing numbers of parcels in 'x' & 'y' data!",
ValueError)
if self._Z is not None:
if self._X.shape[1] != self._Z.shape[1]:
lgr.critical_raise("Got differing numbers of parcels in 'x'/'y' & 'z' data!",
ValueError)
# ## check distance matrix
# if self._parc_dist_mat["null_maps"] is not None:
# dist_mat = load_distmat(self._parc_dist_mat["null_maps"])
# if not isinstance(dist_mat, tuple):
# dist_mat = dist_mat,
# for d in dist_mat:
# if d.shape[0] != d.shape[1] != (self._X.shape[1] if len(dist_mat) == 1
# else self._X.shape[1] / 2):
# lgr.warning(f"Provided distance matrix shape {d.shape} is not symmetric or "
# f"does not fit with number of parcels in data ({self._X.shape[1]})!"
# " Ignoring provided matrix.")
# dist_mat = None
# break
# self._parc_dist_mat["null_maps"] = dist_mat[0] if len(dist_mat) == 1 else dist_mat
## check data indices
if all(self._X.columns != self._Y.columns):
lgr.warning("Parcel labels (column names) differ between 'x' & 'y' dataframes! "
"Using 'x' labels for both.")
self._Y.columns = self._X.columns.copy()
if self._Z is not None:
if all(self._X.columns != self._Z.columns):
lgr.warning("Parcel labels (column names) differ between 'x'/'y' & 'z' dataframes! "
"Using 'x' labels for both.")
self._Z.columns = self._X.columns.copy()
## deal with nan's
self._nan_bool = pd.concat([self._X, self._Y, self._Z], axis=0).isnull().any(axis=0)
self._no_nan = np.array(~self._nan_bool)
# case remove nan parcels completely
if self._drop_nan==True:
lgr.warning(f"Dropping {np.sum(self._nan_bool)} parcels with nan's. "
"This might lead to problems with null map generation!")
self._X = self._X.loc[:, self._no_nan]
self._Y = self._Y.loc[:, self._no_nan]
if self._Z is not None:
self._Z = self._Z.loc[:, self._no_nan]
self._nan_bool = pd.concat([self._X, self._Y, self._Z], axis=0).isnull().any(axis=0)
self._no_nan = np.array(~self._nan_bool)
# get column (parcel) indices and labels with nan's
self._nan_cols = list(np.where(self._nan_bool==True)[0])
self._nan_labs = list(self._nan_bool[self._nan_bool].index)
## parcel number
self._n_parcels = self._X.shape[1]
## update data labels
self._x_lab = self._X.index
self._y_lab = self._Y.index
## z-standardization
if self._zscore:
self._zscore = self._zscore.lower()
if "x" in self._zscore:
lgr.info("Z-standardizing 'X' data.")
self._X = zscore_df(self._X, along="rows")
if "y" in self._zscore:
lgr.info("Z-standardizing 'Y' data.")
self._Y = zscore_df(self._Y, along="rows")
if ("z" in self._zscore) & (self._Z is not None):
lgr.info("Z-standardizing 'Z' data.")
self._Z = zscore_df(self._Z, along="rows")
## return complete object
return self
# REDUCE DIMENSIONS ============================================================================
[docs] def reduce_x(self, reduction,
mean_by_set=False, weighted_mean=False,
n_components=None, min_ev=None, fa_method="minres", fa_rotation="promax",
seed=None, store=True, verbose=None):
"""
Performs dimensionality reduction on X data.
Under construction.
"""
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.reduce_x() - X dimensionality reduction. ***")
## check if fit was run
self._check_fit()
if self._X.shape[0] <= 1:
lgr.critical_raise(f"For X dimensionality reduction, X data has to be more than "
f"one map ({self._X.shape[0]})!",
ValueError)
## get X data (so this function can be run on direct X input data)
_X = self._X
## case mean or median
if reduction.lower() in ["mean", "median"]:
lgr.info(f"Calculating parcelwise{' weighted' if weighted_mean else ''} "
f"{reduction}{' by set' if mean_by_set else ''} of X data.")
if weighted_mean & ("weight" not in _X.index.names):
lgr.error("DataFrame must have 'weight' in its MultiIndex for weighted calculations")
weighted_mean = False
if mean_by_set & ("set" not in _X.index.names):
lgr.error("DataFrame must have 'set' in its MultiIndex for set-wise calculations")
mean_by_set = False
_X_reduced = mean_by_set_df(_X, mean_by_set, weighted_mean, reduction)
## case PCA / case ICA / case FA
elif reduction.lower() in ["pca", "ica", "fa"]:
lgr.info(f"Calculating {reduction.upper()} on X data.")
_X_reduced, ev, loadings = _reduce_dimensions(
data=_X.values[:, self._no_nan].T,
method=reduction,
n_components=n_components,
min_ev=min_ev,
fa_method=fa_method,
fa_rotation=fa_rotation,
seed=seed
)
# save
_X_reduced = fill_nan(
data=pd.DataFrame(
data=_X_reduced.T,
index=[f"c{i}" for i in range(_X_reduced.shape[1])],
columns=_X.iloc[:, self._no_nan].columns,
dtype=self._dtype
),
idx=self._nan_cols,
idx_label=self._nan_labs,
which="col"
)
loadings = pd.DataFrame(
data=loadings,
columns=_X_reduced.index,
index=_X.index,
dtype=self._dtype
)
ev = pd.Series(
data=ev,
name="ev",
index=[f"c{i}" for i in range(_X_reduced.shape[0])],
dtype=self._dtype
)
## case not defined
else:
lgr.error(f"Dimensionality reduction '{reduction}' not defined!",
ValueError)
return None
## save and return
if store:
self._X_dimred[_get_df_string(kind="xdimred", xdimred=reduction)] = _X_reduced
if reduction in ["pca", "ica", "fa"]:
self._dimred[reduction] = dict(
method=reduction,
n_components=_X_reduced.shape[0],
min_ev=min_ev,
loadings=loadings
)
if reduction in ["pca", "fa"]:
self._dimred[reduction]["ev"] = ev
if reduction=="fa":
self._dimred[reduction]["fa_method"] = fa_method
self._dimred[reduction]["fa_rotation"] = fa_rotation
self._set_last(X_reduction=reduction)
## return
if self._return_self:
return self
if reduction in ["pca", "ica", "fa"]:
return _X_reduced, ev, loadings
else:
return _X_reduced
# CLEAN ========================================================================================
[docs] def clean_y(self, how,
covariates_within=None,
covariates_between=None,
protect=None,
within_y_specific=False,
combat=False, combat_protect=None, combat_keep=None,
combat_train=None, combat_model=None, combat_kwargs=None,
plot_design_between=False,
n_proc=None, replace=True, verbose=None):
from .core.clean_y import _clean_y_within, _clean_y_between
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.clean_y() - Y covariate regression. ***")
self._check_fit()
# TODO (first non-dev release): remove combat_keep parameter entirely
if combat_keep is not None:
lgr.warning(_DEPR_COMBAT_KEEP)
n_proc = self._n_proc if n_proc is None else n_proc
combat_kwargs = {} if combat_kwargs is None else combat_kwargs
if isinstance(how, str):
how = [how]
if not isinstance(how, (list, tuple, set)) or \
not all(h in ["within", "between"] for h in how):
lgr.critical_raise(
f"'how' must be (list of) 'within' and/or 'between', got {how!r}.", ValueError)
Y = self._Y
Y_arr = np.array(Y)
did_within = did_between = False
# within: regression across parcels per map
if "within" in how and covariates_within is not None:
lgr.info("Performing covariate regression within map/subjects (e.g., grey matter maps).")
Y_arr, used_z, did_within = _clean_y_within(
Y_arr=Y_arr,
covariates_within=covariates_within,
Z=self._Z,
n_maps=Y.shape[0],
n_parcels=Y.shape[1],
within_y_specific=within_y_specific,
n_proc=n_proc,
dtype=self._dtype,
verbose=verbose,
)
if used_z:
self._clean_y_z = True
# between: ComBat harmonization and/or regression across subjects
if "between" in how and covariates_between is not None:
lgr.info("Performing covariate regression between maps/subjects (e.g., age, sex, site).")
Y_arr, combat_model, combat_covariates = _clean_y_between(
Y_arr=Y_arr,
covariates_between=covariates_between,
n_subjects=Y.shape[0],
protect=protect,
combat=combat,
combat_protect=combat_protect,
combat_train=combat_train,
combat_model=combat_model,
combat_kwargs=combat_kwargs,
plot_design_between=plot_design_between,
n_proc=n_proc,
dtype=self._dtype,
verbose=verbose,
)
did_between = True
if combat_model is not None:
self._clean_y_combat_model = combat_model
self._clean_y_combat_cov = combat_covariates
if not did_within and not did_between:
lgr.warning("No covariate regression performed! Set 'how' to 'between' and/or 'within' "
"and provide covariate arrays through 'covariates_{within|between}'!")
Y = pd.DataFrame(Y_arr, columns=Y.columns, index=Y.index, dtype=self._dtype)
if replace:
self._Y = Y
if self._return_self:
return self
return Y
# TRANSFORM Y ==================================================================================
# TRANSFORM Z ==================================================================================
# COLOCALIZE ===================================================================================
[docs] def colocalize(self, method=None, X_reduction=None, Y_transform=None, xsea=None,
xsea_aggregation_method="mean",
regress_z=True, zy_matched=False,
X=None, Y=None, Z=None,
store=True, n_proc=None, seed=None, verbose=None,
dist_mat_kwargs=None,
force_dict=False,
**kwargs):
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.colocalize() - Estimating X & Y colocalizations. ***")
# kwargs
dist_mat_kwargs = {} if dist_mat_kwargs is None else dist_mat_kwargs
## check if fit was run
self._check_fit()
## settings
n_proc = self._n_proc if n_proc is None else n_proc
dtype = self._dtype
## settings
method, X_reduction, Y_transform, xsea, rank, zy_matched, regress_z = self._get_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
rank=kwargs.pop("rank", None),
zy_matched=zy_matched,
regress_z=regress_z,
)
if method is None:
coloc_methods = ", ".join(list(_COLOC_METHODS.keys()))
lgr.critical_raise(f"No colocalization method defined! Supported:\n{coloc_methods}",
ValueError)
else:
lgr.info(f"Running '{method}' colocalization" + \
(f" on '{X_reduction}'-reduced X data" if X_reduction else "") + \
(f" with '{Y_transform}' transform" if Y_transform else "") + ".")
if "spearman" in method:
rank = True
## get X and Y data (so this function can be run on direct X & Y input data)
# X
if not X:
if not X_reduction:
X = self._X
else:
with _quiet():
X = self.get_x(X_reduction=X_reduction)
X_arr = np.array(X, dtype=dtype)
X_weights = None
if xsea:
lgr.info("Will perform X-set enrichment analysis (XSEA).")
if not isinstance(X, pd.DataFrame):
lgr.critical_raise("XSEA requires X data to be a pandas DataFrame!",
TypeError)
if not isinstance(X.index, pd.MultiIndex):
lgr.critical_raise("XSEA requires X data to have a MultiIndex!",
TypeError)
if "set" not in X.index.names:
lgr.critical_raise("XSEA requires X data to have a MultiIndex with a 'set' level!",
ValueError)
if "weighted" in xsea_aggregation_method and "weight" not in X.index.names:
lgr.warning("Weighted XSEA requires X data to have a MultiIndex with a 'weight' level! "
"Will not use weights.")
xsea_aggregation_method = xsea_aggregation_method.replace("weighted", "")
X_arr = {set_name: np.array(set_X, dtype=self._dtype)
for set_name, set_X in X.groupby(level="set", sort=False)}
if "weighted" in xsea_aggregation_method:
X_weights = {set_name: np.array(set_X.index.get_level_values("weight"), dtype=self._dtype)
for set_name, set_X in X.groupby(level="set", sort=False)}
lgr.info(f"Using {len(X_arr)} sets with between "
f"{X.index.get_level_values('set').value_counts().min()} and "
f"{X.index.get_level_values('set').value_counts().max()} samples. "
f"Aggregating within-set colocalizations with: {xsea_aggregation_method}.")
if ("spearman" in method or "pearson" in method):
if hasattr(kwargs, "r_to_z"):
if kwargs["r_to_z"] is False:
lgr.warning("XSEA with correlation colocalization requires Fisher's Z "
"transform! Will set 'r_to_z' = True.")
kwargs["r_to_z"] = True
self._xsea = True
self._xsea_aggregation_method = xsea_aggregation_method
# Y
groups = kwargs.pop("groups", None)
subjects = kwargs.pop("subjects", None)
if not Y:
if not Y_transform:
Y = self._Y
else:
if not self._check_transform(ytrans=Y_transform, raise_error=True):
lgr.warning(f"Y transform '{Y_transform}' was not run before. Running now.")
self.transform_y(Y_transform, groups, subjects)
else:
with _quiet():
Y = self.get_y(Y_transform=Y_transform)
Y_arr = np.array(Y, dtype=dtype)
# Z
if not Z:
Z = self._Z
# if regress_z is True, we regress Z from X and Y, if None or False, we don't regress Z
if Z is None or regress_z is None or regress_z == False:
regress_z = ""
if regress_z == True:
regress_z = "xy"
# partialspearman and partialpearson entail full Z regression
if regress_z and "partial" in method:
if Z is None:
lgr.error(f"Provide Z data for method '{method}'! Using method "
f"'{method.replace('partial', '')}' instead.")
method = method.replace("partial", "")
regress_z = ""
elif hasattr(self, "_clean_y_z"):
lgr.warning("It seems, Z-from-Y-regression was performed before. Will add X-from-Z-regression.")
regress_z = "x"
else:
regress_z = "xy"
# if zy_matched is True, we regress each Z from each Y, we cannot regress from X in that case
if regress_z and zy_matched:
if Z.shape[0] != Y_arr.shape[0]:
lgr.error(f"Number of Z maps ({Z.shape[0]}) must equal number of Y maps " + \
f"({Y_arr.shape[0]}) if 'zy_matched' is True! Will not perform Z regression.")
zy_matched, regress_z = False, ""
elif "partial" in method:
lgr.warning(f"Method '{method}' is not compatible with matched Z->Y regression. "
f"Will perfom semi-partial '{method.replace('partial', '')}' correlation.")
method = method.replace("partial", "")
regress_z = "y"
else:
regress_z = "y"
# if regression was performed in .clean_y(), we avoid to touch y again
if hasattr(self, "_clean_y_z"):
lgr.warning(f"It seems, Z-from-Y-regression was performed before. "
f"{'Will only perform Z-from-X-regression.' if regress_z else 'Skipping regression.'}")
regress_z = regress_z.replace("y", "")
# if regress_z is still not False, we proceed
Z_arr = np.array(Z, dtype=dtype) if regress_z else None
## Preranking and regression
if rank or regress_z:
if rank:
lgr.info("Pre-ranking X and Y data.")
if regress_z:
lgr.info(f"Regressing {Z_arr.shape[0]} {'Y-matched ' if zy_matched else ''}Z maps from "
f"{regress_z.upper()} data.")
# X
X_arr = _rank_regress(
arr=X_arr,
rank=rank,
regress="x" in regress_z,
z=Z_arr,
zy_matched=zy_matched,
verbose=verbose
)
# Y
Y_arr = _rank_regress(
arr=Y_arr,
rank=rank,
regress="y" in regress_z,
z=Z_arr,
zy_matched=zy_matched,
verbose=verbose
)
## special case regularized regression: we need euclidean distance matrices
parcel_tr_te_splits, parcel_train_pct = None, None
if (method in ["lasso", "ridge", "elasticnet"]):
parcel_tr_te_splits = dist_mat_kwargs.pop("parcel_tr_te_splits", None)
euclidean_dist_mat = dist_mat_kwargs.pop("euclidean_dist_mat", None)
parcel_train_pct = dist_mat_kwargs.pop("parcel_train_pct", 0.75)
if parcel_tr_te_splits is None:
lgr.info("Fetching euclidean distance matrix for regularized regression "
"colocalization with n(parcel)-fold CV.")
if self._zscore:
lgr.warning("Input data was Z-standardized, which might lead to leakage in CV!")
if euclidean_dist_mat is None:
euclidean_dist_mat = self._get_dist_mat(
dist_mat_type="cv",
n_proc=n_proc,
**dist_mat_kwargs
)
if any([s in (self._parc._space or "").lower() for s in ["mni", "fsa"]]):
lgr.info("Calculating distance-dependent parcel splits.")
self.parcel_tr_te_splits_works = _get_dist_dep_splits(
dist_mat=euclidean_dist_mat[np.ix_(self._no_nan, self._no_nan)],
train_pct=parcel_train_pct
)
else:
lgr.warning("Calculating random parcel splits as parcellation space not supported.")
parcel_tr_te_splits = _get_rand_splits(
train_pct=parcel_train_pct,
seed=seed
)
# save colocalization settings
self._coloc_kwargs = dict(
xsea=xsea,
xsea_method=xsea_aggregation_method,
parcel_train_pct=None,
parcel_tr_te_splits=None,
parcel_mask_regularized=self._no_nan.copy(),
**kwargs
)
if self._x_with_self:
self._coloc_kwargs["r_equal_one"] = np.nan
## get function to perform colocalization for one y vector/row (= per subject), needed for parallelization
_y_colocalize = _get_colocalize_fun(
method=method,
seed=seed,
verbose=verbose,
dtype=dtype,
**self._coloc_kwargs
)
## run actual prediction using joblib.Parallel
_colocs_list = Parallel(n_jobs=n_proc)(
delayed(_y_colocalize)(X_arr, Y_arr[i_y, :], X_weights) \
for i_y in tqdm(
range(Y.shape[0]),
desc=f"Colocalizing ({method}, {n_proc} proc)",
disable=not verbose
)
)
## sort output with helper function, return as df
_colocs = _sort_colocs(
method=method,
xsea=xsea,
y_colocs_list=_colocs_list,
n_X=len(X_arr),
n_Y=Y.shape[0],
return_df=True,
labs_X=X.index if not xsea else X_arr.keys(),
labs_Y=Y.index,
#n_components=n_components,
dtype=dtype
)
## store & return
if store:
# save output
for stat in _colocs:
df_str = _get_df_string("coloc", xdimred=X_reduction, ytrans=Y_transform,
method=method, stat=stat, xsea=xsea)
self._colocs[df_str] = _colocs[stat]
# save coloc. function
self._colocs_fun[method] = _y_colocalize
# save last settings
self._set_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
rank=rank,
zy_matched=zy_matched,
regress_z=regress_z,
)
## return
if self._return_self:
return self
# return dict of dfs
if force_dict or len(_colocs) > 1:
return _colocs
else:
return _colocs[list(_colocs.keys())[0]]
# PERMUTE ======================================================================================
[docs] def permute(self, what, method=None, X_reduction=None, Y_transform=None, xsea=None,
n_perm=10000,
maps_which="X", maps_nulls=None, maps_method=None, dist_mat=None,
sets_X_background=None,
p_tails=None, pooled_p="auto", p_from_average_y_coloc=None,
n_proc=None, seed=None, store=True, verbose=None, force_dict=False,
**kwargs):
"""
Estimate exact non-parametric p-values via permutation testing.
Parameters
----------
what : str or list of str
What to permute. One or more of:
``"maps"`` — spatially constrained null maps for X and/or Y brain maps;
``"groups"`` — Y group labels (requires ``Y_transform``);
``"sets"`` — X set membership labels (requires XSEA).
Allowed combinations: ``["maps", "groups"]``, ``["maps", "sets"]``,
``["groups", "sets"]``. Three-way simultaneous permutation is not
supported and falls back to ``["groups", "sets"]``.
method : str, optional
Colocalization method. Defaults to the method used in the last
:meth:`colocalize` call.
X_reduction : str, optional
X dimensionality-reduction label. Defaults to last used.
Y_transform : str, optional
Y transformation label. Defaults to last used.
xsea : bool, optional
Whether to run in XSEA mode. Defaults to last used.
n_perm : int, optional
Number of permutations. Default is 10000.
maps_which : str or list of str, optional
Which data to generate null maps for: ``"X"``, ``"Y"``, or
``["X", "Y"]``. Default is ``"X"``.
maps_nulls : dict, optional
Pre-computed null maps as ``{map_name: array(n_perm, n_parcels)}``.
Bypasses null map generation entirely when provided and valid.
maps_method : str, optional
Null map generation method. Auto-selected from the parcellation when
not set. Options: ``"moran"`` (default for volumetric),
``"alexander_bloch"`` / ``"spin"`` (surface), ``"burt2018"``,
``"burt2020"``, ``"random"``.
dist_mat : array-like of shape (n_parcels, n_parcels), optional
Pre-computed geodesic distance matrix. Generated from the
parcellation if not provided (and required by the null method).
sets_X_background : array-like of shape (n_maps, n_parcels), optional
Background X map pool for set permutation. If not provided, the
unique observed X maps are used as the background.
p_tails : str or dict, optional
P-value tail(s). ``"two"``, ``"upper"``, or ``"lower"``. Can be a
dict keyed by statistic name (e.g. ``{"rho": "two"}``). Defaults to
method-appropriate tails.
pooled_p : str or bool, optional
How to aggregate across Y maps before computing p-values:
``"mean"`` or ``"median"`` (average first, one p-value per X map),
``False`` (one p-value per Y×X pair),
``"auto"`` (default) — ``False`` for single-Y, ``"mean"`` otherwise.
p_from_average_y_coloc : str or bool, optional
Deprecated. Use ``pooled_p`` instead.
n_proc : int, optional
Number of parallel processes. Defaults to the value set at init.
seed : int, optional
Random seed for reproducibility.
store : bool, optional
Store p-values and z-scores in the object. Default is True.
verbose : bool, optional
Print progress messages. Defaults to the value set at init.
force_dict : bool, optional
Always return a dict even when the result has a single statistic.
Other Parameters
----------------
maps_centroids : bool
``maps_*`` kwargs → null map generation (:func:`generate_null_maps`).
Use parcel centroids for geodesic distance matrix. Default False.
maps_parc_resample : int
Voxel size (mm) to resample parcellation before distance-matrix
computation. Default 2.
maps_lr_mirror_dist_mat : bool
Mirror left-hemisphere distance matrix to the right. Default False.
maps_split_hemi : bool or None
Generate null maps separately per hemisphere. Default None.
maps_split_cxsc : bool
Generate null maps separately for cortex and subcortex. Default False.
maps_cx_sc_minmax_scale : bool
Min–max scale cortex and subcortex null maps before merging.
Default False.
maps_procedure : str
Moran randomisation procedure: ``"singleton"`` (default) or ``"all"``.
maps_joint : bool
Moran joint randomisation. Default True.
distmat_centroids : bool
``distmat_*`` kwargs → distance-matrix generation (``_get_dist_mat``).
Use centroids for CV distance matrix. Default False.
distmat_parc_resample : int
Resampling voxel size for CV distance matrix. Default 2.
groups_paired : bool or "auto"
``groups_*`` kwargs → group-label permutation (:func:`permute_groups`).
Paired permutation (requires subjects vector). ``"auto"`` infers
pairing from the Y transform. Default ``"auto"``.
groups_strategy : str
Permutation strategy: ``"proportional"`` (default) or ``"random"``.
Remaining kwargs (no prefix) are forwarded to :meth:`colocalize`.
Returns
-------
p_values : DataFrame or dict of DataFrames
P-values indexed by Y labels × X labels. A dict is returned when
the colocalization method produces multiple statistics or when
``force_dict=True``.
"""
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.permute() - Estimate exact non-parametric p values. ***")
## check if fit was run
self._check_fit()
## map permutation: raise early only when parc is genuinely needed
if "maps" in ([what] if isinstance(what, str) else what) and self._parc is None:
from .nulls import _SPIN_METHODS as _sm
_has_nulls = maps_nulls is not None or (self._nulls.get("maps_null") is not None)
_has_dist = dist_mat is not None
_is_spin = maps_method in _sm if maps_method else False
if not (_has_nulls or (_has_dist and not _is_spin)):
lgr.critical_raise(
"Map null map generation requires a parcellation. Provide one via "
"NiSpace(parcellation=...), or supply pre-computed null maps (maps_nulls=) "
"or a distance matrix (dist_mat=) with a non-spin null method.",
ValueError,
)
## check for allowed permutation combinations
# check what variable
if isinstance(what, str):
what = [what]
elif isinstance(what, (list, tuple)):
pass
else:
lgr.critical_raise(f"'what' must be list, tuple, or string, not {type(what)}",
ValueError)
what = sorted(what)
# check maps_which variable
if maps_which:
if isinstance(maps_which, str):
maps_which = [m for m in maps_which]
maps_which = sorted(maps_which)
if maps_which not in [["X"], ["Y"], ["X", "Y"]]:
lgr.critical_raise(f"'maps_which' has to be 'X', 'Y', or ['X', 'Y'] not '{maps_which}'",
ValueError)
# case X/Y maps
if what == ["maps"]:
perm_info = f"{' & '.join(maps_which)} maps"
# case Y groups
elif what == ["groups"]:
perm_info = "Y groups"
# case X sets
elif what == ["sets"]:
perm_info = "X sets"
# case X/Y maps and Y groups
elif what == ["groups", "maps"]:
if maps_which != ["X"]:
lgr.warning("Y map permutation not allowed in combination with Y group permutation. "
"Will set 'maps_which' = 'X' and permute X maps instead.")
maps_which = ["X"]
perm_info = "X maps and Y groups"
# case X/Y maps and X sets
elif what == ["maps", "sets"]:
if maps_which != ["Y"]:
lgr.warning("X set permutation not allowed in combination with X map permutation. "
"Will set 'maps_which' = 'Y' and permute Y maps instead.")
maps_which = ["Y"]
perm_info = "X sets and Y maps"
# case X sets and Y groups
elif what == ["groups", "sets"]:
perm_info = "X sets and Y groups"
# case X/Y maps, X sets, and Y groups
elif what == ["groups", "maps", "sets"]:
lgr.warning("Cannot perform simultaneous permutation of sets, maps, and groups. "
"Will run permutation of X sets and Y groups instead.")
what = ["groups", "sets"]
# case other
else:
lgr.critical_raise(f"'what' = '{what}' not defined!",
ValueError)
lgr.info(f"Permutation of: {perm_info}.")
## settings
_rank_kwarg = kwargs.pop("rank", None)
method, X_reduction, Y_transform, xsea, rank, zy_matched, regress_z = self._get_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
rank=None,
zy_matched=None,
regress_z=None,
)
if _rank_kwarg is not None:
rank = _rank_kwarg
if "spearman" in method:
rank = True
# specific settings via kwargs
# distance matrix generation
dist_mat_kwargs = {
"dist_mat_type": "null_maps",
"centroids": False,
"parc_resample": 2
}
for k in [k for k in kwargs.keys() if k.startswith("distmat_")]:
dist_mat_kwargs[k.removeprefix("distmat_")] = kwargs.pop(k)
# null maps generation
maps_separate_sc = kwargs.pop("maps_separate_sc", False)
maps_kwargs = {
"nispace_nulls": self._nulls,
"use_existing_maps": True,
"null_maps": maps_nulls,
"null_method": maps_method,
"spin_mat": None,
"lr_mirror_dist_mat": False,
"cx_sc_minmax_scale": False,
"parc_resample": 2,
"parc_name": self._parc._name if self._parc else None,
}
for k in [k for k in kwargs.keys() if k.startswith("maps_")]:
maps_kwargs[k.removeprefix("maps_")] = kwargs.pop(k)
# resolve null_method and null_space from parcellation when not explicitly set
# TODO: combined spin+moran: get_null_space() currently returns a single (space, method).
# For combined parcellations it should return a split strategy and permute() should
# pass a sc-only dist_mat to _get_null_maps alongside the cx spin path.
if self._parc is not None:
if maps_kwargs["null_method"] is None:
null_space, null_method = self._parc.get_null_space()
maps_kwargs["null_method"] = null_method
lgr.info(f"Using default null method '{null_method}' "
f"(parcellation null space: '{null_space}').")
else:
null_space, _ = self._parc.get_null_space()
# ensure null_space is loaded and fitted so backward-compat properties work in _get_null_maps
self._parc._ensure_image_loaded(null_space)
if null_space not in self._parc._hemi_dict:
self._parc._fit_space(null_space)
if self._parc._space is None:
self._parc._space = null_space
# pass precomputed spin matrix for the resolved null space
if maps_kwargs["null_method"] in {"alexander_bloch", "spin"}:
maps_kwargs["spin_mat"] = (
self._parc_spin_mat
or self._parc.get_spin_mat(null_space)
)
else:
if maps_kwargs["null_method"] is None:
maps_kwargs["null_method"] = "moran"
lgr.info("No parcellation set; defaulting null method to 'moran'.")
# groups permutation
groups_kwargs = {
"paired": "auto",
"strategy": "proportional",
}
for k in [k for k in kwargs.keys() if k.startswith("groups_")]:
groups_kwargs[k.removeprefix("groups_")] = kwargs.pop(k)
# colocalization
coloc_kwargs = {
"xsea": xsea,
"n_proc": n_proc,
"seed": seed,
} | kwargs
## merge with settings from current NiSpace object
n_proc = n_proc if n_proc is not None else self._n_proc
dtype = self._dtype
## check if colocalize was run
xsea = True if ("sets" in what) or (xsea == True) else False
if not self._check_colocalize(method, None, X_reduction, Y_transform, xsea,
raise_error=False):
lgr.warning(f"'{method}' colocalization was not run before. Running now.")
self.colocalize(method, X_reduction, Y_transform, **coloc_kwargs)
# colocalize() resolves rank/regress_z/zy_matched through its own conditional
# logic (partial methods, zy_matched, clean_y interactions, etc.). Refresh
# local variables so the null-map pre-ranking/regression block below uses
# exactly the same settings as the observed colocalization just computed.
rank = self._last_settings["rank"]
regress_z = self._last_settings["regress_z"] or ""
zy_matched = self._last_settings["zy_matched"]
if "spearman" in method:
rank = True
## get observed data
# X
if not X_reduction:
_X_obs = self._X
else:
lgr.info(f"Loading dimensionality-reduced X data, reduction method = '{X_reduction}'.")
with _quiet():
_X_obs = self.get_x(X_reduction=X_reduction)
_X_obs_arr = np.array(_X_obs, dtype=dtype)
if xsea:
if self._xsea:
_X_obs_arr = {set_name: np.array(set_X, dtype=dtype)
for set_name, set_X in _X_obs.groupby(level="set", sort=False)}
else:
lgr.warning("Input 'what' contains 'sets' or 'xsea' was set to True but it seems "
"XSEA was not run before. Will not perform XSEA (permutation).")
what.remove("sets")
xsea = False
# Y
_Y_obs = self._Y
_Y_obs_arr = np.array(_Y_obs, dtype=dtype)
if Y_transform:
lgr.info(f"Loading transformed Y data, transform = '{Y_transform}'.")
with _quiet():
_Y_trans_obs = self.get_y(Y_transform=Y_transform)
_Y_trans_obs_arr = np.array(_Y_trans_obs, dtype=dtype)
# Z
_Z_obs = self._Z
_Z_obs_arr = np.array(_Z_obs, dtype=dtype)
# TODO (first non-dev release): remove p_from_average_y_coloc parameter
if p_from_average_y_coloc is not None:
lgr.warning(_DEPR_P_FROM_AVERAGE_Y_COLOC)
pooled_p = p_from_average_y_coloc
## resolve pooled_p: "auto" -> decide based on number of Y maps,
# "median"/"mean" -> calculate p based on mean/median colocalization across Y maps,
# False -> calculate p for every Y map, anything else -> defaults to mean
if pooled_p:
if pooled_p == "auto":
if _Y_obs.shape[0] == 1 or self._x_with_self:
pooled_p = False
elif _Y_obs.shape[0] > 1:
pooled_p = "mean"
elif pooled_p not in ["mean", "median"]:
pooled_p = "mean"
if pooled_p:
lgr.info("Will calculate p values for mean colocalization across Y maps. Set "
"'pooled_p=False' to compute p values for each Y map individually.")
self._nulls["pooled_p"] = pooled_p
## get observed colocalizations as numpy arrays
lgr.info(f"Loading observed colocalizations (method = '{method}').")
with _quiet():
_colocs_obs = self.get_colocalizations(
method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
force_dict=True,
)
_colocs_obs = {stat: np.array(df, dtype=dtype) for stat, df in _colocs_obs.items()}
# get average prediction values of all y if requested
if pooled_p:
for stat in _colocs_obs.keys():
if pooled_p == "median":
_colocs_obs[stat] = np.nanmedian(_colocs_obs[stat], axis=0)[np.newaxis, :]
else:
_colocs_obs[stat] = np.nanmean(_colocs_obs[stat], axis=0)[np.newaxis, :]
## prepare permuted data as prerequisite for null colocalization runs
_X_null, _Y_null, _Z_null = None, None, None
# case permute X/Y brain maps
if "maps" in what:
# iterate map datasets to permute
for XY in maps_which:
lgr.info(f"Generating permuted {XY} maps.")
# if no null maps & also no distance matrix given, generate distance matrix
# (spin methods don't use a dist_mat — skip generation)
if maps_nulls is None and dist_mat is None \
and maps_kwargs["null_method"] not in _SPIN_METHODS:
dist_mat = self._get_dist_mat(**dist_mat_kwargs)
# get null maps, will not generate new maps if already existing and use of
# existing is requested
if XY=="X":
data_obs = _X_obs
standardize_nulls = True if "x" in self._zscore else False
elif XY=="Y":
if Y_transform:
data_obs = _Y_trans_obs
else:
data_obs = _Y_obs
standardize_nulls = True if "y" in self._zscore else False
maps_nulls = _get_null_maps(
data_obs=data_obs,
dist_mat=dist_mat,
parc=self._parc,
#parc_kwargs=self._parc_info,
#standardize=False,
standardize=standardize_nulls,
n_perm=n_perm,
seed=seed,
n_proc=n_proc,
dtype=dtype,
verbose=verbose,
**maps_kwargs
)
# store null maps
self._nulls["maps_null_method"] = maps_kwargs["null_method"]
self._nulls["maps_null"] = maps_nulls
self._nulls["maps_null_which"] = XY
# cache newly generated spin matrix for reuse
if maps_kwargs["null_method"] in {"alexander_bloch", "spin"} \
and self._parc_spin_mat is None \
and self._nulls.get("maps_spin") is not None:
self._parc_spin_mat = self._nulls["maps_spin"]
# sort null map data into lists of length n_perm, each element being one
# permuted array of observed values
lgr.debug("Sorting null map data into arrays.")
if XY=="X":
_X_null = [np.c_[[maps[i, :] for maps in maps_nulls.values()]].astype(self._dtype)
for i in range(n_perm)]
# case: xsea requested: re-sort into a list of dicts of set-wise arrays
if isinstance(_X_obs_arr, dict):
idc_set = np.array(_X_obs.index.get_level_values("set"))
_X_null = [{set_name: null[idc_set == set_name, :]
for set_name in _X_obs_arr.keys()}
for null in _X_null]
elif XY=="Y":
_Y_null = [np.c_[[maps[i, :] for maps in maps_nulls.values()]].astype(self._dtype)
for i in range(n_perm)]
# case permute Y groups
if ("groups" in what) and Y_transform:
lgr.info(f"Generating permuted Y groups.")
# get groups without nan values
groups = self._groups_no_nan
if hasattr(self, "_subjects_no_nan"):
subjects = self._subjects_no_nan
else:
subjects = None
# Y values without nan values in group vector
_Y_obs_arr_nonan = _Y_obs_arr[~self._groups_nan_idc, :]
## prepare formula & transform function
# TODO: get stored transform function
Y_transform = _lower_strip_ws(Y_transform)
apply_transform, paired = _get_transform_fun(Y_transform, return_df=False,
return_paired=True, dtype=dtype,
ignore_nan_warnings=True)
# paired permutations?
if groups_kwargs["paired"] not in ["auto", True, False]:
lgr.warning("Argument 'groups_paired' must be of boolean type or 'auto' not "
f"'{groups_kwargs['paired']}'! Setting to 'auto'.")
groups_kwargs["paired"] = "auto"
if groups_kwargs["paired"] == "auto":
groups_kwargs["paired"] = paired
# get list of permuted group labels
lgr.info(f"Permuting groups/sessions vector, strategy: "
f"{'paired' if groups_kwargs['paired'] else 'unpaired'}, {groups_kwargs['strategy']}.")
groups_null = permute_groups(
groups=groups,
subjects=subjects,
n_perm=n_perm,
n_proc=n_proc,
seed=seed,
verbose=verbose,
**groups_kwargs
)
# get permuted group comparison results
# parallelization function
def par_fun(group_null):
# apply transform with random groups
Y_null = apply_transform(y=_Y_obs_arr_nonan, groups=group_null, subjects=subjects)
return Y_null
# run in parallel
_Y_null = Parallel(n_jobs=n_proc)(
delayed(par_fun)(g) for g in tqdm(
groups_null,
desc=f"Null transformations ({method}, {n_proc} proc)", disable=not verbose
)
)
# case permute Y groups but no comparison is provided
elif ("groups" in what) & (not Y_transform):
lgr.critical_raise("Provide a comparison ('Y_transform') to perform group permutation!",
ValueError)
# case X Set Enrichment Analysis: permute X sets
if "sets" in what:
lgr.info("Generating permuted X sets.")
if sets_X_background is None:
sets_X_background = _X_obs.drop_duplicates(ignore_index=True).values
lgr.warning(f"No X background dataset provided. Will use "
f"{sets_X_background.shape[0]} unique X maps as background.")
else:
if not isinstance(sets_X_background, (np.ndarray, pd.DataFrame)):
lgr.critical_raise(f"X background maps must be of type np.ndarray or "
f"pd.DataFrame, not {type(sets_X_background)}!",
TypeError)
if sets_X_background.shape[1] != _X_obs.shape[1]:
lgr.critical_raise(f"X background maps of wrong shape {sets_X_background.shape}!",
ValueError)
lgr.info(f"Will use {sets_X_background.shape[0]} provided background maps.")
sets_X_background = np.array(sets_X_background, dtype=dtype)
if "x" in self._zscore:
lgr.info("Z-standardizing X background maps.")
sets_X_background = zscore_df(sets_X_background, along="rows", force_df=False)
# get permuted X sets
set_sizes = [set_X.shape[0] for set_X in _X_obs_arr.values()]
set_names = list(_X_obs_arr.keys())
bg_size = sets_X_background.shape[0]
# get permuted indices
rng = np.random.default_rng(seed)
_X_null = [
{name: rng.choice(bg_size, size=size, replace=False)
for name, size in zip(set_names, set_sizes)}
for _ in tqdm(range(n_perm), desc="Permuting X set indices", disable=not verbose)
]
# function to get permuted data from indices. necessary to handle large X set arrays
def _xsea_perm_data(i):
return {name: sets_X_background[idc, :] for name, idc in _X_null[i].items()}
# catch case in which xsea is performed (i.e., x array is dict) but sets are not permuted
elif "sets" not in what and isinstance(_X_obs_arr, dict):
lgr.info("Running X Set Enrichment Analysis (XSEA) without set permutation.")
# function to get _X_null data, only necessary for compatibility with the above
def _xsea_perm_data(i):
return _X_null[i]
# handle weighted XSEA
X_weights = None
if isinstance(_X_obs_arr, dict):
if "weighted" in self._xsea_aggregation_method:
if isinstance(_X_obs_arr, dict):
X_weights = {set_name: np.array(set_X.index.get_level_values("weight"), dtype=self._dtype)
for set_name, set_X in _X_obs.groupby(level="set", sort=False)}
## pre-rank and regress
if rank or regress_z:
if rank:
lgr.info("Pre-ranking X and Y (null) data.")
if regress_z:
lgr.info(f"Regressing {_Z_obs_arr.shape[0]} {'Y-matched ' if zy_matched else ''}Z "
f"maps from (null) {regress_z.upper()} data.")
# X observed
_X_obs_arr = _rank_regress(
arr=_X_obs_arr,
rank=rank,
regress="x" in regress_z,
z=_Z_obs_arr,
zy_matched=zy_matched,
verbose=verbose
)
# X null
if not ("sets" in what and isinstance(_X_obs_arr, dict)):
_X_null = _rank_regress(
arr=_X_null,
rank=rank,
regress="x" in regress_z,
z=_Z_obs_arr,
zy_matched=zy_matched,
verbose=verbose,
n_proc=n_proc
)
# XSEA background
sets_X_background = _rank_regress(
arr=sets_X_background,
rank=rank,
regress="x" in regress_z,
z=_Z_obs_arr,
zy_matched=zy_matched,
verbose=verbose,
)
# Y observed
_Y_obs_arr = _rank_regress(
arr=_Y_obs_arr,
rank=rank,
regress="y" in regress_z,
z=_Z_obs_arr,
zy_matched=zy_matched,
verbose=verbose,
)
# Y null
_Y_null = _rank_regress(
arr=_Y_null,
rank=rank,
regress="y" in regress_z,
z=_Z_obs_arr,
zy_matched=zy_matched,
verbose=verbose,
n_proc=n_proc
)
## check what permuted dataframes we have, if we dont have them, copy observed data (!)
if (not _X_null) & (not _Y_null) & (not _Z_null):
lgr.critical_raise("No permuted data generated. Supported permutations ('what') are: "
"'maps', 'groups', and 'sets'.",
ValueError)
# X
if not _X_null:
_X_null = [_X_obs_arr] * n_perm
# Y
if not _Y_null:
_Y_null = [_Y_trans_obs_arr if "_Y_trans_obs" in locals() else _Y_obs_arr] * n_perm
## run null colocalizations
# function to perform colocalization for one y vector (= per subject); see NiSpace.colocalize()
# the function was saved by NiSpace.colocalize()
_y_colocalize = self._colocs_fun[method]
# function to perform colocalization for one X/Y/Z null array
xsea = True if isinstance(_X_null[0], dict) else False
#n_components = self._coloc_kwargs["n_components"]
def par_fun(X_null, Y_null, X_weights=None):
# run colocalization
null_colocs_list = [
_y_colocalize(X_null, Y_null[i_y, :], X_weights)
for i_y in range(Y_null.shape[0])
]
# sort output with helper function, return as array
null_colocs = _sort_colocs(
method=method,
xsea=xsea,
y_colocs_list=null_colocs_list,
n_X=len(X_null),
n_Y=Y_null.shape[0],
#n_components=n_components,
return_df=False,
dtype=dtype
)
# average colocalization if requested
if pooled_p:
for stat in null_colocs:
if pooled_p == "median":
null_colocs[stat] = np.nanmedian(null_colocs[stat], axis=0)[np.newaxis, :]
else:
null_colocs[stat] = np.nanmean(null_colocs[stat], axis=0)[np.newaxis, :]
# return
return null_colocs
# run in parallel
if not xsea:
_colocs_null = Parallel(n_jobs=n_proc)(
delayed(par_fun)(_X_null[i], _Y_null[i])
for i in tqdm(
range(n_perm),
desc=f"Null colocalizations ({method}, {n_proc} proc)", disable=not verbose
)
)
else:
_colocs_null = Parallel(n_jobs=n_proc)(
delayed(par_fun)(_xsea_perm_data(i), _Y_null[i], X_weights)
for i in tqdm(
range(n_perm),
desc=f"Null colocalizations ({method}, {n_proc} proc)", disable=not verbose
)
)
## calculate exact p values
# get values
p_data, p_tails_resolved = _get_exact_p_values(
method=method,
xsea_aggr=self._xsea_aggregation_method if xsea else None,
colocs_obs=_colocs_obs,
colocs_null=_colocs_null,
p_tails=p_tails,
verbose=verbose,
dtype=dtype
)
# to dataframe
for stat in p_data.keys():
# column names
if p_data[stat].shape[1] == 1:
cols = [stat]
elif p_data[stat].shape[1] == _X_obs.shape[0]:
cols = _X_obs.index
elif isinstance(_X_obs_arr, dict) and p_data[stat].shape[1] == len(_X_obs_arr):
cols = list(_X_obs_arr.keys())
else:
lgr.critical_raise(f"p value array of wrong shape ({p_data[stat].shape})!",
ValueError)
# index names
if (pooled_p in ["mean", "median"]) & (_Y_obs.shape[0]>1):
rows = [pooled_p]
elif "_Y_trans_obs" in locals():
rows = _Y_trans_obs.index
else:
rows = _Y_obs.index
p_data[stat] = pd.DataFrame(p_data[stat], columns=cols, index=rows)
# save and return
if store:
perm = "".join(what).replace("maps", "".join(maps_which)+"maps")
for stat in p_data:
df_str = _get_df_string(
"p",
xdimred=X_reduction,
ytrans=Y_transform,
method=method,
stat=stat,
xsea=xsea,
perm=perm,
pooled_p=pooled_p,
)
self._p_colocs[df_str] = p_data[stat]
df_str = _get_df_string(
"null",
xdimred=X_reduction,
ytrans=Y_transform,
method=method,
xsea=xsea,
perm=perm,
pooled_p=pooled_p,
)
self._nulls["_colocs"][df_str] = _colocs_null
self._nulls[f"p_tails_{df_str}"] = p_tails_resolved
self._set_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
rank=rank,
zy_matched=zy_matched,
regress_z=regress_z,
perm=perm,
pooled_p=pooled_p,
)
## return
if self._return_self:
return self
# return dict of dfs
if force_dict or len(p_data) > 1:
return p_data
else:
return p_data[list(p_data.keys())[0]]
else:
if force_dict or len(p_data) > 1:
return p_data, _colocs_null
else:
k = list(p_data.keys())[0]
return p_data[k], _colocs_null[k]
# CORRECT ======================================================================================
[docs] def correct_p(self, mc_method="meff",
mc_alpha=0.05, mc_dimension="array", coloc_method=None, store=True, verbose=None):
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.correct_p() - Correct p values for multiple comparisons. ***")
# list of all p-value df keys, only uncorrected p-values
p_strs = [k for k in self._p_colocs if "mc-none" in k]
if coloc_method is not None:
p_strs = [s for s in p_strs if f"coloc-{coloc_method}" in s]
# resolve mc method name (aliases → canonical)
mc_method = _get_correct_mc_method(mc_method)
lgr.info(f"Correction method: '{mc_method}', alpha: {mc_alpha}, dimension: '{mc_dimension}'.")
# mc_dimension → how
if mc_dimension in ["x", "X", "c", "col", "cols", "column", "columns"]:
how = "c"
elif mc_dimension in ["y", "Y", "r", "row", "rows"]:
how = "r"
else:
how = "a"
# key suffix (strips _ and - so it's safe as a key fragment)
mc_key = mc_method.replace("_", "").replace("-", "").lower()
p_corr = dict()
if mc_method in _EMPIRICAL_MC_METHODS:
for p_str in p_strs:
fields = _parse_df_string(p_str)
xdimred = _parse_bool(fields.get("xdimred", False))
ytrans = _parse_bool(fields.get("ytrans", False))
coloc = fields.get("coloc")
stat = fields.get("stat")
xsea = _parse_bool(fields.get("xsea", False))
perm = fields.get("perm")
pooled = _parse_bool(fields.get("pooled", False))
p_str_mc = p_str.replace("mc-none", f"mc-{mc_key}")
p_values = self._p_colocs[p_str]
# ── Meff + Sidak ──────────────────────────────────────────────
if mc_method in {"meff_galwey", "meff_li_ji"}:
if how == "c":
lgr.warning("mc_dimension='x'/'c' is not meaningful for 'meff' "
"(Meff is defined over X maps). Ignoring.")
meff_variant = "galwey" if mc_method == "meff_galwey" else "li_ji"
with _quiet():
X_data = np.array(self.get_x(X_reduction=xdimred))
# for XSEA: use per-set mean map so Meff reflects set-level independence
if xsea and hasattr(self._X.index, "get_level_values"):
set_labels = self._X.index.get_level_values("set")
X_data = np.array(
pd.DataFrame(X_data, index=self._X.index)
.groupby(set_labels).mean()
)
meff_x = compute_meff(X_data, method=meff_variant)
lgr.info(f"Meff_X ({meff_variant}) = {meff_x:.2f} "
f"(from {X_data.shape[0]} maps).")
n_y_rows = p_values.shape[0]
if how == "a" and n_y_rows > 1:
# joint correction across X and Y: meff_total = meff_X * meff_Y
with _quiet():
Y_data = np.array(self.get_y(Y_transform=ytrans))
meff_y = compute_meff(Y_data, method=meff_variant)
meff = meff_x * meff_y
lgr.info(f"Meff_Y ({meff_variant}) = {meff_y:.2f} "
f"(from {n_y_rows} maps). Meff_total = {meff:.2f}.")
lgr.warning(
f"Joint Meff correction (Meff_X × Meff_Y = {meff:.2f}) assumes all "
"Y maps are related entities examined together (e.g., disorder effect "
"size maps). It is NOT valid for individual subject maps. "
"Use mc_dimension='y' for independent per-Y correction."
)
else:
meff = meff_x
p_corr[p_str_mc], _ = meff_sidak_correction(
p_values, meff=meff, alpha=mc_alpha, dtype=self._dtype
)
# ── Max-T / Step-down Max-T ───────────────────────────────────
elif mc_method in {"maxT", "step_maxT"}:
# warn if scale comparability assumption may be violated
if stat in {"beta", "individual"} and "x" not in (self._zscore or ""):
lgr.warning(
f"maxT on stat='{stat}' assumes regression coefficients are on a "
"comparable scale across X maps, but X was not z-scored "
f"(standardize='{self._zscore}'). Max-T results may be unreliable. "
"Re-run with standardize='x' (or 'xz') to satisfy this assumption."
)
# maxT corrects across X (columns) per Y row — override "array" default
how_maxt = how if how != "a" else "r"
if how == "a":
lgr.info("mc_dimension not explicitly set to 'y'; defaulting to per-Y "
"(max across X) for maxT.")
elif how == "c":
lgr.critical_raise(
"mc_dimension='x' (per-X column) is not supported for maxT. "
"Use mc_dimension='y' (per-Y row, default) or 'array' (global).",
ValueError
)
# get null key and null colocs
null_str = _get_df_string(
"null", xdimred=xdimred, ytrans=ytrans,
method=coloc, xsea=xsea, perm=perm, pooled_p=pooled,
)
if null_str not in self._nulls["_colocs"]:
lgr.critical_raise(
f"Null colocalizations for '{null_str}' not found. "
f"'{mc_method}' requires the full null distributions. "
"Either re-run permute(), or reload the object with save_nulls=True "
"(note: correct_p('maxT'/'step_maxT') should be called before saving "
"without nulls).",
KeyError
)
null_colocs = self._nulls["_colocs"][null_str]
# get observed statistics (not p-values)
with _quiet():
obs_stats = self.get_colocalizations(
method=coloc, stats=[stat],
X_reduction=xdimred, Y_transform=ytrans,
xsea=xsea, force_dict=True,
)[stat]
# get tail (use stored resolved p_tails if available, else default)
p_tails_stored = self._nulls.get(f"p_tails_{null_str}", {})
tail = p_tails_stored.get(stat, "two")
correction_fn = maxT_correction if mc_method == "maxT" else step_maxT_correction
p_corr[p_str_mc], _ = correction_fn(
obs_stats, null_colocs, stat=stat,
tail=tail, how=how_maxt,
alpha=mc_alpha, dtype=self._dtype
)
else:
# statsmodels path
for p_str in p_strs:
p_str_mc = p_str.replace("mc-none", f"mc-{mc_key}")
p_corr[p_str_mc], _ = mc_correction(
self._p_colocs[p_str],
alpha=mc_alpha,
method=mc_method,
how=how,
dtype=self._dtype
)
# save and return
if store:
for p_str in p_corr:
self._p_colocs[p_str] = p_corr[p_str]
self._set_last(mc_method=mc_method)
if self._return_self:
return self
return p_corr
# ----------------------------------------------------------------------------------------------
[docs] def normalize_colocalizations(self, coloc_method=None, z_method="robust", store=True,
verbose=None):
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.normalize_colocalizations() - Normalize colocalizations against null "
"distribution. ***")
lgr.info(f"Z-score method: {'robust (median/MAD)' if z_method == 'robust' else 'standard (mean/SD)'}.")
if not self._nulls["_colocs"]:
lgr.critical_raise(
"No null colocalizations found. "
"Run permute() first, or reload the object with save_nulls=True "
"(note: normalize_colocalizations() should be called before saving without nulls).",
ValueError
)
score_fn = rzscore_nan if z_method == "robust" else zscore_nan
self._set_last(z_method=z_method)
null_keys = list(self._nulls["_colocs"].keys())
if coloc_method is not None:
null_keys = [k for k in null_keys if f"coloc-{coloc_method.lower()}" in k]
for null_str in null_keys:
fields = _parse_df_string(null_str)
xdimred = _parse_bool(fields.get("xdimred", False))
ytrans = _parse_bool(fields.get("ytrans", False))
coloc = fields.get("coloc")
xsea = _parse_bool(fields.get("xsea", False))
perm = fields.get("perm")
pooled = _parse_bool(fields.get("pooled", False))
null_colocs = self._nulls["_colocs"][null_str]
stats = _get_coloc_stats(coloc, permuted_only=True)
for stat in stats:
with _quiet():
obs_dict = self.get_colocalizations(
method=coloc, stats=[stat],
X_reduction=xdimred, Y_transform=ytrans,
xsea=xsea, force_dict=True,
)
if stat not in obs_dict:
continue
obs_df = obs_dict[stat]
null_arr = _null_stats_to_array(null_colocs, stat).astype(float)
z_arr = score_fn(np.array(obs_df, dtype=float), null_arr)
z_df = pd.DataFrame(z_arr, index=obs_df.index, columns=obs_df.columns,
dtype=self._dtype)
if store:
z_str = _get_df_string(
"z",
xdimred=xdimred, ytrans=ytrans,
method=coloc, stat=stat,
xsea=xsea, perm=perm, pooled_p=pooled,
)
self._z_colocs[z_str] = z_df
lgr.info(f"Stored normalized colocalizations: {z_str}")
if self._return_self:
return self
return self
# PLOT ====================================================================================
[docs] def plot(self, kind="categorical",
method=None, stats=None,
X_reduction=None, Y_transform=None,
xsea=None,
Y_labels=None, X_labels=None,
Y_maps=None, X_maps=None,
values="coloc", mc_method=None,
plot_nulls=True, annot_p=True, permute_what=None,
title="auto",
sort_by=None, # None | 'coloc'|'abs_coloc'|'z'|'abs_z'|'p' — also enables truncation when n_categories is exceeded
sort_colocs=False,
n_categories=50, # max categories shown; exceeded + sort_by set → truncate top N; exceeded + no sort_by → skip with warning; None = no limit
colocalizations_dict=None, nulls_dict=None, p_dict=None, pc_dict=None,
fig=None, ax=None, figsize=None, show=True,
plot_kwargs=None, nullplot_kwargs=None,
verbose=None):
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.plot() - Plot colocalization results. ***")
# kwargs
plot_kwargs = {} if plot_kwargs is None else plot_kwargs
nullplot_kwargs = {} if nullplot_kwargs is None else nullplot_kwargs
# check fit
self._check_fit()
# settings
method, X_reduction, Y_transform, xsea, permute_what = self._get_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
perm=permute_what
)
# check if minimum input provided
if colocalizations_dict is None and method is None:
lgr.critical_raise("Provide either a method name or a colocalization result!",
ValueError)
# TODO (first non-dev release): remove sort_colocs parameter entirely
if sort_colocs:
lgr.warning(_DEPR_SORT_COLOCS)
if sort_by is None:
sort_by = "coloc"
sort_colocs = False
# p mode never shows null distributions; z mode supports them
if values == "p":
plot_nulls = False
# check nulls/p plot
if (plot_nulls or annot_p) and not permute_what:
lgr.warning("if 'plot_nulls' or 'annot_p', provide 'permute_what' ({'groups', "
"'{x|y|xy}maps', 'sets'}). Setting 'plot_nulls' and 'annot_p' to False!")
plot_nulls = False
annot_p = False
# resolve mc_method for p mode
if values == "p":
if isinstance(mc_method, str) and mc_method.lower() in ("uncorrected", "none", "false"):
mc_method = None
elif mc_method is None:
mc_method = self._last_settings.get("mc_method")
if mc_method is not None:
mc_method = _get_correct_mc_method(mc_method)
if mc_method:
lgr.info(f"Plotting −log₁₀(p), corrected: {mc_method}.")
else:
lgr.info("Plotting −log₁₀(p), uncorrected.")
elif values == "z":
lgr.info("Plotting null-normalised z-scores.")
# get arguments
check_kwargs = dict(method=method, stats=stats, xdimred=X_reduction,
ytrans=Y_transform, xsea=xsea)
get_kwargs = dict(method=method, stats=stats, X_reduction=X_reduction,
Y_transform=Y_transform, xsea=xsea)
# get colocalization results
_z_method = self._last_settings.get("z_method", "robust")
if colocalizations_dict is None:
self._check_colocalize(**check_kwargs)
if values == "z":
_z_method = self._last_settings.get("z_method", "robust")
lgr.info(f"Z-score normalisation method: "
f"{'robust (median/MAD)' if _z_method == 'robust' else 'standard (mean/SD)'}.")
with _quiet():
colocalizations_dict = self.get_normalized_colocalizations(
**get_kwargs, force_dict=True,
)
if plot_nulls:
with _quiet():
_raw = self.get_colocalizations(
**get_kwargs, force_dict=True,
get_nulls=True, nulls_permute_what=permute_what,
)
_, nulls_dict = _raw if isinstance(_raw, tuple) else (_raw, None)
if nulls_dict is None:
lgr.warning("No nulls found. Not plotting null distributions in z mode.")
plot_nulls = False
else:
nulls_dict = None
elif values == "p":
_mc = mc_method.replace("_", "").replace("-", "") if mc_method else None
with _quiet():
_p_raw = self.get_p_values(
**get_kwargs, permute_what=permute_what, mc_method=_mc,
force_dict=True,
)
colocalizations_dict = {
stat: df.apply(lambda col: -np.log10(col))
for stat, df in _p_raw.items()
}
nulls_dict = None
else: # values == "coloc"
with _quiet():
coloc_dicts = self.get_colocalizations(
**get_kwargs,
force_dict=True,
get_nulls=plot_nulls,
nulls_permute_what=permute_what,
)
if isinstance(coloc_dicts, tuple):
colocalizations_dict, nulls_dict = coloc_dicts
else:
colocalizations_dict, nulls_dict = coloc_dicts, None
if plot_nulls and nulls_dict is None:
lgr.warning("No nulls found. Not plotting null distributions.")
plot_nulls = False
else:
if not isinstance(colocalizations_dict, dict):
lgr.critical_raise("Provide colocalizations as dict as returned by "
"NiSpace.get_colocalizations(force_dict=True)!",
TypeError)
if nulls_dict:
if not isinstance(nulls_dict, dict):
lgr.error("Provide null colocalizations as dict as returned by NiSpace."
"get_colocalizations(force_dict=True, get_nulls=True)!")
nulls_dict = None
# Y_labels / X_labels are legacy aliases for Y_maps / X_maps
if Y_labels is not None and Y_maps is None:
Y_maps = Y_labels
if X_labels is not None and X_maps is None:
X_maps = X_labels
# restrict to given y maps
keep_y = keep_x = None
if Y_maps is not None:
_ref_df = next(iter(colocalizations_dict.values()))
keep_y = _match_maps(_ref_df.index, Y_maps)
if not keep_y:
lgr.critical_raise(f"No Y maps matching {Y_maps!r} found.", ValueError)
for stat in colocalizations_dict:
colocalizations_dict[stat] = colocalizations_dict[stat].iloc[keep_y]
if nulls_dict is not None:
if isinstance(nulls_dict[stat], dict):
for null_str in nulls_dict[stat]:
nulls_dict[stat][null_str] = nulls_dict[stat][null_str].iloc[keep_y]
else:
nulls_dict[stat] = nulls_dict[stat].iloc[keep_y]
# restrict to given x maps
if X_maps is not None:
_ref_df = next(iter(colocalizations_dict.values()))
keep_x = _match_maps(_ref_df.columns, X_maps)
if not keep_x:
lgr.critical_raise(f"No X maps matching {X_maps!r} found.", ValueError)
for stat in colocalizations_dict:
colocalizations_dict[stat] = colocalizations_dict[stat].iloc[:, keep_x]
if nulls_dict is not None:
if isinstance(nulls_dict[stat], dict):
nulls_dict[stat] = {
k: v for k, v in nulls_dict[stat].items()
if k in colocalizations_dict[stat].columns
}
def _filter_dict(d):
if d is None:
return None
out = {}
for stat, df in d.items():
if keep_y is not None:
df = df.iloc[keep_y]
if keep_x is not None:
df = df.iloc[:, keep_x]
out[stat] = df
return out
# p_dict / pc_dict are always {stat: DataFrame} — plain iloc is fine
# auto-fetch p-values for annotation (and sort_by="p")
# resolve effective mc_method: explicit arg > last stored setting
_annot_mc = mc_method or self._last_settings.get("mc_method")
if _annot_mc:
_annot_mc = _get_correct_mc_method(_annot_mc).replace("_", "").replace("-", "")
if annot_p is not False and values not in ("p",):
if p_dict is None:
try:
with _quiet():
_fetched = self.get_p_values(
**get_kwargs, permute_what=permute_what,
mc_method=None, force_dict=True,
)
if _fetched:
p_dict = _fetched
except Exception:
pass
if pc_dict is None and _annot_mc is not None:
try:
with _quiet():
_fetched = self.get_p_values(
**get_kwargs, permute_what=permute_what,
mc_method=_annot_mc, force_dict=True,
)
if _fetched:
pc_dict = _fetched
except Exception:
pass
p_dict = _filter_dict(p_dict)
pc_dict = _filter_dict(pc_dict)
# loop over stats
stats = [s for s in colocalizations_dict if s not in ["intercept"]]
out = {}
for stat in stats:
lgr.info(f"Creating {kind} plot for method {method}, colocalization stat {stat}.")
if title == "auto":
title = f"{nice_stats_labels(method)} colocalization"
if Y_transform:
title += f" after {nice_stats_labels(Y_transform.replace('(a,b)', ''))} transform"
if permute_what:
_perm_str = nice_stats_labels(permute_what)
if values == "z":
title += f"\n(permutation of {_perm_str} | normalized)"
elif values == "p":
if mc_method:
_mc_sub = mc_method.replace("_", "-")
_p_suffix = rf"$p_{{\mathrm{{{_mc_sub}}}}}$"
else:
_p_suffix = r"$p_{\mathrm{uncorrected}}$"
title += f"\n(permutation of {_perm_str} | {_p_suffix})"
else:
title += f"\n(permutation of {_perm_str})"
# compute column sort order (positional indices) for sort_by
_valid_sort_by = {"coloc", "z", "p", "abs_coloc", "abs_z"}
_sort_order = None
if sort_by is not None and sort_by not in _valid_sort_by:
lgr.warning(f"sort_by='{sort_by}' is not a valid option "
f"({', '.join(sorted(_valid_sort_by))}). Ignoring.")
sort_by = None
if sort_by is not None and colocalizations_dict[stat].shape[1] > 1:
try:
if sort_by in ("coloc", "abs_coloc"):
_src = colocalizations_dict[stat]
sv = _src.mean(axis=0).abs() if sort_by == "abs_coloc" else _src.mean(axis=0)
_ascending = False
elif sort_by in ("z", "abs_z"):
if values == "z":
_src = colocalizations_dict[stat]
else:
with _quiet():
_z = self.get_normalized_colocalizations(
**get_kwargs, force_dict=True,)
_src = _z.get(stat, colocalizations_dict[stat])
sv = _src.mean(axis=0).abs() if sort_by == "abs_z" else _src.mean(axis=0)
_ascending = False
elif sort_by == "p":
if values == "p":
sv = colocalizations_dict[stat].mean(axis=0)
_ascending = False
else:
_pd = ((pc_dict or {}).get(stat) or (p_dict or {}).get(stat))
if _pd is None:
# auto-fetch using same logic as values="p"
_mc = mc_method.replace("_", "").replace("-", "") if mc_method else None
with _quiet():
_p_fetched = self.get_p_values(
**get_kwargs, permute_what=permute_what,
mc_method=_mc, force_dict=True,
)
_pd = _p_fetched.get(stat)
if _pd is None:
raise ValueError("No p-values available for sort_by='p'.")
sv = _pd.mean(axis=0)
_ascending = True # lower p = more significant
else:
sv = None
if sv is not None:
sorted_labels = sv.sort_values(ascending=_ascending).index.tolist()
orig_labels = list(colocalizations_dict[stat].columns)
_sort_order = [orig_labels.index(l) for l in sorted_labels]
except Exception as e:
lgr.warning(f"Could not compute sort order for sort_by='{sort_by}': {e}")
_n_x = colocalizations_dict[stat].shape[1]
if n_categories is not None and _n_x > n_categories:
if sort_by is not None and _sort_order is not None:
# sort_by is set → truncation is meaningful, show top N
_sort_order = _sort_order[:n_categories]
lgr.info(
f"Showing top {n_categories} of {_n_x} categories "
f"(sorted by '{sort_by}')."
)
else:
# no sort_by → arbitrary truncation would be misleading, skip instead
lgr.warning(
f"Plot skipped: {_n_x} categories exceed n_categories={n_categories}. "
f"To show the top {n_categories}, add sort_by='abs_z'. "
f"To show all, pass n_categories=None."
)
continue
if kind == "categorical":
fig_ax = _plot_categorical(
colocs_df=colocalizations_dict[stat],
stat=stat,
nulls_dict=nulls_dict,
p_df=p_dict[stat] if p_dict is not None else None,
pc_df=pc_dict[stat] if pc_dict is not None else None,
values=values,
mc_method=mc_method or _annot_mc,
sort=sort_colocs,
sort_order=_sort_order,
annot_p=annot_p,
z_method=_z_method,
fig=fig,
ax=ax,
title=title,
figsize=figsize,
kwargs=plot_kwargs,
null_kwargs=nullplot_kwargs
)
elif kind == "correlation":
pass
elif kind == "brain":
pass
elif kind == "nullhist":
pass
if show:
plt.show()
out[stat] = fig_ax
if len(out) ==1:
out = out[stat]
return out
# PLOT BRAIN ===================================================================================
[docs] def plot_brain(self,
data="Y",
maps=None,
Y_transform=None,
X_reduction=None,
kind=None,
space=None,
surf_mesh="inflated",
views=None,
cmap=None,
vmin=None, vmax=None,
shared_colorscale=False,
symmetric_cmap="auto",
colorbar=True,
colorbar_label="",
ncols=1,
title="auto",
n_max=5,
figsize=None,
show=True,
verbose=None,
**kwargs):
"""Plot brain maps directly onto surfaces or anatomical volumes.
Parameters
----------
data : {"Y", "X"} or pd.DataFrame
Which data to plot. "Y" (default) uses the fitted Y maps, "X" the
reference maps. A DataFrame can be passed directly.
maps : str or list, optional
Subset of maps to plot. Matches against the DataFrame index
(including MultiIndex levels and tuple entries).
Y_transform : str, optional
Y transform to apply when data="Y". Defaults to the last used transform.
X_reduction : str, optional
X reduction to apply when data="X". Defaults to the last used reduction.
kind : str, optional
Rendering mode: "glass", "slice", or "surface". Defaults to "glass".
space : str, optional
Parcellation space.
surf_mesh : str
Surface mesh ("inflated", "pial", etc.).
views : list, optional
Surface views to render.
cmap : str
Colormap.
vmin, vmax : float, optional
Colorscale limits.
shared_colorscale : bool
Share colorscale across all maps.
symmetric_cmap : bool
Force symmetric colorscale around zero.
colorbar : bool
Show colorbar.
colorbar_label : str
Label for the colorbar title.
ncols : int
Number of columns in the subplot grid.
n_max : int
Maximum number of maps to plot. Raises an error if exceeded.
figsize : tuple, optional
Figure size in inches.
show : bool
Call plt.show() after plotting.
verbose : bool, optional
Verbose logging. Defaults to the instance setting.
**kwargs
Additional keyword arguments forwarded to brainplot() and from
there to the underlying nilearn plotting functions.
Returns
-------
fig : matplotlib.Figure
axes : list of matplotlib.Axes
"""
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
lgr.info("*** NiSpace.plot_brain() ***")
self._check_fit()
Y_transform, X_reduction = self._get_last(
Y_transform=Y_transform, X_reduction=X_reduction
)
# -- resolve data --
if isinstance(data, str):
if data.upper() == "Y":
lgr.info(f"Plotting Y data (Y_transform='{Y_transform}').")
with _quiet():
data_df = self.get_y(Y_transform=Y_transform)
elif data.upper() == "X":
lgr.info(f"Plotting X data (X_reduction='{X_reduction}').")
with _quiet():
data_df = self.get_x(X_reduction=X_reduction)
else:
lgr.critical_raise(
f"data='{data}' not recognised. Use 'Y', 'X', or a DataFrame.",
ValueError,
)
elif isinstance(data, pd.DataFrame):
lgr.info("Plotting custom DataFrame.")
data_df = data
else:
lgr.critical_raise(
"data must be 'Y', 'X', or a pandas DataFrame.", ValueError
)
# -- map selection --
if maps is not None:
keep = _match_maps(data_df.index, maps)
if not keep:
lgr.critical_raise(
f"No maps matching {maps!r} found in the index.", ValueError
)
data_df = data_df.iloc[keep]
# -- safeguard against too many subplots --
if len(data_df) > n_max:
lgr.critical_raise(
f"plot_brain: {len(data_df)} maps selected but n_max={n_max}. "
"Subset with the maps= argument or increase n_max.",
ValueError,
)
# -- call brainplot --
fig, axes = brainplot(
data=data_df,
parcellation=self._parc,
kind=kind,
space=space,
surf_mesh=surf_mesh,
views=views,
cmap=cmap,
vmin=vmin, vmax=vmax,
shared_colorscale=shared_colorscale,
symmetric_cmap=symmetric_cmap,
colorbar=colorbar,
colorbar_label=colorbar_label,
ncols=ncols,
title=title,
figsize=figsize,
verbose=verbose,
**kwargs,
)
if show:
plt.show()
return fig, axes
# GET ==========================================================================================
[docs] def get_x(self, X_reduction=None, maps=None, squeeze=False, verbose=None, copy=True):
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
X_reduction = self._get_last(X_reduction=X_reduction)
if X_reduction is False:
out = self._X
else:
try:
out = self._X_dimred[_get_df_string("xdimred", xdimred=X_reduction)]
except KeyError:
available = "\n".join(list(self._X_dimred.keys()))
lgr.critical_raise(f"No X dataframe for dimensionality reduction '{X_reduction}' "
f"found! Available: {available}",
KeyError)
if maps is not None:
keep = _match_maps(out.index, maps)
if not keep:
lgr.critical_raise(f"No maps matching {maps!r} found in X index.", ValueError)
out = out.iloc[keep]
if squeeze and len(out) == 1:
out = out.squeeze()
lgr.info(f"Returning X dataframe: \n{print_arg_pairs(X_reduction=X_reduction)}")
lgr.setLevel(loglevel)
return out.copy() if copy else out
# ----------------------------------------------------------------------------------------------
[docs] def get_y(self, Y_transform=None, maps=None, squeeze=False, verbose=None, copy=True):
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
Y_transform = self._get_last(Y_transform=Y_transform)
if Y_transform is False:
out = self._Y
else:
try:
out = self._Y_trans[_get_df_string("ytrans", ytrans=Y_transform)]
except KeyError:
available = "\n".join([k.replace("ytrans-", "") for k in self._Y_trans.keys()])
lgr.critical_raise(f"No Y dataframe for transform '{Y_transform}' found! "
f"Available: {available}",
KeyError)
if maps is not None:
keep = _match_maps(out.index, maps)
if not keep:
lgr.critical_raise(f"No maps matching {maps!r} found in Y index.", ValueError)
out = out.iloc[keep]
if squeeze and len(out) == 1:
out = out.squeeze()
lgr.info(f"Returning Y dataframe: \n{print_arg_pairs(Y_transform=Y_transform)}")
lgr.setLevel(loglevel)
return out.copy() if copy else out
# ----------------------------------------------------------------------------------------------
[docs] def get_z(self, verbose=None, copy=True):
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
out = self._Z
if out is None:
lgr.critical_raise("No Z dataframe found!",
ValueError)
lgr.info("Returning Z dataframe.")
lgr.setLevel(loglevel)
return out.copy() if copy else out
# ----------------------------------------------------------------------------------------------
[docs] def get_colocalizations(self, method=None, stats=None,
X_reduction=None, Y_transform=None, xsea=None,
normalized=False, perm=None,
get_nulls=False, nulls_permute_what=None, pooled_p=None,
force_dict=False, verbose=None):
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
method, X_reduction, Y_transform, xsea = self._get_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
)
if normalized:
perm, pooled_p = self._get_last(perm=perm, pooled_p=pooled_p)
if stats is None:
stats = _get_coloc_stats(method, permuted_only=True)
elif isinstance(stats, str):
stats = [stats]
else:
stats = list(stats).copy()
out = dict()
for stat in stats:
z_str = _get_df_string(
"z",
xdimred=X_reduction, ytrans=Y_transform,
method=method, stat=stat,
xsea=xsea, perm=perm, pooled_p=pooled_p,
)
if z_str not in self._z_colocs:
lgr.critical_raise(
f"Normalized colocalizations for '{z_str}' not found. "
"Run normalize_colocalizations() first.",
KeyError
)
out[stat] = self._z_colocs[z_str].copy()
if not force_dict and len(out) == 1:
out = out[stats[0]]
lgr.info(f"Returning z-scored colocalizations.")
lgr.setLevel(loglevel)
return out
if stats is None:
stats = _get_coloc_stats(method)
elif isinstance(stats, str):
stats = [stats]
else:
stats = list(stats).copy()
coloc_keys = list(self._colocs.keys())
out = dict()
for stat in stats:
coloc_str = _get_df_string(
"coloc",
xdimred=X_reduction,
ytrans=Y_transform,
method=method,
stat=stat,
xsea=xsea
)
if coloc_str not in coloc_keys:
if method=="mlr" and \
any([f"stat-{s}" not in coloc_str for s in ["individual", "intercept"]]):
stats.remove(stat)
continue
else:
available = "\n".join(coloc_keys)
lgr.critical_raise(f"Colocalizations for '{coloc_str}' not found! "
f"Available: {available}",
KeyError)
out[stat] = self._colocs[coloc_str].copy() if copy else self._colocs[coloc_str]
if get_nulls and nulls_permute_what is None:
lgr.error("If 'get_nulls' is True, 'nulls_permute_what' must not be None!")
get_nulls = False
if get_nulls:
if nulls_permute_what not in ["groups", "groupsxmaps", "groupssets",
"xmaps", "ymaps", "xymaps", "ymapssets",
"sets"]:
lgr.critical_raise("If 'get_nulls' is True, 'nulls_permute_what' must be one of "
"{'groups', '{x|y|xy}maps', 'sets'}!",
ValueError)
pooled_p = self._get_last(pooled_p=pooled_p)
out_null = None
null_str = _get_df_string(
"null",
xdimred=X_reduction,
ytrans=Y_transform,
method=method,
xsea=xsea,
perm=nulls_permute_what,
pooled_p=pooled_p,
)
if null_str not in self._nulls["_colocs"].keys():
available = "\n".join(list(self._nulls["_colocs"].keys()))
lgr.error(f"Null colocalizations for '{null_str}' not found! Available: {available}")
else:
nulls = self._nulls["_colocs"][null_str].copy()
out_null = dict()
n_nulls = len(nulls)
with _quiet():
idx = self.get_p_values(method, nulls_permute_what, _COLOC_METHODS[method][0],
xsea,
pooled_p=pooled_p,
X_reduction=X_reduction,
Y_transform=Y_transform).index
for stat in stats:
if out[stat].shape[1] == 1:
out_null[stat] = pd.DataFrame(
{i: nulls[i][stat][:, 0] for i in range(n_nulls)},
index=idx
)
else:
out_null[stat] = dict()
for i_x, x in enumerate(out[stat].columns):
out_null[stat][x] = pd.DataFrame(
{i: nulls[i][stat][:, i_x] for i in range(n_nulls)},
index=idx
)
# force return as dict if requested
if not force_dict:
if len(out)==1:
out = out[stats[0]]
if "out_null" in locals():
out_null = out_null[stats[0]]
string = print_arg_pairs(method=method, xsea=xsea, X_reduction=X_reduction,
Y_transform=Y_transform)
lgr.info(f"Returning colocalizations: \n{string}")
lgr.setLevel(loglevel)
return (out, out_null) if get_nulls else out
# ----------------------------------------------------------------------------------------------
[docs] def get_p_values(self, method=None, permute_what=None, stats=None, xsea=None,
mc_method=None, pooled_p=None,
X_reduction=None, Y_transform=None, force_dict=False, verbose=None, copy=True):
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
method, X_reduction, Y_transform, xsea, permute_what, pooled_p = self._get_last(
method=method,
X_reduction=X_reduction,
Y_transform=Y_transform,
xsea=xsea,
perm=permute_what,
pooled_p=pooled_p,
)
if mc_method is not None:
mc_method = _get_correct_mc_method(mc_method).replace("-", "").replace("_", "")
self._check_permute(method, permute_what, mc_method, xsea, stats, X_reduction, Y_transform,
pooled_p=pooled_p)
if stats is None:
stats = _get_coloc_stats(method, permuted_only=True)
elif isinstance(stats, str):
stats = [stats]
out = dict()
for stat in stats:
p_str = _get_df_string(
"p",
xdimred=X_reduction,
ytrans=Y_transform,
method=method,
stat=stat,
xsea=xsea,
perm=permute_what,
pooled_p=pooled_p,
mc=mc_method,
)
if p_str not in self._p_colocs.keys():
if "coloc-mlr_stat-individual" in p_str:
continue
else:
available = "\n".join(list(self._p_colocs.keys()))
lgr.critical_raise(f"Colocalization p values for '{p_str}' not found. "
f"Available: {available}",
KeyError)
out[stat] = self._p_colocs[p_str].copy() if copy else self._p_colocs[p_str]
if not force_dict:
if len(out)==1:
out = out[list(out.keys())[0]]
string = print_arg_pairs(method=method, permute_what=permute_what, xsea=xsea,
mc_method=mc_method,
X_reduction=X_reduction, Y_transform=Y_transform)
lgr.info(f"Returning p values: \n{string}")
lgr.setLevel(loglevel)
return out
# ----------------------------------------------------------------------------------------------
[docs] def get_corrected_p_values(self, mc_method=None, **kwargs):
mc_method = self._get_last(mc_method=mc_method)
if mc_method is None:
lgr.critical_raise(
"No corrected p values found. Run correct_p() first.",
ValueError
)
mc_method = _get_correct_mc_method(mc_method)
return self.get_p_values(mc_method=mc_method, **kwargs)
# ----------------------------------------------------------------------------------------------
[docs] def get_normalized_colocalizations(self, **kwargs):
return self.get_colocalizations(normalized=True, **kwargs)
# SAVE, LOAD, COPY =============================================================================
[docs] def to_pickle(self, filepath, save_nulls=True, verbose=None):
"""
Save the NiSpace object to a pickle file.
Parameters
----------
filepath : str
Filepath to save the NiSpace object to.
save_nulls : bool, optional
Whether to save the null distributions. Defaults to True. If False, null
colocalizations are dropped, which substantially reduces file size but prevents
running correct_p('maxT'), correct_p('step_maxT'), or normalize_colocalizations()
after reloading. Call those methods before saving if you intend to drop nulls.
verbose : bool, optional
"""
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
# remove nulls (very large depending on number of permutations) if requested
self_save = self.copy()
if not save_nulls:
self_save._nulls = {
"_colocs": {}
}
# save
to_pickle(self_save, filepath, use_dill=True)
lgr.debug(f"Saved NiSpace object to {filepath}.")
lgr.setLevel(loglevel)
# ----------------------------------------------------------------------------------------------
[docs] def copy(self, deep=True, verbose=True):
set_log(lgr, verbose)
if deep==True:
return copy.deepcopy(self)
else:
return copy.copy(self)
# ----------------------------------------------------------------------------------------------
[docs] @staticmethod
def from_pickle(filepath, verbose=True):
"""
Load a NiSpace object from a pickle file.
Parameters
----------
filepath : str
Filepath to load the NiSpace object from.
verbose : bool, optional
Whether to print verbose output. Defaults to True.
Returns
-------
nispace_object : NiSpace
The loaded NiSpace object.
"""
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, verbose)
# load
nispace_object = from_pickle(filepath, use_dill=True)
lgr.debug(f"Loaded NiSpace object from {filepath}.")
# return
lgr.setLevel(loglevel)
return nispace_object
# PRIVATE METHODS ==============================================================================
def _check_fit(self, raise_error=True):
if not (hasattr(self, "_X") | hasattr(self, "_Y")):
if raise_error:
lgr.critical_raise("Input data ('X', 'Y') not found. Did you run NiSpace.fit()?!",
ValueError)
else:
return False
else:
return True
# ----------------------------------------------------------------------------------------------
def _check_transform(self, ytrans=False, raise_error=True):
y_str = _get_df_string("ytrans", ytrans=ytrans)
lgr.debug(y_str)
if y_str not in self._Y_trans.keys():
if raise_error:
lgr.critical_raise(f"Y transform = '{ytrans}' not found. Did you run "
f"NiSpace.transform_y()?!",
KeyError)
else:
return False
else:
return True
# ----------------------------------------------------------------------------------------------
def _check_colocalize(self, method, stats=None, xdimred=False, ytrans=False, xsea=False,
raise_error=True):
if stats is None:
stats = _get_coloc_stats(method, drop_optional=True)
elif isinstance(stats, str):
stats = [stats]
for stat in stats:
coloc_str = _get_df_string("coloc", xdimred=xdimred, ytrans=ytrans, method=method,
stat=stat, xsea=xsea)
lgr.debug(coloc_str)
if coloc_str not in self._colocs.keys():
if raise_error:
lgr.critical_raise(
f"Colocalizations for method = '{method}', stat = '{stat}', "
f"X dimensionality reduction = '{xdimred}', and Y transform = '{ytrans}' "
f"not found. Did you run NiSpace.colocalize()?!",
KeyError
)
else:
return False
return True
# ----------------------------------------------------------------------------------------------
def _check_permute(self, method, permute_what, mc_method=None, xsea=False,
stats=None, xdimred=False, ytrans=False, pooled_p=False, raise_error=True):
if stats is None:
stats = _get_coloc_stats(method, drop_optional=True, permuted_only=True)
elif isinstance(stats, str):
stats = [stats]
for stat in stats:
p_str = _get_df_string("p", xdimred=xdimred, ytrans=ytrans, method=method, stat=stat,
perm=permute_what, pooled_p=pooled_p, mc=mc_method, xsea=xsea).lower()
lgr.debug(p_str)
if p_str not in self._p_colocs:
if raise_error:
lgr.critical_raise(
f"P values for permute_what = '{permute_what}', method = '{method}', "
f"stat = '{stat}', xsea = {xsea}, X dimensionality reduction = '{xdimred}', "
f"Y transform = '{ytrans}', and mc_method = '{mc_method}' not found. "
"Did you run NiSpace.permute()?!",
KeyError
)
else:
return False
return True
# ----------------------------------------------------------------------------------------------
def _get_last(self, **kwargs):
out = []
for arg, value in kwargs.items():
if not arg in self._last_settings:
lgr.critical_raise(f"Last setting for '{arg}' not found. "
f"Available: {list(self._last_settings.keys())}")
else:
if value is None:
value_last = self._last_settings[arg]
if isinstance(value_last, str):
value_last = value_last.lower()
out.append(value_last)
else:
out.append(value)
return tuple(out) if len(out) > 1 else out[0]
# ----------------------------------------------------------------------------------------------
def _set_last(self, **kwargs):
for arg, value in kwargs.items():
self._last_settings[arg] = value
# ----------------------------------------------------------------------------------------------
def _get_dist_mat(self, dist_mat_type, centroids=False, parc_resample=2,
n_proc=None, store=True, verbose=None, force_generate=False):
loglevel = lgr.getEffectiveLevel()
verbose = set_log(lgr, self._verbose if verbose is None else verbose)
if self._parc is None:
lgr.critical_raise(
"Distance matrix computation requires a parcellation. "
"Provide one via NiSpace(parcellation=...).",
ValueError,
)
if dist_mat_type not in ["cv", "null_maps"]:
lgr.critical_raise(f"dist_mat_type = '{dist_mat_type}' not defined",
ValueError)
dist_mat_dict = self._parc_dist_mat
generate_dist_mat = True
if not force_generate and dist_mat_type in dist_mat_dict:
dist_mat = dist_mat_dict[dist_mat_type]
if dist_mat is not None:
generate_dist_mat = False
if generate_dist_mat:
null_space, _ = self._parc.get_null_space()
# ensure the null space is loaded and its derived attrs (hemi, idc, …) are computed
self._parc._ensure_image_loaded(null_space)
if null_space not in self._parc._hemi_dict:
self._parc._fit_space(null_space)
dist_mat = get_distance_matrix(
parc=self._parc.get_image(null_space),
parc_space=null_space,
parc_hemi=self._parc.get_hemi(null_space),
parc_resample=parc_resample,
centroids=centroids,
surf_euclidean=True if dist_mat_type=="cv" else False,
n_proc=self._n_proc if not n_proc else n_proc,
verbose=verbose
)
if store:
self._parc_dist_mat[dist_mat_type] = dist_mat
lgr.setLevel(loglevel)
return dist_mat
# ----------------------------------------------------------------------------------------------