Source code for nispace.stats.effectsize

import numpy as np
from numba import njit


@njit(cache=True, nogil=True)
def _welford_1d(arr):
    n = 0
    mean = 0.0
    M2 = 0.0
    for x in arr:
        if not np.isnan(x):
            n += 1
            delta = x - mean
            mean += delta / n
            M2 += delta * (x - mean)
    var = M2 / (n - 1) if n >= 2 else np.nan
    return n, mean, var

# ---------------------------------------------------
# Cohen's d for independent groups
# ---------------------------------------------------
[docs]def cohen(a, b): a = np.array(a) b = np.array(b) # Number of elements in each column na = a.shape[0] nb = b.shape[0] dof = na + nb - 2 # Calculate the pooled standard deviation for each column pooled_std = np.sqrt(((na - 1) * np.var(a, ddof=1, axis=0) + (nb - 1) * np.var(b, ddof=1, axis=0)) / dof) # Calculate Cohen's d for each column d = (np.mean(a, axis=0) - np.mean(b, axis=0)) / pooled_std return d
[docs]def cohen_nan(a, b): a = np.array(a) b = np.array(b) # Number of elements in each column na = np.sum(~np.isnan(a), axis=0) nb = np.sum(~np.isnan(b), axis=0) dof = na + nb - 2 # Calculate the pooled standard deviation for each column pooled_std = np.sqrt(((na - 1) * np.nanvar(a, ddof=1, axis=0) + (nb - 1) * np.nanvar(b, ddof=1, axis=0)) / dof) # Calculate Cohen's d for each column d = (np.nanmean(a, axis=0) - np.nanmean(b, axis=0)) / pooled_std return d
@njit(cache=True, nogil=True) def cohen_nan_fast(a, b): n_cols = a.shape[1] d = np.empty(n_cols, dtype=np.float64) for j in range(n_cols): na, mean_a, var_a = _welford_1d(a[:, j]) nb, mean_b, var_b = _welford_1d(b[:, j]) dof = na + nb - 2 if dof <= 0: d[j] = np.nan else: pooled_std = np.sqrt(((na - 1) * var_a + (nb - 1) * var_b) / dof) d[j] = (mean_a - mean_b) / pooled_std return d # --------------------------------------------------- # Cohen's d for dependent groups # ---------------------------------------------------
[docs]def cohen_paired(a, b): a = np.array(a) b = np.array(b) if a.shape != b.shape: raise ValueError("Arrays 'a' and 'b' must have the same shape.") # Calculate the difference between pairs for each column diff = a - b # Calculate Cohen's d for each column d = np.mean(diff, axis=0) / np.std(diff, ddof=1, axis=0) return d
[docs]def cohen_paired_nan(a, b): a = np.array(a) b = np.array(b) if a.shape != b.shape: raise ValueError("Arrays 'a' and 'b' must have the same shape.") # Calculate the difference between pairs for each column diff = a - b # Calculate Cohen's d for each column d = np.nanmean(diff, axis=0) / np.nanstd(diff, ddof=1, axis=0) return d
@njit(cache=True, nogil=True) def cohen_paired_nan_fast(a, b): n_rows, n_cols = a.shape d = np.empty(n_cols, dtype=np.float64) for j in range(n_cols): n = 0 mean_d = 0.0 M2 = 0.0 for i in range(n_rows): ai, bi = a[i, j], b[i, j] if not np.isnan(ai) and not np.isnan(bi): diff = ai - bi n += 1 delta = diff - mean_d mean_d += delta / n M2 += delta * (diff - mean_d) if n < 2: d[j] = np.nan else: d[j] = mean_d / np.sqrt(M2 / (n - 1)) return d # --------------------------------------------------- # Hedges g # ---------------------------------------------------
[docs]def hedges(a, b): a = np.array(a) b = np.array(b) # Calculate Cohen's d for each column d = cohen(a, b) # Calculate the correction factor for Hedges' g for each column na = a.shape[0] nb = b.shape[0] dof = na + nb - 2 correction = 1 - (3 / (4 * dof - 1)) # Calculate Hedges' g for each column g = d * correction return g
[docs]def hedges_nan(a, b): a = np.array(a) b = np.array(b) # Calculate Cohen's d for each column d = cohen_nan(a, b) # Calculate the correction factor for Hedges' g for each column na = np.sum(~np.isnan(a), axis=0) nb = np.sum(~np.isnan(b), axis=0) dof = na + nb - 2 correction = 1 - (3 / (4 * dof - 1)) # Calculate Hedges' g for each column g = d * correction return g
@njit(cache=True, nogil=True) def hedges_nan_fast(a, b): n_cols = a.shape[1] g = np.empty(n_cols, dtype=np.float64) for j in range(n_cols): na, mean_a, var_a = _welford_1d(a[:, j]) nb, mean_b, var_b = _welford_1d(b[:, j]) dof = na + nb - 2 if dof <= 0: g[j] = np.nan else: pooled_std = np.sqrt(((na - 1) * var_a + (nb - 1) * var_b) / dof) d = (mean_a - mean_b) / pooled_std g[j] = d * (1.0 - 3.0 / (4.0 * dof - 1.0)) return g # def hedges_paired(a, b): # a = np.array(a) # b = np.array(b) # if a.shape != b.shape: # raise ValueError("Arrays 'a' and 'b' must have the same shape.") # # Calculate Cohen's d for each column # d = cohen_paired_nan(a, b) # # Calculate the correction factor for Hedges' g for each column # n = np.sum(~np.isnan(a), axis=0) # correction = 1 - (3 / (4 * n - 1)) # # Calculate Hedges' g for each column # g = d * correction # return g # --------------------------------------------------- # Zscores # ---------------------------------------------------
[docs]def zscore(a, b=None): a = np.array(a) if b is not None: b = np.array(b) z = (a - np.mean(b, axis=0)) / np.std(b, ddof=1, axis=0) else: z = (a - np.mean(a, axis=0)) / np.std(a, ddof=1, axis=0) return z
[docs]def zscore_nan(a, b=None): a = np.array(a) if b is not None: b = np.array(b) z = (a - np.nanmean(b, axis=0)) / np.nanstd(b, ddof=1, axis=0) else: z = (a - np.nanmean(a, axis=0)) / np.nanstd(a, ddof=1, axis=0) return z
@njit(cache=True, nogil=True) def _col_stats(arr): """Per-column Welford mean and std for a 2D array.""" n_cols = arr.shape[1] means = np.empty(n_cols, dtype=np.float64) stds = np.empty(n_cols, dtype=np.float64) for j in range(n_cols): _, mean, var = _welford_1d(arr[:, j]) means[j] = mean stds[j] = np.sqrt(var) # nan propagates when n < 2 return means, stds def zscore_nan_fast(a, b=None): ref = a if b is None else b means, stds = _col_stats(ref) return (a - means) / stds # numpy broadcast: row-major, SIMD-friendly # --------------------------------------------------------------------------- # Robust Zscores # ---------------------------------------------------------------------------
[docs]def rzscore_nan(a, b=None): a = np.array(a) if b is not None: b = np.array(b) med = np.nanmedian(b, axis=0) mad = np.nanmedian(np.abs(b - med), axis=0) else: med = np.nanmedian(a, axis=0) mad = np.nanmedian(np.abs(a - med), axis=0) with np.errstate(divide="ignore", invalid="ignore"): result = (a - med) / (1.4826 * mad) zero_mad = np.atleast_1d(mad == 0) if np.any(zero_mad): result = np.where(zero_mad, np.nan, result) return result
@njit(cache=True, nogil=True) def _nanmedian_1d(arr): """NaN-safe median of a 1D array via sort.""" n = 0 for x in arr: if not np.isnan(x): n += 1 if n == 0: return np.nan valid = np.empty(n, dtype=np.float64) k = 0 for x in arr: if not np.isnan(x): valid[k] = x k += 1 valid = np.sort(valid) mid = n // 2 if n % 2 == 0: return (valid[mid - 1] + valid[mid]) / 2.0 else: return valid[mid] @njit(cache=True, nogil=True) def _col_robust_stats(arr): """Per-column NaN-safe median and MAD for a 2D array.""" n_cols = arr.shape[1] medians = np.empty(n_cols, dtype=np.float64) mads = np.empty(n_cols, dtype=np.float64) for j in range(n_cols): col = arr[:, j] med = _nanmedian_1d(col) medians[j] = med mads[j] = _nanmedian_1d(np.abs(col - med)) # NaN propagates, skipped in median return medians, mads def rzscore_nan_fast(a, b=None): ref = a if b is None else b medians, mads = _col_robust_stats(ref) with np.errstate(divide="ignore", invalid="ignore"): result = (a - medians) / (1.4826 * mads) zero_mad = mads == 0 if np.any(zero_mad): result[:, zero_mad] = np.nan return result # --------------------------------------------------- # Percent change # ---------------------------------------------------
[docs]def prc(a, b): a = np.array(a, dtype=float) # Ensure input is a numpy array and convert to float for safe division b = np.array(b, dtype=float) if a.shape != b.shape: raise ValueError("Arrays 'a' and 'b' must have the same shape.") # Calculate percentage change # Use np.where to avoid division by zero p = np.where(a != 0, (a - b) / a * 100, np.nan) return p
@njit(cache=True, nogil=True) def prc_fast(a, b): n_rows, n_cols = a.shape result = np.empty((n_rows, n_cols), dtype=np.float64) for i in range(n_rows): for j in range(n_cols): ai = a[i, j] result[i, j] = np.nan if ai == 0.0 else (ai - b[i, j]) / ai * 100.0 return result # --------------------------------------------------- # Log fold change: log((a+c) / (b+c)) # c = shift to ensure all values are positive. # For raw positive data (e.g. CT in mm): c = 0. # For z-scored / residual data with negatives: c is # auto-computed as |global_min| + eps. # Symmetric under permutation regardless of c: # swap(a,b) -> log((b+c)/(a+c)) = -logfc(a,b) ✓ # -> null distribution is always exactly 0-centered. # --------------------------------------------------- def centile_fast(a, b=None): """Percentile rank of each row of a within columns of b (or a if b is None). Output in [0, 100].""" a = np.array(a, dtype=float) ref = a if b is None else np.array(b, dtype=float) n_cols = ref.shape[1] result = np.empty_like(a, dtype=float) for j in range(n_cols): col_ref = ref[:, j] col_ref_valid = np.sort(col_ref[~np.isnan(col_ref)]) n_valid = len(col_ref_valid) for i in range(a.shape[0]): v = a[i, j] if np.isnan(v) or n_valid == 0: result[i, j] = np.nan else: result[i, j] = np.searchsorted(col_ref_valid, v, side="right") / n_valid * 100 return result
[docs]def logfc_nan(a, b): a = np.array(a, dtype=float) b = np.array(b, dtype=float) if a.shape != b.shape: raise ValueError("Arrays 'a' and 'b' must have the same shape.") global_min = min(np.nanmin(a), np.nanmin(b)) shift = max(0.0, -global_min) + 1e-6 return np.where(np.isnan(a) | np.isnan(b), np.nan, np.log((a + shift) / (b + shift)))
@njit(cache=True, nogil=True) def logfc_fast(a, b): # compute global min over both arrays (ignoring NaN) to derive shift global_min = np.inf n_rows, n_cols = a.shape for i in range(n_rows): for j in range(n_cols): v = a[i, j] if not np.isnan(v) and v < global_min: global_min = v v = b[i, j] if not np.isnan(v) and v < global_min: global_min = v shift = -global_min + 1e-6 if global_min < 0.0 else 0.0 result = np.empty((n_rows, n_cols), dtype=np.float64) for i in range(n_rows): for j in range(n_cols): ai = a[i, j] bi = b[i, j] if np.isnan(ai) or np.isnan(bi): result[i, j] = np.nan else: result[i, j] = np.log((ai + shift) / (bi + shift)) return result