diff --git a/docs/python_api/methods/sumstat_imputation.md b/docs/python_api/methods/sumstat_imputation.md new file mode 100644 index 000000000..6e64d35b1 --- /dev/null +++ b/docs/python_api/methods/sumstat_imputation.md @@ -0,0 +1,28 @@ +--- +title: Summary Statistics Imputation +--- + +Summary statistics imputation leverages linkage disequilibrium (LD) information to compute Z-scores of missing SNPs from neighbouring observed SNPs +SNPs by taking advantage of the Linkage Disequilibrium. + +We implemented the basic model from RAISS (Robust and Accurate Imputation from Summary Statistics) package (see the original [paper](https://academic.oup.com/bioinformatics/article/35/22/4837/5512360)). + +The full repository for the RAISS package can be found [here](https://gitlab.pasteur.fr/statistical-genetics/raiss). + +The original model was suggested in 2014 by Bogdan Pasaniuc et al. [here](https://pubmed.ncbi.nlm.nih.gov/24990607/). + +It represents the following formula: + +E(z*i|z_t) = M*{i,t} \cdot (M\_{t,t})^{-1} \cdot z_t + +Where: + +- E(z_i|z_t) represents the expected z-score of SNP 'i' given the observed z-scores at known SNP indexes 't'. + +- M\_{i,t} represents the LD (Linkage Disequilibrium) matrix between SNP 'i' and the known SNPs at indexes 't'. + +- (M\_{t,t})^{-1} represents the inverse of the LD matrix of the known SNPs at indexes 't'. + +- z_t represents the vector of observed z-scores at the known SNP indexes 't'. + +:::gentropy.method.sumstat_imputation.SummaryStatisticsImputation diff --git a/src/gentropy/method/sumstat_imputation.py b/src/gentropy/method/sumstat_imputation.py new file mode 100644 index 000000000..8375fa84f --- /dev/null +++ b/src/gentropy/method/sumstat_imputation.py @@ -0,0 +1,172 @@ +"""RAISS summary statstics imputation model.""" + +from __future__ import annotations + +from typing import Any + +import numpy as np +import scipy.linalg + + +class SummaryStatisticsImputation: + """Implementation of RAISS summary statstics imputation model.""" + + @staticmethod + def raiss_model( + z_scores_known: np.ndarray, + ld_matrix_known: np.ndarray, + ld_matrix_known_missing: np.ndarray, + lamb: float = 0.01, + rtol: float = 0.01, + ) -> dict[str, Any]: + """Compute the imputation of the z-score using the RAISS model. + + Args: + z_scores_known (np.ndarray): the vector of known Z scores + ld_matrix_known (np.ndarray) : the matrix of known LD correlations + ld_matrix_known_missing (np.ndarray): LD matrix of known SNPs with other unknown SNPs in large matrix (similar to ld[unknowns, :][:,known]) + lamb (float): size of the small value added to the diagonal of the covariance matrix before inversion. Defaults to 0.01. + rtol (float): threshold to filter eigenvectos by its eigenvalue. It makes an inversion biased but much more numerically robust. Default to 0.01. + + Returns: + dict[str, Any]: + - var (np.ndarray): variance of the imputed SNPs + - mu (np.ndarray): the estimation of the zscore of the imputed SNPs + - ld_score (np.ndarray): the linkage disequilibrium score of the imputed SNPs + - condition_number (np.ndarray): the condition number of the correlation matrix + - correct_inversion (np.ndarray): a boolean array indicating if the inversion was successful + - imputation_r2 (np.ndarray): the R2 of the imputation + """ + sig_t_inv = SummaryStatisticsImputation._invert_sig_t( + ld_matrix_known, lamb, rtol + ) + if sig_t_inv is None: + return { + "var": None, + "mu": None, + "ld_score": None, + "condition_number": None, + "correct_inversion": None, + "imputation_r2": None, + } + else: + condition_number = np.array( + [np.linalg.cond(ld_matrix_known)] * ld_matrix_known_missing.shape[0] + ) + correct_inversion = np.array( + [ + SummaryStatisticsImputation._check_inversion( + ld_matrix_known, sig_t_inv + ) + ] + * ld_matrix_known_missing.shape[0] + ) + + var, ld_score = SummaryStatisticsImputation._compute_var( + ld_matrix_known_missing, sig_t_inv, lamb + ) + + mu = SummaryStatisticsImputation._compute_mu( + ld_matrix_known_missing, sig_t_inv, z_scores_known + ) + var_norm = SummaryStatisticsImputation._var_in_boundaries(var, lamb) + + R2 = (1 + lamb) - var_norm + + mu = mu / np.sqrt(R2) + return { + "var": var, + "mu": mu, + "ld_score": ld_score, + "condition_number": condition_number, + "correct_inversion": correct_inversion, + "imputation_r2": 1 - var, + } + + @staticmethod + def _compute_mu( + sig_i_t: np.ndarray, sig_t_inv: np.ndarray, zt: np.ndarray + ) -> np.ndarray: + """Compute the estimation of z-score from neighborring snp. + + Args: + sig_i_t (np.ndarray) : correlation matrix with line corresponding to unknown Snp (snp to impute) and column to known SNPs + sig_t_inv (np.ndarray): inverse of the correlation matrix of known matrix + zt (np.ndarray): Zscores of known snp + Returns: + np.ndarray: a vector of length i containing the estimate of zscore + + """ + return np.dot(sig_i_t, np.dot(sig_t_inv, zt)) + + @staticmethod + def _compute_var( + sig_i_t: np.ndarray, sig_t_inv: np.ndarray, lamb: float + ) -> tuple[np.ndarray, np.ndarray]: + """Compute the expected variance of the imputed SNPs. + + Args: + sig_i_t (np.ndarray) : correlation matrix with line corresponding to unknown Snp (snp to impute) and column to known SNPs + sig_t_inv (np.ndarray): inverse of the correlation matrix of known matrix + lamb (float): regularization term added to matrix + + Returns: + tuple[np.ndarray, np.ndarray]: a tuple containing the variance and the ld score + """ + var = (1 + lamb) - np.einsum( + "ij,jk,ki->i", sig_i_t, sig_t_inv, sig_i_t.transpose() + ) + ld_score = (sig_i_t**2).sum(1) + + return var, ld_score + + @staticmethod + def _check_inversion(sig_t: np.ndarray, sig_t_inv: np.ndarray) -> bool: + """Check if the inversion is correct. + + Args: + sig_t (np.ndarray): the correlation matrix + sig_t_inv (np.ndarray): the inverse of the correlation matrix + Returns: + bool: True if the inversion is correct, False otherwise + """ + return np.allclose(sig_t, np.dot(sig_t, np.dot(sig_t_inv, sig_t))) + + @staticmethod + def _var_in_boundaries(var: np.ndarray, lamb: float) -> np.ndarray: + """Forces the variance to be in the 0 to 1+lambda boundary. Theoritically we shouldn't have to do that. + + Args: + var (np.ndarray): the variance of the imputed SNPs + lamb (float): regularization term added to the diagonal of the sig_t matrix + + Returns: + np.ndarray: the variance of the imputed SNPs + """ + id_neg = np.where(var < 0) + var[id_neg] = 0 + id_inf = np.where(var > (0.99999 + lamb)) + var[id_inf] = 1 + + return var + + @staticmethod + def _invert_sig_t(sig_t: np.ndarray, lamb: float, rtol: float) -> np.ndarray: + """Invert the correlation matrix. If the provided regularization values are not enough to stabilize the inversion process for the given matrix, the function calls itself recursively, increasing lamb and rtol by 10%. + + Args: + sig_t (np.ndarray): the correlation matrix + lamb (float): regularization term added to the diagonal of the sig_t matrix + rtol (float): threshold to filter eigenvector with a eigenvalue under rtol make inversion biased but much more numerically robust + + Returns: + np.ndarray: the inverse of the correlation matrix + """ + try: + np.fill_diagonal(sig_t, (1 + lamb)) + sig_t_inv = scipy.linalg.pinv(sig_t, rtol=rtol, atol=0) + return sig_t_inv + except np.linalg.LinAlgError: + return SummaryStatisticsImputation._invert_sig_t( + sig_t, lamb * 1.1, rtol * 1.1 + ) diff --git a/tests/gentropy/method/test_sumstat_imputation.py b/tests/gentropy/method/test_sumstat_imputation.py new file mode 100644 index 000000000..aea59f76b --- /dev/null +++ b/tests/gentropy/method/test_sumstat_imputation.py @@ -0,0 +1,31 @@ +"""Test of sumstat imputation functions.""" + +from __future__ import annotations + +import numpy as np +from gentropy.method.sumstat_imputation import SummaryStatisticsImputation + + +class TestSSImp: + """Test of RAISS sumstat imputation main function.""" + + def test_sumstat_imputation( + self: TestSSImp, sample_data_for_carma: list[np.ndarray] + ) -> None: + """Test of RAISS.""" + ld = sample_data_for_carma[0] + z = sample_data_for_carma[1] + + unknowns = [5] + known = [index for index in list(range(21)) if index not in unknowns] + sig_t = ld[known, :][:, known] + sig_i_t = ld[unknowns, :][:, known] + zt = z[known] + + _l = SummaryStatisticsImputation.raiss_model( + zt, sig_t, sig_i_t, lamb=0.01, rtol=0.01 + ) + assert ( + np.round(_l["imputation_r2"][0], decimals=4) == 0.9304 + and np.round(_l["mu"][0], decimals=4) == 9.7215 + )