from typing import Union, List, Dict, Tuple
import pathlib
import pandas as pd
import numpy as np
import os
from requests import Session
import logging
lgr = logging.getLogger(__name__)
from . import __commit__
from .core.constants import _PARC_DEFAULT, _SPACE_DEFAULT
from .stats.misc import zscore_df
from .utils.utils import _rm_ext, set_log, merge_parcellations
from .utils.utils_datasets import get_file
from .io import read_json, write_json, load_img, load_distmat, load_spinmat, load_labels, load_l2rmap
from .nulls import _img_density_for_neuromaps
# Set the default nispace data directory environment variable
os.environ['NISPACE_DATA_DIR'] = str(pathlib.Path.home() / "nispace-data")
datalib_dir = pathlib.Path(__file__).parent / "datalib"
reference_lib = read_json(datalib_dir / "reference.json")
template_lib = read_json(datalib_dir / "template.json")
parcellation_lib = read_json(datalib_dir / "parcellation.json")
example_lib = read_json(datalib_dir / "example.json")
# ==================================================================================================
# DEPRECATION MESSAGE STRINGS
# ==================================================================================================
_DEPR_NISPACE_DATA_DIR = (
"The 'nispace_data_dir' parameter is deprecated and will be removed in the first non-dev release. "
"Please use the NISPACE_DATA_DIR environment variable instead."
)
_DEPR_FETCH_PARC_LEGACY = (
"Passing 'space=' to fetch_parcellation() and receiving individual arrays is deprecated "
"and will be removed in the first non-dev release. "
"Use return_parcellation_only=True (or omit space=) to get a Parcellation object, "
"then call .get_image(), .get_dist_mat(), etc. as needed."
)
def keys2list(dct):
return list(dct.keys())
def keys2str(dct, sep=", "):
return sep.join(list(dct.keys()))
# TODO (first non-dev release): remove nispace_data_dir parameter from all fetch_* functions and delete _resolve_nispace_data_dir()
def _resolve_nispace_data_dir(nispace_data_dir):
if nispace_data_dir is not None:
lgr.warning(_DEPR_NISPACE_DATA_DIR)
os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir)
return os.getenv('NISPACE_DATA_DIR')
# EMPTY NISPACE DATA DIR ===========================================================================
# _EMPTY_DATA_CONFIRMED = False
# def empty_nispace_data_dir(nispace_data_dir: Union[str, pathlib.Path] = None):
# global _EMPTY_DATA_CONFIRMED
# if nispace_data_dir is None:
# nispace_data_dir = pathlib.Path.home() / "nispace-data"
# if not _EMPTY_DATA_CONFIRMED:
# lgr.warning("If you call this function again, it will remove all contents of your NiSpace "
# f"data directory at {nispace_data_dir}.")
# lgr.warning("Call it again to proceed.")
# _EMPTY_DATA_CONFIRMED = True
# else:
# lgr.warning(f"Emptying nispace data dir at {nispace_data_dir}.")
# shutil.rmtree(nispace_data_dir)
# nispace_data_dir.mkdir(parents=True, exist_ok=True)
# FILE HANDLING ====================================================================================
def _file_desc(fname, feature_position):
if isinstance(fname, pathlib.Path):
fname = fname.name
fname = fname.split(".")[0]
if isinstance(feature_position, int):
return fname.split("_")[feature_position].split("-")[1]
elif isinstance(feature_position, str):
return fname.split(f"{feature_position}-")[1].split("_")[0]
# BRAIN TEMPLATES ==================================================================================
[docs]def fetch_template(template: str = _SPACE_DEFAULT,
res: str = None,
desc: str = None,
#parcellation: str = None,
hemi: Union[List[str], str] = ["L", "R"],
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch a brain template.
Parameters
----------
template : str, optional
The template to fetch. Default is "MNI152NLin2009cAsym".
res : str, optional
The resolution of the template to fetch. Defaults: "1mm" for MNI152, "41k" for
fsaverage (available: 3k/10k/41k/164k), "32k" for fsLR (available: 4k/8k/32k/164k).
desc : str, optional
The image type. Defaults: "T1w" for MNI152 (also: brain/mask/gmprob); "pial" for
fsaverage (also: white/inflated/sphere/sulc/medial/vaavg); "midthickness" for fsLR
(also: inflated/veryinflated[32k+]/sphere/sulc/medial/vaavg).
hemi : list of str, optional
The hemispheres to fetch. Default is ["L", "R"].
nispace_data_dir : str or pathlib.Path, optional
The directory containing the NiSpace data. Default is None.
Returns
-------
The template.
"""
verbose = set_log(lgr, verbose)
# check if template exists
if template not in template_lib:
raise ValueError(f"Template '{template}' not found. Available: {keys2str(template_lib)}")
nispace_data_dir = _resolve_nispace_data_dir(nispace_data_dir)
# paths
base_dir = pathlib.Path(nispace_data_dir) / "template" / template
map_dir = base_dir / "map"
# set defaults:
if "mni" in template.lower():
res = "1mm" if res is None else res
desc = "T1w" if desc is None else desc
hemi = None
elif "fsa" in template.lower():
res = "41k" if res is None else res
desc = "pial" if desc is None else desc
if hemi is None:
hemi = ["L", "R"]
elif "fslr" in template.lower():
res = "32k" if res is None else res
desc = "midthickness" if desc is None else desc
if hemi is None:
hemi = ["L", "R"]
# check settings
if res not in template_lib[template]:
raise ValueError(f"res = '{res}' not defined. Choose one of {keys2str(template_lib[template])}!")
if desc not in template_lib[template][res]:
raise ValueError(f"desc = '{desc}' not defined. Choose one of {keys2str(template_lib[template][res])}!")
if hemi is not None:
if isinstance(hemi, str):
hemi = [hemi]
if hemi not in [["L"], ["R"], ["L", "R"]]:
raise ValueError(f"hemi = '{hemi}' not defined. Choose one of 'L', 'R', or ['L', 'R']!")
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash)
# get file
lgr.info(f"Loading {template} '{desc}' template in '{res}' resolution.")
if "mni" in template.lower():
tpl_file = get_file(
map_dir / desc / f"tpl-{template}_desc-{desc}_res-{res}.%s",
**template_lib[template][res][desc],
**get_file_kwargs,
)
else:
tpl_file = ()
for h in hemi:
tpl_file += get_file(
map_dir / desc / f"tpl-{template}_desc-{desc}_res-{res}_hemi-{h}.%s",
**template_lib[template][res][desc][h],
**get_file_kwargs,
),
if len(tpl_file) == 1:
tpl_file = tpl_file[0]
# f
return tpl_file
# PARCELLATIONS ===================================================================================
def _parc_alias(parcellation: str):
if "alias" in parcellation_lib[parcellation]:
parc = parcellation_lib[parcellation]["alias"]
else:
parc = parcellation
return parc
def _parc_symmetric(parc_labels):
labels_lh = [l.split("hemi-L")[1] for l in parc_labels if "hemi-L" in l]
labels_rh = [l.split("hemi-R")[1] for l in parc_labels if "hemi-R" in l]
if not labels_lh or not labels_rh:
return False
if labels_lh == labels_rh:
return True
return False
def _print_parcellations():
return ", ".join([p for p in parcellation_lib.keys() if "alias" not in parcellation_lib[p]])
def _check_parcellation(parcellation: str, force_list: bool = False, force_str: bool = False,
raise_not_found=True):
"""
Check if a parcellation name is valid and return the correct parcellation name as a string or
a list of strings containing a cortex-subcortex combination.
"""
# Path strings are not library names — catch before substring matching eats the path
if isinstance(parcellation, os.PathLike) or (
isinstance(parcellation, str) and (
os.sep in parcellation
or "/" in parcellation
or any(parcellation.endswith(ext) for ext in (".nii", ".nii.gz", ".gii", ".gii.gz"))
)
):
if raise_not_found:
lgr.critical_raise(
f"'{parcellation}' looks like a file path, not an integrated parcellation name. "
"Pass a pathlib.Path object or use a registered library name.",
ValueError,
)
return None
# helper function to check if an iterable of n=2 parcellations are cortex-subcortex combinations
def _check_cortex_subcortex(parc):
levels = []
for p in parc:
p_space = list(parcellation_lib[p].keys())[0]
levels.append(parcellation_lib[p][p_space]["level"])
if set(levels) != {"cortex", "subcortex"}:
lgr.critical_raise(f"Only cortex-subcortex combinations are allowed, not: {', '.join(levels)} ",
ValueError)
else:
# if we got to here, we have a cortex-subcortex combination; now ensure correct order
return [parc[levels.index("cortex")], parc[levels.index("subcortex")]]
# Parcellation can be a string as it appears in parcellation_lib (e.g., "Schaefer100")
# OR multiple strings from parcellation_lib concatenated without (e.g., "Schaefer100TianS1")
# or with a space in between (e.g., "Schaefer100 TianS1")
# (1) We check if parcellation is a string
assert isinstance(parcellation, (str)), f"Parcellation must be of type string, not {type(parcellation)}!"
# (2) We check if it is in parcellation_lib as is
parc = None
if parcellation in parcellation_lib:
parc = _parc_alias(parcellation)
# alias may point to a combined name (e.g. "BrainnetomeCorticalBrainnetomeSubcortical")
# that is not itself a key — resolve it through partial matching
if isinstance(parc, str) and parc not in parcellation_lib:
parcellation = parc
parc = None
# (3) If not, we check if we get a partial match
if parc is None:
# get a list of potential partial matches; skip combined-alias keys (their resolved alias
# is not itself a library entry, so they would pollute de-nesting in step 3b)
parc_matches = list(set([
_parc_alias(p) for p in parcellation_lib
if p in parcellation and _parc_alias(p) in parcellation_lib
]))
# (3a) No match found: raise error
if len(parc_matches) == 0:
if raise_not_found:
lgr.critical_raise(f"Parcellation '{parcellation}' not found.\nAvailable "
f"(cortex-subcortex-combinations allowed): {_print_parcellations()}",
FileNotFoundError)
else:
return
# (3b) > 2 matches found: check if matches are contained in each other or raise error
elif len(parc_matches) > 2:
# (3b1) check if matches are contained in each other and remove the contained ones
parc = parc_matches.copy()
for p in parc_matches:
if any([p in p_other for p_other in set(parc_matches) - {p}]):
parc.remove(p)
# (3b2) if still not 2, raise error
if len(parc) != 2:
lgr.critical_raise(f"Parcellation '{parcellation}' matches more than 2 parcellations: "
"{', '.join(parc_matches)}.",
ValueError)
# (3c) 1 match found: use it
elif len(parc_matches) == 1:
parc = parc_matches[0]
# (3d) 2 matches found: check if they are cortex-subcortex combinations
else:
parc = _check_cortex_subcortex(parc_matches)
# output format
if force_list and not force_str and isinstance(parc, str):
parc = [parc]
elif force_str and not force_list and isinstance(parc, list):
parc = "".join(parc)
return parc
[docs]def fetch_parcellation(parcellation: str = _PARC_DEFAULT,
space: str = None,
hemi: Union[List[str], str] = ["L", "R"],
bilateral: bool = False,
return_parcellation_only: bool = False,
# TODO (first non-dev release): remove all return_* kwargs and legacy space= array-return path; keep return_parcellation_only only
return_labels: bool = True,
return_space: bool = False,
return_resolution: bool = False,
return_symmetric: bool = False,
return_l2rmap: bool = False,
return_lrcorr: bool = False,
return_dist_mat: bool = False,
return_spin_mat: bool = False,
return_loaded: bool = True,
lrcorr_threshold: float = 0.0,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch a parcellation.
"""
verbose = set_log(lgr, verbose)
# check parcellation and return correct name or list of two names
parc = _check_parcellation(parcellation)
# if list, we need to merge parcellation and associated data , so we need to load stuff
return_loaded = True if isinstance(parc, str) else return_loaded
nispace_data_dir = _resolve_nispace_data_dir(nispace_data_dir)
# function to load individual parcellation and associated data
def load_parc(p, space=space, hemi=hemi, return_labels=return_labels, return_space=return_space,
return_resolution=return_resolution, return_symmetric=return_symmetric, return_l2rmap=return_l2rmap,
return_lrcorr=return_lrcorr,
return_dist_mat=return_dist_mat, return_spin_mat=return_spin_mat, return_loaded=return_loaded,
nispace_data_dir=nispace_data_dir, overwrite=overwrite, check_file_hash=check_file_hash):
# Check space
if space is None:
# get default space -> first space listed in parcellation_lib
space = list(parcellation_lib[p].keys())[0]
else:
if space not in parcellation_lib[p]:
lgr.critical_raise(f"Space '{space}' not found for parcellation '{p}'.\n"
f"Available: {keys2str(parcellation_lib[p])}",
ValueError)
# data directory
base_dir = pathlib.Path(nispace_data_dir) / "parcellation" / p / space
# Symmetry
if "l2rmap" in parcellation_lib[p][space] or "lrcorr" in parcellation_lib[p][space]:
symmetric = False
else:
symmetric = True
# LOAD
lgr.info(f"Loading {parcellation_lib[p][space]['level']} parcellation '{p}' in '{space}' space.")
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash)
# volume
if "mni" in space.lower():
# get files
parcellation_file = get_file(
base_dir / f"parc-{p}_space-{space}.%s", **parcellation_lib[p][space]["map"],
**get_file_kwargs,
)
if return_labels:
label_file = get_file(
base_dir / f"parc-{p}_space-{space}.label.txt", **parcellation_lib[p][space]["label"],
**get_file_kwargs,
)
if return_l2rmap and not symmetric:
l2rmap_file = get_file(
base_dir / f"parc-{p}_space-{space}.l2rmap.csv.gz", **parcellation_lib[p][space]["l2rmap"],
**get_file_kwargs,
)
elif return_l2rmap and symmetric:
l2rmap_file = None
if return_lrcorr and not symmetric and "lrcorr" in parcellation_lib[p][space]:
lrcorr_file = get_file(
base_dir / f"parc-{p}_space-{space}.lrcorr.csv.gz", **parcellation_lib[p][space]["lrcorr"],
**get_file_kwargs,
)
else:
lrcorr_file = None
if return_dist_mat:
distmat_file = get_file(
base_dir / f"parc-{p}_space-{space}.dist.csv.gz", **parcellation_lib[p][space]["distmat"],
**get_file_kwargs,
)
if return_spin_mat:
spinmat_file = None # spin tests not available for volumetric parcellations
# surface
else:
# check hemis
if isinstance(hemi, str):
hemi = [hemi]
if hemi not in [["L"], ["R"], ["L", "R"]]:
raise ValueError(f"hemi = '{hemi}' not defined. Choose one of 'L', 'R', or ['L', 'R']!")
# get files
parcellation_file, label_file, distmat_file, spinmat_file = (), (), (), ()
for h in hemi:
parcellation_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.%s", **parcellation_lib[p][space]["map"][h],
**get_file_kwargs,
),
if return_labels:
label_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.label.txt", **parcellation_lib[p][space]["label"][h],
**get_file_kwargs,
),
if return_dist_mat:
if "fslr" in space.lower():
lgr.warning("Distance matrices for fslr spaces are currently not available. Returning None.")
distmat_file += None,
else:
distmat_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.dist.csv.gz", **parcellation_lib[p][space]["distmat"][h],
**get_file_kwargs,
),
if return_spin_mat:
if "spinmat" in parcellation_lib[p][space]:
spinmat_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.spin.npy", **parcellation_lib[p][space]["spinmat"][h],
**get_file_kwargs,
),
else:
spinmat_file += None,
if return_l2rmap and not symmetric:
l2rmap_file = get_file(
base_dir / f"parc-{p}_space-{space}.l2rmap.csv.gz", **parcellation_lib[p][space]["l2rmap"],
**get_file_kwargs,
)
elif return_l2rmap and symmetric:
l2rmap_file = None
if return_lrcorr and not symmetric and "lrcorr" in parcellation_lib[p][space]:
lrcorr_file = get_file(
base_dir / f"parc-{p}_space-{space}.lrcorr.csv.gz", **parcellation_lib[p][space]["lrcorr"],
**get_file_kwargs,
)
else:
lrcorr_file = None
if return_spin_mat and "spinmat" not in parcellation_lib[p][space]:
lgr.info(f"No pre-computed spin matrix available for '{p}' in '{space}' space.")
if len(parcellation_file) == 1:
parcellation_file = parcellation_file[0]
if return_labels:
label_file = label_file[0]
if return_dist_mat:
distmat_file = distmat_file[0]
if return_spin_mat:
spinmat_file = spinmat_file[0]
l2rmap_file = None
# return
# build output
out = {}
# parc
out["parc"] = load_img(parcellation_file) if return_loaded else parcellation_file
# label
if return_labels:
out["label"] = load_labels(label_file) if return_loaded else label_file
# space
if return_space:
out["space"] = space
# res
if return_resolution:
out["res"] = _img_density_for_neuromaps(load_img(parcellation_file))
# symmetric
if return_symmetric:
out["sym"] = symmetric
# l2rmap
if return_l2rmap:
out["l2rmap"] = load_l2rmap(l2rmap_file, threshold=lrcorr_threshold) if return_loaded else l2rmap_file
# lrcorr
if return_lrcorr:
out["lrcorr"] = load_l2rmap(lrcorr_file, threshold=lrcorr_threshold) if return_loaded else lrcorr_file
# distmat
if return_dist_mat:
out["distmat"] = load_distmat(distmat_file) if return_loaded else distmat_file
# spinmat
if return_spin_mat:
out["spinmat"] = load_spinmat(spinmat_file) if return_loaded else spinmat_file
return out
# ---- NEW PATH: space=None OR return_parcellation_only=True → Parcellation object ----
if space is None or return_parcellation_only:
from .core.parcellation import Parcellation
if isinstance(parc, list):
parc_obj = Parcellation.from_nispace_library(
parc,
[parcellation_lib[parc[0]], parcellation_lib[parc[1]]],
nispace_data_dir,
load_dist_mat=return_dist_mat,
load_spin_mat=return_spin_mat,
lrcorr_threshold=lrcorr_threshold,
overwrite=overwrite,
check_file_hash=check_file_hash,
verbose=verbose,
)
else:
parc_obj = Parcellation.from_nispace_library(
parc,
parcellation_lib[parc],
nispace_data_dir,
load_dist_mat=return_dist_mat,
load_spin_mat=return_spin_mat,
lrcorr_threshold=lrcorr_threshold,
overwrite=overwrite,
check_file_hash=check_file_hash,
verbose=verbose,
)
if bilateral:
parc_obj.make_bilateral()
# hemisphere selection: normalise hemi arg and call select_hemi if single hemi
_hemi_set = set([hemi] if isinstance(hemi, str) else list(hemi))
if not _hemi_set >= {"L", "R"}:
h = next(iter(_hemi_set), None)
if h in ("L", "R"):
parc_obj.select_hemi(h, verbose=verbose)
# activate the requested space if one was given
if space is not None:
parc_obj.set_active_space(space)
return parc_obj
# ---- LEGACY PATH: space explicitly given and return_parcellation_only=False ----
lgr.warning(_DEPR_FETCH_PARC_LEGACY)
# run load_parc for a single parcellation
if isinstance(parc, str):
out = load_parc(parc)
if len(out) == 1:
return list(out.values())[0]
else:
return tuple(out.values())
# run load_parc for 2 parcellations
out_cortex = load_parc(parc[0])
out_subcortex = load_parc(parc[1])
lgr.info(f"Merging to cortex-subcortex parcellation '{parc[0]}{parc[1]}'.")
# now, we will have to combine the data
out = {}
# combine parcellations
out["parc"] = merge_parcellations([out_cortex["parc"], out_subcortex["parc"]], quick=True)#[0]
# label
if return_labels:
out["label"] = out_cortex["label"] + out_subcortex["label"]
# space
if return_space:
out["space"] = out_cortex["space"]
# res
if return_resolution:
out["res"] = out_cortex["res"]
# symmetric
if return_symmetric:
out["sym"] = True if out_cortex["sym"] and out_subcortex["sym"] else False
# lrcorr (merged case: just use cortex lrcorr, subcortex is symmetric)
if return_lrcorr:
out["lrcorr"] = out_cortex.get("lrcorr", None)
# l2rmap
if return_l2rmap:
if out_cortex["l2rmap"] is None and out_subcortex["l2rmap"] is None:
out["l2rmap"] = None
else:
if not return_labels:
lgr.critical_raise("Cannot return merged l2rmap when return_labels=False!", ValueError)
out["l2rmap"] = pd.DataFrame(
np.eye(len(out["label"]) // 2),
index=[l for l in out["label"] if "hemi-L" in l],
columns=[l for l in out["label"] if "hemi-R" in l]
)
if out_cortex["l2rmap"] is not None:
out["l2rmap"].loc[out_cortex["l2rmap"].index, out_cortex["l2rmap"].columns] = out_cortex["l2rmap"]
if out_subcortex["l2rmap"] is not None:
out["l2rmap"].loc[out_subcortex["l2rmap"].index, out_subcortex["l2rmap"].columns] = out_subcortex["l2rmap"]
# distmat
if return_dist_mat:
lgr.info("Distance matrices for merged parcellations are currently not available. Returning None.")
out["distmat"] = None
# spinmat
if return_spin_mat:
out["spinmat"] = None
# return
if len(out) == 1:
return list(out.values())[0]
else:
return tuple(out.values())
[docs]def fetch_collection(collection: Union[str, pathlib.Path, np.ndarray, pd.DataFrame, pd.Series, list],
dataset: str = None,
maps: list = None,
set_size_range: Union[None, Tuple[int, int]] = None,
weight_range: Union[None, Tuple[float, float]] = None,
weight_quantile: float = None,
set_specificity: float = None,
return_maps: bool = False,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch a collection that defines a subset (and optional grouping) of maps.
A collection is a mapping from map IDs to optional set labels and weights. The result
is a DataFrame with columns ``["map"]``, ``["set", "map"]``, or ``["set", "map", "weight"]``
depending on the collection content.
Three `.collect` file formats are supported:
1. **Simple list** — plain text, one map ID per line, ``map`` header.
2. **JSON set dict** — ``{"set_name": ["map1", "map2", ...], ...}``.
3. **CSV set table** — columns: ``map``, ``set, map``, or ``set, map, weight``.
Parameters
----------
collection : str, Path, ndarray, DataFrame, Series, or list
When ``dataset`` is given: the name of an integrated collection (e.g. ``"All"``,
``"BrainSpanWeights"``).
When ``dataset`` is None: a path to a ``.collect`` file, or an in-memory
array-like / DataFrame that is used directly.
dataset : str, optional
Name of an integrated NiSpace reference dataset (e.g. ``"mrna"``, ``"pet"``).
If provided, ``collection`` must be the name of one of that dataset's registered
collections.
maps : list, optional
Restrict to this subset of map IDs after loading.
set_size_range : tuple (int, int), optional
Keep only sets whose membership count falls within ``[min, max]`` (inclusive).
weight_range : tuple (float, float), optional
Keep only entries whose weight is within ``[min, max]`` (inclusive).
Ignored when the collection has no weights.
weight_quantile : float, optional
Within each set, keep only entries with weight ≥ this quantile. Ignored when
the collection has no weights.
set_specificity : float in (0, 1], optional
Keep only maps that appear in ≤ ``set_specificity`` fraction of all sets,
i.e. discard ubiquitous maps.
return_maps : bool, default False
If True, return a tuple ``(collection_df, maps_avail)`` where ``maps_avail`` is
the deduplicated list of map IDs after all filters.
nispace_data_dir : str or Path, optional
Override the NiSpace data directory (default: ``$NISPACE_DATA_DIR``).
overwrite : bool, default False
Re-download the collection file even if it is already cached.
check_file_hash : bool, default True
Verify the SHA-256 hash of the downloaded file.
verbose : bool, default True
Print progress messages.
Returns
-------
collection_df : DataFrame
Columns: ``["map"]`` for unstructured collections; ``["set", "map"]`` for grouped
collections; ``["set", "map", "weight"]`` for weighted collections.
maps_avail : list of str
Only returned when ``return_maps=True``. Deduplicated map IDs present in
``collection_df`` after filtering.
"""
verbose = set_log(lgr, verbose)
# If dataset is provided, assume to load integrated collection
# check if dataset is valid
if dataset is not None:
if dataset not in reference_lib:
lgr.critical_raise(f"Dataset '{dataset}' not found. Available: {keys2str(reference_lib)}",
ValueError)
else:
nispace_data_dir = _resolve_nispace_data_dir(nispace_data_dir)
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset
# get integrated collection
if collection in reference_lib[dataset]["collection"]:
lgr.info(f"Loading integrated collection '{collection}' for dataset '{dataset}'.")
collection_path = base_dir / f"collection-{collection}.collect"
collection_file = get_file(
collection_path, **reference_lib[dataset]["collection"][collection],
overwrite=overwrite, hash_check=check_file_hash,
)
else:
lgr.critical_raise(f"Collection '{collection}' not found for dataset '{dataset}'. "
f"Available: {keys2str(reference_lib[dataset]['collection'])}",
ValueError)
# dataset is not provided, assume to load custom collection
else:
# check if collection is a file
if isinstance(collection, (str, pathlib.Path)):
collection_file = pathlib.Path(collection)
if collection_file.exists():
lgr.info(f"Loading custom collection from file: {collection_file}")
else:
lgr.critical_raise(f"Assuming collection '{collection_file}' to be a file, but it does not exist! "
"Ensure that the file exists and try again. If you want to load an integrated collection, "
"use the 'dataset' argument.",
ValueError)
# else, assume to load array-like object
else:
lgr.info(f"Loading custom collection of type {type(collection)}.")
collection_file = collection
# Load collection file; 1-column df (= maps) or 2-column df (= set and maps)
collection_df = _load_collection(collection_file)
# apply filters
collection_df, maps_avail = _apply_collection_filter(
collection_df=collection_df,
maps=maps,
set_size_range=set_size_range,
weight_range=weight_range,
weight_quantile=weight_quantile,
set_specificity=set_specificity
)
# return
return collection_df if not return_maps else (collection_df, maps_avail)
[docs]def apply_collection(data: pd.DataFrame, collection: pd.DataFrame):
if not np.isin(["map"], collection.columns).all():
lgr.critical_raise("collection must have at least a 'map' column.")
maps_intersection = data.index.intersection(collection["map"].unique())
collection_df_intersection = collection[collection["map"].isin(maps_intersection)]
data_out = data.copy()
data_out = data_out.loc[collection_df_intersection["map"]]
data_out.index = pd.MultiIndex.from_frame(collection_df_intersection)
return data_out
# REFERENCE DATA - PRIVATE =========================================================================
def _filter_maps(maps_avail: List[str],
maps: Union[str, List[str], Dict[str, Union[str, list]]]) -> List[pathlib.Path]:
def matches_filters(map_name: str, filters: Dict[str, Union[str, List[str]]]) -> bool:
for filter_name, filter_content in filters.items():
if filter_content not in [None, False, "", []]:
if isinstance(filter_content, (str, int)):
filter_content = [filter_content]
filter_content = list(map(str, filter_content))
if filter_name == "n" and filter_content[0].startswith(">"):
try:
filter_n = int(filter_content[0].replace(">", ""))
n_value = int(_file_desc(map_name, 2))
if n_value <= filter_n:
return False
except (ValueError, IndexError):
continue # Skip this filter if parsing fails
else:
if not any(f"{filter_name}-{content}".lower() in map_name.lower()
for content in filter_content):
return False
return True
if isinstance(maps, str):
maps = [maps]
if isinstance(maps, list):
maps = list(set(maps))
filtered_maps = [f for f in maps_avail if any(map_str in f for map_str in maps)]
elif isinstance(maps, dict):
filtered_maps = [f for f in maps_avail if matches_filters(f, maps)]
else:
filtered_maps = maps_avail
return filtered_maps
def _load_collection(collection_path):
# if path, read file
if isinstance(collection_path, (str, pathlib.Path)):
collection_path = pathlib.Path(collection_path)
ext = collection_path.suffix
# if "collect" file, detect if dict or table
if ext == ".collect":
with open(collection_path) as f:
header = f.readline()
if header.startswith("{"):
ext = ".json"
else:
ext = ".csv"
# if json, load into dict
if ext == ".json":
collection = read_json(collection_path)
# else, try to directly load as table file
else:
with open(collection_path) as f:
header = f.readline().strip("\n")
if any([h in header for h in ["set", "map", "weight"]]):
header = 0
else:
header = None
collection = pd.read_csv(collection_path, header=header, sep=",")
else:
collection = collection_path
# if array, convert all do df
if isinstance(collection, (np.ndarray, pd.DataFrame, pd.Series, list)):
collection = pd.DataFrame(collection)
# if dict, convert to df as well
elif isinstance(collection, dict):
collection = pd.concat([pd.DataFrame({0:k, 1:v}) for k, v in collection.items()])
# else
else:
raise TypeError(f"Datatype {type(collection_path)} not accepted for argument 'collection'.")
# process depending on number of columns
n_cols = collection.shape[1]
if n_cols == 0:
raise ValueError("No columns detected in collection file?!")
elif n_cols == 1:
collection.columns = ["map"]
elif n_cols == 2:
collection.columns = ["set", "map"]
elif n_cols == 3:
collection.columns = ["set", "map", "weight"]
else:
raise ValueError(f"Collection file with > 3 columns not supported ({n_cols} columns)!")
# return
return collection.reset_index(drop=True)
def _apply_collection_filter(#dataset: str,
collection_df: pd.DataFrame,
maps: List[Union[str, pathlib.Path]] = None,
#collection: str,
#nispace_data_dir: Union[str, pathlib.Path],
set_size_range: Union[None, Tuple[int, int]] = None,
weight_range: Union[None, Tuple[float, float]] = None,
weight_quantile: Union[None, float] = None,
set_specificity: Union[None, float] = None,
#overwrite: bool = False,
#check_file_hash: bool = True
) -> List[pathlib.Path]:
# Apply maps filter
lgr.info(f"Filtering maps by collection.")
if maps is None or len(maps) == 0:
filtered_map_files = collection_df["map"].unique()
elif isinstance(maps[0], pathlib.Path):
map_names = [_rm_ext(f.name) for f in maps]
filtered_map_files = [
maps[map_names.index(f_name)] for f_name in collection_df["map"].unique()
if f_name in map_names
]
collection_df = collection_df[collection_df["map"].isin(map_names)]
else:
filtered_map_files = [
f_name for f_name in collection_df["map"].unique()
if any(m == f_name for m in maps)
]
collection_df = collection_df[collection_df["map"].isin(filtered_map_files)]
# Apply
if set_specificity is not None:
n_sets = len(collection_df["set"].unique())
collection_df = (
collection_df
.groupby("map")
.filter(lambda x: x.shape[0] <= n_sets * set_specificity)
.reset_index(drop=True)
)
lgr.info(f"Keeping maps occuring in <= {set_specificity:.02%} of "
f"{collection_df['set'].nunique()} retained sets.")
# Apply weight quantile filter
if weight_quantile is not None:
if "weight" not in collection_df.columns:
lgr.warning("Collection does not seem to contain weights, will not apply weight filter.")
else:
collection_df = (
collection_df
.groupby("set", sort=False)
.apply(lambda x: x[x.weight >= x.weight.quantile(0.9)])
.reset_index(drop=True)
)
lgr.info(f"Filtered to maps with weights >= quantile {weight_quantile} within each set.")
# Apply absolute weight filter
if weight_range is not None:
if "weight" not in collection_df.columns:
lgr.warning("Collection does not seem to contain weights, will not apply weight filter.")
else:
weight_range = [
x if x is not None else x_
for x, x_
in zip(weight_range, (-np.inf, np.inf))
]
collection_df = collection_df[collection_df["weight"].between(*weight_range, inclusive="both")]
lgr.info(f"Filtered to {len(collection_df['set'].unique())} collection sets with weights between "
f"{weight_range[0]} and {weight_range[1]}.")
# Apply size filter
if set_size_range is not None:
if "set" in collection_df.columns and isinstance(set_size_range, (tuple, list)):
set_size_range = [
x if x is not None else x_
for x, x_
in zip(set_size_range, (1, np.inf))
]
collection_df = (
collection_df
.groupby("set", sort=False)
.filter(lambda x: set_size_range[0] <= x.shape[0] <= set_size_range[1])
.reset_index(drop=True)
)
n_sets = len(collection_df["set"].unique())
if n_sets == 0:
lgr.critical_raise(f"No collection sets found with between {set_size_range[0]} and "
f"{set_size_range[1]} maps. Adjust the 'set_size_range' parameter.",
ValueError)
filtered_map_files = list( set(filtered_map_files).intersection(set(collection_df["map"])) )
lgr.info(f"Filtered to {n_sets} collection sets with between "
f"{set_size_range[0]} and {set_size_range[1]} maps.")
return collection_df, filtered_map_files
def _load_parcellated_data(dataset: str,
nispace_data_dir: Union[str, pathlib.Path],
parc: Union[str, List[str]],
map_files: List[str],
collection_df: pd.DataFrame,
standardize: bool,
set_size_range: Union[None, Tuple[int, int]] = None,
merge_how: str = "inner",
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True) -> Union[pd.DataFrame, Tuple[pd.DataFrame, Dict]]:
verbose = set_log(lgr, verbose)
# tab dir
tab_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset / "tab"
# parcellation can be string with one parcellation name or list of two parcellation names
if isinstance(parc, str):
lgr.info(f"Loading data parcellated with '{parc}'")
# all to list
parc = [parc]
elif isinstance(parc, list):
lgr.info(f"Loading and {merge_how}-merging data parcellated with '{parc[0]}' and '{parc[1]}'")
else:
lgr.critical_raise(f"Invalid parcellation type: {type(parc)}", ValueError)
# loop through parcellations
data = []
for p in parc:
# check if parcellation available for this data
if p not in reference_lib[dataset]["tab"]:
lgr.critical_raise(f"Dataset '{dataset}' is not available for parcellation '{p}'!\n"
f"Available: {keys2str(reference_lib[dataset]['tab'])}",
FileNotFoundError)
# file
parcellation_file = tab_dir / f"dset-{dataset}_parc-{p}.csv.gz"
lgr.debug(f"Loading {parcellation_file}")
# load data
data.append(pd.read_csv(
get_file(
parcellation_file, **reference_lib[dataset]["tab"][p],
overwrite=overwrite, hash_check=check_file_hash,
),
index_col=0
))
lgr.debug(f"Loaded parcellated data of shape {data[-1].shape}")
lgr.debug(f"First 5 map names: {data[-1].index.to_list()[:5]}")
# merge if necessary: all maps are kept even if they are not present in both parcellations
if len(parc) > 1:
data = data[0].merge(data[1], how=merge_how, left_index=True, right_index=True)
else:
data = data[0]
# Apply filter to the dataframe index
lgr.debug(f"Applying filtering based on maps, first 5: {map_files[:5]}")
if isinstance(map_files[0], pathlib.Path):
map_files = [_rm_ext(f.name) for f in map_files]
data = data.loc[data.index.intersection(map_files)]
lgr.debug(f"Shape after filtering based on map_names: {data.shape}")
# Apply collection index (-> handles maps that are present multiple times in different sets)
if collection_df is not None:
data = apply_collection(data, collection_df)
# Re-apply set size filter: maps missing from parcellated CSV can silently reduce set sizes
if set_size_range is not None and "set" in data.index.names:
size_range = [
x if x is not None else x_
for x, x_ in zip(set_size_range, (1, np.inf))
]
set_counts = data.index.get_level_values("set").value_counts()
valid_sets = set_counts[(set_counts >= size_range[0]) & (set_counts <= size_range[1])].index
data = data[data.index.get_level_values("set").isin(valid_sets)]
lgr.debug(f"After parcellated-data set size filter: {len(valid_sets)} sets remaining.")
# Standardize
if standardize:
lgr.info("Standardizing parcellated data.")
data = zscore_df(data, along="rows")
return data
def _print_references(dataset: str, meta: pd.DataFrame = None):
# info file
def get_ref_info(dataset, add_commit=True):
get_line = False
msg = ""
with open(datalib_dir / "reference.txt", "r") as f:
for line in f:
if line.lower().startswith(f"# {dataset.lower()}"):
get_line = True
continue
if get_line and line == "\n":
break
if get_line:
msg += line
if add_commit:
msg += f"To ensure reproducibility, note the NiSpace commit/version: {__commit__}\n"
msg += "\n"
return msg
# PET
if dataset.lower() == "pet":
msg = get_ref_info(dataset)
if meta is not None:
atlas_maxlen = max([len(x) for x in meta["atlas"]])
author_maxlen = max([len(x) for x in meta["publication"]])
license_maxlen = max([len(x) for x in meta["license"]])
for atlas, pub, doi, license, note in zip(
meta["atlas"], meta["publication"], meta["doi"], meta["license"], meta["note"]
):
doi_list = [f"https://doi.org/{doi}" for doi in doi.replace(" ", "").split(";")]
if "" in doi_list: doi_list.remove("")
doi_str = ", ".join(doi_list)
atlas = atlas.ljust(atlas_maxlen)
author = pub.capitalize().ljust(author_maxlen)
license = license.ljust(license_maxlen)
msg += f"- {atlas} Source: {author} {license} {doi_str}\n"
if not pd.isna(note):
msg += f" CAVE: {note}\n"
# all others
else:
msg = get_ref_info(dataset)
if meta is not None:
if len(meta) > 0:
collection_maxlen = max([len(x) for x in meta["collection"]])
author_maxlen = max([len(x) for x in meta["author"]])
for collection, pub, doi in zip(meta["collection"], meta["author"], meta["doi"]):
collection = collection.ljust(collection_maxlen)
author = pub.capitalize().ljust(author_maxlen)
msg += f"- {collection} Source: {author} https://doi.org/{doi}\n"
# RSN
# elif dataset.lower() == "rsn":
# msg = get_ref_info(dataset)
# if meta is not None:
# if len(meta) > 0:
# author_maxlen = max([len(x) for x in meta["author"]])
# for pub, doi in zip(meta["author"], meta["doi"]):
# author = pub.capitalize().ljust(author_maxlen)
# msg += f"- {author} https://doi.org/{doi}\n"
# print
# if msg[-2:] != "\n":
# msg += "\n"
print(msg)
# REFERENCE DATA - PUBLIC ==========================================================================
# TODO: fetch_reference is ~280 lines and handles two distinct code paths (image-path fetching vs.
# parcellated data loading). Consider splitting into fetch_reference_maps() and fetch_reference_tab()
# with a shared dispatcher, or at minimum extracting the two branches into private helpers.
[docs]def fetch_reference(dataset: str,
maps: Union[None, str, List[str], Dict[str, Union[str, list]]] = None,
space: str = _SPACE_DEFAULT,
collection: str = None,
sets: Union[None, str, List[str]] = None,
set_size_range: Union[None, Tuple[int, int]] = None,
weight_range: Union[None, Tuple[float, float]] = None,
weight_quantile: Union[None, float] = None,
set_specificity: Union[None, float] = None,
parcellation: str = None,
bilateral: bool = False,
hemi: Union[str, List[str]] = None,
standardize_parcellated: bool = False,
return_metadata: bool = False,
print_references: bool = True,
osf_config_file: str = None,
github_config_file: str = None,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
verbose = set_log(lgr, verbose)
# --- handle Parcellation object passed as parcellation= ---
from .core.parcellation import Parcellation as _Parcellation
if isinstance(parcellation, _Parcellation):
if parcellation._is_combined:
parcellation = (parcellation._cx_name or "") + (parcellation._sc_name or "")
else:
parcellation = parcellation._name
elif bilateral and parcellation is None:
lgr.warning("bilateral=True has no effect without a parcellation.")
# Check dataset availability
if isinstance(dataset, str):
dataset = dataset.lower()
if dataset not in reference_lib:
lgr.critical_raise(f"Dataset '{dataset}' not found! Available datasets: {keys2str(reference_lib)}",
ValueError)
elif parcellation is None and "map" not in reference_lib[dataset]:
lgr.critical_raise(f"Dataset '{dataset}' is only available as parcellated data, choose a parcellation!",
ValueError)
else:
lgr.critical_raise(f"Invalid dataset type; expecting string or pandas DataFrame/Series, got {type(dataset)}",
TypeError)
lgr.info(f"Loading {dataset} maps.")
nispace_data_dir = _resolve_nispace_data_dir(nispace_data_dir)
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset
map_dir = base_dir / "map"
tab_dir = base_dir / "tab"
# Check if parcellation is defined correctly and load map lists
if parcellation is not None:
# check parcellation and return correct name or list of two names
parc = _check_parcellation(parcellation)
# bilateral symmetry guard: only valid for symmetric library parcellations
if bilateral:
names_to_check = parc if isinstance(parc, list) else [parc]
for _pname in names_to_check:
if not isinstance(_pname, str) or _pname not in parcellation_lib:
continue
is_sym = all(
space_data.get(
"symmetric",
"l2rmap" not in space_data and "lrcorr" not in space_data,
)
for space_data in parcellation_lib[_pname].values()
if isinstance(space_data, dict)
)
if not is_sym:
raise ValueError(
f"bilateral=True requires a symmetric parcellation. "
f"'{_pname}' is not symmetric."
)
# load maps from collection "All", which should be available for all datasets
maps_avail = _load_collection(get_file(
base_dir / f"collection-All.collect", **reference_lib[dataset]["collection"]["All"],
overwrite=overwrite, hash_check=check_file_hash,
))["map"].to_list()
# Check space availability and load map lists
else:
# get list of map image files
maps_avail = [m for m, v in reference_lib[dataset]["map"].items() if space in v]
if len(maps_avail) == 0:
lgr.critical_raise(f"Found no maps for space '{space}' in dataset '{dataset}'.",
ValueError)
# Remove private maps
if not osf_config_file and not github_config_file:
if "mni152" in space.lower():
maps_avail = [
m for m in maps_avail
if reference_lib[dataset]["map"][m][space]["host"] not in ["osfprivate", "github-nispace-private"]
]
else:
maps_avail = [
m for m in maps_avail
if (reference_lib[dataset]["map"][m][space]["L"]["host"]
if "L" in reference_lib[dataset]["map"][m][space]
else reference_lib[dataset]["map"][m][space]["R"]["host"])
not in ["osfprivate", "github-nispace-private"]
]
lgr.debug(f"Loaded {len(maps_avail)} unfiltered map(s). "
f"First 5: {maps_avail[:5] if len(maps_avail) >= 5 else maps_avail[:len(maps_avail)]}")
# Filter by 'maps'
if maps:
n_tmp = len(maps_avail)
lgr.info(f"Applying filter: {maps}")
maps_avail = _filter_maps(maps_avail, maps)
lgr.info(f"Filtered from {n_tmp} to {len(maps_avail)} maps.")
# Filter by 'collection'
if collection == "All":
collection = None
if collection:
collection_df, maps_avail = fetch_collection(
collection,
dataset,
maps_avail,
return_maps=True,
set_size_range=set_size_range,
weight_range=weight_range,
weight_quantile=weight_quantile,
set_specificity=set_specificity,
overwrite=overwrite,
check_file_hash=check_file_hash,
verbose=verbose
)
else:
collection_df = None
# Filter by 'sets'
if sets is not None:
if collection_df is None or "set" not in collection_df.columns:
lgr.warning(f"'sets={sets}' filter ignored: no collection with sets is loaded.")
else:
if isinstance(sets, str):
sets = [sets]
avail_sets = collection_df["set"].unique().tolist()
missing = [s for s in sets if s not in avail_sets]
if missing:
show = avail_sets[:20]
lgr.critical_raise(
f"Set(s) {missing} not found in collection. Available ({len(avail_sets)}): {show}"
+ (" ..." if len(avail_sets) > 20 else ""),
ValueError
)
collection_df = collection_df[collection_df["set"].isin(sets)]
maps_avail = collection_df["map"].tolist()
# Load tabulated data if 'parcellation' is specified
if parcellation:
data = _load_parcellated_data(
dataset=dataset,
parc=parc,
map_files=maps_avail,
collection_df=collection_df,
standardize=standardize_parcellated,
set_size_range=set_size_range,
nispace_data_dir=nispace_data_dir,
overwrite=overwrite,
check_file_hash=check_file_hash,
verbose=verbose,
)
# bilateral: average matched LH/RH columns by label prefix
if bilateral:
from .core.parcellation import _bilateral_labels_match
ok, lh_idc, rh_idc, bilateral_cols, unmatched = _bilateral_labels_match(
data.columns.tolist()
)
if not ok:
lgr.warning(
f"Bilateral averaging: label matching failed "
f"({len(unmatched)} unmatched label(s): {unmatched[:5]}). "
"Returning non-bilateral data."
)
else:
N_before = data.shape[1]
lh_vals = data.iloc[:, lh_idc].values
rh_vals = data.iloc[:, rh_idc].values
data = pd.DataFrame(
(lh_vals + rh_vals) / 2,
index=data.index,
columns=bilateral_cols,
)
lgr.info(f"Bilateral averaging applied: {N_before} → {len(bilateral_cols)} parcels.")
# Fetch paths to maps if no 'parcellation' is specified
else:
# get kwargs
get_file_kwargs = dict(
overwrite=overwrite, hash_check=check_file_hash,
osf_config_file=osf_config_file, github_config_file=github_config_file)
# MNI: one file per map
if "mni152" in space.lower():
data = [
get_file(
map_dir / m / f"{m}_space-{space}.%s", **reference_lib[dataset]["map"][m][space],
**get_file_kwargs,
)
for m in maps_avail
]
# surface: two files per map
else:
data = []
for m in maps_avail:
data.append(tuple([
get_file(
map_dir / m / f"{m}_space-{space}_hemi-{_h}.%s", **reference_lib[dataset]["map"][m][space][_h],
**get_file_kwargs,
)
for _h in reference_lib[dataset]["map"][m][space].keys()
]))
# Hemisphere filter
if hemi is not None:
_hemi_set = set([hemi] if isinstance(hemi, str) else list(hemi))
if not _hemi_set >= {"L", "R"}:
_h_keep = next(iter(_hemi_set), None)
if _h_keep in ("L", "R"):
if isinstance(data, pd.DataFrame):
_prefix = "hemi-L_" if _h_keep == "L" else "hemi-R_"
_keep_cols = [c for c in data.columns if str(c).startswith(_prefix)]
if _keep_cols:
data = data[_keep_cols]
lgr.info(f"hemi='{_h_keep}': kept {len(_keep_cols)} parcels.")
else:
lgr.warning(
f"hemi='{_h_keep}': no columns with prefix '{_prefix}' found. "
"Returning all columns."
)
else:
lgr.warning(
f"hemi='{_h_keep}' filtering is only supported for parcellated data "
"(parcellation= must be set). Full images returned."
)
# Print references
# for maps if "pet", or for sets if "mrna"
if return_metadata or print_references:
if dataset == "pet":
meta = fetch_metadata(dataset, maps_avail, overwrite=overwrite, check_file_hash=check_file_hash)
elif dataset in ["mrna", "magicc"] and collection_df is not None:
meta = fetch_metadata(dataset, collection=collection, overwrite=overwrite, check_file_hash=check_file_hash)
else:
meta = None
if return_metadata:
data = (data + (meta,)) if isinstance(data, tuple) else (data, meta)
if print_references & verbose:
_print_references(dataset, meta)
return data
# EXAMPLE DATA =====================================================================================
[docs]def fetch_example(example: str,
parcellation: str = None,
return_associated_data: bool = True,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch an example dataset.
"""
verbose = set_log(lgr, verbose)
nispace_data_dir = _resolve_nispace_data_dir(nispace_data_dir)
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "example"
# check available
example = example.lower()
if example not in example_lib:
lgr.critical_raise(f"Example '{example}' not found. Available: {list(example_lib.keys())}",
ValueError)
# check parcellation
if parcellation is not None:
parc = _check_parcellation(parcellation, force_list=True)
else:
lgr.critical_raise("Currently, only parcellated example datasets are available. Please specify a parcellation.",
ValueError)
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash)
# load tabulated data
if all(p in example_lib[example]["tab"] for p in parc):
lgr.info(f"Loading example dataset: '{example}', parcellated with: {''.join(parc)}.")
example_data = pd.concat([
pd.read_csv(
get_file(
base_dir / f"example-{example}_parc-{p}.csv.gz", **example_lib[example]["tab"][p],
**get_file_kwargs,
),
index_col=0
)
for p in parc
], axis=1)
else:
lgr.critical_raise(f"Parcellation '{parcellation}' not found for example '{example}'.\n"
f"Available parcellations: {list(example_lib[example]['tab'].keys())}",
ValueError)
# Check for info data
if return_associated_data and "info" in example_lib[example]:
lgr.info("Returning parcellated and associated subject data.")
example_info = pd.read_csv(
get_file(
base_dir / f"example-{example}_info.csv", **example_lib[example]["info"],
**get_file_kwargs,
),
index_col=0
)
return example_data, example_info
else:
return example_data