diff --git a/FetchMigration/python/metadata_migration.py b/FetchMigration/python/metadata_migration.py index 60775b8217..f7fc9ec768 100644 --- a/FetchMigration/python/metadata_migration.py +++ b/FetchMigration/python/metadata_migration.py @@ -183,7 +183,7 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult: # Write output YAML if len(args.output_file) > 0: write_output(dp_config, indices_to_create, args.output_file) - if args.report: + if args.report: # pragma no cover print("Wrote output YAML pipeline to: " + args.output_file) if not args.dryrun: index_data = dict() diff --git a/FetchMigration/python/migration_monitor.py b/FetchMigration/python/migration_monitor.py index ffe345f112..9dd7719773 100644 --- a/FetchMigration/python/migration_monitor.py +++ b/FetchMigration/python/migration_monitor.py @@ -1,6 +1,5 @@ import argparse import logging -import math import subprocess import time from subprocess import Popen @@ -12,14 +11,20 @@ from endpoint_info import EndpointInfo from migration_monitor_params import MigrationMonitorParams +from progress_metrics import ProgressMetrics # Path to the Data Prepper Prometheus metrics API endpoint # Used to monitor the progress of the migration -__METRICS_API_PATH = "/metrics/prometheus" -__SHUTDOWN_API_PATH = "/shutdown" -__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess" -__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight" -__NO_PARTITIONS_METRIC = "_noPartitionsAcquired" +__METRICS_API_PATH: str = "/metrics/prometheus" +__SHUTDOWN_API_PATH: str = "/shutdown" +__DOC_SUCCESS_METRIC: str = "_opensearch_documentsSuccess" +__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight" +__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired" +__IDLE_THRESHOLD: int = 5 + + +def is_process_alive(proc: Popen) -> bool: + return proc.returncode is None # Gracefully shutdown a subprocess @@ -29,7 +34,7 @@ def shutdown_process(proc: Popen) -> Optional[int]: try: proc.wait(timeout=60) except subprocess.TimeoutExpired: - if proc.returncode is None: + if is_process_alive(proc): # Failed to terminate, send SIGKILL proc.kill() return proc.returncode @@ -60,84 +65,63 @@ def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int] return None -def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_partition_count: Optional[int], - prev_no_partition: int, target: int) -> bool: - # Check for target doc_count - # TODO Add a check for partitionsCompleted = indices - if doc_count is not None and doc_count >= target: - # Check for idle pipeline - logging.info("Target doc count reached, checking for idle pipeline...") - # Debug metrics - if logging.getLogger().isEnabledFor(logging.DEBUG): # pragma no cover - debug_msg_template: str = "Idle pipeline metrics - " + \ - "Records in flight: [{0}], " + \ - "No-partitions counter: [{1}]" + \ - "Previous no-partition value: [{2}]" - logging.debug(debug_msg_template.format(in_flight, no_partition_count, prev_no_partition)) - - if in_flight is not None and in_flight == 0: - # No-partitions metrics should steadily tick up - if no_partition_count is not None and no_partition_count > prev_no_partition > 0: - return True - return False - - -def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \ - tuple[bool, int]: - terminal: bool = False +def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetrics) -> ProgressMetrics: # If the API call fails, the response is empty metrics = fetch_prometheus_metrics(endpoint_info) if metrics is not None: + # Reset API failure counter + progress.reset_metric_api_failure() success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC) - rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC) - no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC) - if success_docs is not None: # pragma no cover - completion_percentage: int = math.floor((success_docs * 100) / target_doc_count) + progress.update_records_in_flight_count(get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)) + progress.update_no_partitions_count(get_metric_value(metrics, __NO_PARTITIONS_METRIC)) + if success_docs is not None: + completion_percentage = progress.update_success_doc_count(success_docs) progress_message: str = "Completed " + str(success_docs) + \ " docs ( " + str(completion_percentage) + "% )" logging.info(progress_message) + if progress.all_docs_migrated(): + logging.info("All documents migrated...") else: + progress.record_success_doc_value_failure() logging.warning("Could not fetch progress stats from Data Prepper response, " + "will retry on next polling cycle...") - terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count, - target_doc_count) - if not terminal: - # Save no_partitions_count - prev_no_partitions_count = no_partitions_count else: + progress.record_metric_api_failure() logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...") - # TODO - Handle idle non-terminal pipeline - return terminal, prev_no_partitions_count + return progress def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]: endpoint_info = EndpointInfo(args.data_prepper_endpoint) - target_doc_count: int = args.target_count - # Counter to track the no_partition_count metric - no_partition_count: int = 0 - is_migration_complete = False - logging.info("Starting migration monitor until target doc count: " + str(target_doc_count)) + progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD) + logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count)) # Sets returncode. A value of None means the subprocess has not yet terminated dp_process.poll() - while dp_process.returncode is None and not is_migration_complete: + while is_process_alive(dp_process) and not progress_metrics.is_in_terminal_state(): try: dp_process.wait(timeout=poll_interval_seconds) except subprocess.TimeoutExpired: pass - if dp_process.returncode is None: - is_migration_complete, no_partition_count = check_and_log_progress( - endpoint_info, target_doc_count, no_partition_count) - # Loop terminated - if not is_migration_complete: + if is_process_alive(dp_process): + progress_metrics = check_and_log_progress(endpoint_info, progress_metrics) + # Log debug metrics + progress_metrics.log_idle_pipeline_debug_metrics() + # Loop terminated, shut down the Data Prepper pipeline + if not progress_metrics.is_in_terminal_state(): logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode)) # TODO - Implement rollback logging.error("Please delete any partially migrated indices before retrying the migration.") return dp_process.returncode else: + if progress_metrics.is_migration_complete_success(): + logging.info("Migration monitor observed successful migration, shutting down...\n") + elif progress_metrics.is_migration_idle(): + logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...") + elif progress_metrics.is_too_may_api_failures(): + logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") # Shut down Data Prepper pipeline via API - logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n") shutdown_pipeline(endpoint_info) - if dp_process.returncode is None: + if is_process_alive(dp_process): # Workaround for https://github.com/opensearch-project/data-prepper/issues/3141 return shutdown_process(dp_process) else: @@ -146,17 +130,19 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None: endpoint_info = EndpointInfo(args.data_prepper_endpoint) - target_doc_count: int = args.target_count - # Counter to track the no_partition_count metric - no_partition_count: int = 0 - is_migration_complete = False - logging.info("Starting migration monitor until target doc count: " + str(target_doc_count)) - while not is_migration_complete: + progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD) + logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count)) + while not progress_metrics.is_in_terminal_state(): time.sleep(poll_interval_seconds) - is_migration_complete, no_partition_count = check_and_log_progress( - endpoint_info, target_doc_count, no_partition_count) + progress_metrics = check_and_log_progress(endpoint_info, progress_metrics) # Loop terminated, shut down the Data Prepper pipeline - logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n") + if progress_metrics.is_migration_complete_success(): + logging.info("Migration monitor observed successful migration, shutting down...\n") + elif progress_metrics.is_migration_idle(): + logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...") + elif progress_metrics.is_too_may_api_failures(): + logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") + # Shut down Data Prepper pipeline via API shutdown_pipeline(endpoint_info) diff --git a/FetchMigration/python/progress_metrics.py b/FetchMigration/python/progress_metrics.py new file mode 100644 index 0000000000..4c271caf95 --- /dev/null +++ b/FetchMigration/python/progress_metrics.py @@ -0,0 +1,138 @@ +import logging +import math +from typing import Optional + + +# Class that tracks metrics on the health and progress of the migration.Specific metric values from the Data Prepper +# metrics API endpoint are retrieved and stored, as well as idle-value tracking via counters that may indicate an +# idle pipeline. Counters are also used to keep track of API failures or missing metric values. +class ProgressMetrics: + + # Private constants + __IDLE_VALUE_PREFIX: str = "idle-value-" + _METRIC_API_FAIL_KEY: str = "metric_api_fail" + _SUCCESS_DOCS_KEY = "success_docs" + _REC_IN_FLIGHT_KEY = "records_in_flight" + _NO_PART_KEY = "no_partitions" + + target_doc_count: int + idle_threshold: int + current_values_map: dict[str, Optional[int]] + prev_values_map: dict[str, Optional[int]] + counter_map: dict[str, int] + + def __init__(self, doc_count, idle_threshold): + self.target_doc_count = doc_count + self.idle_threshold = idle_threshold + self.current_values_map = dict() + self.prev_values_map = dict() + self.counter_map = dict() + + def __reset_counter(self, key: str): + if key in self.counter_map: + del self.counter_map[key] + + def __increment_counter(self, key: str): + val = self.counter_map.get(key, 0) + self.counter_map[key] = val + 1 + + def __get_idle_value_key_name(self, key: str) -> str: + return self.__IDLE_VALUE_PREFIX + key + + def __get_idle_value_count(self, key: str) -> Optional[int]: + idle_value_key = self.__get_idle_value_key_name(key) + return self.counter_map.get(idle_value_key) + + def __record_value(self, key: str, val: Optional[int]): + if key in self.current_values_map: + # Move current value to previous + self.prev_values_map[key] = self.current_values_map[key] + # Store new value + self.current_values_map[key] = val + # Track idle value metrics + idle_value_key = self.__get_idle_value_key_name(key) + if self.prev_values_map[key] == val: + self.__increment_counter(idle_value_key) + else: + self.__reset_counter(idle_value_key) + self.current_values_map[key] = val + + def __get_current_value(self, key: str) -> Optional[int]: + return self.current_values_map.get(key) + + def reset_metric_api_failure(self): + self.__reset_counter(self._METRIC_API_FAIL_KEY) + + def record_metric_api_failure(self): + self.__increment_counter(self._METRIC_API_FAIL_KEY) + + def __reset_success_doc_value_failure(self): + self.__reset_counter(self._SUCCESS_DOCS_KEY) + # Also reset API falure counter + self.reset_metric_api_failure() + + def record_success_doc_value_failure(self): + self.__record_value(self._SUCCESS_DOCS_KEY, None) + + def update_success_doc_count(self, doc_count: int) -> int: + self.__reset_success_doc_value_failure() + self.__record_value(self._SUCCESS_DOCS_KEY, doc_count) + return self.get_doc_completion_percentage() + + def update_records_in_flight_count(self, rec_in_flight: Optional[int]): + self.__record_value(self._REC_IN_FLIGHT_KEY, rec_in_flight) + + def update_no_partitions_count(self, no_part_count: Optional[int]): + self.__record_value(self._NO_PART_KEY, no_part_count) + + def get_doc_completion_percentage(self) -> int: + success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) + if success_doc_count is None: + success_doc_count = 0 + return math.floor((success_doc_count * 100) / self.target_doc_count) + + def all_docs_migrated(self) -> bool: + # TODO Add a check for partitionsCompleted = indices + success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY) + if success_doc_count is None: + success_doc_count = 0 + return success_doc_count >= self.target_doc_count + + def is_migration_complete_success(self) -> bool: + is_idle_pipeline: bool = False + rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY) + no_partitions_count = self.__get_current_value(self._NO_PART_KEY) + prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0) + # Check for no records in flight + if rec_in_flight is not None and rec_in_flight == 0: + # No-partitions metrics should steadily tick up + if no_partitions_count is not None and no_partitions_count > prev_no_partitions_count > 0: + is_idle_pipeline = True + return is_idle_pipeline and self.all_docs_migrated() + + def is_migration_idle(self) -> bool: + keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY] + for key in keys_to_check: + val = self.__get_idle_value_count(key) + if val is not None and val >= self.idle_threshold: + logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " + + str(self.idle_threshold)) + return True + # End of loop + return False + + def is_too_may_api_failures(self) -> bool: + return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold + + def is_in_terminal_state(self) -> bool: + return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures() + + def log_idle_pipeline_debug_metrics(self): # pragma no cover + if logging.getLogger().isEnabledFor(logging.DEBUG): + debug_msg_template: str = "Idle pipeline metrics - " + \ + "Records in flight: [{0}], " + \ + "No-partitions counter: [{1}]" + \ + "Previous no-partition value: [{2}]" + logging.debug(debug_msg_template.format(self.__get_current_value(self._REC_IN_FLIGHT_KEY), + self.__get_current_value(self._NO_PART_KEY), + self.prev_values_map.get(self._NO_PART_KEY))) diff --git a/FetchMigration/python/tests/test_migration_monitor.py b/FetchMigration/python/tests/test_migration_monitor.py index c6a45f8860..97123b46c5 100644 --- a/FetchMigration/python/tests/test_migration_monitor.py +++ b/FetchMigration/python/tests/test_migration_monitor.py @@ -1,7 +1,7 @@ import logging import subprocess import unittest -from unittest.mock import patch, MagicMock, PropertyMock +from unittest.mock import patch, MagicMock, PropertyMock, ANY import requests import responses @@ -31,7 +31,7 @@ def tearDown(self) -> None: logging.disable(logging.NOTSET) @patch('requests.post') - def test_shutdown(self, mock_post: MagicMock): + def test_shutdown_pipeline(self, mock_post: MagicMock): expected_shutdown_url = TEST_ENDPOINT + "/shutdown" test_endpoint = EndpointInfo(TEST_ENDPOINT, TEST_AUTH, TEST_FLAG) migration_monitor.shutdown_pipeline(test_endpoint) @@ -78,101 +78,145 @@ def test_get_metric_value(self): self.assertEqual(expected_val, val) # No matching metric returns None val = migration_monitor.get_metric_value(test_input, "invalid") - self.assertEqual(None, val) + self.assertIsNone(val) + + @patch('migration_monitor.fetch_prometheus_metrics') + # Note that mock objects are passed bottom-up from the patch order above + def test_check_progress_metrics_failure(self, mock_fetch: MagicMock): + mock_progress = MagicMock() + # On API failure, check returns None + mock_fetch.return_value = None + # Endpoint info doesn't matter + return_value = migration_monitor.check_and_log_progress(MagicMock(), mock_progress) + # Same progress object is returned + self.assertEqual(mock_progress, return_value) + # API metric failure is recorded + mock_progress.record_metric_api_failure.assert_called_once() - @patch('migration_monitor.shutdown_pipeline') - @patch('time.sleep') - @patch('migration_monitor.check_if_complete') @patch('migration_monitor.get_metric_value') @patch('migration_monitor.fetch_prometheus_metrics') + def test_check_progress_missing_success_docs_metric(self, mock_fetch: MagicMock, mock_get_metric: MagicMock): + # Fetch return value is not None, but get-metric returns None + mock_fetch.return_value = MagicMock() + mock_get_metric.return_value = None + mock_progress = MagicMock() + # Endpoint info doesn't matter + return_value = migration_monitor.check_and_log_progress(MagicMock(), mock_progress) + # Same progress object is returned + self.assertEqual(mock_progress, return_value) + # API failure metric is reset is recorded + mock_progress.reset_metric_api_failure.assert_called_once() + # 3 metric values are read + self.assertEqual(3, mock_get_metric.call_count) + # Success doc failure metric is recorded + mock_progress.record_success_doc_value_failure.assert_called_once() + + @patch('migration_monitor.get_metric_value') + @patch('migration_monitor.fetch_prometheus_metrics') + def test_check_and_log_progress(self, mock_fetch: MagicMock, mock_get_metric: MagicMock): + # Fetch return value is not None + mock_fetch.return_value = MagicMock() + # Get metric return value is not None + expected_value: int = 10 + mock_get_metric.return_value = expected_value + mock_progress = MagicMock() + # Set up target-doc-count + mock_progress.target_doc_count.return_value = expected_value + # Endpoint info doesn't matter + return_value = migration_monitor.check_and_log_progress(MagicMock(), mock_progress) + # Same progress object is returned + self.assertEqual(mock_progress, return_value) + # 3 metric values are read + self.assertEqual(3, mock_get_metric.call_count) + # Success doc count is updated as expected + mock_progress.update_success_doc_count.assert_called_once_with(expected_value) + # All-docs-migrated check is invoked + mock_progress.all_docs_migrated.assert_called_once() + + @patch('migration_monitor.shutdown_pipeline') + @patch('time.sleep') + @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, mock_sleep: MagicMock, - mock_shut: MagicMock): + def test_run(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - mock_get.return_value = None - # Check will first fail, then pass - mock_check.side_effect = [False, True] + mock_progress = MagicMock() + mock_progress.is_in_terminal_state.return_value = True + mock_check.return_value = mock_progress # Run test method wait_time = 3 migration_monitor.run(test_input, wait_time) # Test that fetch was called with the expected EndpointInfo expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) - self.assertEqual(2, mock_fetch.call_count) - mock_fetch.assert_called_with(expected_endpoint_info) mock_sleep.assert_called_with(wait_time) mock_shut.assert_called_once_with(expected_endpoint_info) + @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') - @patch('time.sleep') - @patch('migration_monitor.check_if_complete') - @patch('migration_monitor.get_metric_value') - @patch('migration_monitor.fetch_prometheus_metrics') # Note that mock objects are passed bottom-up from the patch order above - def test_run_with_fetch_failure(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: MagicMock, - mock_sleep: MagicMock, mock_shut: MagicMock): + def test_monitor_local_process_exit(self, mock_shut_dp: MagicMock, mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - mock_get.return_value = None - mock_check.return_value = True - # Fetch call will first fail, then succeed - mock_fetch.side_effect = [None, MagicMock()] + mock_subprocess = MagicMock() + # Simulate an exited subprocess + expected_return_code: int = 1 + mock_subprocess.returncode = expected_return_code # Run test method - wait_time = 3 - migration_monitor.run(test_input, wait_time) - # Test that fetch was called with the expected EndpointInfo - expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) - self.assertEqual(2, mock_fetch.call_count) - mock_fetch.assert_called_with(expected_endpoint_info) - # We expect one wait cycle - mock_sleep.assert_called_with(wait_time) - mock_shut.assert_called_once_with(expected_endpoint_info) - - def test_check_if_complete(self): - # If any of the optional values are missing, we are not complete - self.assertFalse(migration_monitor.check_if_complete(None, 0, 1, 0, 2)) - self.assertFalse(migration_monitor.check_if_complete(2, None, 1, 0, 2)) - self.assertFalse(migration_monitor.check_if_complete(2, 0, None, 0, 2)) - # Target count not reached - self.assertFalse(migration_monitor.check_if_complete(1, None, None, 0, 2)) - # Target count reached, but has records in flight - self.assertFalse(migration_monitor.check_if_complete(2, 1, None, 0, 2)) - # Target count reached, no records in flight, but no prev no_part_count - self.assertFalse(migration_monitor.check_if_complete(2, 0, 1, 0, 2)) - # Terminal state - self.assertTrue(migration_monitor.check_if_complete(2, 0, 2, 1, 2)) + return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0) + self.assertEqual(expected_return_code, return_code) + mock_shut_dp.assert_not_called() + mock_shut_proc.assert_not_called() @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') + @patch('migration_monitor.is_process_alive') + @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_migration_process_exit(self, mock_shut_dp: MagicMock, mock_shut_proc: MagicMock): + def test_monitor_local_migration_complete(self, mock_check: MagicMock, mock_is_alive: MagicMock, + mock_shut_dp: MagicMock, mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") + # Simulate a successful migration + mock_progress = MagicMock() + mock_progress.is_in_terminal_state.return_value = True + mock_progress.is_migration_complete_success.return_value = True + mock_check.return_value = mock_progress + # Sequence of expected return values for a process that terminates successfully + mock_is_alive.side_effect = [True, True, True, False] mock_subprocess = MagicMock() - # set subprocess returncode to None to simulate a zombie Data Prepper process - mock_subprocess.returncode = 1 + expected_return_code: int = 0 + mock_subprocess.returncode = expected_return_code + # Simulate timeout on wait + mock_subprocess.wait.side_effect = [subprocess.TimeoutExpired("test", 1)] # Run test method - wait_time = 3 - return_code = migration_monitor.monitor_local(test_input, mock_subprocess, wait_time) - self.assertEqual(1, return_code) - mock_shut_dp.assert_not_called() + actual_return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0) + self.assertEqual(expected_return_code, actual_return_code) + expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) + mock_check.assert_called_once_with(expected_endpoint_info, ANY) + mock_shut_dp.assert_called_once_with(expected_endpoint_info) mock_shut_proc.assert_not_called() @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_process_shutdown_invocation(self, mock_check: MagicMock, mock_shut_dp: MagicMock, - mock_shut_proc: MagicMock): + def test_monitor_local_shutdown_process(self, mock_check: MagicMock, mock_shut_dp: MagicMock, + mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") - mock_check.side_effect = [(False, 1), (True, 2)] + # Simulate a progressing, successful migration + mock_progress = MagicMock() + mock_progress.is_in_terminal_state.side_effect = [False, True, True] + mock_check.return_value = mock_progress mock_subprocess = MagicMock() # set subprocess returncode to None to simulate a zombie Data Prepper process mock_subprocess.returncode = None + # Shtudown-process call return code + expected_return_code: int = 137 + mock_shut_proc.return_value = 137 # Run test method - wait_time = 3 - migration_monitor.monitor_local(test_input, mock_subprocess, wait_time) + actual_return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0) + self.assertEqual(expected_return_code, actual_return_code) expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) mock_shut_dp.assert_called_once_with(expected_endpoint_info) mock_shut_proc.assert_called_once_with(mock_subprocess) diff --git a/FetchMigration/python/tests/test_progress_metrics.py b/FetchMigration/python/tests/test_progress_metrics.py new file mode 100644 index 0000000000..900dbeb9a3 --- /dev/null +++ b/FetchMigration/python/tests/test_progress_metrics.py @@ -0,0 +1,103 @@ +import logging +import unittest + +from progress_metrics import ProgressMetrics + + +class TestProgressMetrics(unittest.TestCase): + def setUp(self) -> None: + logging.disable(logging.CRITICAL) + + def tearDown(self) -> None: + logging.disable(logging.NOTSET) + + def test_doc_completion_percentage(self): + metrics = ProgressMetrics(9, 0) + # Base case + self.assertEqual(0, metrics.get_doc_completion_percentage()) + # Completion percentage should round down + metrics.update_success_doc_count(3) + self.assertEqual(33, metrics.get_doc_completion_percentage()) + metrics.update_success_doc_count(9) + self.assertEqual(100, metrics.get_doc_completion_percentage()) + + def test_all_docs_migrated(self): + metrics = ProgressMetrics(9, 0) + # Base case + self.assertFalse(metrics.all_docs_migrated()) + metrics.update_success_doc_count(3) + self.assertFalse(metrics.all_docs_migrated()) + # Return value is true when >= target + metrics.update_success_doc_count(9) + self.assertTrue(metrics.all_docs_migrated()) + metrics.update_success_doc_count(10) + self.assertTrue(metrics.all_docs_migrated()) + + def test_is_migration_complete_success(self): + metrics = ProgressMetrics(9, 0) + # Base case + self.assertFalse(metrics.is_migration_complete_success()) + # Update success docs + metrics.update_success_doc_count(9) + self.assertFalse(metrics.is_migration_complete_success()) + # Non-zero records in flight + metrics.update_records_in_flight_count(1) + self.assertFalse(metrics.is_migration_complete_success()) + # Zero records in flight, but no recorded partition count + metrics.update_records_in_flight_count(0) + self.assertFalse(metrics.is_migration_complete_success()) + # Record partition count, but no previous count + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_migration_complete_success()) + # Update partition count, but it matches previous value + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_migration_complete_success()) + # Update partition count to meet idle pipeline criteria + metrics.update_no_partitions_count(2) + self.assertTrue(metrics.is_migration_complete_success()) + + def test_is_migration_idle(self): + metrics = ProgressMetrics(9, 1) + # Base case + self.assertFalse(metrics.is_migration_idle()) + # Update success docs + metrics.update_success_doc_count(3) + self.assertFalse(metrics.is_migration_idle()) + # Update partition count + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_migration_idle()) + # Update success docs to same value, which reaches threshold + metrics.update_success_doc_count(3) + self.assertTrue(metrics.is_migration_idle()) + + def test_is_too_may_api_failures(self): + metrics = ProgressMetrics(9, 1) + # Base case + self.assertFalse(metrics.is_too_may_api_failures()) + # Metric value failure does not count towards API failure + metrics.record_success_doc_value_failure() + self.assertFalse(metrics.is_too_may_api_failures()) + metrics.record_metric_api_failure() + self.assertTrue(metrics.is_too_may_api_failures()) + + def test_is_in_terminal_state(self): + metrics = ProgressMetrics(9, 1) + metrics.update_success_doc_count(1) + metrics.update_no_partitions_count(1) + self.assertFalse(metrics.is_in_terminal_state()) + # Too many API failures + metrics.record_metric_api_failure() + self.assertTrue(metrics.is_in_terminal_state()) + metrics.reset_metric_api_failure() + # Idle pipeline + metrics.update_no_partitions_count(1) + self.assertTrue(metrics.is_in_terminal_state()) + metrics.update_no_partitions_count(2) + # Migration complete + metrics.update_success_doc_count(10) + metrics.update_records_in_flight_count(0) + self.assertTrue(metrics.is_in_terminal_state()) + + +if __name__ == '__main__': + unittest.main()