import os
import warnings
import json
import dill, pickle, blosc, gzip
from pathlib import Path
from joblib import Parallel, delayed
from tqdm.auto import tqdm
import numpy as np
import pandas as pd
import nibabel as nib
from neuromaps import images
import logging
lgr = logging.getLogger(__name__)
from .utils.utils import set_log
from .parcellate import Parcellater
[docs]def parcellate_data(data,
data_labels=None,
data_space=None,
parcellation=None,
parc_labels=None,
parc_space=None,
parc_hemi=None,
resampling_target="data",
ignore_background_data=True,
background_value=["auto", 0.0],
drop_background_parcels=False,
min_num_valid_datapoints=None,
min_fraction_valid_datapoints=None,
return_parc=False,
dtype=None,
n_proc=1,
verbose=True,
ignore_zero_division_warning=True):
"""
Parcellates given imaging data using a specified parcellation.
Parameters
----------
parcellation : str, os.PathLike, nib.Nifti1Image, nib.GiftiImage, or tuple
The parcellation image or surfaces, where each region is identified by a unique integer ID.
parc_labels : list
Labels for the parcellation regions.
parc_space : str
The space in which the parcellation is defined.
parc_hemi : list of str
Hemispheres to consider for parcellation, e.g., ["L", "R"].
resampling_target : {'data', 'parcellation'}
Specifies which image gives the final shape/size.
data : list, dict, pd.DataFrame, pd.Series, or np.ndarray
The imaging data to be parcellated.
data_labels : list
Labels for the input data.
data_space : str
The space in which the input data is defined.
ignore_background_data : bool
Whether to exclude background voxels from parcel-mean computation.
When True, values specified by `background_value` are masked before
averaging, so they do not dilute parcel means. Default: True
background_value : float, list, set, array, or 'auto'
Value(s) to treat as background when `ignore_background_data=True`.
Accepts a scalar, or any collection of scalars and/or the sentinel
string ``'auto'``/``None``:
- float (e.g. ``0.0``): exclude that specific value
- ``'auto'`` or ``None``: auto-detect from border voxels (volumetric)
or medial wall median (surface)
- list/set/array: any combination of the above
Default: ``['auto', 0.0]`` (excludes detected background and zeros)
drop_background_parcels : bool
Whether to set parcels whose mean equals `background_value` to NaN
after aggregation. Only meaningful when `ignore_background_data` is
False: if `ignore_background_data=True`, all-background parcels
already return NaN from aggregation (no valid values → empty mean),
making this flag redundant. Default: False
min_num_valid_datapoints : int, optional
Minimum number of valid datapoints required per parcel.
min_fraction_valid_datapoints : float, optional
Minimum fraction of valid datapoints required per parcel.
n_proc : int
Number of processors to use for parallel processing.
dtype : data-type
Desired data type of the output.
Returns
-------
pd.DataFrame
Parcellated data in a DataFrame.
Raises
------
TypeError
If the input data type is not recognized.
ValueError
If the resampling target is invalid.
Notes
-----
This function handles different types of input data, including lists, DataFrames, Series, and ndarrays.
It also manages different parcellation formats and resampling targets.
"""
verbose = set_log(lgr, verbose)
# unpack Parcellation object into flat args (lazy import avoids circular dependency)
from .core.parcellation import Parcellation
if isinstance(parcellation, Parcellation):
# bilateral surface parcellating is not yet supported
if getattr(parcellation, "_bilateral", False) and parcellation._space is not None:
active_space = parcellation._space or ""
if "mni" not in active_space.lower():
raise NotImplementedError(
"Surface parcellating with a bilateral Parcellation is not yet supported. "
"Use an MNI space or fetch pre-parcellated data via fetch_reference(). # TODO"
)
parc_labels = parc_labels if parc_labels is not None else parcellation._labels
parc_hemi = parc_hemi if parc_hemi is not None else parcellation._hemi
parc_space = parc_space if parc_space is not None else parcellation._space
# _image_obj requires an active space; if none is set (pre-parcellated data path),
# set to None — list inputs will raise their own error, DataFrame inputs don't need it
parcellation = parcellation._image_obj if parcellation._space is not None else None
## put data into list
if isinstance(data, Path):
data = str(data)
if isinstance(data, (str, tuple, nib.Nifti1Image, nib.GiftiImage)):
data = [data]
## case list
if isinstance(data, (list, dict)):
if isinstance(data, dict):
lgr.info("Input type: dict, assuming (img_name, img) pairs for imaging data.")
data_labels = list(data.keys()) if data_labels is None else data_labels
data = list(data.values())
else:
lgr.info("Input type: list, assuming imaging data.")
# load parcellation
if parcellation is None:
lgr.critical_raise("If input 'data' is list, 'parcellation' must not be None!",
TypeError)
if isinstance(parcellation, Path):
parcellation = str(parcellation)
if isinstance(parcellation, str):
if parcellation.endswith(".nii") | parcellation.endswith(".nii.gz"):
parcellation = images.load_nifti(parcellation)
elif parcellation.endswith(".gii") | parcellation.endswith(".gii.gz"):
parcellation = images.load_gifti(parcellation)
if parc_hemi is None:
lgr.warning("Input is single GIFTI image but 'hemi' is not given. Assuming left!")
parc_hemi = "left"
else:
lgr.error(f"Argument 'parcellation' of type string, but no path ('{parcellation}')!")
elif isinstance(parcellation, nib.GiftiImage):
parcellation = images.load_gifti(parcellation)
elif isinstance(parcellation, nib.Nifti1Image):
parcellation = images.load_nifti(parcellation)
elif isinstance(parcellation, tuple):
parcellation = (images.load_gifti(parcellation[0]),
images.load_gifti(parcellation[1]))
else:
lgr.critical(f"Parcellation data type not recognized! ({type(parcellation)})")
# catch problems
if ("mni" in data_space.lower()) & \
("mni" not in parc_space.lower()) & \
(resampling_target=="data"):
lgr.warning("Data in MNI space but parcellation in surface space and "
"'resampling_target' is 'data'! Cannot resample surface to MNI: "
"Setting 'resampling_target' to 'parcellation'.")
resampling_target = "parcellation"
# number of parcels
if isinstance(parcellation, nib.Nifti1Image):
parc_data = parcellation.get_fdata()
elif isinstance(parcellation, nib.GiftiImage):
parc_data = parcellation.darrays[0].data
elif isinstance(parcellation, tuple):
parc_data = np.c_[parcellation[0].darrays[0].data, parcellation[1].darrays[0].data]
else:
lgr.error("Something is wrong with the loaded parcellation image!")
parc_idc = np.trim_zeros(np.unique(parc_data))
parc_n = len(parc_idc)
# modified neuromaps parcellater: can deal with str, path, nifti, gifti, tuple
parcellater = Parcellater(
parcellation=parcellation,
space="mni152" if "mni" in parc_space.lower() else parc_space,
resampling_target=resampling_target,
hemi=parc_hemi
).fit()
# data extraction function
def extract_data(file):
# apply parcellater
kwargs = dict(
data=file,
space="mni152" if "mni" in data_space.lower() else data_space,
hemi=parc_hemi,
ignore_background_data=ignore_background_data,
background_value=background_value,
fill_dropped=True,
background_parcels_to_nan=drop_background_parcels,
min_num_valid_datapoints=min_num_valid_datapoints,
min_fraction_valid_datapoints=min_fraction_valid_datapoints
)
# apply parcellater
if ignore_zero_division_warning:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "invalid value encountered in divide", RuntimeWarning)
file_parc = parcellater.transform(**kwargs).squeeze()
else:
file_parc = parcellater.transform(**kwargs).squeeze()
# return data and dropped parcels
return (file_parc, parcellater._parc_idc_dropped, parcellater._parc_idc_bg,
parcellater._parc_idc_excl)
# extract data (in parallel)
lgr.info(
f"Background (bg) handling: ignoring bg: {ignore_background_data}"
+ (f" (bg value: {background_value})" if ignore_background_data else "")
+ f"; dropping bg parcels: {drop_background_parcels}"
)
lgr.info(f"Parcellating imaging data.")
# run
data_parc_list = Parallel(n_jobs=n_proc)(
delayed(extract_data)(f) for f in tqdm(
data, desc=f"Parcellating ({n_proc} proc)", disable=not verbose)
)
# collect data
data_parc = np.zeros((len(data), parc_n))
nan_parcels = {"drop": set(), "bg": set(), "excl": set()}
for i, par_out in enumerate(data_parc_list):
data_parc[i, :] = par_out[0]
nan_parcels["drop"].update(set(par_out[1]))
nan_parcels["bg"].update(set(par_out[2]))
nan_parcels["excl"].update(set(par_out[3]))
# dropped parcels
if len(nan_parcels["drop"]) > 0:
lgr.warning(f"Combined across images, up to {len(nan_parcels['drop'])} parcel(s) were dropped "
"after resampling of the parcellation! Data was replaced with nan values."
"Try a coarser parcellation or set 'resampling_target' = 'parcellation' to "
f"avoid this behavior ({[int(i) for i in nan_parcels['drop']]}).")
# background intensity parcels
if drop_background_parcels:
lgr.info(f"Combined across images, {len(nan_parcels['bg'])} parcel(s) had only background "
f"intensity and were set to nan ({[int(i) for i in nan_parcels['bg']]}).")
# below parcel threshold parcels
if min_num_valid_datapoints or min_fraction_valid_datapoints:
msg = (f"Combined across images, {len(nan_parcels['excl'])} parcels were dropped "
f"due to exclusion criteria: ")
if min_num_valid_datapoints and min_fraction_valid_datapoints:
msg += (f"min. n = {min_num_valid_datapoints} and "
f"{min_fraction_valid_datapoints * 100}% non-background datapoints.")
elif min_num_valid_datapoints:
msg += f"min. n = {min_num_valid_datapoints} non-background datapoints."
elif min_fraction_valid_datapoints:
msg += f"min. {min_fraction_valid_datapoints * 100}% non-background datapoints."
msg += f" ({[int(i) for i in nan_parcels['excl']]})."
lgr.info(msg)
# output dataframe
if data_labels is None:
try:
if isinstance(data[0], tuple):
data_labels = [os.path.basename(f[0]).replace(".gii","").replace(".gz","") \
for f in data]
else:
data_labels = [os.path.basename(f).replace(".nii","").replace(".gz","") \
for f in data]
except:
data_labels = list(range(len(data)))
if isinstance(data_labels, str):
data_labels = [data_labels]
df_parc = pd.DataFrame(
data=data_parc,
index=data_labels,
columns=parc_labels
)
## case array
elif isinstance(data, np.ndarray):
lgr.info("Input type: ndarray, assuming parcellated data with shape "
"(n_files/subjects/etc, n_parcels).")
if len(data.shape)==1:
data = data[np.newaxis, :]
df_parc = pd.DataFrame(
data=data,
index=data_labels,
columns=parc_labels
)
## case dataframe
elif isinstance(data, pd.DataFrame):
lgr.info("Input type: DataFrame, assuming parcellated data with shape "
"(n_files/subjects/etc, n_parcels).")
df_parc = pd.DataFrame(
data=data.values,
index=data_labels if data_labels is not None else data.index,
columns=parc_labels if parc_labels is not None else data.columns
)
## case series
elif isinstance(data, pd.Series):
lgr.info("Input type: Series, assuming parcellated data with shape (1, n_parcels).")
df_parc = pd.DataFrame(
data=data.values,
index=parc_labels if parc_labels is not None else data.index,
columns=data_labels if data_labels is not None else [data.name],
)
df_parc = df_parc.T
## case not defined
else:
lgr.critical_raise(f"Can't import from data with type {type(data)}!",
TypeError)
## check for nan's
if df_parc.isnull().any(axis=None):
lgr.warning("Parcellated data contains nan values!")
## return data array
if return_parc:
return df_parc.astype(dtype), parcellation
else:
return df_parc.astype(dtype)
[docs]def read_json(json_path):
if isinstance(json_path, (str, Path)):
with open(json_path) as f:
json_dict = json.load(f)
else:
try:
json_dict = dict(json_path)
except ValueError:
print("Provide path to json-like file or dict-like object!")
return json_dict
[docs]def write_json(json_dict, json_path):
if isinstance(json_path, (str, Path)):
json_path = Path(json_path)
with open(json_path, "w") as f:
json.dump(json_dict, f, indent=4)
else:
print("Provide path-like object for argument 'json_path'")
return json_path
[docs]def load_img(img, override_file_format=False):
# check override_file_format
if override_file_format not in [False, ".nii", ".nii.gz", ".gii", ".gii.gz"]:
raise ValueError("'override_file_format' must be False, '.nii', '.nii.gz', '.gii' or '.gii.gz'")
# to tuple
if isinstance(img, (str, Path, nib.Nifti1Image, nib.GiftiImage)):
img = (img,)
elif isinstance(img, list):
img = tuple(img)
elif isinstance(img, tuple):
pass
else:
raise ValueError("Input must be path, list, tuple or image object")
# load
img_load = []
for i in img:
# if preloaded image, continue
if isinstance(i, (nib.Nifti1Image, nib.GiftiImage)):
pass
# if string, load
elif isinstance(i, (str, Path)):
i = str(i)
if i.endswith(".nii") or i.endswith(".nii.gz"):
i = images.load_nifti(i)
elif override_file_format in [".nii", ".nii.gz"]:
i = Path(i).rename(Path(i).with_suffix(override_file_format))
i = images.load_nifti(i)
elif i.endswith(".gii") or i.endswith(".gii.gz"):
i = images.load_gifti(i)
elif override_file_format in [".gii", ".gii.gz"]:
i = Path(i).rename(Path(i).with_suffix(override_file_format))
i = images.load_gifti(i)
elif i.endswith(".curv"):
i = nib.GiftiImage(
darrays=[nib.gifti.GiftiDataArray(data=nib.freesurfer.read_morph_data(i))]
)
else:
raise ValueError(f"File format of '{i}' not supported. Path must end with .nii(.gz) or .gii(.gz)")
else:
raise ValueError(f"Type {type(i)} not supported! Provide nifti, gifti or path to image file.")
img_load.append(i)
# return as tuple if two, or
return img_load[0] if len(img_load) == 1 else tuple(img_load)
[docs]def load_labels(labels, concat=True, header=None, index=None):
# to tuple
if isinstance(labels, (str, Path, list, np.ndarray, pd.Series)):
labels = (labels,)
elif isinstance(labels, tuple):
pass
else:
raise ValueError("Input must be path, list, ndarray or Series")
# load
labels_load = []
for l in labels:
# return if array/list, to string if path
if isinstance(l, (list, np.ndarray, pd.Series)):
labels_load.append(list(l))
continue
elif isinstance(l, Path):
l = str(l)
# load
try:
l = pd.read_csv(l, header=header, index_col=index).iloc[:,0].to_list()
except:
raise ValueError("File format not supported. Provide path to csv-like text file.")
labels_load.append(l)
# return as tuple if two, or list of one
if len(labels_load) == 1:
labels_load = labels_load[0]
else:
if concat:
labels_load = labels_load[0] + labels_load[1]
else:
labels_load = tuple(labels_load)
return labels_load
[docs]def load_l2rmap(l2rmap, header=0, index=0, threshold=0.0):
# catch None content
if l2rmap is None:
return l2rmap
# catch DataFrame
elif isinstance(l2rmap, pd.DataFrame):
df = l2rmap
# catch path
elif isinstance(l2rmap, (str, Path)):
df = pd.read_csv(l2rmap, header=header, index_col=index)
else:
raise ValueError("Input must be str, path, or DataFrame")
if threshold is not None:
df = df.where(df >= threshold, other=0.0)
return df
[docs]def load_distmat(distmat):
# catch None content
if distmat is None or (isinstance(distmat, tuple) and all([d is None for d in distmat])):
return distmat
# to tuple
if isinstance(distmat, (str, Path, np.ndarray, pd.DataFrame)):
distmat = (distmat,)
elif isinstance(distmat, list):
distmat = tuple(distmat)
elif isinstance(distmat, tuple):
pass
else:
raise ValueError("Input must be path, list, tuple, ndarray, or DataFrame")
# load
distmat_load = []
for d in distmat:
# return if array, to string if path
if isinstance(d, (np.ndarray, pd.DataFrame)):
distmat_load.append(np.array(d))
continue
elif isinstance(d, Path):
d = str(d)
# load
try:
d = pd.read_csv(d, header=None, index_col=None).values
except:
raise ValueError("File format not supported. Provide path to csv-like text file.")
distmat_load.append(d)
# return as tuple if two, or as array if one
return distmat_load[0] if len(distmat_load) == 1 else tuple(distmat_load)
[docs]def load_spinmat(spinmat):
if spinmat is None or (isinstance(spinmat, tuple) and all(s is None for s in spinmat)):
return spinmat
if isinstance(spinmat, (str, Path, np.ndarray)):
spinmat = (spinmat,)
elif isinstance(spinmat, list):
spinmat = tuple(spinmat)
elif isinstance(spinmat, tuple):
pass
else:
raise ValueError("Input must be path, ndarray, or list/tuple thereof")
loaded = []
for s in spinmat:
if isinstance(s, np.ndarray):
loaded.append(s)
elif isinstance(s, (str, Path)):
loaded.append(np.load(Path(s), allow_pickle=False, mmap_mode='c'))
else:
raise ValueError(f"Unsupported spinmat element type: {type(s)}")
return loaded[0] if len(loaded) == 1 else tuple(loaded)
[docs]def to_pickle(obj, filepath, use_dill=False):
"""
Pickle, compress, and save to a file.
Parameters
----------
obj : object
Any python object to be pickled.
filepath : str
File path destination.
"""
# use dill instead of pickle
if use_dill:
pkl = dill
else:
pkl = pickle
# save
if filepath.endswith(".pkl"):
with open(filepath, "wb") as f:
pkl.dump(obj, f)
elif filepath.endswith(".pkl.gz"):
with gzip.open(filepath, "wb") as f:
pkl.dump(obj, f)
elif filepath.endswith(".pkl.blosc"):
arr = pkl.dumps(obj, -1)
with open(filepath, "wb") as f:
s = 0
while s < len(arr):
e = min(s + blosc.MAX_BUFFERSIZE, len(arr))
carr = blosc.compress(arr[s:e], typesize=8)
f.write(carr)
s = e
else:
raise ValueError(f"Unsupported file extension of path: {filepath}")
[docs]def from_pickle(filepath, use_dill=False):
"""
Unpickle a python object.
"""
# use dill instead of pickle
if use_dill:
pkl = dill
else:
pkl = pickle
# load
if filepath.endswith(".pkl"):
with open(filepath, "rb") as f:
return pkl.load(f)
elif filepath.endswith(".pkl.gz"):
with gzip.open(filepath, "rb") as f:
return pkl.load(f)
elif filepath.endswith(".pkl.blosc"):
arr = []
buffsize = blosc.MAX_BUFFERSIZE
with open(filepath, "rb") as f:
while buffsize > 0:
try:
carr = f.read(buffsize)
except (OverflowError, MemoryError):
buffsize = buffsize // 2
continue
if len(carr) == 0:
break
arr.append(blosc.decompress(carr))
if buffsize == 0:
raise RuntimeError("Could not determine a buffer size.")
return pkl.loads(b"".join(arr))
else:
raise ValueError(f"Unsupported file extension of path: {filepath}")
def read_msigdb_json(json_path):
"""
Read MSigDB gene set JSON file and return a "clean" dictionary of gene sets as expected for
collection files in NiSpace.
MSigDB json files can be found at https://www.gsea-msigdb.org/gsea/msigdb/human/genesets.jsp
after selecting a specific gene set.
Parameters
----------
json_path : str, os.PathLike
Path to MSigDB gene set JSON file.
Returns
-------
dict
Dictionary of gene sets. Keys are gene set names, values are lists of gene symbols.
Both keys and values are sorted alphabetically.
"""
in_dict = read_json(json_path)
gene_sets = sorted( in_dict.keys() )
out_dict = {
gene_set: sorted( in_dict[gene_set]["geneSymbols"] )
for gene_set in gene_sets
}
return out_dict