From 400b23685097ea8ddec894da720b03d20bd5aba9 Mon Sep 17 00:00:00 2001 From: Kartik Ganesh Date: Tue, 7 Nov 2023 10:03:41 -0800 Subject: [PATCH] [Fetch Migration] Handle idle pipeline when target doc count is never reached (#377) This commit introduces changes to track various Data Prepper metrics (as well as API failure counters) in order to detect an idle pipeline or non-responsive Data Prepper subprocess. Without these logic changes, the monitoring module would only shutdown the Data Prepper pipeline when the target doc count was reached. If this failed to occur for any reason, or if the Data Prepper API was unresponsive, the overall Fetch Migration workflow would never conclude. A new ProgressMetrics class has been added to track all metrics and encapsulate detection logic. Much of the migration-success logic from the monitoring module has been moved to this class. Unit test updates and improvements are also included. This PR also refactors/merges the run and monitor_local functions together (since most of their code/logic is identical) for improved unit test coverage. --------- Signed-off-by: Kartik Ganesh --- FetchMigration/python/fetch_orchestrator.py | 2 +- FetchMigration/python/metadata_migration.py | 2 +- FetchMigration/python/migration_monitor.py | 134 ++++++-------- .../python/migration_monitor_params.py | 1 + FetchMigration/python/progress_metrics.py | 134 ++++++++++++++ .../python/tests/test_fetch_orchestrator.py | 4 +- .../python/tests/test_migration_monitor.py | 167 +++++++++++------- .../python/tests/test_progress_metrics.py | 103 +++++++++++ 8 files changed, 405 insertions(+), 142 deletions(-) create mode 100644 FetchMigration/python/progress_metrics.py create mode 100644 FetchMigration/python/tests/test_progress_metrics.py diff --git a/FetchMigration/python/fetch_orchestrator.py b/FetchMigration/python/fetch_orchestrator.py index 22b7d516d..d51f6a58c 100644 --- a/FetchMigration/python/fetch_orchestrator.py +++ b/FetchMigration/python/fetch_orchestrator.py @@ -28,7 +28,7 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in # Run the migration monitor next migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint) logging.info("Starting migration monitor...\n") - return migration_monitor.monitor_local(migration_monitor_params, proc) + return migration_monitor.run(migration_monitor_params, proc) if __name__ == '__main__': # pragma no cover diff --git a/FetchMigration/python/metadata_migration.py b/FetchMigration/python/metadata_migration.py index 60775b821..f7fc9ec76 100644 --- a/FetchMigration/python/metadata_migration.py +++ b/FetchMigration/python/metadata_migration.py @@ -183,7 +183,7 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult: # Write output YAML if len(args.output_file) > 0: write_output(dp_config, indices_to_create, args.output_file) - if args.report: + if args.report: # pragma no cover print("Wrote output YAML pipeline to: " + args.output_file) if not args.dryrun: index_data = dict() diff --git a/FetchMigration/python/migration_monitor.py b/FetchMigration/python/migration_monitor.py index ffe345f11..5a0150cb4 100644 --- a/FetchMigration/python/migration_monitor.py +++ b/FetchMigration/python/migration_monitor.py @@ -1,6 +1,5 @@ import argparse import logging -import math import subprocess import time from subprocess import Popen @@ -12,14 +11,20 @@ from endpoint_info import EndpointInfo from migration_monitor_params import MigrationMonitorParams +from progress_metrics import ProgressMetrics # Path to the Data Prepper Prometheus metrics API endpoint # Used to monitor the progress of the migration -__METRICS_API_PATH = "/metrics/prometheus" -__SHUTDOWN_API_PATH = "/shutdown" -__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess" -__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight" -__NO_PARTITIONS_METRIC = "_noPartitionsAcquired" +__METRICS_API_PATH: str = "/metrics/prometheus" +__SHUTDOWN_API_PATH: str = "/shutdown" +__DOC_SUCCESS_METRIC: str = "_opensearch_documentsSuccess" +__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight" +__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired" +__IDLE_THRESHOLD: int = 5 + + +def is_process_alive(proc: Popen) -> bool: + return proc.returncode is None # Gracefully shutdown a subprocess @@ -29,7 +34,7 @@ def shutdown_process(proc: Popen) -> Optional[int]: try: proc.wait(timeout=60) except subprocess.TimeoutExpired: - if proc.returncode is None: + if is_process_alive(proc): # Failed to terminate, send SIGKILL proc.kill() return proc.returncode @@ -60,106 +65,79 @@ def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int] return None -def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_partition_count: Optional[int], - prev_no_partition: int, target: int) -> bool: - # Check for target doc_count - # TODO Add a check for partitionsCompleted = indices - if doc_count is not None and doc_count >= target: - # Check for idle pipeline - logging.info("Target doc count reached, checking for idle pipeline...") - # Debug metrics - if logging.getLogger().isEnabledFor(logging.DEBUG): # pragma no cover - debug_msg_template: str = "Idle pipeline metrics - " + \ - "Records in flight: [{0}], " + \ - "No-partitions counter: [{1}]" + \ - "Previous no-partition value: [{2}]" - logging.debug(debug_msg_template.format(in_flight, no_partition_count, prev_no_partition)) - - if in_flight is not None and in_flight == 0: - # No-partitions metrics should steadily tick up - if no_partition_count is not None and no_partition_count > prev_no_partition > 0: - return True - return False - - -def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \ - tuple[bool, int]: - terminal: bool = False +def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetrics) -> ProgressMetrics: # If the API call fails, the response is empty metrics = fetch_prometheus_metrics(endpoint_info) if metrics is not None: + # Reset API failure counter + progress.reset_metric_api_failure() success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC) - rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC) - no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC) - if success_docs is not None: # pragma no cover - completion_percentage: int = math.floor((success_docs * 100) / target_doc_count) + progress.update_records_in_flight_count(get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)) + progress.update_no_partitions_count(get_metric_value(metrics, __NO_PARTITIONS_METRIC)) + if success_docs is not None: + completion_percentage = progress.update_success_doc_count(success_docs) progress_message: str = "Completed " + str(success_docs) + \ " docs ( " + str(completion_percentage) + "% )" logging.info(progress_message) + if progress.all_docs_migrated(): + logging.info("All documents migrated...") else: + progress.record_success_doc_value_failure() logging.warning("Could not fetch progress stats from Data Prepper response, " + "will retry on next polling cycle...") - terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count, - target_doc_count) - if not terminal: - # Save no_partitions_count - prev_no_partitions_count = no_partitions_count else: + progress.record_metric_api_failure() logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...") - # TODO - Handle idle non-terminal pipeline - return terminal, prev_no_partitions_count + return progress + + +def __should_continue_monitoring(progress: ProgressMetrics, proc: Optional[Popen] = None) -> bool: + return not progress.is_in_terminal_state() and (proc is None or is_process_alive(proc)) -def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]: +# The "dp_process" parameter is optional, and signifies a local Data Prepper process +def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int: endpoint_info = EndpointInfo(args.data_prepper_endpoint) - target_doc_count: int = args.target_count - # Counter to track the no_partition_count metric - no_partition_count: int = 0 - is_migration_complete = False - logging.info("Starting migration monitor until target doc count: " + str(target_doc_count)) - # Sets returncode. A value of None means the subprocess has not yet terminated - dp_process.poll() - while dp_process.returncode is None and not is_migration_complete: - try: - dp_process.wait(timeout=poll_interval_seconds) - except subprocess.TimeoutExpired: - pass - if dp_process.returncode is None: - is_migration_complete, no_partition_count = check_and_log_progress( - endpoint_info, target_doc_count, no_partition_count) + progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD) + logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count)) + while __should_continue_monitoring(progress_metrics, dp_process): + if dp_process is not None: + # Wait on local process + try: + dp_process.wait(timeout=poll_interval_seconds) + except subprocess.TimeoutExpired: + pass + else: + # Thread sleep + time.sleep(poll_interval_seconds) + if dp_process is None or is_process_alive(dp_process): + progress_metrics = check_and_log_progress(endpoint_info, progress_metrics) # Loop terminated - if not is_migration_complete: + if not progress_metrics.is_in_terminal_state(): + # This will only happen for a local Data Prepper process logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode)) # TODO - Implement rollback logging.error("Please delete any partially migrated indices before retrying the migration.") return dp_process.returncode else: + if progress_metrics.is_migration_complete_success(): + logging.info("Migration monitor observed successful migration, shutting down...\n") + elif progress_metrics.is_migration_idle(): + logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...") + elif progress_metrics.is_too_may_api_failures(): + logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") # Shut down Data Prepper pipeline via API - logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n") shutdown_pipeline(endpoint_info) - if dp_process.returncode is None: + if dp_process is None: + # No local process + return 0 + elif is_process_alive(dp_process): # Workaround for https://github.com/opensearch-project/data-prepper/issues/3141 return shutdown_process(dp_process) else: return dp_process.returncode -def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None: - endpoint_info = EndpointInfo(args.data_prepper_endpoint) - target_doc_count: int = args.target_count - # Counter to track the no_partition_count metric - no_partition_count: int = 0 - is_migration_complete = False - logging.info("Starting migration monitor until target doc count: " + str(target_doc_count)) - while not is_migration_complete: - time.sleep(poll_interval_seconds) - is_migration_complete, no_partition_count = check_and_log_progress( - endpoint_info, target_doc_count, no_partition_count) - # Loop terminated, shut down the Data Prepper pipeline - logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n") - shutdown_pipeline(endpoint_info) - - if __name__ == '__main__': # pragma no cover # Set up parsing for command line arguments arg_parser = argparse.ArgumentParser( diff --git a/FetchMigration/python/migration_monitor_params.py b/FetchMigration/python/migration_monitor_params.py index e04599a98..5476f23cf 100644 --- a/FetchMigration/python/migration_monitor_params.py +++ b/FetchMigration/python/migration_monitor_params.py @@ -5,3 +5,4 @@ class MigrationMonitorParams: target_count: int data_prepper_endpoint: str + is_local_process: bool = False diff --git a/FetchMigration/python/progress_metrics.py b/FetchMigration/python/progress_metrics.py new file mode 100644 index 000000000..f3d08cf9c --- /dev/null +++ b/FetchMigration/python/progress_metrics.py @@ -0,0 +1,134 @@ +import logging +import math +from typing import Optional + + +# Class that tracks metrics on the health and progress of the migration.Specific metric values from the Data Prepper +# metrics API endpoint are retrieved and stored, as well as idle-value tracking via counters that may indicate an +# idle pipeline. Counters are also used to keep track of API failures or missing metric values. +class ProgressMetrics: + + # Private constants + __IDLE_VALUE_PREFIX: str = "idle-value-" + _METRIC_API_FAIL_KEY: str = "metric_api_fail" + _SUCCESS_DOCS_KEY = "success_docs" + _REC_IN_FLIGHT_KEY = "records_in_flight" + _NO_PART_KEY = "no_partitions" + + target_doc_count: int + idle_threshold: int + current_values_map: dict[str, Optional[int]] + prev_values_map: dict[str, Optional[int]] + counter_map: dict[str, int] + + def __init__(self, doc_count, idle_threshold): + self.target_doc_count = doc_count + self.idle_threshold = idle_threshold + self.current_values_map = dict() + self.prev_values_map = dict() + self.counter_map = dict() + + def __reset_counter(self, key: str): + if key in self.counter_map: + del self.counter_map[key] + + def __increment_counter(self, key: str): + val = self.counter_map.get(key, 0) + self.counter_map[key] = val + 1 + + def __get_idle_value_key_name(self, key: str) -> str: + return self.__IDLE_VALUE_PREFIX + key + + def __get_idle_value_count(self, key: str) -> Optional[int]: + idle_value_key = self.__get_idle_value_key_name(key) + return self.counter_map.get(idle_value_key) + + def __record_value(self, key: str, val: Optional[int]): + if key in self.current_values_map: + # Move current value to previous + self.prev_values_map[key] = self.current_values_map[key] + # Track idle value metrics + idle_value_key = self.__get_idle_value_key_name(key) + if self.prev_values_map[key] == val: + self.__increment_counter(idle_value_key) + else: + self.__reset_counter(idle_value_key) + # Store new value + self.current_values_map[key] = val + + def __get_current_value(self, key: str) -> Optional[int]: + return self.current_values_map.get(key) + + def reset_metric_api_failure(self): + self.__reset_counter(self._METRIC_API_FAIL_KEY) + + def record_metric_api_failure(self): + self.__increment_counter(self._METRIC_API_FAIL_KEY) + + def __reset_success_doc_value_failure(self): + self.__reset_counter(self._SUCCESS_DOCS_KEY) + # Also reset API falure counter + self.reset_metric_api_failure() + + def record_success_doc_value_failure(self): + self.__record_value(self._SUCCESS_DOCS_KEY, None) + + def update_success_doc_count(self, doc_count: int) -> int: + self.__reset_success_doc_value_failure() + self.__record_value(self._SUCCESS_DOCS_KEY, doc_count) + return self.get_doc_completion_percentage() + + def update_records_in_flight_count(self, rec_in_flight: Optional[int]): + self.__record_value(self._REC_IN_FLIGHT_KEY, rec_in_flight) + + def update_no_partitions_count(self, no_part_count: Optional[int]): + self.__record_value(self._NO_PART_KEY, no_part_count) + + def get_doc_completion_percentage(self) -> int: + success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) + if success_doc_count is None: + success_doc_count = 0 + return math.floor((success_doc_count * 100) / self.target_doc_count) + + def all_docs_migrated(self) -> bool: + # TODO Add a check for partitionsCompleted = indices + success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) + if success_doc_count is None: + success_doc_count = 0 + return success_doc_count >= self.target_doc_count + + def is_migration_complete_success(self) -> bool: + is_idle_pipeline: bool = False + rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY) + no_partitions_count = self.__get_current_value(self._NO_PART_KEY) + prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0) + # Check for no records in flight + if rec_in_flight is not None and rec_in_flight == 0: + # No-partitions metrics should steadily tick up + if no_partitions_count is not None and no_partitions_count > prev_no_partitions_count > 0: + is_idle_pipeline = True + return is_idle_pipeline and self.all_docs_migrated() + + def is_migration_idle(self) -> bool: + keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY] + for key in keys_to_check: + val = self.__get_idle_value_count(key) + if val is not None and val >= self.idle_threshold: + logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " + + str(self.idle_threshold)) + return True + # End of loop + return False + + def is_too_may_api_failures(self) -> bool: + return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold + + def is_in_terminal_state(self) -> bool: + return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures() + + def log_idle_pipeline_debug_metrics(self): # pragma no cover + if logging.getLogger().isEnabledFor(logging.DEBUG): + logging.debug("Idle pipeline metrics - " + + f"Records in flight: [{self.__get_current_value(self._REC_IN_FLIGHT_KEY)}], " + + f"No-partitions counter: [{self.__get_current_value(self._NO_PART_KEY)}]" + + f"Previous no-partition value: [{self.prev_values_map.get(self._NO_PART_KEY)}]") diff --git a/FetchMigration/python/tests/test_fetch_orchestrator.py b/FetchMigration/python/tests/test_fetch_orchestrator.py index 25f004129..92f8baf87 100644 --- a/FetchMigration/python/tests/test_fetch_orchestrator.py +++ b/FetchMigration/python/tests/test_fetch_orchestrator.py @@ -9,7 +9,7 @@ class TestFetchOrchestrator(unittest.TestCase): - @patch('migration_monitor.monitor_local') + @patch('migration_monitor.run') @patch('subprocess.Popen') @patch('metadata_migration.run') # Note that mock objects are passed bottom-up from the patch order above @@ -33,7 +33,7 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc mock_subprocess.assert_called_once_with(expected_dp_runnable) mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value) - @patch('migration_monitor.monitor_local') + @patch('migration_monitor.run') @patch('subprocess.Popen') @patch('metadata_migration.run') # Note that mock objects are passed bottom-up from the patch order above diff --git a/FetchMigration/python/tests/test_migration_monitor.py b/FetchMigration/python/tests/test_migration_monitor.py index c6a45f886..edfb1fe91 100644 --- a/FetchMigration/python/tests/test_migration_monitor.py +++ b/FetchMigration/python/tests/test_migration_monitor.py @@ -1,7 +1,7 @@ import logging import subprocess import unittest -from unittest.mock import patch, MagicMock, PropertyMock +from unittest.mock import patch, MagicMock, PropertyMock, ANY import requests import responses @@ -31,7 +31,7 @@ def tearDown(self) -> None: logging.disable(logging.NOTSET) @patch('requests.post') - def test_shutdown(self, mock_post: MagicMock): + def test_shutdown_pipeline(self, mock_post: MagicMock): expected_shutdown_url = TEST_ENDPOINT + "/shutdown" test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG) migration_monitor.shutdown_pipeline(test_endpoint) @@ -78,101 +78,148 @@ def test_get_metric_value(self): self.assertEqual(expected_val, val) # No matching metric returns None val = migration_monitor.get_metric_value(test_input, "invalid") - self.assertEqual(None, val) + self.assertIsNone(val) + + @patch('migration_monitor.fetch_prometheus_metrics') + # Note that mock objects are passed bottom-up from the patch order above + def test_check_progress_metrics_failure(self, mock_fetch: MagicMock): + mock_progress = MagicMock() + # On API failure, check returns None + mock_fetch.return_value = None + # Endpoint info doesn't matter + return_value = migration_monitor.check_and_log_progress(MagicMock(), mock_progress) + # Same progress object is returned + self.assertEqual(mock_progress, return_value) + # API metric failure is recorded + mock_progress.record_metric_api_failure.assert_called_once() - @patch('migration_monitor.shutdown_pipeline') - @patch('time.sleep') - @patch('migration_monitor.check_if_complete') @patch('migration_monitor.get_metric_value') @patch('migration_monitor.fetch_prometheus_metrics') + def test_check_progress_missing_success_docs_metric(self, mock_fetch: MagicMock, mock_get_metric: MagicMock): + # Fetch return value is not None, but get-metric returns None + mock_fetch.return_value = MagicMock() + mock_get_metric.return_value = None + mock_progress = MagicMock() + # Endpoint info doesn't matter + return_value = migration_monitor.check_and_log_progress(MagicMock(), mock_progress) + # Same progress object is returned + self.assertEqual(mock_progress, return_value) + # API failure metric is reset is recorded + mock_progress.reset_metric_api_failure.assert_called_once() + # 3 metric values are read + self.assertEqual(3, mock_get_metric.call_count) + # Success doc failure metric is recorded + mock_progress.record_success_doc_value_failure.assert_called_once() + + @patch('migration_monitor.get_metric_value') + @patch('migration_monitor.fetch_prometheus_metrics') + def test_check_and_log_progress(self, mock_fetch: MagicMock, mock_get_metric: MagicMock): + # Fetch return value is not None + mock_fetch.return_value = MagicMock() + # Get metric return value is not None + expected_value: int = 10 + mock_get_metric.return_value = expected_value + mock_progress = MagicMock() + # Set up target-doc-count + mock_progress.target_doc_count.return_value = expected_value + # Endpoint info doesn't matter + return_value = migration_monitor.check_and_log_progress(MagicMock(), mock_progress) + # Same progress object is returned + self.assertEqual(mock_progress, return_value) + # 3 metric values are read + self.assertEqual(3, mock_get_metric.call_count) + # Success doc count is updated as expected + mock_progress.update_success_doc_count.assert_called_once_with(expected_value) + # All-docs-migrated check is invoked + mock_progress.all_docs_migrated.assert_called_once() + + @patch('migration_monitor.shutdown_process') + @patch('migration_monitor.shutdown_pipeline') + @patch('time.sleep') + @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock, - mock_shut: MagicMock): + def test_monitor_non_local(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut_dp: MagicMock, + mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - mock_get.return_value = None - # Check will first fail, then pass - mock_check.side_effect = [False, True] + mock_progress = MagicMock() + mock_progress.is_in_terminal_state.return_value = True + mock_check.return_value = mock_progress # Run test method wait_time = 3 - migration_monitor.run(test_input, wait_time) + migration_monitor.run(test_input, None, wait_time) # Test that fetch was called with the expected EndpointInfo expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) - self.assertEqual(2, mock_fetch.call_count) - mock_fetch.assert_called_with(expected_endpoint_info) mock_sleep.assert_called_with(wait_time) - mock_shut.assert_called_once_with(expected_endpoint_info) + mock_shut_dp.assert_called_once_with(expected_endpoint_info) + mock_shut_proc.assert_not_called() + @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') - @patch('time.sleep') - @patch('migration_monitor.check_if_complete') - @patch('migration_monitor.get_metric_value') - @patch('migration_monitor.fetch_prometheus_metrics') # Note that mock objects are passed bottom-up from the patch order above - def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, - mock_sleep: MagicMock, mock_shut: MagicMock): + def test_monitor_local_process_exit(self, mock_shut_dp: MagicMock, mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - mock_get.return_value = None - mock_check.return_value = True - # Fetch call will first fail, then succeed - mock_fetch.side_effect = [None, MagicMock()] + mock_subprocess = MagicMock() + # Simulate an exited subprocess + expected_return_code: int = 1 + mock_subprocess.returncode = expected_return_code # Run test method - wait_time = 3 - migration_monitor.run(test_input, wait_time) - # Test that fetch was called with the expected EndpointInfo - expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) - self.assertEqual(2, mock_fetch.call_count) - mock_fetch.assert_called_with(expected_endpoint_info) - # We expect one wait cycle - mock_sleep.assert_called_with(wait_time) - mock_shut.assert_called_once_with(expected_endpoint_info) - - def test_check_if_complete(self): - # If any of the optional values are missing, we are not complete - self.assertFalse(migration_monitor.check_if_complete(None, 0, 1, 0, 2)) - self.assertFalse(migration_monitor.check_if_complete(2, None, 1, 0, 2)) - self.assertFalse(migration_monitor.check_if_complete(2, 0, None, 0, 2)) - # Target count not reached - self.assertFalse(migration_monitor.check_if_complete(1, None, None, 0, 2)) - # Target count reached, but has records in flight - self.assertFalse(migration_monitor.check_if_complete(2, 1, None, 0, 2)) - # Target count reached, no records in flight, but no prev no_part_count - self.assertFalse(migration_monitor.check_if_complete(2, 0, 1, 0, 2)) - # Terminal state - self.assertTrue(migration_monitor.check_if_complete(2, 0, 2, 1, 2)) + return_code = migration_monitor.run(test_input, mock_subprocess) + self.assertEqual(expected_return_code, return_code) + mock_shut_dp.assert_not_called() + mock_shut_proc.assert_not_called() @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') + @patch('migration_monitor.is_process_alive') + @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_migration_process_exit(self, mock_shut_dp: MagicMock, mock_shut_proc: MagicMock): + def test_monitor_local_migration_complete(self, mock_check: MagicMock, mock_is_alive: MagicMock, + mock_shut_dp: MagicMock, mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") + # Simulate a successful migration + mock_progress = MagicMock() + mock_progress.is_in_terminal_state.side_effect = [False, True] + mock_progress.is_migration_complete_success.return_value = True + mock_check.return_value = mock_progress + # Sequence of expected return values for a process that terminates successfully + mock_is_alive.side_effect = [True, True, False, False] mock_subprocess = MagicMock() - # set subprocess returncode to None to simulate a zombie Data Prepper process - mock_subprocess.returncode = 1 + expected_return_code: int = 0 + mock_subprocess.returncode = expected_return_code + # Simulate timeout on wait + mock_subprocess.wait.side_effect = [subprocess.TimeoutExpired("test", 1)] # Run test method - wait_time = 3 - return_code = migration_monitor.monitor_local(test_input, mock_subprocess, wait_time) - self.assertEqual(1, return_code) - mock_shut_dp.assert_not_called() + actual_return_code = migration_monitor.run(test_input, mock_subprocess) + self.assertEqual(expected_return_code, actual_return_code) + expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) + mock_check.assert_called_once_with(expected_endpoint_info, ANY) + mock_shut_dp.assert_called_once_with(expected_endpoint_info) mock_shut_proc.assert_not_called() @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_process_shutdown_invocation(self, mock_check: MagicMock, mock_shut_dp: MagicMock, - mock_shut_proc: MagicMock): + def test_monitor_local_shutdown_process(self, mock_check: MagicMock, mock_shut_dp: MagicMock, + mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - mock_check.side_effect = [(False, 1), (True, 2)] + # Simulate a progressing, successful migration + mock_progress = MagicMock() + mock_progress.is_in_terminal_state.side_effect = [False, True, True] + mock_check.return_value = mock_progress mock_subprocess = MagicMock() # set subprocess returncode to None to simulate a zombie Data Prepper process mock_subprocess.returncode = None + # Shtudown-process call return code + expected_return_code: int = 137 + mock_shut_proc.return_value = 137 # Run test method - wait_time = 3 - migration_monitor.monitor_local(test_input, mock_subprocess, wait_time) + actual_return_code = migration_monitor.run(test_input, mock_subprocess) + self.assertEqual(expected_return_code, actual_return_code) expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) mock_shut_dp.assert_called_once_with(expected_endpoint_info) mock_shut_proc.assert_called_once_with(mock_subprocess) diff --git a/FetchMigration/python/tests/test_progress_metrics.py b/FetchMigration/python/tests/test_progress_metrics.py new file mode 100644 index 000000000..900dbeb9a --- /dev/null +++ b/FetchMigration/python/tests/test_progress_metrics.py @@ -0,0 +1,103 @@ +import logging +import unittest + +from progress_metrics import ProgressMetrics + + +class TestProgressMetrics(unittest.TestCase): + def setUp(self) -> None: + logging.disable(logging.CRITICAL) + + def tearDown(self) -> None: + logging.disable(logging.NOTSET) + + def test_doc_completion_percentage(self): + metrics = ProgressMetrics(9, 0) + # Base case + self.assertEqual(0, metrics.get_doc_completion_percentage()) + # Completion percentage should round down + metrics.update_success_doc_count(3) + self.assertEqual(33, metrics.get_doc_completion_percentage()) + metrics.update_success_doc_count(9) + self.assertEqual(100, metrics.get_doc_completion_percentage()) + + def test_all_docs_migrated(self): + metrics = ProgressMetrics(9, 0) + # Base case + self.assertFalse(metrics.all_docs_migrated()) + metrics.update_success_doc_count(3) + self.assertFalse(metrics.all_docs_migrated()) + # Return value is true when >= target + metrics.update_success_doc_count(9) + self.assertTrue(metrics.all_docs_migrated()) + metrics.update_success_doc_count(10) + self.assertTrue(metrics.all_docs_migrated()) + + def test_is_migration_complete_success(self): + metrics = ProgressMetrics(9, 0) + # Base case + self.assertFalse(metrics.is_migration_complete_success()) + # Update success docs + metrics.update_success_doc_count(9) + self.assertFalse(metrics.is_migration_complete_success()) + # Non-zero records in flight + metrics.update_records_in_flight_count(1) + self.assertFalse(metrics.is_migration_complete_success()) + # Zero records in flight, but no recorded partition count + metrics.update_records_in_flight_count(0) + self.assertFalse(metrics.is_migration_complete_success()) + # Record partition count, but no previous count + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_migration_complete_success()) + # Update partition count, but it matches previous value + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_migration_complete_success()) + # Update partition count to meet idle pipeline criteria + metrics.update_no_partitions_count(2) + self.assertTrue(metrics.is_migration_complete_success()) + + def test_is_migration_idle(self): + metrics = ProgressMetrics(9, 1) + # Base case + self.assertFalse(metrics.is_migration_idle()) + # Update success docs + metrics.update_success_doc_count(3) + self.assertFalse(metrics.is_migration_idle()) + # Update partition count + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_migration_idle()) + # Update success docs to same value, which reaches threshold + metrics.update_success_doc_count(3) + self.assertTrue(metrics.is_migration_idle()) + + def test_is_too_may_api_failures(self): + metrics = ProgressMetrics(9, 1) + # Base case + self.assertFalse(metrics.is_too_may_api_failures()) + # Metric value failure does not count towards API failure + metrics.record_success_doc_value_failure() + self.assertFalse(metrics.is_too_may_api_failures()) + metrics.record_metric_api_failure() + self.assertTrue(metrics.is_too_may_api_failures()) + + def test_is_in_terminal_state(self): + metrics = ProgressMetrics(9, 1) + metrics.update_success_doc_count(1) + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_in_terminal_state()) + # Too many API failures + metrics.record_metric_api_failure() + self.assertTrue(metrics.is_in_terminal_state()) + metrics.reset_metric_api_failure() + # Idle pipeline + metrics.update_no_partitions_count(1) + self.assertTrue(metrics.is_in_terminal_state()) + metrics.update_no_partitions_count(2) + # Migration complete + metrics.update_success_doc_count(10) + metrics.update_records_in_flight_count(0) + self.assertTrue(metrics.is_in_terminal_state()) + + +if __name__ == '__main__': + unittest.main()