Source code for nispace.stats.coloc

from itertools import combinations
import numpy as np
from numba import njit
from sklearn.linear_model import ElasticNetCV, LassoCV, RidgeCV
from sklearn.cross_decomposition import PLSRegression
from sklearn.decomposition import PCA
from sklearn.feature_selection import mutual_info_regression
from tqdm.auto import tqdm

import logging
lgr = logging.getLogger(__name__)
from ..utils.utils import _del_from_tuple

# for backwards compatibility
[docs]@njit(nogil=True) def rank_array(array): return rank1d(array)
[docs]@njit(cache=True, nogil=True) def rank1d(arr): """Rank a 1D array using mid-ranks (average rank) for tied values. Constant arrays receive identical ranks -> zero variance -> NaN correlation. CAVE: does not handle nan's; strip them before calling.""" n = arr.size _args = arr.argsort() ranked = np.empty(n, dtype=np.float64) i = 0 while i < n: # find the end of the current run of equal values j = i + 1 while j < n and arr[_args[j]] == arr[_args[i]]: j += 1 # assign the average (mid) rank to all tied elements mid = (i + j - 1) * 0.5 for k in range(i, j): ranked[_args[k]] = mid i = j return ranked
[docs]@njit(cache=True, nogil=True) def rank2d(arr): """Rank a 2D array column-wise using mid-ranks. Handles nan's.""" if arr.ndim == 1: return rank1d(arr) ranked = np.full(arr.shape, np.nan, dtype=np.float64) for i in range(arr.shape[1]): v = arr[:, i] nonan = ~np.isnan(v) ranked[nonan, i] = rank1d(v[nonan]) return ranked
[docs]@njit(cache=True, nogil=True) def corr(x, y, rank=False): """Compute Pearson or Spearman correlation for two 1D arrays.""" if rank: x = rank1d(x) y = rank1d(y) m_x = x.mean() m_y = y.mean() num = np.sum((x - m_x) * (y - m_y)) den = np.sqrt(np.sum((x - m_x) ** 2) * np.sum((y - m_y) ** 2)) if den == 0.0: return np.nan return num / den
[docs]@njit(cache=True, nogil=True) def pearson(x, y): """Compute Pearson correlation for two 1D arrays.""" m_x = x.mean() m_y = y.mean() num = np.sum((x - m_x) * (y - m_y)) den = np.sqrt(np.sum((x - m_x) ** 2) * np.sum((y - m_y) ** 2)) if den == 0.0: return np.nan return num / den
[docs]@njit(cache=True, nogil=True) def partialcorr(x, y, z, rank=False): """Computes partial correlation between {x} and {y} controlled for {z} Args: x (array-like): input vector 1 y (array-like): input vector 2 z (array-like): input vector to be controlled for rank (bool, optional): True or False. Defaults to False. -> Pearson correlation Returns: rp (float): (ranked) partial correlation coefficient between x and y """ if rank: x = rank1d(x) y = rank1d(y) z = rank1d(z) C = np.column_stack((x, y, z)) corr = np.corrcoef(C, rowvar=False) corr_inv = np.linalg.inv(corr) # the (multiplicative) inverse of a matrix. rp = -corr_inv[0,1] / (np.sqrt(corr_inv[0,0] * corr_inv[1,1])) return rp
[docs]@njit(cache=True, nogil=True) def partialpearson(x, y, z): """Computes partial Pearson correlation between {x} and {y} controlled for {z} Args: x (array-like): input vector 1 y (array-like): input vector 2 z (array-like): input array to be controlled for Returns: rp (float): partial correlation coefficient between x and y """ C = np.column_stack((x, y, z)) corr = np.corrcoef(C, rowvar=False) corr_inv = np.linalg.inv(corr) # the (multiplicative) inverse of a matrix. rp = -corr_inv[0,1] / (np.sqrt(corr_inv[0,0] * corr_inv[1,1])) return rp
[docs]def mutualinfo(x, y, n_neighbors=3): """Compute mutual information between x and y using sklearn. Args: x (numpy.ndarray): shape (n_values, n_predictors) y (numpy.ndarray): shape (n_values, 1) or (n_values,) n_neighbors (int, optional): Number of neighbors for MI estimation. Defaults to 3. Returns: float: mutual information between x and y """ if x.ndim == 1: x = x[:, np.newaxis] return mutual_info_regression(x, y, discrete_features=False, n_neighbors=n_neighbors)[0]
[docs]@njit(cache=True, nogil=True) def mlr(x, y, adj_r2=True, intercept=True): """Compute Regression of predictor(s) x on target y. Requires numpy arrays with columns as predictors/target. Args: x (numpy.ndarray): shape (n_values, n_predictors) y (numpy.ndarray): shape (n_values, 1) or (n_values,) adj_r2 (bool, optional): Calculate adjusted R2. Defaults to True. intercept(bool, optional): Return intercept in leading position of beta array or omit Returns: float: (adjusted) R2 array: parameters, starting with or w/o intercept """ n_obs = x.shape[0] n_x = x.shape[1] X = np.column_stack((np.ones(n_obs, dtype=x.dtype), x)) beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y)) y_hat = np.dot(X, beta) ss_res = np.sum((y - y_hat)**2) ss_tot = np.sum((y - np.mean(y))**2) rsq = 1 - ss_res / ss_tot if adj_r2: rsq = 1 - (1 - rsq) * (n_obs - 1) / (n_obs - n_x - 1) beta = beta.flatten() if intercept==False: beta = beta[1:] return (rsq, beta)
[docs]@njit(cache=True, nogil=True) def r2(x, y, adj_r2=True): """Compute R2 for Regression of predictor(s) x on target y. Requires numpy arrays with columns as predictors/target. Args: x (numpy.ndarray): shape (n_values, n_predictors) y (numpy.ndarray): shape (n_values, 1) or (n_values,) adj_r2 (bool, optional): Calculate adjusted R2. Defaults to True. Returns: float: (adjusted) R2 """ n_obs = x.shape[0] n_x = x.shape[1] X = np.column_stack((x, np.ones(n_obs, dtype=x.dtype))) beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y)) y_hat = np.dot(X, beta) ss_res = np.sum((y - y_hat)**2) ss_tot = np.sum((y - np.mean(y))**2) rsq = 1 - ss_res / ss_tot if adj_r2: rsq = 1 - (1 - rsq) * (n_obs - 1) / (n_obs - n_x - 1) return rsq
[docs]@njit(cache=True, nogil=True) def beta(x, y, intercept=True): """Compute beta coefficients for Regression of predictor(s) x on target y. Requires numpy arrays with columns as predictors/target. Args: x (numpy.ndarray): shape (n_values, n_predictors) y (numpy.ndarray): shape (n_values, 1) or (n_values,) intercept(bool, optional): Return intercept in leading position of beta array or omit Returns: numpy.ndarray: 1D array of beta coefficients (w or w/o intercept) """ X = np.column_stack((np.ones(x.shape[0], dtype=x.dtype), x)) beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y)).flatten() if intercept==False: beta = beta[1:] return beta
[docs]def dominance(x, y, adj_r2=False, verbose=False): if verbose: print(f"Dominance analysis with {x.shape[1]} predictors and {len(y)} features.") ## print total rsquare rsq_total = r2(x=x, y=y, adj_r2=adj_r2) if verbose: print(f"Full model R^2 = {rsq_total:.03f}") dom_stats = dict() dom_stats["sum"] = rsq_total ## get possible predictor combinations n_pred = x.shape[1] pred_combs = [list(combinations(range(n_pred), i)) for i in range(1, n_pred+1)] ## calculate R2s if verbose: print("Calculating models...") rsqs = dict() for len_group in tqdm(pred_combs, desc='Iterating over len groups', disable=not verbose): for pred_idc in tqdm(len_group, desc='Inside loop', disable=True): rsq = r2(x=x[:, pred_idc], y=y, adj_r2=adj_r2) rsqs[pred_idc] = rsq ## collect metrics # individual dominance if verbose: print("Calculating individual dominance.") dom_stats["individual"] = np.zeros((n_pred)) for i in range(n_pred): dom_stats["individual"][i] = rsqs[(i,)] dom_stats["individual"] = dom_stats["individual"].reshape(1, -1) # partial dominance if verbose: print("Calculating partial dominance.") dom_stats["partial"] = np.zeros((n_pred, n_pred-1)) for i in range(n_pred - 1): i_len_combs = list(combinations(range(n_pred), i + 2)) for j_node in range(n_pred): j_node_sel = [v for v in i_len_combs if j_node in v] reduced_list = [_del_from_tuple(comb, j_node) for comb in j_node_sel] diff_values = [rsqs[j_node_sel[i]] - rsqs[reduced_list[i]] for i in range( len(reduced_list))] dom_stats["partial"][j_node,i] = np.mean(diff_values) #dom_stats["partial"] = dom_stats["partial"].mean(axis=1) # total dominance if verbose: print("Calculating total dominance.") dom_stats["total"] = np.mean(np.c_[dom_stats["individual"].T, dom_stats["partial"]], axis=1) # relative contribution dom_stats["relative"] = dom_stats["total"] / rsq_total ## sanity check if not np.allclose(np.sum(dom_stats["total"]), rsq_total): raise ValueError(f"Sum of total dominance ({np.sum(dom_stats['total'])}) does not " f"equal full model R^2 ({rsq_total})! ") return dom_stats
[docs]def pls(x, y, n_components=np.inf, **kwargs): """ """ reg = PLSRegression( n_components=np.min([n_components, x.shape[1]]).astype(int), **kwargs, ) reg.fit(x, y) out = { "r2": reg.score(x, y), "beta": np.squeeze(reg.coef_.T), "loadings": reg.x_loadings_, } return out
[docs]def pcr(x, y, adj_r2=True, n_components=np.inf, **kwargs): """ """ n_components = np.min([n_components, x.shape[1]]).astype(int) x_pcs = PCA(n_components=n_components, **kwargs).fit_transform(x) rsq = r2(x_pcs, y, adj_r2=adj_r2) return {"r2": rsq}
[docs]def elasticnet(x, y, cv=None, seed=None, **kwargs): """ """ regCV = ElasticNetCV( cv=cv, random_state=seed, **kwargs ) regCV.fit(X=x, y=y) out = { "alpha": regCV.alpha_, "l1ratio": regCV.l1_ratio_, "r2": regCV.score(x, y), "beta": regCV.coef_ } return out
[docs]def lasso(x, y, cv=None, seed=None, kwargs={}): """ """ regCV = LassoCV( cv=cv, random_state=seed, **kwargs ) regCV.fit(X=x, y=y) out = { "alpha": regCV.alpha_, "r2": regCV.score(x, y), "beta": regCV.coef_ } return out
[docs]def ridge(x, y, cv=None, seed=None, kwargs={}): """ """ regCV = RidgeCV( cv=cv, **kwargs ) regCV.fit(X=x, y=y) out = { "alpha": regCV.alpha_, "r2": regCV.score(x, y), "beta": regCV.coef_ } return out
# Numba-accelerated implementation of sklearn-style SIMPLS for a single target # should return the same as sklearn.cross_decomposition.PLSRegression with ~5x speed-up @njit(fastmath=True, cache=True) def _simpls1_loop(X_res, y_res, n_comp): """ SIMPLS deflation when Y has shape (n_samples,) Returns W, P, Q, T_norms (x-weights, x-loadings, y-loadings, norms of T). """ n, p = X_res.shape W = np.empty((p, n_comp)) P = np.empty((p, n_comp)) Q = np.empty(n_comp) V = np.empty((n_comp, p)).T # orthonormal basis for deflation, flipped to achieve order="F" T_norms = np.empty(n_comp) for a in range(n_comp): # cross-covariance vector (instead of matrix when q == 1) s = X_res.T @ y_res # shape (p,) r = s / np.linalg.norm(s) # first left-singular vector # sklearn sign convention (svd_flip) if r[np.abs(r).argmax()] < 0.0: # largest‐abs entry must be +ve r *= -1.0 t = X_res @ r norm_t = np.linalg.norm(t) T_norms[a] = norm_t t /= norm_t r /= norm_t # make tᵀr == 1 p = X_res.T @ t q = np.dot(y_res, t) # scalar because q == 1 W[:, a] = r P[:, a] = p Q[a] = q # orthogonalise p to build V basis v = p.copy() for j in range(a): v -= V[:, j] * np.dot(V[:, j], p) v /= np.linalg.norm(v) V[:, a] = v # deflate X and y X_res -= np.outer(t, p) y_res -= t * q return W, P, Q, T_norms # full PLS function
[docs]def fast_pls1( x: np.ndarray, y: np.ndarray, n_components: int ): """ Fast PLS (SIMPLS) for a single target. Parameters ---------- x : (n_samples, n_features) array_like y : (n_samples,) or (n_samples, 1) array_like n_components : int Number of latent components. Returns ------- coef : (n_features,) ndarray Regression weights in original data units. intercept : float r2 : float Coefficient of determination. x_loadings : (n_features, n_components) ndarray Same meaning as ``PLSRegression.x_loadings_`` from scikit-learn. """ x = np.asarray(x, dtype=np.float64) y = np.asarray(y, dtype=np.float64).ravel() n, p = x.shape n_components = np.minimum(n_components, p) # centre & scale (matches sklearn default) x_mean = x.mean(axis=0) y_mean = y.mean() xc = x - x_mean yc = y - y_mean x_std = xc.std(axis=0, ddof=1) y_std = yc.std(ddof=1) xc /= x_std yc /= y_std # latent variables via numba loop W, P_raw, Q, t_norms = _simpls1_loop(xc.copy(), yc.copy(), n_components) # sklearn-style loadings x_loadings = P_raw / t_norms # coefficients in scaled space, back-transform inner = np.linalg.solve(P_raw.T @ W, Q) # (n_components,) coef_scaled = W @ inner # (p,) coef = coef_scaled * (y_std / x_std) intercept = y_mean - x_mean @ coef # get R2 y_pred = x @ coef + intercept r2 = 1.0 - np.sum((y - y_pred) ** 2) / np.sum((y - y_mean) ** 2) return { "r2": r2, "beta": coef, "loadings": x_loadings, }