Source code for nispace.datasets

from typing import Union, List, Dict, Tuple
import pathlib
import pandas as pd
import numpy as np
import os

from . import lgr, __commit__
from .modules.constants import _PARC_DEFAULT, _SPACE_DEFAULT
from .stats.misc import zscore_df
from .utils.utils import _rm_ext, set_log, merge_parcellations
from .utils.utils_datasets import get_file
from .io import read_json, write_json, load_img, load_distmat, load_labels, load_l2rmap
from .nulls import _img_density_for_neuromaps

# Set the default nispace data directory environment variable
os.environ['NISPACE_DATA_DIR'] = str(pathlib.Path.home() / "nispace-data")

datalib_dir = pathlib.Path(__file__).parent / "datalib"
reference_lib = read_json(datalib_dir / "reference.json")
template_lib = read_json(datalib_dir / "template.json")
parcellation_lib = read_json(datalib_dir / "parcellation.json")
example_lib = read_json(datalib_dir / "example.json")

[docs]def keys2list(dct): return list(dct.keys())
[docs]def keys2str(dct, sep=", "): return sep.join(list(dct.keys()))
# EMPTY NISPACE DATA DIR =========================================================================== # _EMPTY_DATA_CONFIRMED = False # def empty_nispace_data_dir(nispace_data_dir: Union[str, pathlib.Path] = None): # global _EMPTY_DATA_CONFIRMED # if nispace_data_dir is None: # nispace_data_dir = pathlib.Path.home() / "nispace-data" # if not _EMPTY_DATA_CONFIRMED: # lgr.warning("If you call this function again, it will remove all contents of your NiSpace " # f"data directory at {nispace_data_dir}.") # lgr.warning("Call it again to proceed.") # _EMPTY_DATA_CONFIRMED = True # else: # lgr.warning(f"Emptying nispace data dir at {nispace_data_dir}.") # shutil.rmtree(nispace_data_dir) # nispace_data_dir.mkdir(parents=True, exist_ok=True) # FILE HANDLING ==================================================================================== def _file_desc(fname, feature_position): if isinstance(fname, pathlib.Path): fname = fname.name fname = fname.split(".")[0] if isinstance(feature_position, int): return fname.split("_")[feature_position].split("-")[1] elif isinstance(feature_position, str): return fname.split(f"{feature_position}-")[1].split("_")[0] # BRAIN TEMPLATES ==================================================================================
[docs]def fetch_template(template: str = _SPACE_DEFAULT, res: str = None, desc: str = None, #parcellation: str = None, hemi: Union[List[str], str] = ["L", "R"], nispace_data_dir: Union[str, pathlib.Path] = None, overwrite: bool = False, check_file_hash: bool = True, verbose: bool = True): """ Fetch a brain template. Parameters ---------- template : str, optional The template to fetch. Default is "MNI152NLin2009cAsym". res : str, optional The resolution of the template to fetch. If None, will default to "1mm" for MNI152 and "10k" for fsaverage. desc : str, optional The description of the template to fetch. If None, will default to "T1w" for MNI152 and "pial" for fsaverage. hemi : list of str, optional The hemispheres to fetch. Default is ["L", "R"]. nispace_data_dir : str or pathlib.Path, optional The directory containing the NiSpace data. Default is None. Returns ------- The template. """ verbose = set_log(lgr, verbose) # check if template exists if template not in template_lib: raise ValueError(f"Template '{template}' not found. Available: {keys2str(template_lib)}") # data directory # warn if the parameter is used if nispace_data_dir is not None: lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.") os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir) nispace_data_dir = os.getenv('NISPACE_DATA_DIR') # paths base_dir = pathlib.Path(nispace_data_dir) / "template" / template map_dir = base_dir / "map" # set defaults: if "mni" in template.lower(): res = "1mm" if res is None else res desc = "T1w" if desc is None else desc hemi = None elif "fsa" in template.lower(): res = "41k" if res is None else res desc = "pial" if desc is None else desc if hemi is None: hemi = ["L", "R"] # check settings if res not in template_lib[template]: raise ValueError(f"res = '{res}' not defined. Choose one of {keys2str(template_lib[template])}!") if desc not in template_lib[template][res]: raise ValueError(f"desc = '{desc}' not defined. Choose one of {keys2str(template_lib[template][res])}!") if hemi is not None: if isinstance(hemi, str): hemi = [hemi] if hemi not in [["L"], ["R"], ["L", "R"]]: raise ValueError(f"hemi = '{hemi}' not defined. Choose one of 'L', 'R', or ['L', 'R']!") # get kwargs get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash) # get file lgr.info(f"Loading {template} '{desc}' template in '{res}' resolution.") if "mni" in template.lower(): tpl_file = get_file( map_dir / desc / f"tpl-{template}_desc-{desc}_res-{res}.%s", **template_lib[template][res][desc], **get_file_kwargs, ) else: tpl_file = () for h in hemi: tpl_file += get_file( map_dir / desc / f"tpl-{template}_desc-{desc}_res-{res}_hemi-{h}.%s", **template_lib[template][res][desc][h], **get_file_kwargs, ), if len(tpl_file) == 1: tpl_file = tpl_file[0] # f return tpl_file
# PARCELLATIONS =================================================================================== def _parc_alias(parcellation: str): if "alias" in parcellation_lib[parcellation]: parc = parcellation_lib[parcellation]["alias"] else: parc = parcellation return parc def _parc_symmetric(parc_labels): labels_lh = [l.split("hemi-L")[1] for l in parc_labels if "hemi-L" in l] labels_rh = [l.split("hemi-R")[1] for l in parc_labels if "hemi-R" in l] if not labels_lh or not labels_rh: return False if labels_lh == labels_rh: return True return False def _print_parcellations(): return ", ".join([p for p in parcellation_lib.keys() if "alias" not in parcellation_lib[p]]) def _check_parcellation(parcellation: str, force_list: bool = False, force_str: bool = False): """ Check if a parcellation name is valid and return the correct parcellation name as a string or a list of strings containing a cortex-subcortex combination. """ # Parcellation can be a string as it appears in parcellation_lib (e.g., "Schaefer100") # OR multiple strings from parcellation_lib concatenated (e.g., "Schaefer100TianS1") # (1) We check if parcellation is a string assert isinstance(parcellation, str), f"Parcellation must be of type string, not {type(parcellation)}!" # (2) We check if it is in parcellation_lib as is if parcellation in parcellation_lib: parc = _parc_alias(parcellation) # (3) If not, we check if we get a partial match else: # get a list of potential partial matches parc = list(set([_parc_alias(p) for p in parcellation_lib if p in parcellation])) # (3a) No match found: raise error if len(parc) == 0: lgr.critical_raise(f"Parcellation '{parcellation}' not found.\nAvailable " f"(cortex-subcortex-combinations allowed): {_print_parcellations()}", ValueError) # (3b) > 2 matches found: raise error elif len(parc) > 2: lgr.critical_raise(f"Parcellation '{parcellation}' matches more than 2 parcellations: {', '.join(parc)}.", ValueError) # (3c) 1 match found: use it elif len(parc) == 1: parc = parc[0] # (3d) 2 matches found: check if they are cortex-subcortex combinations else: levels = [] for p in parc: p_space = list(parcellation_lib[p].keys())[0] levels.append(parcellation_lib[p][p_space]["level"]) if set(levels) != {"cortex", "subcortex"}: lgr.critical_raise(f"Only cortex-subcortex combinations are allowed, not: {', '.join(levels)} ", ValueError) else: # if we got to here, we have a cortex-subcortex combination; now ensure correct order parc = [parc[levels.index("cortex")], parc[levels.index("subcortex")]] # output format if force_list and not force_str and isinstance(parc, str): parc = [parc] elif force_str and not force_list and isinstance(parc, list): parc = "".join(parc) return parc
[docs]def fetch_parcellation(parcellation: str = _PARC_DEFAULT, space: str = None, hemi: Union[List[str], str] = ["L", "R"], return_labels: bool = True, return_space: bool = False, return_resolution: bool = False, return_symmetric: bool = False, return_l2rmap: bool = False, return_dist_mat: bool = False, return_loaded: bool = True, nispace_data_dir: Union[str, pathlib.Path] = None, overwrite: bool = False, check_file_hash: bool = True, verbose: bool = True): """ Fetch a parcellation. """ verbose = set_log(lgr, verbose) # check parcellation and return correct name or list of two names parc = _check_parcellation(parcellation) # if list, we need to merge parcellation and associated data , so we need to load stuff return_loaded = True if isinstance(parc, str) else return_loaded # data directory # warn if the parameter is used if nispace_data_dir is not None: lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.") os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir) nispace_data_dir = os.getenv('NISPACE_DATA_DIR') # function to load individual parcellation and associated data def load_parc(p, space=space, hemi=hemi, return_labels=return_labels, return_space=return_space, return_resolution=return_resolution, return_symmetric=return_symmetric, return_l2rmap=return_l2rmap, return_dist_mat=return_dist_mat, return_loaded=return_loaded, nispace_data_dir=nispace_data_dir, overwrite=overwrite, check_file_hash=check_file_hash): # Check space if space is None: # get default space -> first space listed in parcellation_lib space = list(parcellation_lib[p].keys())[0] else: if space not in parcellation_lib[p]: lgr.critical_raise(f"Space '{space}' not found for parcellation '{p}'.\n" f"Available: {keys2str(parcellation_lib[p])}", ValueError) # data directory base_dir = pathlib.Path(nispace_data_dir) / "parcellation" / p / space # Symmetry if "l2rmap" in parcellation_lib[p][space]: symmetric = False else: symmetric = True # LOAD lgr.info(f"Loading {parcellation_lib[p][space]['level']} parcellation '{p}' in '{space}' space.") # get kwargs get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash) # volume if "mni" in space.lower(): # get files parcellation_file = get_file( base_dir / f"parc-{p}_space-{space}.%s", **parcellation_lib[p][space]["map"], **get_file_kwargs, ) if return_labels: label_file = get_file( base_dir / f"parc-{p}_space-{space}.label.txt", **parcellation_lib[p][space]["label"], **get_file_kwargs, ) if return_l2rmap and not symmetric: l2rmap_file = get_file( base_dir / f"parc-{p}_space-{space}.l2rmap.csv.gz", **parcellation_lib[p][space]["l2rmap"], **get_file_kwargs, ) elif return_l2rmap and symmetric: l2rmap_file = None if return_dist_mat: distmat_file = get_file( base_dir / f"parc-{p}_space-{space}.dist.csv.gz", **parcellation_lib[p][space]["distmat"], **get_file_kwargs, ) # surface else: # check hemis if isinstance(hemi, str): hemi = [hemi] if hemi not in [["L"], ["R"], ["L", "R"]]: raise ValueError(f"hemi = '{hemi}' not defined. Choose one of 'L', 'R', or ['L', 'R']!") # get files parcellation_file, label_file, distmat_file = (), (), () for h in hemi: parcellation_file += get_file( base_dir / f"parc-{p}_space-{space}_hemi-{h}.%s", **parcellation_lib[p][space]["map"][h], **get_file_kwargs, ), if return_labels: label_file += get_file( base_dir / f"parc-{p}_space-{space}_hemi-{h}.label.txt", **parcellation_lib[p][space]["label"][h], **get_file_kwargs, ), if return_dist_mat: if "fslr" in space.lower(): lgr.warning("Distance matrices for fslr spaces are currently not available. Returning None.") distmat_file += None, else: distmat_file += get_file( base_dir / f"parc-{p}_space-{space}_hemi-{h}.dist.csv.gz", **parcellation_lib[p][space]["distmat"][h], **get_file_kwargs, ), if return_l2rmap and not symmetric: l2rmap_file = get_file( base_dir / f"parc-{p}_space-{space}.l2rmap.csv.gz", **parcellation_lib[p][space]["l2rmap"], **get_file_kwargs, ) elif return_l2rmap and symmetric: l2rmap_file = None if len(parcellation_file) == 1: parcellation_file, label_file, distmat_file, l2rmap_file = parcellation_file[0], label_file[0], distmat_file[0], None # return # build output out = {} # parc out["parc"] = load_img(parcellation_file) if return_loaded else parcellation_file # label if return_labels: out["label"] = load_labels(label_file) if return_loaded else label_file # space if return_space: out["space"] = space # res if return_resolution: out["res"] = _img_density_for_neuromaps(load_img(parcellation_file)) # symmetric if return_symmetric: out["sym"] = symmetric # l2rmap if return_l2rmap: out["l2rmap"] = load_l2rmap(l2rmap_file) if return_loaded else l2rmap_file # distmat if return_dist_mat: out["distmat"] = load_distmat(distmat_file) if return_loaded else distmat_file return out # run load_parc for a single parcellation if isinstance(parc, str): out = load_parc(parc) if len(out) == 1: return list(out.values())[0] else: return tuple(out.values()) # run load_parc for 2 parcellations out_cortex = load_parc(parc[0]) out_subcortex = load_parc(parc[1]) lgr.info(f"Merging to cortex-subcortex parcellation '{parc[0]}{parc[1]}'.") # now, we will have to combine the data out = {} # combine parcellations out["parc"] = merge_parcellations([out_cortex["parc"], out_subcortex["parc"]], quick=True)#[0] # label if return_labels: out["label"] = out_cortex["label"] + out_subcortex["label"] # space if return_space: out["space"] = out_cortex["space"] # res if return_resolution: out["res"] = out_cortex["res"] # symmetric if return_symmetric: out["sym"] = True if out_cortex["sym"] and out_subcortex["sym"] else False # l2rmap if return_l2rmap: if out_cortex["l2rmap"] is None and out_subcortex["l2rmap"] is None: out["l2rmap"] = None else: if not return_labels: lgr.critical_raise("Cannot return merged l2rmap when return_labels=False!", ValueError) out["l2rmap"] = pd.DataFrame( np.eye(len(out["label"]) // 2), index=[l for l in out["label"] if "hemi-L" in l], columns=[l for l in out["label"] if "hemi-R" in l] ) if out_cortex["l2rmap"] is not None: out["l2rmap"].loc[out_cortex["l2rmap"].index, out_cortex["l2rmap"].columns] = out_cortex["l2rmap"] if out_subcortex["l2rmap"] is not None: out["l2rmap"].loc[out_subcortex["l2rmap"].index, out_subcortex["l2rmap"].columns] = out_subcortex["l2rmap"] # distmat if return_dist_mat: lgr.info("Distance matrices for merged parcellations are currently not available. Returning None.") out["distmat"] = None # return if len(out) == 1: return list(out.values())[0] else: return tuple(out.values())
[docs]def fetch_collection(collection: Union[str, pathlib.Path, np.ndarray, pd.DataFrame, pd.Series, list], dataset: str = None, nispace_data_dir: Union[str, pathlib.Path] = None, overwrite: bool = False, check_file_hash: bool = True, verbose: bool = True): """ Fetch a collection to subset a map dataset. Args: dataset: str collection: Union[str, pathlib.Path, np.ndarray, pd.DataFrame, pd.Series, list] nispace_data_dir: Union[str, pathlib.Path] verbose: bool """ verbose = set_log(lgr, verbose) # If dataset is provided, assume to load integrated collection # check if dataset is valid if dataset is not None: if dataset not in reference_lib: lgr.critical_raise(f"Dataset '{dataset}' not found. Available: {keys2str(reference_lib)}", ValueError) else: # data directory # warn if the parameter is used if nispace_data_dir is not None: lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.") os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir) nispace_data_dir = os.getenv('NISPACE_DATA_DIR') # base dir base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset # get integrated collection if collection in reference_lib[dataset]["collection"]: lgr.info(f"Loading integrated collection '{collection}' for dataset '{dataset}'.") collection_path = base_dir / f"collection-{collection}.collect" collection_file = get_file( collection_path, **reference_lib[dataset]["collection"][collection], overwrite=overwrite, hash_check=check_file_hash, ) else: lgr.critical_raise(f"Collection '{collection}' not found for dataset '{dataset}'. " f"Available: {keys2str(reference_lib[dataset]['collection'])}", ValueError) # dataset is not provided, assume to load custom collection else: # check if collection is a file if isinstance(collection, (str, pathlib.Path)): collection_file = pathlib.Path(collection) if collection_file.exists(): lgr.info(f"Loading custom collection from file: {collection_file}") else: lgr.critical_raise(f"Assuming collection '{collection_file}' to be a file, but it does not exist! " "Ensure that the file exists and try again. If you want to load an integrated collection, " "use the 'dataset' argument.", ValueError) # else, assume to load array-like object else: lgr.info(f"Loading custom collection of type {type(collection)}.") collection_file = collection # Load collection file; 1-column df (= maps) or 2-column df (= set and maps) collection_df = _load_collection(collection_file) # return return collection_df
# REFERENCE DATA - PRIVATE ========================================================================= def _filter_maps(maps_avail: List[str], maps: Union[str, List[str], Dict[str, Union[str, list]]]) -> List[pathlib.Path]: def matches_filters(map_name: str, filters: Dict[str, Union[str, List[str]]]) -> bool: for filter_name, filter_content in filters.items(): if filter_content not in [None, False, "", []]: if isinstance(filter_content, (str, int)): filter_content = [filter_content] filter_content = list(map(str, filter_content)) if filter_name == "n" and filter_content[0].startswith(">"): try: filter_n = int(filter_content[0].replace(">", "")) n_value = int(_file_desc(map_name, 2)) if n_value <= filter_n: return False except (ValueError, IndexError): continue # Skip this filter if parsing fails else: if not any(f"{filter_name}-{content}".lower() in map_name.lower() for content in filter_content): return False return True if isinstance(maps, str): maps = [maps] if isinstance(maps, list): maps = list(set(maps)) filtered_maps = [f for f in maps_avail if any(map_str in f for map_str in maps)] elif isinstance(maps, dict): filtered_maps = [f for f in maps_avail if matches_filters(f, maps)] else: filtered_maps = maps_avail return filtered_maps def _load_collection(collection_path): # if path, read file if isinstance(collection_path, (str, pathlib.Path)): collection_path = pathlib.Path(collection_path) ext = collection_path.suffix # if "collect" file, detect if dict or table if ext == ".collect": with open(collection_path) as f: header = f.readline() if header.startswith("{"): ext = ".json" else: ext = ".csv" # if json, load into dict if ext == ".json": collection = read_json(collection_path) # else, try to directly load as table file else: with open(collection_path) as f: header = f.readline().strip("\n") if any([h in header for h in ["set", "map", "weight"]]): header = 0 else: header = None collection = pd.read_csv(collection_path, header=header, sep=",") else: collection = collection_path # if array, convert all do df if isinstance(collection, (np.ndarray, pd.DataFrame, pd.Series, list)): collection = pd.DataFrame(collection) # if dict, convert to df as well elif isinstance(collection, dict): collection = pd.concat([pd.DataFrame({0:k, 1:v}) for k, v in collection.items()]) # else else: raise TypeError(f"Datatype {type(collection_path)} not accepted for argument 'collection'.") # process depending on number of columns n_cols = collection.shape[1] if n_cols == 0: raise ValueError("No columns detected in collection file?!") elif n_cols == 1: collection.columns = ["map"] elif n_cols == 2: collection.columns = ["set", "map"] elif n_cols == 3: collection.columns = ["set", "map", "weight"] else: raise ValueError(f"Collection file with > 3 columns not supported ({n_cols} columns)!") # return return collection.reset_index(drop=True) def _apply_collection_filter(dataset: str, map_files: List[Union[str, pathlib.Path]], collection: str, nispace_data_dir: Union[str, pathlib.Path], set_size_range: Union[None, Tuple[int, int]] = None, overwrite: bool = False, check_file_hash: bool = True) -> List[pathlib.Path]: # base dir base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset # Check if path to custom file collection_path = pathlib.Path(collection) if not collection_path.exists(): # If not exists, search integrated collections if collection in reference_lib[dataset]["collection"]: collection_path = base_dir / f"collection-{collection}.collect" collection_file = get_file( collection_path, **reference_lib[dataset]["collection"][collection], overwrite=overwrite, hash_check=check_file_hash, ) else: lgr.warning(f"Collection '{collection}' not found! Available: " f"{keys2str(reference_lib[dataset]['collection'])}") return map_files, None # Load collection file; 1-column df (= maps) or 2-column df (= set and maps) collection_df = _load_collection(collection_file) lgr.debug(f"Collection df shape: {collection_df.shape}; " f"index names: {collection_df.index.names}; " f"column names: {collection_df.columns.names}") # Apply collection filter lgr.info(f"Applying collection filter from: {collection_file}.") if isinstance(map_files[0], pathlib.Path): map_names = [_rm_ext(f.name) for f in map_files] filtered_map_files = [f for f, f_name in zip(map_files, map_names) if f_name in collection_df["map"].unique()] collection_df = collection_df[collection_df["map"].isin(map_names)] else: filtered_map_files = [f for f in map_files if f in collection_df["map"].unique()] collection_df = collection_df[collection_df["map"].isin(filtered_map_files)] # Apply size filter if set_size_range is not None: if "set" in collection_df.columns and isinstance(set_size_range, (tuple, list)): set_size_range = [ x if x is not None else x_ for x, x_ in zip(set_size_range, (1, np.inf)) ] lgr.info(f"Filtering to collection sets with between {set_size_range[0]} and " f"{set_size_range[1]} maps.") collection_df = ( collection_df .groupby("set") .filter(lambda x: set_size_range[0] <= x.shape[0] <= set_size_range[1]) ) filtered_map_files = [f for f in map_files if f in collection_df["map"].unique()] return filtered_map_files, collection_df def _load_parcellated_data(dataset: str, nispace_data_dir: Union[str, pathlib.Path], parc: Union[str, List[str]], map_files: List[str], collection_df: pd.DataFrame, standardize: bool, merge_how: str = "inner", overwrite: bool = False, check_file_hash: bool = True, verbose: bool = True) -> Union[pd.DataFrame, Tuple[pd.DataFrame, Dict]]: verbose = set_log(lgr, verbose) # tab dir tab_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset / "tab" # parcellation can be string with one parcellation name or list of two parcellation names if isinstance(parc, str): lgr.info(f"Loading data parcellated with '{parc}'") # all to list parc = [parc] elif isinstance(parc, list): lgr.info(f"Loading and {merge_how}-merging data parcellated with '{parc[0]}' and '{parc[1]}'") else: lgr.critical_raise(f"Invalid parcellation type: {type(parc)}", ValueError) # loop through parcellations data = [] for p in parc: # check if parcellation available for this data if p not in reference_lib[dataset]["tab"]: lgr.critical_raise(f"Dataset '{dataset}' is not available for parcellation '{p}'!\n" f"Available: {keys2str(reference_lib[dataset]['tab'])}", FileNotFoundError) # file parcellation_file = tab_dir / f"dset-{dataset}_parc-{p}.csv.gz" lgr.debug(f"Loading {parcellation_file}") # load data data.append(pd.read_csv( get_file( parcellation_file, **reference_lib[dataset]["tab"][p], overwrite=overwrite, hash_check=check_file_hash, ), index_col=0 )) lgr.debug(f"Loaded parcellated data of shape {data[-1].shape}") lgr.debug(f"First 5 map names: {data[-1].index.to_list()[:5]}") # merge if necessary: all maps are kept even if they are not present in both parcellations if len(parc) > 1: data = data[0].merge(data[1], how=merge_how, left_index=True, right_index=True) else: data = data[0] # Apply filter to the dataframe index lgr.debug(f"Applying filtering based on maps, first 5: {map_files[:5]}") if isinstance(map_files[0], pathlib.Path): map_files = [_rm_ext(f.name) for f in map_files] data = data.loc[data.index.intersection(map_files)] lgr.debug(f"Shape after filtering based on map_names: {data.shape}") # Apply collection index (-> handles maps that are present multiple times in different sets) if collection_df is not None: maps_intersection = data.index.intersection(collection_df["map"].unique()) collection_df_intersection = collection_df.query("map in @maps_intersection") data = data.loc[collection_df_intersection["map"]] data.index = pd.MultiIndex.from_frame(collection_df_intersection) # Standardize if standardize: lgr.info("Standardizing parcellated data.") data = zscore_df(data, along="rows") return data def _print_references(dataset: str, meta: pd.DataFrame = None): # info file def get_ref_info(dataset, add_commit=True): get_line = False msg = "" with open(datalib_dir / "reference.txt", "r") as f: for line in f: if line.lower().startswith(f"# {dataset.lower()}"): get_line = True continue if get_line and line == "\n": break if get_line: msg += line if add_commit: msg += f"To ensure reproducibility, note the NiSpace commit/version: {__commit__}\n" msg += "\n" return msg # PET if dataset.lower() == "pet": msg = get_ref_info(dataset) if meta is not None: atlas_maxlen = max([len(x) for x in meta["atlas"]]) author_maxlen = max([len(x) for x in meta["publication"]]) license_maxlen = max([len(x) for x in meta["license"]]) for atlas, pub, doi, license, note in zip( meta["atlas"], meta["publication"], meta["doi"], meta["license"], meta["note"] ): doi_list = [f"https://doi.org/{doi}" for doi in doi.replace(" ", "").split(";")] if "" in doi_list: doi_list.remove("") doi_str = ", ".join(doi_list) atlas = atlas.ljust(atlas_maxlen) author = pub.capitalize().ljust(author_maxlen) license = license.ljust(license_maxlen) msg += f"- {atlas} Source: {author} {license} {doi_str}\n" if not pd.isna(note): msg += f" CAVE: {note}\n" # all others else: msg = get_ref_info(dataset) if meta is not None: if len(meta) > 0: collection_maxlen = max([len(x) for x in meta["collection"]]) author_maxlen = max([len(x) for x in meta["author"]]) for collection, pub, doi in zip(meta["collection"], meta["author"], meta["doi"]): collection = collection.ljust(collection_maxlen) author = pub.capitalize().ljust(author_maxlen) msg += f"- {collection} Source: {author} https://doi.org/{doi}\n" # RSN # elif dataset.lower() == "rsn": # msg = get_ref_info(dataset) # if meta is not None: # if len(meta) > 0: # author_maxlen = max([len(x) for x in meta["author"]]) # for pub, doi in zip(meta["author"], meta["doi"]): # author = pub.capitalize().ljust(author_maxlen) # msg += f"- {author} https://doi.org/{doi}\n" # print # if msg[-2:] != "\n": # msg += "\n" print(msg) # REFERENCE DATA - PUBLIC ==========================================================================
[docs]def fetch_reference(dataset: str, maps: Union[None, str, List[str], Dict[str, Union[str, list]]] = None, space: str = _SPACE_DEFAULT, collection: str = None, set_size_range: Union[None, Tuple[int, int]] = None, parcellation: str = None, standardize_parcellated: bool = False, return_metadata: bool = False, print_references: bool = True, osf_config_file: str = None, github_config_file: str = None, nispace_data_dir: Union[str, pathlib.Path] = None, overwrite: bool = False, check_file_hash: bool = True, verbose: bool = True): verbose = set_log(lgr, verbose) # Check dataset availability if isinstance(dataset, str): dataset = dataset.lower() if dataset not in reference_lib: lgr.critical_raise(f"Dataset '{dataset}' not found! Available datasets: {keys2str(reference_lib)}", ValueError) elif parcellation is None and "map" not in reference_lib[dataset]: lgr.critical_raise(f"Dataset '{dataset}' is only available as parcellated data, choose a parcellation!", ValueError) else: lgr.critical_raise(f"Invalid dataset type; expecting string.", TypeError) lgr.info(f"Loading {dataset} maps.") # data directory # warn if the parameter is used if nispace_data_dir is not None: lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.") os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir) nispace_data_dir = os.getenv('NISPACE_DATA_DIR') # base dir base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset map_dir = base_dir / "map" tab_dir = base_dir / "tab" # Check if parcellation is defined correctly and load map lists if parcellation is not None: # check parcellation and return correct name or list of two names parc = _check_parcellation(parcellation) # load maps from collection "All", which should be available for all datasets maps_avail = _load_collection(get_file( base_dir / f"collection-All.collect", **reference_lib[dataset]["collection"]["All"], overwrite=overwrite, hash_check=check_file_hash, ))["map"].to_list() # Check space availability and load map lists else: # get list of map image files maps_avail = [m for m, v in reference_lib[dataset]["map"].items() if space in v] if len(maps_avail) == 0: lgr.critical_raise(f"Found no maps for space '{space}' in dataset '{dataset}'.", ValueError) # Remove private maps if not osf_config_file and not github_config_file: if "mni152" in space.lower(): maps_avail = [ m for m in maps_avail if reference_lib[dataset]["map"][m][space]["host"] not in ["osfprivate", "github-nispace-private"] ] else: maps_avail = [ m for m in maps_avail if (reference_lib[dataset]["map"][m][space]["L"]["host"] if "L" in reference_lib[dataset]["map"][m][space] else reference_lib[dataset]["map"][m][space]["R"]["host"]) not in ["osfprivate", "github-nispace-private"] ] lgr.debug(f"Loaded {len(maps_avail)} unfiltered map(s). " f"First 5: {maps_avail[:5] if len(maps_avail) >= 5 else maps_avail[:len(maps_avail)]}") # Filter by 'maps' if maps: n_tmp = len(maps_avail) lgr.info(f"Applying filter: {maps}") maps_avail = _filter_maps(maps_avail, maps) # if "map" not in reference_lib[dataset]: # maps_avail = _filter_maps(maps_avail, maps) # else: # if isinstance(maps, str): # maps = [maps] # elif not isinstance(maps, (list, tuple, set, pd.Series)): # lgr.warning(f"For dataset '{dataset}', 'maps' must be list-like. Skipping filter.") # maps = maps_avail # maps_avail = list(set(maps_avail).intersection(maps)) lgr.info(f"Filtered from {n_tmp} to {len(maps_avail)} maps.") # Filter by 'collection' if collection == "All": collection = None if collection: maps_avail, collection_df = _apply_collection_filter( dataset, maps_avail, collection, set_size_range=set_size_range, nispace_data_dir=nispace_data_dir, overwrite=overwrite, check_file_hash=check_file_hash ) else: collection_df = None # Load tabulated data if 'parcellation' is specified if parcellation: data = _load_parcellated_data( dataset=dataset, parc=parc, map_files=maps_avail, collection_df=collection_df, standardize=standardize_parcellated, nispace_data_dir=nispace_data_dir, overwrite=overwrite, check_file_hash=check_file_hash, verbose=verbose, ) # Fetch paths to maps if no 'parcellation' is specified else: # get kwargs get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash, osf_config_file=osf_config_file, github_config_file=github_config_file) # MNI: one file per map if "mni152" in space.lower(): data = [ get_file( map_dir / m / f"{m}_space-{space}.%s", **reference_lib[dataset]["map"][m][space], **get_file_kwargs, ) for m in maps_avail ] # surface: two files per map else: data = [] for m in maps_avail: data.append(tuple([ get_file( map_dir / m / f"{m}_space-{space}_hemi-{hemi}.%s", **reference_lib[dataset]["map"][m][space][hemi], **get_file_kwargs, ) for hemi in reference_lib[dataset]["map"][m][space].keys() ])) # Print references # for maps if "pet", or for sets if "mrna" if return_metadata or print_references: if dataset == "pet": meta = fetch_metadata(dataset, maps_avail, overwrite=overwrite, check_file_hash=check_file_hash) elif dataset in ["mrna", "magicc"] and collection_df is not None: meta = fetch_metadata(dataset, collection=collection, overwrite=overwrite, check_file_hash=check_file_hash) else: meta = None if return_metadata: data = (data + (meta,)) if isinstance(data, tuple) else (data, meta) if print_references & verbose: _print_references(dataset, meta) return data
[docs]def fetch_metadata(dataset: str, maps: Union[str, list] = None, collection: str = None, overwrite: bool = False, check_file_hash: bool = True, nispace_data_dir: Union[str, pathlib.Path] = None): if isinstance(dataset, str): dataset = dataset.lower() if dataset not in reference_lib: return None else: return None # data directory # warn if the parameter is used if nispace_data_dir is not None: lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.") os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir) nispace_data_dir = os.getenv('NISPACE_DATA_DIR') # base dir base_dir = pathlib.Path(nispace_data_dir) / "reference" / dataset # load metadata meta = pd.read_csv( get_file( base_dir / "metadata.csv", **reference_lib[dataset]["metadata"], overwrite=overwrite, hash_check=check_file_hash, ) ) if dataset == "pet" and maps is not None: if isinstance(maps, str): maps = [maps] meta = meta[meta.atlas.str.contains("|".join(maps), na=False)] elif dataset in ["mrna", "magicc"] and collection is not None: meta = meta.query("collection == @collection") elif dataset == "rsn": meta = None return meta
# EXAMPLE DATA =====================================================================================
[docs]def fetch_example(example: str, parcellation: str = None, return_associated_data: bool = True, nispace_data_dir: Union[str, pathlib.Path] = None, overwrite: bool = False, check_file_hash: bool = True, verbose: bool = True): """ Fetch an example dataset. """ verbose = set_log(lgr, verbose) # data directory # warn if the parameter is used if nispace_data_dir is not None: lgr.warning("The 'nispace_data_dir' parameter is deprecated. Please use the NISPACE_DATA_DIR environment variable instead.") os.environ["NISPACE_DATA_DIR"] = str(nispace_data_dir) nispace_data_dir = os.getenv('NISPACE_DATA_DIR') # base dir base_dir = pathlib.Path(nispace_data_dir) / "example" # check available example = example.lower() if example not in example_lib: lgr.critical_raise(f"Example '{example}' not found. Available: {list(example_lib.keys())}", ValueError) # check parcellation if parcellation is not None: parc = _check_parcellation(parcellation, force_list=True) else: lgr.critical_raise("Currently, only parcellated example datasets are available. Please specify a parcellation.", ValueError) # get kwargs get_file_kwargs = dict(overwrite=overwrite, hash_check=check_file_hash) # load tabulated data if all(p in example_lib[example]["tab"] for p in parc): lgr.info(f"Loading example dataset: '{example}', parcellated with: {''.join(parc)}.") example_data = pd.concat([ pd.read_csv( get_file( base_dir / f"example-{example}_parc-{p}.csv.gz", **example_lib[example]["tab"][p], **get_file_kwargs, ), index_col=0 ) for p in parc ], axis=1) else: lgr.critical_raise(f"Parcellation '{parcellation}' not found for example '{example}'.\n" f"Available parcellations: {list(example_lib[example]['tab'].keys())}", ValueError) # Check for info data if return_associated_data and "info" in example_lib[example]: lgr.info("Returning parcellated and associated subject data.") example_info = pd.read_csv( get_file( base_dir / f"example-{example}_info.csv", **example_lib[example]["info"], **get_file_kwargs, ), index_col=0 ) return example_data, example_info else: return example_data