from itertools import combinations
import numpy as np
from numba import njit
from sklearn.linear_model import ElasticNetCV, LassoCV, RidgeCV
from sklearn.cross_decomposition import PLSRegression
from sklearn.decomposition import PCA
from sklearn.feature_selection import mutual_info_regression
from tqdm.auto import tqdm
from .. import lgr
from ..utils.utils import _del_from_tuple
[docs]@njit(cache=True, nogil=True)
def rank_array(array):
"""Rank an array. CAVE: Cannot really deal with nan's!"""
_args = array.argsort()
ranked = np.empty_like(array)
ranked[_args] = np.arange(array.size)
return ranked
[docs]@njit(cache=True, nogil=True)
def corr(x, y, rank=False):
"""Compute Pearson or Spearman correlation for two 1D arrays."""
if rank:
x = rank_array(x)
y = rank_array(y)
m_x = x.mean()
m_y = y.mean()
num = np.sum((x - m_x) * (y - m_y))
den = np.sqrt(np.sum((x - m_x) ** 2) * np.sum((y - m_y) ** 2))
r = num / den
return r
[docs]@njit(cache=True, nogil=True)
def partialcorr(x, y, z, rank=False):
"""Computes partial correlation between {x} and {y} controlled for {z}
Args:
x (array-like): input vector 1
y (array-like): input vector 2
z (array-like): input vector to be controlled for
rank (bool, optional): True or False. Defaults to False. -> Pearson correlation
Returns:
rp (float): (ranked) partial correlation coefficient between x and y
"""
if rank:
x = rank_array(x)
y = rank_array(y)
z = rank_array(z)
C = np.column_stack((x, y, z))
corr = np.corrcoef(C, rowvar=False)
corr_inv = np.linalg.inv(corr) # the (multiplicative) inverse of a matrix.
rp = -corr_inv[0,1] / (np.sqrt(corr_inv[0,0] * corr_inv[1,1]))
return rp
[docs]def mutualinfo(x, y, n_neighbors=3):
"""Compute mutual information between x and y using sklearn.
Args:
x (numpy.ndarray): shape (n_values, n_predictors)
y (numpy.ndarray): shape (n_values, 1) or (n_values,)
n_neighbors (int, optional): Number of neighbors for MI estimation. Defaults to 3.
Returns:
float: mutual information between x and y
"""
if x.ndim == 1:
x = x[:, np.newaxis]
return mutual_info_regression(x, y, discrete_features=False, n_neighbors=n_neighbors)[0]
[docs]@njit(cache=True, nogil=True)
def mlr(x, y, adj_r2=True, intercept=True):
"""Compute Regression of predictor(s) x on target y.
Requires numpy arrays with columns as predictors/target.
Args:
x (numpy.ndarray): shape (n_values, n_predictors)
y (numpy.ndarray): shape (n_values, 1) or (n_values,)
adj_r2 (bool, optional): Calculate adjusted R2. Defaults to True.
intercept(bool, optional): Return intercept in leading position of beta array or omit
Returns:
float: (adjusted) R2
array: parameters, starting with or w/o intercept
"""
n_obs = x.shape[0]
n_x = x.shape[1]
X = np.column_stack((np.ones(n_obs, dtype=x.dtype), x))
beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y))
y_hat = np.dot(X, beta)
ss_res = np.sum((y - y_hat)**2)
ss_tot = np.sum((y - np.mean(y))**2)
rsq = 1 - ss_res / ss_tot
if adj_r2:
rsq = 1 - (1 - rsq) * (n_obs - 1) / (n_obs - n_x - 1)
beta = beta.flatten()
if intercept==False:
beta = beta[1:]
return (rsq, beta)
[docs]@njit(cache=True, nogil=True)
def r2(x, y, adj_r2=True):
"""Compute R2 for Regression of predictor(s) x on target y.
Requires numpy arrays with columns as predictors/target.
Args:
x (numpy.ndarray): shape (n_values, n_predictors)
y (numpy.ndarray): shape (n_values, 1) or (n_values,)
adj_r2 (bool, optional): Calculate adjusted R2. Defaults to True.
Returns:
float: (adjusted) R2
"""
n_obs = x.shape[0]
n_x = x.shape[1]
X = np.column_stack((x, np.ones(n_obs, dtype=x.dtype)))
beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y))
y_hat = np.dot(X, beta)
ss_res = np.sum((y - y_hat)**2)
ss_tot = np.sum((y - np.mean(y))**2)
rsq = 1 - ss_res / ss_tot
if adj_r2:
rsq = 1 - (1 - rsq) * (n_obs - 1) / (n_obs - n_x - 1)
return rsq
[docs]@njit(cache=True, nogil=True)
def beta(x, y, intercept=True):
"""Compute beta coefficients for Regression of predictor(s) x on target y.
Requires numpy arrays with columns as predictors/target.
Args:
x (numpy.ndarray): shape (n_values, n_predictors)
y (numpy.ndarray): shape (n_values, 1) or (n_values,)
intercept(bool, optional): Return intercept in leading position of beta array or omit
Returns:
numpy.ndarray: 1D array of beta coefficients (w or w/o intercept)
"""
X = np.column_stack((np.ones(x.shape[0], dtype=x.dtype), x))
beta = np.linalg.pinv((X.T).dot(X)).dot(X.T.dot(y)).flatten()
if intercept==False:
beta = beta[1:]
return beta
[docs]def dominance(x, y, adj_r2=False, verbose=True):
if verbose: lgr.info(f"Running dominance analysis with {x.shape[1]} "
f"predictors and {len(y)} features.")
## print total rsquare
rsq_total = r2(x=x, y=y, adj_r2=adj_r2)
if verbose: lgr.info(f"Full model R^2 = {rsq_total:.03f}")
dom_stats = dict()
dom_stats["sum"] = rsq_total
## get possible predictor combinations
n_pred = x.shape[1]
pred_combs = [list(combinations(range(n_pred), i)) for i in range(1, n_pred+1)]
## calculate R2s
if verbose: lgr.info("Calculating models...")
rsqs = dict()
for len_group in tqdm(pred_combs, desc='Iterating over len groups', disable=not verbose):
for pred_idc in tqdm(len_group, desc='Inside loop', disable=True):
rsq = r2(x=x[:, pred_idc], y=y, adj_r2=adj_r2)
rsqs[pred_idc] = rsq
## collect metrics
# individual dominance
if verbose: lgr.info("Calculating individual dominance.")
dom_stats["individual"] = np.zeros((n_pred))
for i in range(n_pred):
dom_stats["individual"][i] = rsqs[(i,)]
dom_stats["individual"] = dom_stats["individual"].reshape(1, -1)
# partial dominance
if verbose: lgr.info("Calculating partial dominance.")
dom_stats["partial"] = np.zeros((n_pred, n_pred-1))
for i in range(n_pred - 1):
i_len_combs = list(combinations(range(n_pred), i + 2))
for j_node in range(n_pred):
j_node_sel = [v for v in i_len_combs if j_node in v]
reduced_list = [_del_from_tuple(comb, j_node) for comb in j_node_sel]
diff_values = [rsqs[j_node_sel[i]] - rsqs[reduced_list[i]] for i in range(
len(reduced_list))]
dom_stats["partial"][j_node,i] = np.mean(diff_values)
#dom_stats["partial"] = dom_stats["partial"].mean(axis=1)
# total dominance
if verbose: lgr.info("Calculating total dominance.")
dom_stats["total"] = np.mean(np.c_[dom_stats["individual"].T, dom_stats["partial"]], axis=1)
# relative contribution
dom_stats["relative"] = dom_stats["total"] / rsq_total
## sanity check
if not np.allclose(np.sum(dom_stats["total"]), rsq_total):
lgr.error(f"Sum of total dominance ({np.sum(dom_stats['total'])}) does not "
f"equal full model R^2 ({rsq_total})! ")
return dom_stats
[docs]def pls(x, y, n_components=np.inf, **kwargs):
"""
"""
reg = PLSRegression(
n_components=np.min([n_components, x.shape[1]]).astype(int),
**kwargs,
)
reg.fit(x, y)
out = {
"r2": reg.score(x, y),
"beta": np.squeeze(reg.coef_.T),
"loadings": reg.x_loadings_,
}
return out
[docs]def pcr(x, y, adj_r2=True, n_components=np.inf, **kwargs):
"""
"""
n_components = np.min([n_components, x.shape[1]]).astype(int)
x_pcs = PCA(n_components=n_components, **kwargs).fit_transform(x)
rsq = r2(x_pcs, y, adj_r2=adj_r2)
return {"r2": rsq}
[docs]def elasticnet(x, y, cv=None, seed=None, **kwargs):
"""
"""
regCV = ElasticNetCV(
cv=cv,
random_state=seed,
**kwargs
)
regCV.fit(X=x, y=y)
out = {
"alpha": regCV.alpha_,
"l1ratio": regCV.l1_ratio_,
"r2": regCV.score(x, y),
"beta": regCV.coef_
}
return out
[docs]def lasso(x, y, cv=None, seed=None, kwargs={}):
"""
"""
regCV = LassoCV(
cv=cv,
random_state=seed,
**kwargs
)
regCV.fit(X=x, y=y)
out = {
"alpha": regCV.alpha_,
"r2": regCV.score(x, y),
"beta": regCV.coef_
}
return out
[docs]def ridge(x, y, cv=None, seed=None, kwargs={}):
"""
"""
regCV = RidgeCV(
cv=cv,
**kwargs
)
regCV.fit(X=x, y=y)
out = {
"alpha": regCV.alpha_,
"r2": regCV.score(x, y),
"beta": regCV.coef_
}
return out