From b482bb52ce1922d22283365a1b4d97e38610ec37 Mon Sep 17 00:00:00 2001 From: Zain Huda Date: Tue, 15 Oct 2024 14:46:53 -0700 Subject: [PATCH] add GPU sync and numerical value tests (#2194) Summary: Added GPU sync tests to simulate gathering metric states on to rank 0 and computing. Tests don't cover this case before, which has resulted in SEVs in the past as users aren't aware of how RecMetrics collects and computes metrics. Added numerical value tests, most metrics are do not have this which can result in issues down the line if metrics need to be changed/accommodate future changes. Also we've found inconsistencies sometimes from other methods, so always good to check here. We compare each metric to a reference implementation from literature to ensure the values are as expected. Fixed and cleaned up some tests too. Reviewed By: henrylhtsang Differential Revision: D59173140 --- torchrec/metrics/tests/test_accuracy.py | 23 ++ torchrec/metrics/tests/test_auprc.py | 23 ++ torchrec/metrics/tests/test_calibration.py | 23 ++ torchrec/metrics/tests/test_ctr.py | 23 ++ torchrec/metrics/tests/test_mae.py | 23 ++ torchrec/metrics/tests/test_mse.py | 23 ++ .../metrics/tests/test_multiclass_recall.py | 23 ++ torchrec/metrics/tests/test_ndcg.py | 328 +++++++++++------- torchrec/metrics/tests/test_ne.py | 49 ++- torchrec/metrics/tests/test_ne_positive.py | 3 +- torchrec/metrics/tests/test_precision.py | 82 +++-- torchrec/metrics/tests/test_recall.py | 23 ++ torchrec/metrics/tests/test_segmented_ne.py | 3 +- .../metrics/tests/test_serving_calibration.py | 28 +- 14 files changed, 496 insertions(+), 181 deletions(-) diff --git a/torchrec/metrics/tests/test_accuracy.py b/torchrec/metrics/tests/test_accuracy.py index 5f9e47416..84d487db1 100644 --- a/torchrec/metrics/tests/test_accuracy.py +++ b/torchrec/metrics/tests/test_accuracy.py @@ -17,8 +17,10 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, RecTaskInfo, + sync_test_helper, TestMetric, ) @@ -251,3 +253,24 @@ def test_accuracy(self) -> None: except AssertionError: print("Assertion error caught with data set ", inputs) raise + + +class AccuracyGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = AccuracyMetric + task_name: str = "accuracy" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=AccuracyMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestAccuracyMetric, + metric_name=AccuracyGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_auprc.py b/torchrec/metrics/tests/test_auprc.py index dacdbab47..95256aa7c 100644 --- a/torchrec/metrics/tests/test_auprc.py +++ b/torchrec/metrics/tests/test_auprc.py @@ -23,7 +23,9 @@ ) from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -346,3 +348,24 @@ def test_required_input_for_grouped_auprc(self) -> None: ) self.assertIn("grouping_keys", auprc.get_required_inputs()) + + +class AUPRCGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = AUPRCMetric + task_name: str = "auprc" + + def test_sync_auprc(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=AUPRCMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestAUPRCMetric, + metric_name=AUPRCGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_calibration.py b/torchrec/metrics/tests/test_calibration.py index fb6f109f7..2ea49026e 100644 --- a/torchrec/metrics/tests/test_calibration.py +++ b/torchrec/metrics/tests/test_calibration.py @@ -15,7 +15,9 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -77,3 +79,24 @@ def test_fused_calibration(self) -> None: world_size=WORLD_SIZE, entry_point=metric_test_helper, ) + + +class CalibrationGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = CalibrationMetric + task_name: str = "calibration" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=CalibrationMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestCalibrationMetric, + metric_name=CalibrationGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_ctr.py b/torchrec/metrics/tests/test_ctr.py index b3397dcc1..61ca6081e 100644 --- a/torchrec/metrics/tests/test_ctr.py +++ b/torchrec/metrics/tests/test_ctr.py @@ -15,7 +15,9 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -71,3 +73,24 @@ def test_fused_ctr(self) -> None: world_size=WORLD_SIZE, entry_point=metric_test_helper, ) + + +class CTRGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = CTRMetric + task_name: str = "ctr" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=CTRMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestCTRMetric, + metric_name=CTRGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_mae.py b/torchrec/metrics/tests/test_mae.py index 8aaa68af0..cff8bd3f7 100644 --- a/torchrec/metrics/tests/test_mae.py +++ b/torchrec/metrics/tests/test_mae.py @@ -15,7 +15,9 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -74,3 +76,24 @@ def test_fused_mae(self) -> None: world_size=WORLD_SIZE, entry_point=metric_test_helper, ) + + +class MAEGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = MAEMetric + task_name: str = "mae" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=MAEMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestMAEMetric, + metric_name=MAEGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_mse.py b/torchrec/metrics/tests/test_mse.py index 9dc0ca337..498122dc0 100644 --- a/torchrec/metrics/tests/test_mse.py +++ b/torchrec/metrics/tests/test_mse.py @@ -15,7 +15,9 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -123,3 +125,24 @@ def test_fused_rmse(self) -> None: world_size=WORLD_SIZE, entry_point=metric_test_helper, ) + + +class MSEGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = MSEMetric + task_name: str = "mse" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=MSEMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestMSEMetric, + metric_name=MSEGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_multiclass_recall.py b/torchrec/metrics/tests/test_multiclass_recall.py index f8d484aef..4e67a8bd3 100644 --- a/torchrec/metrics/tests/test_multiclass_recall.py +++ b/torchrec/metrics/tests/test_multiclass_recall.py @@ -19,7 +19,9 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -113,3 +115,24 @@ def test_multiclass_recall_update_fused(self) -> None: batch_window_size=10, n_classes=N_CLASSES, ) + + +class MulticlassRecallGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = MulticlassRecallMetric + task_name: str = "accuracy" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=MulticlassRecallMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestMulticlassRecallMetric, + metric_name=MulticlassRecallGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_ndcg.py b/torchrec/metrics/tests/test_ndcg.py index 76d798c63..88c227725 100644 --- a/torchrec/metrics/tests/test_ndcg.py +++ b/torchrec/metrics/tests/test_ndcg.py @@ -10,12 +10,13 @@ import unittest from dataclasses import replace -from typing import Dict +from typing import Any, Dict, List import torch from torchrec.metrics.metrics_config import DefaultTaskInfo from torchrec.metrics.ndcg import NDCGMetric, SESSION_KEY +from torchrec.metrics.test_utils import RecTaskInfo WORLD_SIZE = 4 @@ -99,14 +100,13 @@ def get_test_case_negative_task() -> Dict[str, torch.Tensor]: } -""" -predictions = [0.1, 0.5, 0.3, 0.4, 0.2, 0.1] * weights = [1, 0, 0, 1, 0, 0] => [0.1, 0, 0, 0.4, 0.0, 0.0] -labels = [1, 0, 1, 1, 0, 1] * weights = [1, 0, 1, 1, 0, 1] => [1, 0, 0, 1, 0, 0] - => NDCG going to be perfect for both sessions (trivially). -""" - - def get_test_case_scale_by_weights_tensor() -> Dict[str, torch.Tensor]: + """ + For this test case, + predictions * weights = [0.1, 0, 0, 0.4, 0.0, 0.0] + labels * weights = [1, 0, 0, 1, 0, 0] + So NDCG going to be perfect for both sessions. + """ return { "predictions": torch.tensor([[0.1, 0.5, 0.3, 0.4, 0.2, 0.1]]), "session_ids": torch.tensor([[1, 1, 1, 2, 2, 2]]), @@ -118,97 +118,63 @@ def get_test_case_scale_by_weights_tensor() -> Dict[str, torch.Tensor]: class NDCGMetricValueTest(unittest.TestCase): - def setUp(self) -> None: - self.non_exponential_ndcg = NDCGMetric( - world_size=WORLD_SIZE, - my_rank=0, - batch_size=BATCH_SIZE, - tasks=[DefaultTaskInfo], - # pyre-ignore - exponential_gain=False, # exponential_gain is one of the kwargs - # pyre-ignore - session_key=SESSION_KEY, # session_key is one of the kwargs - ) - - self.exponential_ndcg = NDCGMetric( - world_size=WORLD_SIZE, - my_rank=0, - batch_size=BATCH_SIZE, - tasks=[DefaultTaskInfo], - # pyre-ignore - exponential_gain=True, # exponential_gain is one of the kwargs - # pyre-ignore - session_key=SESSION_KEY, # session_key is one of the kwargs - ) - - self.ndcg_at_k = NDCGMetric( - world_size=WORLD_SIZE, - my_rank=0, - batch_size=BATCH_SIZE, - tasks=[DefaultTaskInfo], - # pyre-ignore - exponential_gain=False, # exponential_gain is one of the kwargs - # pyre-ignore - session_key=SESSION_KEY, # session_key is one of the kwargs - # pyre-ignore[6]: In call `NDCGMetric.__init__`, for argument `k`, expected `Dict[str, typing.Any]` but got `int` - k=2, - ) - - self.ndcg_remove_single_length_sessions = NDCGMetric( - world_size=WORLD_SIZE, - my_rank=0, - batch_size=BATCH_SIZE, - tasks=[DefaultTaskInfo], - # pyre-ignore - exponential_gain=False, # exponential_gain is one of the kwargs - # pyre-ignore - session_key=SESSION_KEY, # session_key is one of the kwargs - # pyre-ignore[6]: In call `NDCGMetric.__init__`, for argument `remove_single_length_sessions`, expected `Dict[str, typing.Any]` but got `bool` - remove_single_length_sessions=True, + def generate_metric( + self, + world_size: int, + my_rank: int, + batch_size: int, + tasks: List[RecTaskInfo] = [DefaultTaskInfo], + exponential_gain: bool = False, + session_key: str = SESSION_KEY, + k: int = -1, + remove_single_length_sessions: bool = False, + scale_by_weights_tensor: bool = False, + report_ndcg_as_decreasing_curve: bool = True, + **kwargs: Dict[str, Any], + ) -> NDCGMetric: + return NDCGMetric( + world_size=world_size, + my_rank=my_rank, + batch_size=batch_size, + tasks=tasks, + # pyre-ignore[6] + session_key=session_key, + # pyre-ignore[6] + exponential_gain=exponential_gain, + # pyre-ignore[6] + remove_single_length_sessions=remove_single_length_sessions, + # pyre-ignore[6] + scale_by_weights_tensor=scale_by_weights_tensor, + # pyre-ignore[6] + report_ndcg_as_decreasing_curve=report_ndcg_as_decreasing_curve, + # pyre-ignore[6] + k=k, + # pyre-ignore[6] + **kwargs, ) - TempTaskInfo = replace(DefaultTaskInfo, is_negative_task=True) - self.ndcg_apply_negative_task_mask = NDCGMetric( - world_size=WORLD_SIZE, - my_rank=0, - batch_size=BATCH_SIZE, - tasks=[TempTaskInfo], - # pyre-ignore - exponential_gain=False, # exponential_gain is one of the kwargs - # pyre-ignore - session_key=SESSION_KEY, # session_key is one of the kwargs - ) - - self.ndcg_report_as_increasing_and_scale_by_weights_tensor = NDCGMetric( + def test_single_session_non_exp(self) -> None: + """ + Test single session in a update. + """ + model_output = get_test_case_multiple_sessions_within_batch() + metric = self.generate_metric( world_size=WORLD_SIZE, my_rank=0, batch_size=BATCH_SIZE, tasks=[DefaultTaskInfo], - # pyre-ignore - exponential_gain=False, # exponential_gain is one of the kwargs - # pyre-ignore - session_key=SESSION_KEY, # session_key is one of the kwargs - # pyre-ignore[6]: In call `NDCGMetric.__init__`, for argument `remove_single_length_sessions`, expected `Dict[str, typing.Any]` but got `bool` - remove_single_length_sessions=True, - # pyre-ignore[6]: In call `NDCGMetric.__init__`, for argument `scale_by_weights_tensor`, expected `Dict[str, typing.Any]` but got `bool` - scale_by_weights_tensor=True, - # pyre-ignore[6]: In call `NDCGMetric.__init__`, for argument `report_ndcg_as_decreasing_curve`, expected `Dict[str, typing.Any]` but got `bool` - report_ndcg_as_decreasing_curve=False, + exponential_gain=False, + session_key=SESSION_KEY, ) - def test_single_session(self) -> None: - """ - Test single session in a update. - """ - model_output = get_test_case_multiple_sessions_within_batch() - self.non_exponential_ndcg.update( + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.non_exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -221,14 +187,28 @@ def test_single_session(self) -> None: msg=f"Actual: {actual_metric}, Expected: {expected_metric}", ) - self.exponential_ndcg.update( + def test_single_session_exp(self) -> None: + """ + Test single session in a update for exponential metric. + """ + model_output = get_test_case_multiple_sessions_within_batch() + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=True, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_exp"] torch.testing.assert_close( @@ -241,19 +221,28 @@ def test_single_session(self) -> None: msg=f"Actual: {actual_metric}, Expected: {expected_metric}", ) - def test_multiple_sessions(self) -> None: + def test_multiple_sessions_non_exp(self) -> None: """ Test multiple sessions in a single update. """ model_output = get_test_case_multiple_sessions_within_batch() - self.non_exponential_ndcg.update( + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.non_exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -266,14 +255,25 @@ def test_multiple_sessions(self) -> None: msg=f"Actual: {actual_metric}, Expected: {expected_metric}", ) - self.exponential_ndcg.update( + def test_multiple_sessions_exp(self) -> None: + model_output = get_test_case_multiple_sessions_within_batch() + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=True, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_exp"] torch.testing.assert_close( @@ -291,14 +291,23 @@ def test_negative_sessions(self) -> None: Test sessions where all labels are 0. """ model_output = get_test_case_all_labels_zero() - self.non_exponential_ndcg.update( + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.non_exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -311,14 +320,28 @@ def test_negative_sessions(self) -> None: msg=f"Actual: {actual_metric}, Expected: {expected_metric}", ) - self.exponential_ndcg.update( + def test_negative_sessions_exp(self) -> None: + """ + Test sessions where all labels are 0, for exponential gain. + """ + model_output = get_test_case_all_labels_zero() + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=True, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_exp"] torch.testing.assert_close( @@ -336,14 +359,23 @@ def test_another_multiple_sessions(self) -> None: Test another multiple sessions in a single update. """ model_output = get_test_case_another_multiple_sessions_within_batch() - self.non_exponential_ndcg.update( + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.non_exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -356,14 +388,28 @@ def test_another_multiple_sessions(self) -> None: msg=f"Actual: {actual_metric}, Expected: {expected_metric}", ) - self.exponential_ndcg.update( + def test_another_multiple_sessions_exp(self) -> None: + """ + Test another multiple sessions in a single update, for exponential gain. + """ + model_output = get_test_case_another_multiple_sessions_within_batch() + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=True, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.exponential_ndcg.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_exp"] torch.testing.assert_close( @@ -381,14 +427,23 @@ def test_at_k(self) -> None: Test NDCG @ K. """ model_output = get_test_case_at_k() - self.ndcg_at_k.update( + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + k=2, + ) + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.ndcg_at_k.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -406,14 +461,24 @@ def test_remove_single_length_sessions(self) -> None: Test NDCG with removing single length sessions. """ model_output = get_test_case_remove_single_length_sessions() - self.ndcg_remove_single_length_sessions.update( + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + remove_single_length_sessions=True, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.ndcg_remove_single_length_sessions.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -431,15 +496,26 @@ def test_apply_negative_task_mask(self) -> None: Test NDCG with apply negative task mask. """ model_output = get_test_case_negative_task() - self.ndcg_apply_negative_task_mask.update( + TempTaskInfo = replace(DefaultTaskInfo, is_negative_task=True) + + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[TempTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.ndcg_apply_negative_task_mask.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( @@ -457,15 +533,27 @@ def test_case_report_as_increasing_ndcg_and_scale_by_weights_tensor(self) -> Non Test NDCG with reporting as increasing NDCG and scaling by weights tensor correctly. """ model_output = get_test_case_scale_by_weights_tensor() - self.ndcg_report_as_increasing_and_scale_by_weights_tensor.update( + metric = self.generate_metric( + world_size=WORLD_SIZE, + my_rank=0, + batch_size=BATCH_SIZE, + tasks=[DefaultTaskInfo], + exponential_gain=False, + session_key=SESSION_KEY, + remove_single_length_sessions=True, + scale_by_weights_tensor=True, + report_ndcg_as_decreasing_curve=False, + ) + + metric.update( predictions={DefaultTaskInfo.name: model_output["predictions"][0]}, labels={DefaultTaskInfo.name: model_output["labels"][0]}, weights={DefaultTaskInfo.name: model_output["weights"][0]}, required_inputs={SESSION_KEY: model_output["session_ids"][0]}, ) - metric = self.ndcg_report_as_increasing_and_scale_by_weights_tensor.compute() - actual_metric = metric[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] + output = metric.compute() + actual_metric = output[f"ndcg-{DefaultTaskInfo.name}|lifetime_ndcg"] expected_metric = model_output["expected_ndcg_non_exp"] torch.testing.assert_close( diff --git a/torchrec/metrics/tests/test_ne.py b/torchrec/metrics/tests/test_ne.py index 2bf94cdc9..bd1db6ab5 100644 --- a/torchrec/metrics/tests/test_ne.py +++ b/torchrec/metrics/tests/test_ne.py @@ -21,7 +21,9 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) @@ -157,32 +159,6 @@ def test_ne_update_fused(self) -> None: batch_window_size=10, ) - # TODO(stellaya): support the usage of fused_tasks_computation and - # fused_update for the same RecMetric - # rec_metric_value_test_launcher( - # target_clazz=NEMetric, - # target_compute_mode=RecComputeMode.FUSED_TASKS_COMPUTATION, - # test_clazz=TestNEMetric, - # task_names=["t1", "t2", "t3"], - # fused_update_limit=5, - # compute_on_all_ranks=False, - # should_validate_update=False, - # world_size=WORLD_SIZE, - # entry_point=self._test_ne, - # ) - - # rec_metric_value_test_launcher( - # target_clazz=NEMetric, - # target_compute_mode=RecComputeMode.FUSED_TASKS_COMPUTATION, - # test_clazz=TestNEMetric, - # task_names=["t1", "t2", "t3"], - # fused_update_limit=100, - # compute_on_all_ranks=False, - # should_validate_update=False, - # world_size=WORLD_SIZE, - # entry_point=self._test_ne_large_window_size, - # ) - def test_ne_zero_weights(self) -> None: rec_metric_value_test_launcher( target_clazz=NEMetric, @@ -258,3 +234,24 @@ def test_logloss_update_fused(self) -> None: entry_point=self._logloss_metric_test_helper, batch_window_size=10, ) + + +class NEGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = NEMetric + task_name: str = "ne" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=NEMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestNEMetric, + metric_name=NEGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_ne_positive.py b/torchrec/metrics/tests/test_ne_positive.py index 427d73124..d4487cae7 100644 --- a/torchrec/metrics/tests/test_ne_positive.py +++ b/torchrec/metrics/tests/test_ne_positive.py @@ -29,7 +29,8 @@ def generate_model_output() -> Dict[str, torch._tensor.Tensor]: class NEPositiveValueTest(unittest.TestCase): - r"""This set of tests verify the computation logic of AUC in several + """ + This set of tests verify the computation logic of AUC in several corner cases that we know the computation results. The goal is to provide some confidence of the correctness of the math formula. """ diff --git a/torchrec/metrics/tests/test_precision.py b/torchrec/metrics/tests/test_precision.py index e8141f5e0..991b7e864 100644 --- a/torchrec/metrics/tests/test_precision.py +++ b/torchrec/metrics/tests/test_precision.py @@ -17,8 +17,10 @@ from torchrec.metrics.rec_metric import RecComputeMode, RecMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, RecTaskInfo, + sync_test_helper, TestMetric, ) @@ -32,8 +34,8 @@ def _get_states( labels: torch.Tensor, predictions: torch.Tensor, weights: torch.Tensor ) -> Dict[str, torch.Tensor]: predictions = predictions.double() - true_pos_sum = torch.sum(weights * ((predictions >= 0.5) == labels)) - false_pos_sum = torch.sum(weights * ((predictions >= 0.5) == (1 - labels))) + true_pos_sum = torch.sum(weights * ((predictions >= 0.5) * labels)) + false_pos_sum = torch.sum(weights * ((predictions >= 0.5) * (1 - labels))) return { "true_pos_sum": true_pos_sum, "false_pos_sum": false_pos_sum, @@ -51,34 +53,33 @@ class PrecisionMetricTest(unittest.TestCase): target_clazz: Type[RecMetric] = PrecisionMetric task_name: str = "precision" - # Temporarily comment out fuse unit tests due to unknown failure (D56856649). - # def test_unfused_precision(self) -> None: - # rec_metric_value_test_launcher( - # target_clazz=PrecisionMetric, - # target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, - # test_clazz=TestPrecisionMetric, - # metric_name=PrecisionMetricTest.task_name, - # task_names=["t1", "t2", "t3"], - # fused_update_limit=0, - # compute_on_all_ranks=False, - # should_validate_update=False, - # world_size=WORLD_SIZE, - # entry_point=metric_test_helper, - # ) - - # def test_fused_precision(self) -> None: - # rec_metric_value_test_launcher( - # target_clazz=PrecisionMetric, - # target_compute_mode=RecComputeMode.FUSED_TASKS_COMPUTATION, - # test_clazz=TestPrecisionMetric, - # metric_name=PrecisionMetricTest.task_name, - # task_names=["t1", "t2", "t3"], - # fused_update_limit=0, - # compute_on_all_ranks=False, - # should_validate_update=False, - # world_size=WORLD_SIZE, - # entry_point=metric_test_helper, - # ) + def test_unfused_precision(self) -> None: + rec_metric_value_test_launcher( + target_clazz=PrecisionMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestPrecisionMetric, + metric_name=PrecisionMetricTest.task_name, + task_names=["t1", "t2", "t3"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=WORLD_SIZE, + entry_point=metric_test_helper, + ) + + def test_fused_precision(self) -> None: + rec_metric_value_test_launcher( + target_clazz=PrecisionMetric, + target_compute_mode=RecComputeMode.FUSED_TASKS_COMPUTATION, + test_clazz=TestPrecisionMetric, + metric_name=PrecisionMetricTest.task_name, + task_names=["t1", "t2", "t3"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=WORLD_SIZE, + entry_point=metric_test_helper, + ) class PrecisionMetricValueTest(unittest.TestCase): @@ -253,3 +254,24 @@ def test_precision(self) -> None: except AssertionError: print("Assertion error caught with data set ", inputs) raise + + +class PrecisionGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = PrecisionMetric + task_name: str = "precision" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=PrecisionMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestPrecisionMetric, + metric_name=PrecisionGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_recall.py b/torchrec/metrics/tests/test_recall.py index 3388406aa..4cfd599e1 100644 --- a/torchrec/metrics/tests/test_recall.py +++ b/torchrec/metrics/tests/test_recall.py @@ -17,8 +17,10 @@ from torchrec.metrics.recall import compute_recall, RecallMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, RecTaskInfo, + sync_test_helper, TestMetric, ) @@ -245,3 +247,24 @@ def test_recall(self) -> None: except AssertionError: print("Assertion error caught with data set ", inputs) raise + + +class RecallGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = RecallMetric + task_name: str = "recall" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=RecallMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestRecallMetric, + metric_name=RecallGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + ) diff --git a/torchrec/metrics/tests/test_segmented_ne.py b/torchrec/metrics/tests/test_segmented_ne.py index 6a0acc170..315995599 100644 --- a/torchrec/metrics/tests/test_segmented_ne.py +++ b/torchrec/metrics/tests/test_segmented_ne.py @@ -17,7 +17,8 @@ class SegementedNEValueTest(unittest.TestCase): - r"""This set of tests verify the computation logic of AUC in several + """ + This set of tests verify the computation logic of AUC in several corner cases that we know the computation results. The goal is to provide some confidence of the correctness of the math formula. """ diff --git a/torchrec/metrics/tests/test_serving_calibration.py b/torchrec/metrics/tests/test_serving_calibration.py index 87f9189ad..693b27778 100644 --- a/torchrec/metrics/tests/test_serving_calibration.py +++ b/torchrec/metrics/tests/test_serving_calibration.py @@ -11,10 +11,14 @@ from torchrec.metrics.serving_calibration import ServingCalibrationMetric from torchrec.metrics.test_utils import ( metric_test_helper, + rec_metric_gpu_sync_test_launcher, rec_metric_value_test_launcher, + sync_test_helper, TestMetric, ) +WORLD_SIZE = 4 + class TestServingCalibrationMetric(TestMetric): @staticmethod @@ -39,9 +43,6 @@ def _compute(states: Dict[str, torch.Tensor]) -> torch.Tensor: ).double() -WORLD_SIZE = 4 - - class ServingCalibrationMetricTest(unittest.TestCase): clazz: Type[RecMetric] = ServingCalibrationMetric task_name: str = "calibration" @@ -73,3 +74,24 @@ def test_fused_calibration(self) -> None: world_size=WORLD_SIZE, entry_point=metric_test_helper, ) + + +class ServingCalibrationGPUSyncTest(unittest.TestCase): + clazz: Type[RecMetric] = ServingCalibrationMetric + task_name: str = "calibration" + + def test_sync_ne(self) -> None: + rec_metric_gpu_sync_test_launcher( + target_clazz=ServingCalibrationMetric, + target_compute_mode=RecComputeMode.UNFUSED_TASKS_COMPUTATION, + test_clazz=TestServingCalibrationMetric, + metric_name=ServingCalibrationGPUSyncTest.task_name, + task_names=["t1"], + fused_update_limit=0, + compute_on_all_ranks=False, + should_validate_update=False, + world_size=2, + batch_size=5, + batch_window_size=20, + entry_point=sync_test_helper, + )