from typing import Union, List, Dict, Tuple
import pathlib
import pandas as pd
import numpy as np
import os
from . import lgr, __commit__
from .modules.constants import _PARC_DEFAULT, _SPACE_DEFAULT
from .stats.misc import zscore_df
from .utils.utils import _rm_ext, set_log, merge_parcellations
from .utils.utils_datasets import get_file
from .io import read_json, write_json, load_img, load_distmat, load_labels, load_l2rmap
from .nulls import _img_density_for_neuromaps
# Set the default nispace data directory environment variable
os.environ['NISPACE_DATA_DIR'] = str(pathlib.Path.home() / "nispace-data")
datalib_dir = pathlib.Path(__file__).parent / "datalib"
reference_lib = read_json(datalib_dir / "reference.json")
template_lib = read_json(datalib_dir / "template.json")
parcellation_lib = read_json(datalib_dir / "parcellation.json")
example_lib = read_json(datalib_dir / "example.json")
[docs]def keys2list(dct):
return list(dct.keys())
[docs]def keys2str(dct, sep=", "):
return sep.join(list(dct.keys()))
# EMPTY NISPACE DATA DIR ===========================================================================
# _EMPTY_DATA_CONFIRMED = False
# def empty_nispace_data_dir(nispace_data_dir: Union[str, pathlib.Path] = None):
# global _EMPTY_DATA_CONFIRMED
# if nispace_data_dir is None:
# nispace_data_dir = pathlib.Path.home() / "nispace-data"
# if not _EMPTY_DATA_CONFIRMED:
# lgr.warning("If you call this function again, it will remove all contents of your NiSpace "
# f"data directory at {nispace_data_dir}.")
# lgr.warning("Call it again to proceed.")
# _EMPTY_DATA_CONFIRMED = True
# else:
# lgr.warning(f"Emptying nispace data dir at {nispace_data_dir}.")
# shutil.rmtree(nispace_data_dir)
# nispace_data_dir.mkdir(parents=True, exist_ok=True)
# FILE HANDLING ====================================================================================
def _file_desc(fname, feature_position):
if isinstance(fname, pathlib.Path):
fname = fname.name
fname = fname.split(".")[0]
if isinstance(feature_position, int):
return fname.split("_")[feature_position].split("-")[1]
elif isinstance(feature_position, str):
return fname.split(f"{feature_position}-")[1].split("_")[0]
# BRAIN TEMPLATES ==================================================================================
[docs]def fetch_template(template: str = _SPACE_DEFAULT,
res: str = None,
desc: str = None,
#parcellation: str = None,
hemi: Union[List[str], str] = ["L", "R"],
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch a brain template.
Parameters
----------
template : str, optional
The template to fetch. Default is "MNI152NLin2009cAsym".
res : str, optional
The resolution of the template to fetch. If None, will default to "1mm" for MNI152 and
"10k" for fsaverage.
desc : str, optional
The description of the template to fetch. If None, will default to "T1w" for MNI152 and
"pial" for fsaverage.
hemi : list of str, optional
The hemispheres to fetch. Default is ["L", "R"].
nispace_data_dir : str or pathlib.Path, optional
The directory containing the NiSpace data. Default is None.
Returns
-------
The template.
"""
verbose = set_log(lgr, verbose)
# check if template exists
if template not in template_lib:
raise ValueError(f"Template '{template}' not found. Available: {keys2str(template_lib)}")
# data directory
# warn if the parameter is used
if nispace_data_dir is not None:
lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.")
os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir)
nispace_data_dir = os.getenv('NISPACE_DATA_DIR')
# paths
base_dir = pathlib.Path(nispace_data_dir) / "template" / template
map_dir = base_dir / "map"
# set defaults:
if "mni" in template.lower():
res = "1mm" if res is None else res
desc = "T1w" if desc is None else desc
hemi = None
elif "fsa" in template.lower():
res = "41k" if res is None else res
desc = "pial" if desc is None else desc
if hemi is None:
hemi = ["L", "R"]
# check settings
if res not in template_lib[template]:
raise ValueError(f"res = '{res}' not defined. Choose one of {keys2str(template_lib[template])}!")
if desc not in template_lib[template][res]:
raise ValueError(f"desc = '{desc}' not defined. Choose one of {keys2str(template_lib[template][res])}!")
if hemi is not None:
if isinstance(hemi, str):
hemi = [hemi]
if hemi not in [["L"], ["R"], ["L", "R"]]:
raise ValueError(f"hemi = '{hemi}' not defined. Choose one of 'L', 'R', or ['L', 'R']!")
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash)
# get file
lgr.info(f"Loading {template} '{desc}' template in '{res}' resolution.")
if "mni" in template.lower():
tpl_file = get_file(
map_dir / desc / f"tpl-{template}_desc-{desc}_res-{res}.%s",
**template_lib[template][res][desc],
**get_file_kwargs,
)
else:
tpl_file = ()
for h in hemi:
tpl_file += get_file(
map_dir / desc / f"tpl-{template}_desc-{desc}_res-{res}_hemi-{h}.%s",
**template_lib[template][res][desc][h],
**get_file_kwargs,
),
if len(tpl_file) == 1:
tpl_file = tpl_file[0]
# f
return tpl_file
# PARCELLATIONS ===================================================================================
def _parc_alias(parcellation: str):
if "alias" in parcellation_lib[parcellation]:
parc = parcellation_lib[parcellation]["alias"]
else:
parc = parcellation
return parc
def _parc_symmetric(parc_labels):
labels_lh = [l.split("hemi-L")[1] for l in parc_labels if "hemi-L" in l]
labels_rh = [l.split("hemi-R")[1] for l in parc_labels if "hemi-R" in l]
if not labels_lh or not labels_rh:
return False
if labels_lh == labels_rh:
return True
return False
def _print_parcellations():
return ", ".join([p for p in parcellation_lib.keys() if "alias" not in parcellation_lib[p]])
def _check_parcellation(parcellation: str, force_list: bool = False, force_str: bool = False):
"""
Check if a parcellation name is valid and return the correct parcellation name as a string or
a list of strings containing a cortex-subcortex combination.
"""
# Parcellation can be a string as it appears in parcellation_lib (e.g., "Schaefer100")
# OR multiple strings from parcellation_lib concatenated (e.g., "Schaefer100TianS1")
# (1) We check if parcellation is a string
assert isinstance(parcellation, str), f"Parcellation must be of type string, not {type(parcellation)}!"
# (2) We check if it is in parcellation_lib as is
if parcellation in parcellation_lib:
parc = _parc_alias(parcellation)
# (3) If not, we check if we get a partial match
else:
# get a list of potential partial matches
parc = list(set([_parc_alias(p) for p in parcellation_lib if p in parcellation]))
# (3a) No match found: raise error
if len(parc) == 0:
lgr.critical_raise(f"Parcellation '{parcellation}' not found.\nAvailable "
f"(cortex-subcortex-combinations allowed): {_print_parcellations()}",
ValueError)
# (3b) > 2 matches found: raise error
elif len(parc) > 2:
lgr.critical_raise(f"Parcellation '{parcellation}' matches more than 2 parcellations: {', '.join(parc)}.",
ValueError)
# (3c) 1 match found: use it
elif len(parc) == 1:
parc = parc[0]
# (3d) 2 matches found: check if they are cortex-subcortex combinations
else:
levels = []
for p in parc:
p_space = list(parcellation_lib[p].keys())[0]
levels.append(parcellation_lib[p][p_space]["level"])
if set(levels) != {"cortex", "subcortex"}:
lgr.critical_raise(f"Only cortex-subcortex combinations are allowed, not: {', '.join(levels)} ",
ValueError)
else:
# if we got to here, we have a cortex-subcortex combination; now ensure correct order
parc = [parc[levels.index("cortex")], parc[levels.index("subcortex")]]
# output format
if force_list and not force_str and isinstance(parc, str):
parc = [parc]
elif force_str and not force_list and isinstance(parc, list):
parc = "".join(parc)
return parc
[docs]def fetch_parcellation(parcellation: str = _PARC_DEFAULT,
space: str = None,
hemi: Union[List[str], str] = ["L", "R"],
return_labels: bool = True,
return_space: bool = False,
return_resolution: bool = False,
return_symmetric: bool = False,
return_l2rmap: bool = False,
return_dist_mat: bool = False,
return_loaded: bool = True,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch a parcellation.
"""
verbose = set_log(lgr, verbose)
# check parcellation and return correct name or list of two names
parc = _check_parcellation(parcellation)
# if list, we need to merge parcellation and associated data , so we need to load stuff
return_loaded = True if isinstance(parc, str) else return_loaded
# data directory
# warn if the parameter is used
if nispace_data_dir is not None:
lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.")
os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir)
nispace_data_dir = os.getenv('NISPACE_DATA_DIR')
# function to load individual parcellation and associated data
def load_parc(p, space=space, hemi=hemi, return_labels=return_labels, return_space=return_space,
return_resolution=return_resolution, return_symmetric=return_symmetric, return_l2rmap=return_l2rmap,
return_dist_mat=return_dist_mat, return_loaded=return_loaded,
nispace_data_dir=nispace_data_dir, overwrite=overwrite, check_file_hash=check_file_hash):
# Check space
if space is None:
# get default space -> first space listed in parcellation_lib
space = list(parcellation_lib[p].keys())[0]
else:
if space not in parcellation_lib[p]:
lgr.critical_raise(f"Space '{space}' not found for parcellation '{p}'.\n"
f"Available: {keys2str(parcellation_lib[p])}",
ValueError)
# data directory
base_dir = pathlib.Path(nispace_data_dir) / "parcellation" / p / space
# Symmetry
if "l2rmap" in parcellation_lib[p][space]:
symmetric = False
else:
symmetric = True
# LOAD
lgr.info(f"Loading {parcellation_lib[p][space]['level']} parcellation '{p}' in '{space}' space.")
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash)
# volume
if "mni" in space.lower():
# get files
parcellation_file = get_file(
base_dir / f"parc-{p}_space-{space}.%s", **parcellation_lib[p][space]["map"],
**get_file_kwargs,
)
if return_labels:
label_file = get_file(
base_dir / f"parc-{p}_space-{space}.label.txt", **parcellation_lib[p][space]["label"],
**get_file_kwargs,
)
if return_l2rmap and not symmetric:
l2rmap_file = get_file(
base_dir / f"parc-{p}_space-{space}.l2rmap.csv.gz", **parcellation_lib[p][space]["l2rmap"],
**get_file_kwargs,
)
elif return_l2rmap and symmetric:
l2rmap_file = None
if return_dist_mat:
distmat_file = get_file(
base_dir / f"parc-{p}_space-{space}.dist.csv.gz", **parcellation_lib[p][space]["distmat"],
**get_file_kwargs,
)
# surface
else:
# check hemis
if isinstance(hemi, str):
hemi = [hemi]
if hemi not in [["L"], ["R"], ["L", "R"]]:
raise ValueError(f"hemi = '{hemi}' not defined. Choose one of 'L', 'R', or ['L', 'R']!")
# get files
parcellation_file, label_file, distmat_file = (), (), ()
for h in hemi:
parcellation_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.%s", **parcellation_lib[p][space]["map"][h],
**get_file_kwargs,
),
if return_labels:
label_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.label.txt", **parcellation_lib[p][space]["label"][h],
**get_file_kwargs,
),
if return_dist_mat:
if "fslr" in space.lower():
lgr.warning("Distance matrices for fslr spaces are currently not available. Returning None.")
distmat_file += None,
else:
distmat_file += get_file(
base_dir / f"parc-{p}_space-{space}_hemi-{h}.dist.csv.gz", **parcellation_lib[p][space]["distmat"][h],
**get_file_kwargs,
),
if return_l2rmap and not symmetric:
l2rmap_file = get_file(
base_dir / f"parc-{p}_space-{space}.l2rmap.csv.gz", **parcellation_lib[p][space]["l2rmap"],
**get_file_kwargs,
)
elif return_l2rmap and symmetric:
l2rmap_file = None
if len(parcellation_file) == 1:
parcellation_file, label_file, distmat_file, l2rmap_file = parcellation_file[0], label_file[0], distmat_file[0], None
# return
# build output
out = {}
# parc
out["parc"] = load_img(parcellation_file) if return_loaded else parcellation_file
# label
if return_labels:
out["label"] = load_labels(label_file) if return_loaded else label_file
# space
if return_space:
out["space"] = space
# res
if return_resolution:
out["res"] = _img_density_for_neuromaps(load_img(parcellation_file))
# symmetric
if return_symmetric:
out["sym"] = symmetric
# l2rmap
if return_l2rmap:
out["l2rmap"] = load_l2rmap(l2rmap_file) if return_loaded else l2rmap_file
# distmat
if return_dist_mat:
out["distmat"] = load_distmat(distmat_file) if return_loaded else distmat_file
return out
# run load_parc for a single parcellation
if isinstance(parc, str):
out = load_parc(parc)
if len(out) == 1:
return list(out.values())[0]
else:
return tuple(out.values())
# run load_parc for 2 parcellations
out_cortex = load_parc(parc[0])
out_subcortex = load_parc(parc[1])
lgr.info(f"Merging to cortex-subcortex parcellation '{parc[0]}{parc[1]}'.")
# now, we will have to combine the data
out = {}
# combine parcellations
out["parc"] = merge_parcellations([out_cortex["parc"], out_subcortex["parc"]], quick=True)#[0]
# label
if return_labels:
out["label"] = out_cortex["label"] + out_subcortex["label"]
# space
if return_space:
out["space"] = out_cortex["space"]
# res
if return_resolution:
out["res"] = out_cortex["res"]
# symmetric
if return_symmetric:
out["sym"] = True if out_cortex["sym"] and out_subcortex["sym"] else False
# l2rmap
if return_l2rmap:
if out_cortex["l2rmap"] is None and out_subcortex["l2rmap"] is None:
out["l2rmap"] = None
else:
if not return_labels:
lgr.critical_raise("Cannot return merged l2rmap when return_labels=False!", ValueError)
out["l2rmap"] = pd.DataFrame(
np.eye(len(out["label"]) // 2),
index=[l for l in out["label"] if "hemi-L" in l],
columns=[l for l in out["label"] if "hemi-R" in l]
)
if out_cortex["l2rmap"] is not None:
out["l2rmap"].loc[out_cortex["l2rmap"].index, out_cortex["l2rmap"].columns] = out_cortex["l2rmap"]
if out_subcortex["l2rmap"] is not None:
out["l2rmap"].loc[out_subcortex["l2rmap"].index, out_subcortex["l2rmap"].columns] = out_subcortex["l2rmap"]
# distmat
if return_dist_mat:
lgr.info("Distance matrices for merged parcellations are currently not available. Returning None.")
out["distmat"] = None
# return
if len(out) == 1:
return list(out.values())[0]
else:
return tuple(out.values())
[docs]def fetch_collection(collection: Union[str, pathlib.Path, np.ndarray, pd.DataFrame, pd.Series, list],
dataset: str = None,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch a collection to subset a map dataset.
Args:
dataset: str
collection: Union[str, pathlib.Path, np.ndarray, pd.DataFrame, pd.Series, list]
nispace_data_dir: Union[str, pathlib.Path]
verbose: bool
"""
verbose = set_log(lgr, verbose)
# If dataset is provided, assume to load integrated collection
# check if dataset is valid
if dataset is not None:
if dataset not in reference_lib:
lgr.critical_raise(f"Dataset '{dataset}' not found. Available: {keys2str(reference_lib)}",
ValueError)
else:
# data directory
# warn if the parameter is used
if nispace_data_dir is not None:
lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.")
os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir)
nispace_data_dir = os.getenv('NISPACE_DATA_DIR')
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset
# get integrated collection
if collection in reference_lib[dataset]["collection"]:
lgr.info(f"Loading integrated collection '{collection}' for dataset '{dataset}'.")
collection_path = base_dir / f"collection-{collection}.collect"
collection_file = get_file(
collection_path, **reference_lib[dataset]["collection"][collection],
overwrite=overwrite, hash_check=check_file_hash,
)
else:
lgr.critical_raise(f"Collection '{collection}' not found for dataset '{dataset}'. "
f"Available: {keys2str(reference_lib[dataset]['collection'])}",
ValueError)
# dataset is not provided, assume to load custom collection
else:
# check if collection is a file
if isinstance(collection, (str, pathlib.Path)):
collection_file = pathlib.Path(collection)
if collection_file.exists():
lgr.info(f"Loading custom collection from file: {collection_file}")
else:
lgr.critical_raise(f"Assuming collection '{collection_file}' to be a file, but it does not exist! "
"Ensure that the file exists and try again. If you want to load an integrated collection, "
"use the 'dataset' argument.",
ValueError)
# else, assume to load array-like object
else:
lgr.info(f"Loading custom collection of type {type(collection)}.")
collection_file = collection
# Load collection file; 1-column df (= maps) or 2-column df (= set and maps)
collection_df = _load_collection(collection_file)
# return
return collection_df
# REFERENCE DATA - PRIVATE =========================================================================
def _filter_maps(maps_avail: List[str],
maps: Union[str, List[str], Dict[str, Union[str, list]]]) -> List[pathlib.Path]:
def matches_filters(map_name: str, filters: Dict[str, Union[str, List[str]]]) -> bool:
for filter_name, filter_content in filters.items():
if filter_content not in [None, False, "", []]:
if isinstance(filter_content, (str, int)):
filter_content = [filter_content]
filter_content = list(map(str, filter_content))
if filter_name == "n" and filter_content[0].startswith(">"):
try:
filter_n = int(filter_content[0].replace(">", ""))
n_value = int(_file_desc(map_name, 2))
if n_value <= filter_n:
return False
except (ValueError, IndexError):
continue # Skip this filter if parsing fails
else:
if not any(f"{filter_name}-{content}".lower() in map_name.lower()
for content in filter_content):
return False
return True
if isinstance(maps, str):
maps = [maps]
if isinstance(maps, list):
maps = list(set(maps))
filtered_maps = [f for f in maps_avail if any(map_str in f for map_str in maps)]
elif isinstance(maps, dict):
filtered_maps = [f for f in maps_avail if matches_filters(f, maps)]
else:
filtered_maps = maps_avail
return filtered_maps
def _load_collection(collection_path):
# if path, read file
if isinstance(collection_path, (str, pathlib.Path)):
collection_path = pathlib.Path(collection_path)
ext = collection_path.suffix
# if "collect" file, detect if dict or table
if ext == ".collect":
with open(collection_path) as f:
header = f.readline()
if header.startswith("{"):
ext = ".json"
else:
ext = ".csv"
# if json, load into dict
if ext == ".json":
collection = read_json(collection_path)
# else, try to directly load as table file
else:
with open(collection_path) as f:
header = f.readline().strip("\n")
if any([h in header for h in ["set", "map", "weight"]]):
header = 0
else:
header = None
collection = pd.read_csv(collection_path, header=header, sep=",")
else:
collection = collection_path
# if array, convert all do df
if isinstance(collection, (np.ndarray, pd.DataFrame, pd.Series, list)):
collection = pd.DataFrame(collection)
# if dict, convert to df as well
elif isinstance(collection, dict):
collection = pd.concat([pd.DataFrame({0:k, 1:v}) for k, v in collection.items()])
# else
else:
raise TypeError(f"Datatype {type(collection_path)} not accepted for argument 'collection'.")
# process depending on number of columns
n_cols = collection.shape[1]
if n_cols == 0:
raise ValueError("No columns detected in collection file?!")
elif n_cols == 1:
collection.columns = ["map"]
elif n_cols == 2:
collection.columns = ["set", "map"]
elif n_cols == 3:
collection.columns = ["set", "map", "weight"]
else:
raise ValueError(f"Collection file with > 3 columns not supported ({n_cols} columns)!")
# return
return collection.reset_index(drop=True)
def _apply_collection_filter(dataset: str,
map_files: List[Union[str, pathlib.Path]],
collection: str,
nispace_data_dir: Union[str, pathlib.Path],
set_size_range: Union[None, Tuple[int, int]] = None,
overwrite: bool = False,
check_file_hash: bool = True) -> List[pathlib.Path]:
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset
# Check if path to custom file
collection_path = pathlib.Path(collection)
if not collection_path.exists():
# If not exists, search integrated collections
if collection in reference_lib[dataset]["collection"]:
collection_path = base_dir / f"collection-{collection}.collect"
collection_file = get_file(
collection_path, **reference_lib[dataset]["collection"][collection],
overwrite=overwrite, hash_check=check_file_hash,
)
else:
lgr.warning(f"Collection '{collection}' not found! Available: "
f"{keys2str(reference_lib[dataset]['collection'])}")
return map_files, None
# Load collection file; 1-column df (= maps) or 2-column df (= set and maps)
collection_df = _load_collection(collection_file)
lgr.debug(f"Collection df shape: {collection_df.shape}; "
f"index names: {collection_df.index.names}; "
f"column names: {collection_df.columns.names}")
# Apply collection filter
lgr.info(f"Applying collection filter from: {collection_file}.")
if isinstance(map_files[0], pathlib.Path):
map_names = [_rm_ext(f.name) for f in map_files]
filtered_map_files = [f for f, f_name in zip(map_files, map_names)
if f_name in collection_df["map"].unique()]
collection_df = collection_df[collection_df["map"].isin(map_names)]
else:
filtered_map_files = [f for f in map_files if f in collection_df["map"].unique()]
collection_df = collection_df[collection_df["map"].isin(filtered_map_files)]
# Apply size filter
if set_size_range is not None:
if "set" in collection_df.columns and isinstance(set_size_range, (tuple, list)):
set_size_range = [
x if x is not None else x_
for x, x_
in zip(set_size_range, (1, np.inf))
]
lgr.info(f"Filtering to collection sets with between {set_size_range[0]} and "
f"{set_size_range[1]} maps.")
collection_df = (
collection_df
.groupby("set")
.filter(lambda x: set_size_range[0] <= x.shape[0] <= set_size_range[1])
)
filtered_map_files = [f for f in map_files if f in collection_df["map"].unique()]
return filtered_map_files, collection_df
def _load_parcellated_data(dataset: str,
nispace_data_dir: Union[str, pathlib.Path],
parc: Union[str, List[str]],
map_files: List[str],
collection_df: pd.DataFrame,
standardize: bool,
merge_how: str = "inner",
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True) -> Union[pd.DataFrame, Tuple[pd.DataFrame, Dict]]:
verbose = set_log(lgr, verbose)
# tab dir
tab_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset / "tab"
# parcellation can be string with one parcellation name or list of two parcellation names
if isinstance(parc, str):
lgr.info(f"Loading data parcellated with '{parc}'")
# all to list
parc = [parc]
elif isinstance(parc, list):
lgr.info(f"Loading and {merge_how}-merging data parcellated with '{parc[0]}' and '{parc[1]}'")
else:
lgr.critical_raise(f"Invalid parcellation type: {type(parc)}", ValueError)
# loop through parcellations
data = []
for p in parc:
# check if parcellation available for this data
if p not in reference_lib[dataset]["tab"]:
lgr.critical_raise(f"Dataset '{dataset}' is not available for parcellation '{p}'!\n"
f"Available: {keys2str(reference_lib[dataset]['tab'])}",
FileNotFoundError)
# file
parcellation_file = tab_dir / f"dset-{dataset}_parc-{p}.csv.gz"
lgr.debug(f"Loading {parcellation_file}")
# load data
data.append(pd.read_csv(
get_file(
parcellation_file, **reference_lib[dataset]["tab"][p],
overwrite=overwrite, hash_check=check_file_hash,
),
index_col=0
))
lgr.debug(f"Loaded parcellated data of shape {data[-1].shape}")
lgr.debug(f"First 5 map names: {data[-1].index.to_list()[:5]}")
# merge if necessary: all maps are kept even if they are not present in both parcellations
if len(parc) > 1:
data = data[0].merge(data[1], how=merge_how, left_index=True, right_index=True)
else:
data = data[0]
# Apply filter to the dataframe index
lgr.debug(f"Applying filtering based on maps, first 5: {map_files[:5]}")
if isinstance(map_files[0], pathlib.Path):
map_files = [_rm_ext(f.name) for f in map_files]
data = data.loc[data.index.intersection(map_files)]
lgr.debug(f"Shape after filtering based on map_names: {data.shape}")
# Apply collection index (-> handles maps that are present multiple times in different sets)
if collection_df is not None:
maps_intersection = data.index.intersection(collection_df["map"].unique())
collection_df_intersection = collection_df.query("map in @maps_intersection")
data = data.loc[collection_df_intersection["map"]]
data.index = pd.MultiIndex.from_frame(collection_df_intersection)
# Standardize
if standardize:
lgr.info("Standardizing parcellated data.")
data = zscore_df(data, along="rows")
return data
def _print_references(dataset: str, meta: pd.DataFrame = None):
# info file
def get_ref_info(dataset, add_commit=True):
get_line = False
msg = ""
with open(datalib_dir / "reference.txt", "r") as f:
for line in f:
if line.lower().startswith(f"# {dataset.lower()}"):
get_line = True
continue
if get_line and line == "\n":
break
if get_line:
msg += line
if add_commit:
msg += f"To ensure reproducibility, note the NiSpace commit/version: {__commit__}\n"
msg += "\n"
return msg
# PET
if dataset.lower() == "pet":
msg = get_ref_info(dataset)
if meta is not None:
atlas_maxlen = max([len(x) for x in meta["atlas"]])
author_maxlen = max([len(x) for x in meta["publication"]])
license_maxlen = max([len(x) for x in meta["license"]])
for atlas, pub, doi, license, note in zip(
meta["atlas"], meta["publication"], meta["doi"], meta["license"], meta["note"]
):
doi_list = [f"https://doi.org/{doi}" for doi in doi.replace(" ", "").split(";")]
if "" in doi_list: doi_list.remove("")
doi_str = ", ".join(doi_list)
atlas = atlas.ljust(atlas_maxlen)
author = pub.capitalize().ljust(author_maxlen)
license = license.ljust(license_maxlen)
msg += f"- {atlas} Source: {author} {license} {doi_str}\n"
if not pd.isna(note):
msg += f" CAVE: {note}\n"
# all others
else:
msg = get_ref_info(dataset)
if meta is not None:
if len(meta) > 0:
collection_maxlen = max([len(x) for x in meta["collection"]])
author_maxlen = max([len(x) for x in meta["author"]])
for collection, pub, doi in zip(meta["collection"], meta["author"], meta["doi"]):
collection = collection.ljust(collection_maxlen)
author = pub.capitalize().ljust(author_maxlen)
msg += f"- {collection} Source: {author} https://doi.org/{doi}\n"
# RSN
# elif dataset.lower() == "rsn":
# msg = get_ref_info(dataset)
# if meta is not None:
# if len(meta) > 0:
# author_maxlen = max([len(x) for x in meta["author"]])
# for pub, doi in zip(meta["author"], meta["doi"]):
# author = pub.capitalize().ljust(author_maxlen)
# msg += f"- {author} https://doi.org/{doi}\n"
# print
# if msg[-2:] != "\n":
# msg += "\n"
print(msg)
# REFERENCE DATA - PUBLIC ==========================================================================
[docs]def fetch_reference(dataset: str,
maps: Union[None, str, List[str], Dict[str, Union[str, list]]] = None,
space: str = _SPACE_DEFAULT,
collection: str = None,
set_size_range: Union[None, Tuple[int, int]] = None,
parcellation: str = None,
standardize_parcellated: bool = False,
return_metadata: bool = False,
print_references: bool = True,
osf_config_file: str = None,
github_config_file: str = None,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
verbose = set_log(lgr, verbose)
# Check dataset availability
if isinstance(dataset, str):
dataset = dataset.lower()
if dataset not in reference_lib:
lgr.critical_raise(f"Dataset '{dataset}' not found! Available datasets: {keys2str(reference_lib)}",
ValueError)
elif parcellation is None and "map" not in reference_lib[dataset]:
lgr.critical_raise(f"Dataset '{dataset}' is only available as parcellated data, choose a parcellation!",
ValueError)
else:
lgr.critical_raise(f"Invalid dataset type; expecting string.",
TypeError)
lgr.info(f"Loading {dataset} maps.")
# data directory
# warn if the parameter is used
if nispace_data_dir is not None:
lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.")
os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir)
nispace_data_dir = os.getenv('NISPACE_DATA_DIR')
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset
map_dir = base_dir / "map"
tab_dir = base_dir / "tab"
# Check if parcellation is defined correctly and load map lists
if parcellation is not None:
# check parcellation and return correct name or list of two names
parc = _check_parcellation(parcellation)
# load maps from collection "All", which should be available for all datasets
maps_avail = _load_collection(get_file(
base_dir / f"collection-All.collect", **reference_lib[dataset]["collection"]["All"],
overwrite=overwrite, hash_check=check_file_hash,
))["map"].to_list()
# Check space availability and load map lists
else:
# get list of map image files
maps_avail = [m for m, v in reference_lib[dataset]["map"].items() if space in v]
if len(maps_avail) == 0:
lgr.critical_raise(f"Found no maps for space '{space}' in dataset '{dataset}'.",
ValueError)
# Remove private maps
if not osf_config_file and not github_config_file:
if "mni152" in space.lower():
maps_avail = [
m for m in maps_avail
if reference_lib[dataset]["map"][m][space]["host"] not in ["osfprivate", "github-nispace-private"]
]
else:
maps_avail = [
m for m in maps_avail
if (reference_lib[dataset]["map"][m][space]["L"]["host"]
if "L" in reference_lib[dataset]["map"][m][space]
else reference_lib[dataset]["map"][m][space]["R"]["host"])
not in ["osfprivate", "github-nispace-private"]
]
lgr.debug(f"Loaded {len(maps_avail)} unfiltered map(s). "
f"First 5: {maps_avail[:5] if len(maps_avail) >= 5 else maps_avail[:len(maps_avail)]}")
# Filter by 'maps'
if maps:
n_tmp = len(maps_avail)
lgr.info(f"Applying filter: {maps}")
maps_avail = _filter_maps(maps_avail, maps)
# if "map" not in reference_lib[dataset]:
# maps_avail = _filter_maps(maps_avail, maps)
# else:
# if isinstance(maps, str):
# maps = [maps]
# elif not isinstance(maps, (list, tuple, set, pd.Series)):
# lgr.warning(f"For dataset '{dataset}', 'maps' must be list-like. Skipping filter.")
# maps = maps_avail
# maps_avail = list(set(maps_avail).intersection(maps))
lgr.info(f"Filtered from {n_tmp} to {len(maps_avail)} maps.")
# Filter by 'collection'
if collection == "All":
collection = None
if collection:
maps_avail, collection_df = _apply_collection_filter(
dataset, maps_avail, collection,
set_size_range=set_size_range,
nispace_data_dir=nispace_data_dir,
overwrite=overwrite,
check_file_hash=check_file_hash
)
else:
collection_df = None
# Load tabulated data if 'parcellation' is specified
if parcellation:
data = _load_parcellated_data(
dataset=dataset,
parc=parc,
map_files=maps_avail,
collection_df=collection_df,
standardize=standardize_parcellated,
nispace_data_dir=nispace_data_dir,
overwrite=overwrite,
check_file_hash=check_file_hash,
verbose=verbose,
)
# Fetch paths to maps if no 'parcellation' is specified
else:
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash,
osf_config_file=osf_config_file, github_config_file=github_config_file)
# MNI: one file per map
if "mni152" in space.lower():
data = [
get_file(
map_dir / m / f"{m}_space-{space}.%s", **reference_lib[dataset]["map"][m][space],
**get_file_kwargs,
)
for m in maps_avail
]
# surface: two files per map
else:
data = []
for m in maps_avail:
data.append(tuple([
get_file(
map_dir / m / f"{m}_space-{space}_hemi-{hemi}.%s", **reference_lib[dataset]["map"][m][space][hemi],
**get_file_kwargs,
)
for hemi in reference_lib[dataset]["map"][m][space].keys()
]))
# Print references
# for maps if "pet", or for sets if "mrna"
if return_metadata or print_references:
if dataset == "pet":
meta = fetch_metadata(dataset, maps_avail, overwrite=overwrite, check_file_hash=check_file_hash)
elif dataset in ["mrna", "magicc"] and collection_df is not None:
meta = fetch_metadata(dataset, collection=collection, overwrite=overwrite, check_file_hash=check_file_hash)
else:
meta = None
if return_metadata:
data = (data + (meta,)) if isinstance(data, tuple) else (data, meta)
if print_references & verbose:
_print_references(dataset, meta)
return data
# EXAMPLE DATA =====================================================================================
[docs]def fetch_example(example: str,
parcellation: str = None,
return_associated_data: bool = True,
nispace_data_dir: Union[str, pathlib.Path] = None,
overwrite: bool = False,
check_file_hash: bool = True,
verbose: bool = True):
"""
Fetch an example dataset.
"""
verbose = set_log(lgr, verbose)
# data directory
# warn if the parameter is used
if nispace_data_dir is not None:
lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.")
os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir)
nispace_data_dir = os.getenv('NISPACE_DATA_DIR')
# base dir
base_dir = pathlib.Path(nispace_data_dir) / "example"
# check available
example = example.lower()
if example not in example_lib:
lgr.critical_raise(f"Example '{example}' not found. Available: {list(example_lib.keys())}",
ValueError)
# check parcellation
if parcellation is not None:
parc = _check_parcellation(parcellation, force_list=True)
else:
lgr.critical_raise("Currently, only parcellated example datasets are available. Please specify a parcellation.",
ValueError)
# get kwargs
get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash)
# load tabulated data
if all(p in example_lib[example]["tab"] for p in parc):
lgr.info(f"Loading example dataset: '{example}', parcellated with: {''.join(parc)}.")
example_data = pd.concat([
pd.read_csv(
get_file(
base_dir / f"example-{example}_parc-{p}.csv.gz", **example_lib[example]["tab"][p],
**get_file_kwargs,
),
index_col=0
)
for p in parc
], axis=1)
else:
lgr.critical_raise(f"Parcellation '{parcellation}' not found for example '{example}'.\n"
f"Available parcellations: {list(example_lib[example]['tab'].keys())}",
ValueError)
# Check for info data
if return_associated_data and "info" in example_lib[example]:
lgr.info("Returning parcellated and associated subject data.")
example_info = pd.read_csv(
get_file(
base_dir / f"example-{example}_info.csv", **example_lib[example]["info"],
**get_file_kwargs,
),
index_col=0
)
return example_data, example_info
else:
return example_data