diff --git a/FetchMigration/python/fetch_orchestrator.py b/FetchMigration/python/fetch_orchestrator.py index 22b7d516dd..d51f6a58cb 100644 --- a/FetchMigration/python/fetch_orchestrator.py +++ b/FetchMigration/python/fetch_orchestrator.py @@ -28,7 +28,7 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in # Run the migration monitor next migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint) logging.info("Starting migration monitor...\n") - return migration_monitor.monitor_local(migration_monitor_params, proc) + return migration_monitor.run(migration_monitor_params, proc) if __name__ == '__main__': # pragma no cover diff --git a/FetchMigration/python/migration_monitor.py b/FetchMigration/python/migration_monitor.py index 9dd7719773..9e62ce40f4 100644 --- a/FetchMigration/python/migration_monitor.py +++ b/FetchMigration/python/migration_monitor.py @@ -91,23 +91,30 @@ def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetric return progress -def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]: +def __should_continue_monitoring(metrics: ProgressMetrics, proc: Optional[Popen] = None) -> bool: + return not metrics.is_in_terminal_state() and (proc is None or is_process_alive(proc)) + + +# Last parameter is optional, and signifies a local Data Prepper process +def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int: endpoint_info = EndpointInfo(args.data_prepper_endpoint) progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD) logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count)) - # Sets returncode. A value of None means the subprocess has not yet terminated - dp_process.poll() - while is_process_alive(dp_process) and not progress_metrics.is_in_terminal_state(): - try: - dp_process.wait(timeout=poll_interval_seconds) - except subprocess.TimeoutExpired: - pass - if is_process_alive(dp_process): + while __should_continue_monitoring(progress_metrics, dp_process): + if dp_process is not None: + # Wait on local process + try: + dp_process.wait(timeout=poll_interval_seconds) + except subprocess.TimeoutExpired: + pass + else: + # Thread sleep + time.sleep(poll_interval_seconds) + if dp_process is None or is_process_alive(dp_process): progress_metrics = check_and_log_progress(endpoint_info, progress_metrics) - # Log debug metrics - progress_metrics.log_idle_pipeline_debug_metrics() - # Loop terminated, shut down the Data Prepper pipeline + # Loop terminated if not progress_metrics.is_in_terminal_state(): + # This will only happen for a local Data Prepper process logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode)) # TODO - Implement rollback logging.error("Please delete any partially migrated indices before retrying the migration.") @@ -121,31 +128,16 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") # Shut down Data Prepper pipeline via API shutdown_pipeline(endpoint_info) - if is_process_alive(dp_process): + if dp_process is None: + # No local process + return 0 + elif is_process_alive(dp_process): # Workaround for https://github.com/opensearch-project/data-prepper/issues/3141 return shutdown_process(dp_process) else: return dp_process.returncode -def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None: - endpoint_info = EndpointInfo(args.data_prepper_endpoint) - progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD) - logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count)) - while not progress_metrics.is_in_terminal_state(): - time.sleep(poll_interval_seconds) - progress_metrics = check_and_log_progress(endpoint_info, progress_metrics) - # Loop terminated, shut down the Data Prepper pipeline - if progress_metrics.is_migration_complete_success(): - logging.info("Migration monitor observed successful migration, shutting down...\n") - elif progress_metrics.is_migration_idle(): - logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...") - elif progress_metrics.is_too_may_api_failures(): - logging.warning("Migration monitor was unable to fetch migration metrics, terminating...") - # Shut down Data Prepper pipeline via API - shutdown_pipeline(endpoint_info) - - if __name__ == '__main__': # pragma no cover # Set up parsing for command line arguments arg_parser = argparse.ArgumentParser( diff --git a/FetchMigration/python/migration_monitor_params.py b/FetchMigration/python/migration_monitor_params.py index e04599a989..5476f23cf8 100644 --- a/FetchMigration/python/migration_monitor_params.py +++ b/FetchMigration/python/migration_monitor_params.py @@ -5,3 +5,4 @@ class MigrationMonitorParams: target_count: int data_prepper_endpoint: str + is_local_process: bool = False diff --git a/FetchMigration/python/tests/test_fetch_orchestrator.py b/FetchMigration/python/tests/test_fetch_orchestrator.py index 25f004129c..92f8baf87c 100644 --- a/FetchMigration/python/tests/test_fetch_orchestrator.py +++ b/FetchMigration/python/tests/test_fetch_orchestrator.py @@ -9,7 +9,7 @@ class TestFetchOrchestrator(unittest.TestCase): - @patch('migration_monitor.monitor_local') + @patch('migration_monitor.run') @patch('subprocess.Popen') @patch('metadata_migration.run') # Note that mock objects are passed bottom-up from the patch order above @@ -33,7 +33,7 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc mock_subprocess.assert_called_once_with(expected_dp_runnable) mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value) - @patch('migration_monitor.monitor_local') + @patch('migration_monitor.run') @patch('subprocess.Popen') @patch('metadata_migration.run') # Note that mock objects are passed bottom-up from the patch order above diff --git a/FetchMigration/python/tests/test_migration_monitor.py b/FetchMigration/python/tests/test_migration_monitor.py index 97123b46c5..edfb1fe91c 100644 --- a/FetchMigration/python/tests/test_migration_monitor.py +++ b/FetchMigration/python/tests/test_migration_monitor.py @@ -133,11 +133,13 @@ def test_check_and_log_progress(self, mock_fetch: MagicMock, mock_get_metric: Ma # All-docs-migrated check is invoked mock_progress.all_docs_migrated.assert_called_once() + @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') @patch('time.sleep') @patch('migration_monitor.check_and_log_progress') # Note that mock objects are passed bottom-up from the patch order above - def test_run(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut: MagicMock): + def test_monitor_non_local(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut_dp: MagicMock, + mock_shut_proc: MagicMock): # The param values don't matter since we've mocked the check method test_input = MigrationMonitorParams(1, "test") mock_progress = MagicMock() @@ -145,11 +147,12 @@ def test_run(self, mock_check: MagicMock, mock_sleep: MagicMock, mock_shut: Magi mock_check.return_value = mock_progress # Run test method wait_time = 3 - migration_monitor.run(test_input, wait_time) + migration_monitor.run(test_input, None, wait_time) # Test that fetch was called with the expected EndpointInfo expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) mock_sleep.assert_called_with(wait_time) - mock_shut.assert_called_once_with(expected_endpoint_info) + mock_shut_dp.assert_called_once_with(expected_endpoint_info) + mock_shut_proc.assert_not_called() @patch('migration_monitor.shutdown_process') @patch('migration_monitor.shutdown_pipeline') @@ -162,7 +165,7 @@ def test_monitor_local_process_exit(self, mock_shut_dp: MagicMock, mock_shut_pro expected_return_code: int = 1 mock_subprocess.returncode = expected_return_code # Run test method - return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0) + return_code = migration_monitor.run(test_input, mock_subprocess) self.assertEqual(expected_return_code, return_code) mock_shut_dp.assert_not_called() mock_shut_proc.assert_not_called() @@ -178,18 +181,18 @@ def test_monitor_local_migration_complete(self, mock_check: MagicMock, mock_is_a test_input = MigrationMonitorParams(1, "test") # Simulate a successful migration mock_progress = MagicMock() - mock_progress.is_in_terminal_state.return_value = True + mock_progress.is_in_terminal_state.side_effect = [False, True] mock_progress.is_migration_complete_success.return_value = True mock_check.return_value = mock_progress # Sequence of expected return values for a process that terminates successfully - mock_is_alive.side_effect = [True, True, True, False] + mock_is_alive.side_effect = [True, True, False, False] mock_subprocess = MagicMock() expected_return_code: int = 0 mock_subprocess.returncode = expected_return_code # Simulate timeout on wait mock_subprocess.wait.side_effect = [subprocess.TimeoutExpired("test", 1)] # Run test method - actual_return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0) + actual_return_code = migration_monitor.run(test_input, mock_subprocess) self.assertEqual(expected_return_code, actual_return_code) expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) mock_check.assert_called_once_with(expected_endpoint_info, ANY) @@ -215,7 +218,7 @@ def test_monitor_local_shutdown_process(self, mock_check: MagicMock, mock_shut_d expected_return_code: int = 137 mock_shut_proc.return_value = 137 # Run test method - actual_return_code = migration_monitor.monitor_local(test_input, mock_subprocess, 0) + actual_return_code = migration_monitor.run(test_input, mock_subprocess) self.assertEqual(expected_return_code, actual_return_code) expected_endpoint_info = EndpointInfo(test_input.data_prepper_endpoint) mock_shut_dp.assert_called_once_with(expected_endpoint_info)