import numpy as np
import pandas as pd
from numba import njit
from itertools import permutations
from tqdm.auto import tqdm
from joblib import Parallel, delayed
from scipy.stats import zscore, norm
from statsmodels.stats.multitest import multipletests
[docs]@njit(cache=True)
def np_any_axis1(x):
"""Numba compatible version of np.any(x, axis=1)."""
out = np.zeros(x.shape[0], dtype=np.bool_)
for i in range(x.shape[1]):
out = np.logical_or(out, x[:, i])
return out
[docs]@njit(cache=True, nogil=True)
def residuals(x, y, decenter=False):
"""Compute residuals for Regression with dependent variable y and independent variable(s) x.
Requires numpy arrays with columns as independent variables.
Args:
x (numpy.ndarray): shape (n_values, n_predictors)
y (numpy.ndarray): shape (n_values, 1) or (n_values,)
decenter (bool, optional): add mean of y to residuals before return
Returns:
numpy.ndarray: 1D array of residuals w or w/o added mean of y
"""
X = np.column_stack((x, np.ones(x.shape[0], dtype=x.dtype)))
beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y))
y_hat = np.dot(X, beta)
if decenter:
y_hat += y.mean()
return y - y_hat
[docs]@njit(cache=True, nogil=True)
def residuals_nan(x, y, decenter=False):
"""Compute residuals for Regression with dependent variable y and independent variable(s) x.
Requires numpy arrays with columns as independent variables.
Args:
x (numpy.ndarray): shape (n_values, n_predictors)
y (numpy.ndarray): shape (n_values, 1) or (n_values,)
decenter (bool, optional): add mean of y to residuals before return
Returns:
numpy.ndarray: 1D array of residuals w or w/o added mean of y
"""
nan_mask = np_any_axis1(np.isnan(np.column_stack((x, y))))
x_ = x[~nan_mask]
y_ = y[~nan_mask]
X = np.column_stack((x_, np.ones(x_.shape[0], dtype=x_.dtype)))
beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y_))
y_hat = np.dot(X, beta)
if decenter:
y_hat += y_.mean()
resid = np.full(y.shape, np.nan, dtype=y.dtype)
resid[~nan_mask] = y_ - y_hat
return resid
[docs]def rho_to_z(array, replace_1=1 - np.finfo(float).eps):
"""Fisher's z-transformation of correlation coefficients."""
array = np.array(array)
array[np.isclose(array, 1)] = replace_1
array_z = np.arctanh(array)
return array_z
[docs]def zscore_df(df, along="cols", force_df=True):
"""Z-standardizes array and returns pandas dataframe.
Args:
df (pandas dataframe): input dataframe
along (str, optional): Either "cols" or "rows". Defaults to "cols".
Returns:
pd.DataFrame or pd.Series: standardized dataframe/series
"""
if along=="cols":
axis = 0
elif along=="rows":
axis = 1
else:
raise ValueError(f"Option along=={along} not defined!")
arr_stand = zscore(df, axis=axis, nan_policy="omit")
# DataFrame
if isinstance(df, pd.DataFrame):
df_stand = pd.DataFrame(
data=arr_stand,
columns=df.columns,
index=df.index
)
# Series
elif isinstance(df, pd.Series):
if force_df:
df_stand = pd.DataFrame(
data=arr_stand,
columns=[df.name],
index=df.index
)
else:
df_stand = pd.Series(
data=arr_stand,
index=df.index,
name=df.name
)
# ndarray
elif isinstance(df, np.ndarray):
if force_df:
df_stand = pd.DataFrame(
data=arr_stand
)
else:
df_stand = arr_stand
# not defined
else:
raise TypeError(f"Input data type {type(df)} not defined!")
return df_stand
[docs]def permute_groups(groups, strategy="proportional", paired=False, subjects=None, n_perm=1,
n_proc=1, seed=None, verbose=False):
groups = np.array(groups)
n = len(groups)
group_labels, group_sizes = np.unique(groups, return_counts=True)
n_labels = len(group_labels)
if verbose == "debug":
print(f"{n} samples with labels: {group_labels} with sizes {group_sizes}")
if paired:
# get subjects
if subjects is None:
raise ValueError("Paired permutation requires argument 'subjects'!")
subjects = np.array(subjects)
unique_subjects = np.unique(subjects)
n_subjects = len(unique_subjects)
# check input
if len(subjects) != len(groups):
raise ValueError("Number of subjects must equal length of group/sessions vector!")
if not (np.all(group_sizes == group_sizes[0]) and n_subjects == group_sizes[0]):
raise ValueError("All sessions must have the same number of samples")
groups_perm = []
# unpaired: permute across the whole set
if not paired:
# proportional: permuted groups have the same size & proportion of subjects as the original groups
if "prop" in strategy:
group_idc = {label: np.where(groups==label)[0] for label in group_labels}
def perm_fun(i):
rng = np.random.default_rng(None if seed is None else seed + i)
groups_tmp = np.full(groups.shape, np.nan)
group_idc_tmp = group_idc.copy()
for label, size in zip(group_labels, group_sizes):
if verbose=="debug" and i == 0:
print(f"Permuted group '{label}' of size {size}:")
for l, s in zip(group_labels, group_sizes):
idc = group_idc_tmp[l]
fraction = s / n
n_sample = np.floor(size * fraction).astype(int)
if verbose=="debug" and i == 0:
print(f"{n_sample} samples ({n_sample / size * 100:.02f} %) from group '{l}'.")
idc_perm = rng.choice(idc, n_sample, replace=False)
group_idc_tmp[l] = idc[~np.isin(idc, idc_perm)]
groups_tmp[idc_perm] = label
if np.isnan(groups_tmp).any():
groups_tmp[np.isnan(groups_tmp)] = rng.permutation(group_labels)
if np.isnan(groups_perm).any():
raise ValueError("Problem with group assignment")
else:
return groups_tmp.astype(groups.dtype)
# shuffle: random permutation across the whole set
elif "shuff" in strategy:
def perm_fun(i):
rng = np.random.default_rng(None if seed is None else seed + i)
return rng.permutation(groups)
# draw: random permutation across the whole set, with replacement
elif "draw" in strategy:
def perm_fun(i):
rng = np.random.default_rng(None if seed is None else seed + i)
return rng.choice(groups, len(groups), replace=True)
# not defined
else:
raise ValueError(f"Unknown unpaired permutation strategy: {strategy}!")
# paired: permute within each subject
else:
# proportional: permuted sessions are equally sized and consist of 50% of each original session
# (given two groups, should also work with more)
if "prop" in strategy:
# get a df with indices of each subject in the original vector by session/"group label"
subs_by_session_idc = pd.DataFrame(
np.zeros((n_subjects, n_labels), dtype=int),
index=unique_subjects,
columns=group_labels,
dtype=int
)
for ses in group_labels:
subs_by_session_idc.loc[unique_subjects, ses] = np.where((groups==ses) & np.isin(subjects, unique_subjects))[0]
session_combinations = list(permutations(group_labels))
n_samples = np.floor(n_subjects / len(session_combinations)).astype(int)
def perm_fun(i):
rng = np.random.default_rng(None if seed is None else seed + i)
sessions_perm = np.full(n, np.nan)
subs_tmp = unique_subjects.copy()
for session_labels_perm in session_combinations:
subs = rng.choice(subs_tmp, n_samples, replace=False)
subs_tmp = subs_tmp[~np.isin(subs_tmp, subs)]
sessions_perm[subs_by_session_idc.loc[subs, :]] = session_labels_perm
if len(subs_tmp) > 0:
sessions_perm[subs_by_session_idc.loc[subs_tmp, :]] = rng.permutation(group_labels)
if np.isnan(sessions_perm).any():
raise ValueError("Problem with group assignment")
else:
return sessions_perm.astype(groups.dtype)
# shuffle: random permutation within subjects
elif "shuff" in strategy:
def perm_fun(i):
rng = np.random.default_rng(None if seed is None else seed + i)
sessions_perm = groups.copy()
for sub in np.unique(subjects):
sessions_perm[subjects==sub] = rng.permutation(groups[subjects==sub])
return sessions_perm
# not defined
else:
raise ValueError(f"Unknown paired permutation strategy: {strategy}!")
# Run
groups_perm = Parallel(n_jobs=n_proc)(
[delayed(perm_fun)(i)
for i in tqdm(range(n_perm),
desc=f"Permuting groups ({n_proc} proc)",
disable=not verbose)]
)
# return permuted group vector(s), as 1d array or list thereof, if n_perm > 1
if len(groups_perm) == 1:
groups_perm = groups_perm[0]
return groups_perm
[docs]def null_to_p(test_value, null_array, tail="two", fit_norm=False):
"""Return p-value for test value(s) against null array.
Adopted from NiMARE v0.0.12: https://zenodo.org/record/6600700
(NiMARE/nimare/stats.py)
Parameters
----------
test_value : 1D array_like
Values for which to determine p-value.
null_array : 1D array_like
Null distribution against which test_value is compared.
tail : {'two', 'upper', 'lower'}, optional
Whether to compare value against null distribution in a two-sided
('two') or one-sided ('upper' or 'lower') manner.
If 'upper', then higher values for the test_value are more significant.
If 'lower', then lower values for the test_value are more significant.
Default is 'two'.
fit_norm : boolean
Whether to fit a normal distribution to null_array data and compute p values from that.
Might be useful if the relative order of multiple highly significant p values is of
interest, but the number of null values cannot be increased sufficiently.
Returns
-------
p_value : :obj:`float`
P-value(s) associated with the test value when compared against the null
distribution. Return type matches input type (i.e., a float if
test_value is a single float, and an array if test_value is an array).
Notes
-----
P-values are clipped based on the number of elements in the null array, if fit_norm=False.
In this case, no p-values of 0 or 1 should be produced.
"""
if tail not in {"two", "upper", "lower"}:
raise ValueError('Argument "tail" must be one of ["two", "upper", "lower"]')
return_first = isinstance(test_value, (float, int))
test_value = np.atleast_1d(test_value)
null_array = np.array(null_array)
## empirical p value
if not fit_norm:
def compute_p(t, null):
null = np.sort(null)
idx = np.searchsorted(null, t, side="left").astype(float)
return 1 - idx / len(null)
## fit a normal distribution and get p value
elif fit_norm:
def compute_p(t, null):
mu, sd = norm.fit(null)
return 1 - norm.cdf(t, mu, sd)
## calculate p
null_array_mean = np.mean(null_array)
test_value_centered = test_value - null_array_mean
null_array_centered = null_array - null_array_mean
if tail == "two":
p_l = compute_p(test_value_centered, null_array_centered)
p_r = compute_p(-test_value_centered, -null_array_centered)
p = 2 * np.minimum(p_l, p_r)
elif tail == "lower":
p = compute_p(-test_value_centered, -null_array_centered)
elif tail == "upper":
p = compute_p(test_value_centered, null_array_centered)
# ensure p_value in the following range:
# smallest_value <= p_value <= (1.0 - smallest_value) or 1 if fit_norm
smallest_value = np.maximum(np.finfo(float).eps, 1.0 / len(null_array)) if not fit_norm else 0
largest_value = 1 - smallest_value
result = np.clip(p, a_min=smallest_value, a_max=largest_value)
return result[0] if return_first else result
[docs]def mc_correction(p_array, alpha=0.05, method="fdr_bh", how="array", dtype=None):
# prepare data
p = np.array(p_array)
p_shape = p.shape
## correct across whole input array -> flattern & reshape
if how in ["a", "arr", "array"]:
# flatten row-wise
p_1d = p.flatten("C")
# get corrected p-values
res = multipletests(p_1d, alpha=alpha, method=method)
pcor_1d = res[1]
reject_1d = res[0]
# reshape to original form
pcor = np.reshape(pcor_1d, p_shape, "C")
reject = np.reshape(reject_1d, p_shape, "C")
## correct across each column/row
else:
pcor, reject = np.zeros_like(p, dtype=dtype), np.zeros_like(p, dtype=dtype)
if how in ["c", "col", "cols", "column", "columns"]:
for col in range(p.shape[1]):
res = multipletests(p[:,col], alpha=alpha, method=method)
pcor[:,col], reject[:,col] = res[1], res[0]
elif how in ["r", "row", "rows"]:
for row in range(p.shape[0]):
res = multipletests(p[row,:], alpha=alpha, method=method)
pcor[row,:], reject[row,:] = res[1], res[0]
else:
print(f"Input how='{how}' not defined!")
## return as input dtype
if isinstance(p_array, pd.DataFrame):
pcor = pd.DataFrame(
pcor,
index=p_array.index,
columns=p_array.columns,
dtype=dtype)
reject = pd.DataFrame(
reject,
index=p_array.index,
columns=p_array.columns,
dtype=dtype)
elif isinstance(p_array, pd.Series):
pcor = pd.Series(
pcor,
index=p_array.index,
name=p_array.name,
dtype=dtype)
reject = pd.Series(
reject,
index=p_array.index,
name=p_array.name,
dtype=dtype)
return pcor, reject