import numpy as np
import pandas as pd
from nilearn.plotting import plot_design_matrix
import matplotlib.pyplot as plt
from . import lgr, NiSpace
from .utils.utils import set_log
from .modules.constants import (_PARC_DEFAULT,
_COLLECT_DEFAULT,
_COLOC_METHODS)
from .datasets import fetch_reference, reference_lib, _check_parcellation
def _workflow_base(x, y, z, x_collection, #x_load_nulls,
standardize,
parcellation, parcellation_labels,
colocalization_method,
n_proc, verbose,
nispace_object,
fetch_x_kwargs,
init_kwargs,
):
"""Base workflow for colocalization, group comparison, and GSEA.
Will load X data, initialize NiSpace object """
status = {fun: False for fun in ["init", "fit"]}
# check colocalization method
if isinstance(colocalization_method, (list, tuple)):
if not all(method in _COLOC_METHODS for method in colocalization_method):
raise lgr.critical_raise("'colocalization_method' must be one or a list of "
f"{list(_COLOC_METHODS.keys())} not {colocalization_method}!",
ValueError)
else:
raise lgr.critical_raise("'colocalization_method' must be a string or a list of strings!",
TypeError)
# check if nispace object provided
if nispace_object is not None:
lgr.info("NiSpace object provided. Validating.")
nsp = nispace_object
if isinstance(nispace_object, NiSpace):
if nsp._check_fit():
lgr.info("Fitted NiSpace object provided, ignoring 'x', 'y', and 'z'.")
status["init"] = True
status["fit"] = True
else:
lgr.info("NiSpace object provided but .fit() was not run. Running.")
status["init"] = True
else:
lgr.critical(f"Argument 'nispace_object' must be of type NiSpace not {type(nsp)}!")
## INIT
if not status["init"]:
# check provided data
# parcellation
if isinstance(parcellation, str):
# check if parcellation is an integrated parcellation
try:
parc_integrated = _check_parcellation(parcellation, force_str=True)
lgr.info(f"Using integrated parcellation {parcellation}.")
# if not, check if it's a path to a parcellation file
except ValueError:
lgr.info(f"Input '{parcellation}' not recognized as integrated parcellation. "
"Checking if path to parcellation file.")
parc_integrated = None
# y
if y is None:
lgr.error("You must provide 'y' data: (list) of volumetric/surface or pre-parcellated data!")
# x
if isinstance(x, str):
x = x.lower()
if x in reference_lib:
lgr.info(f"Loading integrated {x} dataset as X data.")
if x_collection is None or not isinstance(x_collection, str):
x_collection = _COLLECT_DEFAULT[x]
lgr.info(f"Using collection {x_collection}.")
fetch_x_kwargs = dict(
dataset=x,
collection=x_collection,
standardize_parcellated=False,
parcellation=parc_integrated,
verbose=verbose
) | fetch_x_kwargs
x = fetch_reference(**fetch_x_kwargs)
if isinstance(x, tuple):
x, null_maps = x
else:
null_maps = None
else:
lgr.error(f"'x' must be one of: '{list(reference_lib.keys())}' not '{x}'!")
else:
null_maps = None
# init
init_kwargs = dict(
x=x,
y=y,
z=z,
standardize=standardize,
parcellation=parc_integrated,
parcellation_labels=parcellation_labels,
n_proc=n_proc,
verbose=verbose,
) | init_kwargs
nsp = NiSpace(**init_kwargs)
## FIT
if not status["fit"]:
nsp.fit()
status["fit"] = True
## RETURN status, NiSpace object, pre-loaded nulls
return status, nsp, null_maps
[docs]def simple_colocalization(y,
x="PET",
z=None,
x_collection=None,
standardize="xz",
parcellation=_PARC_DEFAULT,
parcellation_labels=None,
y_covariates=None,
colocalization_method="spearman",
p_from_average_y=False,
plot=True,
combat=False,
n_perm=10000,
seed=None,
#x_load_nulls=True,
n_proc=1,
verbose=True,
nispace_object=None,
fetch_x_kwargs={},
init_kwargs={},
clean_y_kwargs={},
colocalize_kwargs={},
permute_kwargs={},
correct_p_kwargs={},
plot_kwargs={}):
"""Simple colocalization workflow.
Parameters
----------
y : array-like or pandas DataFrame or list
Input Y data to colocalize with X. Can be a numpy array, pandas DataFrame,
(list of) path(s) to a file(s) or list of image objects.
x : str or array-like, default="PET"
Input X data. Can be a string indicating a reference dataset ("PET", "mRNA", ...),
or inputs types as listed for y.
z : array-like or None, default=None
Optional confound data to regress out. Can be "gm", or input types as listed for y.
x_collection : str or None, default=None
If x is a string reference dataset, specifies which collection to use.
standardize : str, default="xz"
Which data to standardize. Can contain "x", "y", and/or "z".
parcellation : str or int, default=_PARC_DEFAULT
Brain parcellation to use. Can be a string name or integer ID.
parcellation_labels : array-like or None, default=None
Optional labels for the parcellation regions.
y_covariates : array-like or None, default=None
Optional covariates to regress from Y data.
colocalization_method : str or list, default="spearman"
Method(s) to use for colocalization. Can be "spearman", "pearson", etc.
p_from_average_y : bool, default=False
Whether to compute p-values from averaged Y values.
plot : bool, default=True
Whether to generate visualization plots.
combat : bool, default=False
Whether to apply ComBat harmonization.
n_perm : int, default=10000
Number of permutations for null distribution.
seed : int or None, default=None
Random seed for reproducibility.
n_proc : int, default=-1
Number of processes for parallel computation. -1 uses all CPUs.
verbose : bool, default=True
Whether to print progress messages.
nispace_object : NiSpace or None, default=None
Optional pre-initialized NiSpace object to use.
fetch_x_kwargs : dict, default={}
Additional arguments for fetching X data.
init_kwargs : dict, default={}
Additional arguments for NiSpace initialization.
clean_y_kwargs : dict, default={}
Additional arguments for Y data cleaning.
colocalize_kwargs : dict, default={}
Additional arguments for colocalization.
permute_kwargs : dict, default={}
Additional arguments for permutation testing.
correct_p_kwargs : dict, default={}
Additional arguments for p-value correction.
plot_kwargs : dict, default={}
Additional arguments for plotting.
Returns
-------
colocs : dict or array
Colocalization values for each method.
p_values : dict or array
Uncorrected p-values for each method.
p_fdr_values : dict or array
FDR-corrected p-values for each method.
nsp : NiSpace
The NiSpace object containing all results.
"""
verbose = set_log(lgr, verbose)
## COMMON FUNCTIONS: COLOC METHOD VALIDATION, DATA LOADING, INIT,
if isinstance(colocalization_method, str):
colocalization_method = [colocalization_method]
status, nsp, null_maps = _workflow_base(
x=x, y=y, z=z,
x_collection=x_collection,
#x_load_nulls=x_load_nulls,
standardize=standardize,
parcellation=parcellation,
parcellation_labels=parcellation_labels,
colocalization_method=colocalization_method,
n_proc=n_proc,
verbose=verbose,
nispace_object=nispace_object,
fetch_x_kwargs=fetch_x_kwargs,
init_kwargs=init_kwargs
)
status = status | {fun: False for fun in ["clean_y", "colocalize", "permute", "correct_p"]}
## CLEAN Y
if not status["clean_y"] and y_covariates is not None:
clean_y_kwargs = dict(
how="between",
covariates_between=y_covariates,
combat=combat,
) | clean_y_kwargs
nsp.clean_y(**clean_y_kwargs)
status["clean_y"] = True
## COLOCALIZE
# xsea must be same for colocalization and permutation
if colocalize_kwargs.get("xsea", False) or permute_kwargs.get("xsea", False):
colocalize_kwargs["xsea"] = True
permute_kwargs["xsea"] = True
if not status["colocalize"]:
for method in colocalization_method:
colocalize_kwargs_curr = dict(
method=method,
Z_regression=True,
) | colocalize_kwargs
nsp.colocalize(**colocalize_kwargs_curr)
status["colocalize"] = True
## PERMUTE
if not status["permute"]:
for method in colocalization_method:
permute_kwargs_curr = dict(
what="maps",
maps_which="X",
maps_nulls=null_maps,
method=method,
p_from_average_y_coloc=p_from_average_y,
n_perm=n_perm,
seed=seed,
) | permute_kwargs
nsp.permute(**permute_kwargs_curr)
permuted = nsp._get_last(perm=None)
status["permute"] = True
## CORRECT
if not status["correct_p"]:
correct_p_kwargs = dict(
) | correct_p_kwargs
nsp.correct_p(**correct_p_kwargs)
status["correct_p"] = True
## VIZ
if plot:
for method in colocalization_method:
plot_kwargs_curr = dict(
method=method,
permute_what=permuted,
) | plot_kwargs
nsp.plot(**plot_kwargs_curr)
## RETURN
colocs = {method: nsp.get_colocalizations(method)
for method in colocalization_method}
p_values = {method: nsp.get_p_values(method, permuted)
for method in colocalization_method}
p_fdr_values = {method: nsp.get_p_values(method, permuted, mc_method="fdrbh")
for method in colocalization_method}
if len(colocalization_method)==1:
k = colocalization_method[0]
colocs, p_values, p_fdr_values = colocs[k], p_values[k], p_fdr_values[k]
return colocs, p_values, p_fdr_values, nsp
[docs]def group_comparison(y, design,
x="PET",
z=None,
x_collection=None,
standardize="xz",
parcellation=_PARC_DEFAULT,
parcellation_labels=None,
colocalization_method="spearman",
comparison_method=None,
paired=False,
plot_design=True,
combat=False,
plot=True,
n_perm=10000,
seed=None,
n_proc=1,
verbose=True,
nispace_object=None,
fetch_x_kwargs={},
init_kwargs={},
clean_y_kwargs={},
transform_y_kwargs={},
colocalize_kwargs={},
permute_kwargs={},
correct_p_kwargs={},
plot_kwargs={}):
verbose = set_log(lgr, verbose)
## COMMON FUNCTIONS: DATA LOADING, INIT, YCOLOC METHOD VALIDATION
if isinstance(colocalization_method, str):
colocalization_method = [colocalization_method]
status, nsp, _ = _workflow_base(
x=x, y=y, z=z,
x_collection=x_collection,
#x_load_nulls=False,
standardize=standardize,
parcellation=parcellation,
parcellation_labels=parcellation_labels,
colocalization_method=colocalization_method,
n_proc=n_proc,
verbose=verbose,
nispace_object=nispace_object,
fetch_x_kwargs=fetch_x_kwargs,
init_kwargs=init_kwargs
)
status = status | {fun: False for fun in ["clean_y", "transform_y", "colocalize", "permute", "correct_p"]}
## DESIGN MATRIX HANDLING
# ensure dtype and format
# 1d
if isinstance(design, (list, tuple)) or \
(isinstance(design, (np.ndarray, pd.Series)) and design.ndim==1):
if paired:
lgr.critical_raise("If paired==True, design must have two columns: 'group' and 'subjects'.",
ValueError)
else:
lgr.info("1d array provided for design. Assuming this to be dummy-coded groups!")
design = pd.DataFrame(
{"groups": np.array(design)},
index=nsp._y_lab
)
# 2darray
elif isinstance(design, np.ndarray) and design.ndim==2:
if paired:
lgr.info("2d array provided for design with paired==True. Assuming first column "
"to be group labels, second column to be subjects, and remaining to be covariates.")
design = pd.DataFrame(
design,
columns=["groups", "subjects"] + [f"V{i}" for i in range(design.shape[1] - 2)],
index=nsp._y_lab
)
else:
lgr.info("2d array provided for design. Assuming first column to be group labels, "
"second column to be subjects.")
design = pd.DataFrame(
design,
columns=["groups"] + [f"V{i}" for i in range(design.shape[1] - 1)],
index=nsp._y_lab
)
# dataframe
elif isinstance(design, pd.DataFrame):
lgr.info("DataFrame provided for design. Expecting 'groups' and, if paired==True, 'subjects' columns.")
if paired:
if "groups" not in design.columns and "subjects" not in design.columns:
lgr.critical_raise("If a DataFrame is passed for design with paired==True, "
"it must have a 'groups' and a 'subjects' column.",
KeyError)
else:
if "groups" not in design.columns:
lgr.critical_raise("If a DataFrame is passed for design, it must have a 'groups' column.",
KeyError)
# unrecognized type
else:
lgr.critical_raise("'design' must be a list, ndarray, Series, or DataFrame!",
TypeError)
# check dimensions
lgr.info(f"Design matrix of shape {design.shape}. Assuming {design.shape[0]} subjects/maps.")
if design.shape[0] != len(y):
lgr.critical_raise(f"The number of rows in design matrix {design.shape[0]} must equal "
f"the length of the y data {len(y)}!",
ValueError)
# plot
if plot_design:
plot_design_matrix(design)
plt.title("Design matrix")
plt.ylabel("Y maps")
plt.show()
## CLEAN Y
if not status["clean_y"] and \
((not paired and design.shape[1] > 1) or (paired and design.shape[1] > 2)):
if combat and not paired:
y_covariates = design
combat_keep = ["groups"]
elif combat and paired:
y_covariates = design
combat_keep = ["groups", "subjects"]
elif not combat and not paired:
y_covariates = design.iloc[:, 1:]
combat_keep = None
elif not combat and paired:
y_covariates = design.iloc[:, 2:]
combat_keep = None
clean_y_kwargs = dict(
how="between",
covariates_between=y_covariates,
combat=combat,
combat_keep=combat_keep,
) | clean_y_kwargs
nsp.clean_y(**clean_y_kwargs)
status["clean_y"] = True
## TRANSFORM
if not status["transform_y"]:
if comparison_method is None and not paired:
comparison_method = "hedges(a,b)"
elif comparison_method is None and paired:
comparison_method = "pairedcohen(a,b)"
transform_y_kwargs = dict(
transform=comparison_method,
groups=design["groups"],
subjects=design["subjects"] if paired else None,
) | transform_y_kwargs
nsp.transform_y(**transform_y_kwargs)
status["transform_y"] = True
## COLOCALIZE
if not status["colocalize"]:
for method in colocalization_method:
colocalize_kwargs_curr = dict(
method=method,
Y_transform=comparison_method,
Z_regression=True,
verbose=verbose,
) | colocalize_kwargs
nsp.colocalize(**colocalize_kwargs_curr)
status["colocalize"] = True
## PERMUTE
if not status["permute"]:
for method in colocalization_method:
permute_kwargs_curr = dict(
what="groups",
method=method,
Y_transform=comparison_method,
groups_paired=paired,
groups_strategy="proportional",
n_perm=n_perm,
seed=seed,
verbose=verbose,
) | permute_kwargs
nsp.permute(**permute_kwargs_curr)
permute_what = "groups"
status["permute"] = True
## CORRECT
if not status["correct_p"]:
correct_p_kwargs = dict(
verbose=verbose,
) | correct_p_kwargs
nsp.correct_p(**correct_p_kwargs)
status["correct_p"] = True
## VIZ
if plot:
for method in colocalization_method:
plot_kwargs_curr = dict(
method=method,
permute_what=permute_what,
Y_transform=comparison_method,
verbose=verbose,
) | plot_kwargs
nsp.plot(**plot_kwargs_curr)
## RETURN
colocs = {method: nsp.get_colocalizations(method, Y_transform=comparison_method)
for method in colocalization_method}
p_values = {method: nsp.get_p_values(method, permute_what, Y_transform=comparison_method)
for method in colocalization_method}
p_fdr_values = {method: nsp.get_p_values(method, permute_what, Y_transform=comparison_method,
mc_method="fdrbh")
for method in colocalization_method}
if len(colocalization_method)==1:
colocs, p_values, p_fdr_values = (colocs[colocalization_method[0]],
p_values[colocalization_method[0]],
p_fdr_values[colocalization_method[0]])
return colocs, p_values, p_fdr_values, nsp
[docs]def simple_xsea(y,
x="mRNA",
z=None,
x_collection=None,
x_background=None,
standardize="xz",
parcellation=_PARC_DEFAULT,
parcellation_labels=None,
y_covariates=None,
colocalization_method="spearman",
xsea_aggregation_method="mean",
permute_sets=False,
p_from_average_y=False,
plot=True,
combat=False,
n_perm=10000,
seed=None,
n_proc=1,
verbose=True,
nispace_object=None,
fetch_x_kwargs={},
init_kwargs={},
clean_y_kwargs={},
colocalize_kwargs={},
permute_kwargs={},
correct_p_kwargs={},
plot_kwargs={}):
verbose = set_log(lgr, verbose)
# GET THE BACKGROUND
if permute_sets:
if x_background is None and isinstance(x, str):
lgr.info("Trying to fetch background X dataset.")
if x.lower() in reference_lib:
try:
x_background = fetch_reference(x.lower(), parcellation=parcellation, print_references=False)
except:
x_background = None
if x_background is None:
lgr.warning(f"Could not fetch background dataset for input x!")
## We go the easy way and just call .simple_colocalization() with some kwargs:
colocs, p_values, p_fdr_values, nsp = simple_colocalization(
y=y,
x=x, z=z,
x_collection=x_collection,
standardize=standardize,
parcellation=parcellation,
parcellation_labels=parcellation_labels,
y_covariates=y_covariates,
colocalization_method=colocalization_method,
p_from_average_y=p_from_average_y,
plot=plot,
combat=combat,
n_perm=n_perm,
seed=seed,
n_proc=n_proc,
verbose=verbose,
nispace_object=nispace_object,
fetch_x_kwargs=fetch_x_kwargs,
init_kwargs=init_kwargs,
clean_y_kwargs=clean_y_kwargs,
colocalize_kwargs={
"xsea_aggregation_method": xsea_aggregation_method,
"xsea": True
} | colocalize_kwargs,
permute_kwargs={
"what": "maps" if not permute_sets else "sets",
"maps_which": "Y",
"sets_X_background": x_background if permute_sets else None,
} | permute_kwargs,
correct_p_kwargs=correct_p_kwargs,
plot_kwargs=plot_kwargs
)
return colocs, p_values, p_fdr_values, nsp