From 88cbe4f99c546bc07c093f3971454fa871df0fe7 Mon Sep 17 00:00:00 2001 From: John Arevalo Date: Sun, 10 Dec 2023 19:29:30 -0500 Subject: [PATCH] hypothesis testing on mAP instead of AP --- pyproject.toml | 2 +- src/copairs/compute.py | 13 +++++++--- src/copairs/map.py | 58 +++++++++++++++++++++++++----------------- 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d285e96..c98182f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "copairs" -version = "0.3.3" +version = "0.4.0-alpha" description = "Find pairs and compute metrics between them" readme = "README.md" requires-python = ">=3.8" diff --git a/src/copairs/compute.py b/src/copairs/compute.py index 41512d0..f91b6b8 100644 --- a/src/copairs/compute.py +++ b/src/copairs/compute.py @@ -7,6 +7,7 @@ import numpy as np from tqdm.autonotebook import tqdm + def parallel_map(par_func, items): '''Execute par_func(i) for every i in items using ThreadPool and tqdm.''' num_items = len(items) @@ -18,12 +19,14 @@ def parallel_map(par_func, items): pass -def batch_processing(pairwise_op: Callable[[np.ndarray, np.ndarray],np.ndarray],): +def batch_processing(pairwise_op: Callable[[np.ndarray, np.ndarray], + np.ndarray], ): '''Decorator adding the batch_size param to run the function with multithreading using a list of paired indices''' def batched_fn(feats: np.ndarray, pair_ix: np.ndarray, batch_size: int): num_pairs = len(pair_ix) result = np.empty(num_pairs, dtype=np.float32) + def par_func(i): x_sample = feats[pair_ix[i:i + batch_size, 0]] y_sample = feats[pair_ix[i:i + batch_size, 1]] @@ -32,6 +35,7 @@ def par_func(i): parallel_map(par_func, np.arange(0, num_pairs, batch_size)) return result + return batched_fn @@ -112,7 +116,6 @@ def random_ap(num_perm: int, num_pos: int, total: int, seed) -> np.ndarray: rng = np.random.default_rng(seed) rel_k = random_binary_matrix(num_perm, total, num_pos, rng) null_dist = compute_ap(rel_k) - null_dist.sort() return null_dist @@ -137,10 +140,12 @@ def get_null_dists(confs, null_size, seed): seeds = rng.integers(8096, size=num_confs) null_dists = np.empty([len(confs), null_size], dtype=np.float32) + def par_func(i): num_pos, total = confs[i] - null_dists[i] = null_dist_cached(num_pos, total, seeds[i], - null_size, cache_dir) + null_dists[i] = null_dist_cached(num_pos, total, seeds[i], null_size, + cache_dir) + parallel_map(par_func, np.arange(num_confs)) return null_dists diff --git a/src/copairs/map.py b/src/copairs/map.py index a08a1b7..f67b13b 100644 --- a/src/copairs/map.py +++ b/src/copairs/map.py @@ -8,6 +8,7 @@ from copairs import compute from copairs.matching import Matcher, MatcherMultilabel +from tqdm.contrib.concurrent import thread_map logger = logging.getLogger('copairs') @@ -61,23 +62,40 @@ def create_matcher(obs: pd.DataFrame, return Matcher(obs, columns, seed=0) -def aggregate(result: pd.DataFrame, sameby, threshold: float) -> pd.DataFrame: - agg_rs = result.groupby(sameby, as_index=False, observed=True).agg({ - 'average_precision': - 'mean', - 'p_value': - lambda p_values: -np.log10(p_values).mean(), - }) +def aggregate(ap_scores: pd.DataFrame, sameby, null_size: int, + threshold: float, seed: int) -> pd.DataFrame: + ap_scores = ap_scores.reset_index() + + logger.info('Computing null_dist...') + null_confs = ap_scores[['n_pos_pairs', 'n_total_pairs']].values + null_confs, rev_ix = np.unique(null_confs, return_inverse=True) + null_dists = compute.get_null_dists(null_confs, null_size, seed=seed) + ap_scores['null_ix'] = rev_ix + + def get_p_value(params): + map_score, indices = params + null_dist = null_dists[indices].mean(axis=0) + num = (null_dist > map_score).sum() + p_value = (num + 1) / (null_size + 1) + return p_value + + logger.info('Computing p-values...') + + def g(df): + return df['average_precision'].agg(['mean', lambda x: list(x.index)]) + + map_scores = ap_scores.groupby(sameby).agg(g) + map_scores.columns = ['mean_average_precision', 'indices'] + + params = map_scores[['mean_average_precision', 'indices']] + map_scores['p_value'] = thread_map(get_p_value, params.values) reject, pvals_corrected, alphacSidak, alphacBonf = multipletests( - 10**-agg_rs['p_value'], method='fdr_bh') - agg_rs['q_value'] = pvals_corrected - agg_rs['nlog10qvalue'] = (-np.log10(agg_rs['q_value'])) - agg_rs.rename({'p_value': 'nlog10pvalue'}, axis=1, inplace=True) - agg_rs['above_p_threshold'] = agg_rs['nlog10pvalue'] > -np.log10(threshold) - agg_rs['above_q_threshold'] = agg_rs['nlog10qvalue'] > -np.log10(threshold) - agg_rs.rename(columns={'average_precision': 'mean_average_precision'}, - inplace=True) - return agg_rs + map_scores['p_value'], method='fdr_bh') + map_scores['corrected_p_value'] = pvals_corrected + map_scores['below_p'] = map_scores['p_value'] > threshold + map_scores['below_corrected_p'] = map_scores[ + 'corrected_p_value'] > threshold + return map_scores def build_rank_lists(pos_pairs, neg_pairs, pos_dists, neg_dists): @@ -108,7 +126,6 @@ def run_pipeline(meta, pos_diffby, neg_sameby, neg_diffby, - null_size, batch_size=20000, seed=0) -> pd.DataFrame: columns = flatten_str_list(pos_sameby, pos_diffby, neg_sameby, neg_diffby) @@ -147,15 +164,8 @@ def run_pipeline(meta, logger.info('Computing average precision...') ap_scores, null_confs = compute.compute_ap_contiguous(rel_k_list, counts) - logger.info('Computing p-values...') - p_values = compute.compute_p_values(ap_scores, - null_confs, - null_size, - seed=seed) - logger.info('Creating result DataFrame...') meta.loc[paired_ix, 'average_precision'] = ap_scores - meta.loc[paired_ix, 'p_value'] = p_values meta.loc[paired_ix, "n_pos_pairs"] = null_confs[:, 0] meta.loc[paired_ix, "n_total_pairs"] = null_confs[:, 1] logger.info('Finished.')