-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Fetch Migration] Adding orchestration script for Fetch Migration ste…
…ps (#294) This change adds an "orchestration" Python script that runs the Fetch Migration steps in order: 1) The metadata migration script is run first to compare the indices on source and target clusters. A human-readable report is printed, and an updated Data Prepper pipeline YAML file is written to the Data Prepper `pipelines` directory 2) Next, Data Prepper is kicked off as a [sub-process](https://docs.python.org/3/library/subprocess.html) to migrate data based on the output pipeline YAML file 3) After the Data Prepper process is kicked off, the monitoring script is run to poll DP's Prometheus metrics endpoint and determine when migration is complete and the process can be shut down The orchestrator script takes multiple command-line inputs to enable execution of these steps. The Dockerfile definition has also been updated to use the orchestrator script as the entrypoint. * Adding logging to migration_monitor.py Add logging to track progress of the migration. This commit also moves the sleep state to the start of the loop. This allows an initial 30 seconds to go by, letting the Data Prepper process startup. Otherwise, we run the risk of the GET call failing. --------- Signed-off-by: Kartik Ganesh <[email protected]>
- Loading branch information
Showing
11 changed files
with
226 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import argparse | ||
import logging | ||
import os | ||
import subprocess | ||
|
||
import migration_monitor | ||
import metadata_migration | ||
from migration_monitor_params import MigrationMonitorParams | ||
from metadata_migration_params import MetadataMigrationParams | ||
|
||
|
||
__DP_EXECUTABLE_SUFFIX = "/bin/data-prepper" | ||
__PIPELINE_OUTPUT_FILE_SUFFIX = "/pipelines/pipeline.yaml" | ||
|
||
|
||
def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str): | ||
dp_exec_path = dp_base_path + __DP_EXECUTABLE_SUFFIX | ||
output_file = dp_base_path + __PIPELINE_OUTPUT_FILE_SUFFIX | ||
metadata_migration_params = MetadataMigrationParams(dp_config_file, output_file, report=True) | ||
logging.info("Running pre-migration steps...\n") | ||
metadata_migration_result = metadata_migration.run(metadata_migration_params) | ||
if len(metadata_migration_result.created_indices) > 0: | ||
# Kick off a subprocess for Data Prepper | ||
logging.info("Running Data Prepper...\n") | ||
proc = subprocess.Popen(dp_exec_path) | ||
# Data Prepper started successfully, run the migration monitor | ||
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint) | ||
logging.info("Starting migration monitor...\n") | ||
migration_monitor.run(migration_monitor_params) | ||
# Migration ended, the following is a workaround for | ||
# https://github.com/opensearch-project/data-prepper/issues/3141 | ||
if proc.returncode is None: | ||
proc.terminate() | ||
|
||
|
||
if __name__ == '__main__': # pragma no cover | ||
# Set log level | ||
logging.basicConfig(level=logging.INFO) | ||
# Set up parsing for command line arguments | ||
arg_parser = argparse.ArgumentParser( | ||
prog="python fetch_orchestrator.py", | ||
description="Orchestrator script for fetch migration", | ||
formatter_class=argparse.RawTextHelpFormatter | ||
) | ||
# Required positional argument | ||
arg_parser.add_argument( | ||
"data_prepper_path", | ||
help="Path to the base directory where Data Prepper is installed " | ||
) | ||
arg_parser.add_argument( | ||
"config_file_path", | ||
help="Path to the Data Prepper pipeline YAML file to parse for source and target endpoint information" | ||
) | ||
arg_parser.add_argument( | ||
"data_prepper_endpoint", | ||
help="Data Prepper endpoint for monitoring the migration" | ||
) | ||
cli_args = arg_parser.parse_args() | ||
base_path = os.path.expandvars(cli_args.data_prepper_path) | ||
run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import unittest | ||
from unittest.mock import patch, MagicMock, ANY | ||
|
||
import fetch_orchestrator as orchestrator | ||
from migration_monitor_params import MigrationMonitorParams | ||
from metadata_migration_params import MetadataMigrationParams | ||
from metadata_migration_result import MetadataMigrationResult | ||
|
||
|
||
class TestFetchOrchestrator(unittest.TestCase): | ||
|
||
@patch('migration_monitor.run') | ||
@patch('subprocess.Popen') | ||
@patch('metadata_migration.run') | ||
# Note that mock objects are passed bottom-up from the patch order above | ||
def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, | ||
mock_monitor: MagicMock): | ||
test_path = "test_path" | ||
test_file = "test_file" | ||
test_host = "test_host" | ||
# Setup mock pre-migration | ||
expected_metadata_migration_input = MetadataMigrationParams(test_file, test_path + "/pipelines/pipeline.yaml", | ||
report=True) | ||
test_result = MetadataMigrationResult(10, {"index1", "index2"}) | ||
expected_monitor_input = MigrationMonitorParams(test_result.target_doc_count, test_host) | ||
mock_metadata_migration.return_value = test_result | ||
# setup subprocess return value | ||
mock_subprocess.return_value.returncode = 0 | ||
# Run test | ||
orchestrator.run(test_path, test_file, test_host) | ||
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input) | ||
expected_dp_runnable = test_path + "/bin/data-prepper" | ||
mock_subprocess.assert_called_once_with(expected_dp_runnable) | ||
mock_monitor.assert_called_once_with(expected_monitor_input) | ||
mock_subprocess.return_value.terminate.assert_not_called() | ||
|
||
@patch('migration_monitor.run') | ||
@patch('subprocess.Popen') | ||
@patch('metadata_migration.run') | ||
# Note that mock objects are passed bottom-up from the patch order above | ||
def test_orchestrator_shutdown_workaround(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, | ||
mock_monitor: MagicMock): | ||
test_path = "test_path" | ||
test_file = "test_file" | ||
test_host = "test_host" | ||
# Setup mock pre-migration | ||
expected_metadata_migration_input = MetadataMigrationParams(test_file, test_path + "/pipelines/pipeline.yaml", | ||
report=True) | ||
test_result = MetadataMigrationResult(10, {"index1", "index2"}) | ||
expected_monitor_input = MigrationMonitorParams(test_result.target_doc_count, test_host) | ||
mock_metadata_migration.return_value = test_result | ||
# set subprocess return value to None to simulate a zombie Data Prepper process | ||
mock_subprocess.return_value.returncode = None | ||
# Run test | ||
orchestrator.run(test_path, test_file, test_host) | ||
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input) | ||
expected_dp_runnable = test_path + "/bin/data-prepper" | ||
mock_subprocess.assert_called_once_with(expected_dp_runnable) | ||
mock_monitor.assert_called_once_with(expected_monitor_input) | ||
mock_subprocess.return_value.terminate.assert_called_once() | ||
|
||
@patch('migration_monitor.run') | ||
@patch('subprocess.Popen') | ||
@patch('metadata_migration.run') | ||
# Note that mock objects are passed bottom-up from the patch order above | ||
def test_orchestrator_no_migration(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock, | ||
mock_monitor: MagicMock): | ||
# Setup empty result from pre-migration | ||
mock_metadata_migration.return_value = MetadataMigrationResult() | ||
orchestrator.run("test", "test", "test") | ||
mock_metadata_migration.assert_called_once_with(ANY) | ||
# Subsequent steps should not be called | ||
mock_subprocess.assert_not_called() | ||
mock_monitor.assert_not_called() | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
Oops, something went wrong.