Skip to content

Commit

Permalink
fetch_orchestrator.py __main__ now exits via sys.exit()
Browse files Browse the repository at this point in the history
This enables quitting the Fetch Migration workflow with a return_code bubbled up from migration_monitor.py, which can help indicate a successful (return code zero) or non-successful execution.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Oct 26, 2023
1 parent 133fc96 commit 9309b34
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
19 changes: 12 additions & 7 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import logging
import os
import subprocess
import sys
from typing import Optional

import migration_monitor
import metadata_migration
from migration_monitor_params import MigrationMonitorParams
import migration_monitor
from metadata_migration_params import MetadataMigrationParams

from migration_monitor_params import MigrationMonitorParams

__DP_EXECUTABLE_SUFFIX = "/bin/data-prepper"
__PIPELINE_OUTPUT_FILE_SUFFIX = "/pipelines/pipeline.yaml"


def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[int]:
dp_exec_path = dp_base_path + __DP_EXECUTABLE_SUFFIX
output_file = dp_base_path + __PIPELINE_OUTPUT_FILE_SUFFIX
metadata_migration_params = MetadataMigrationParams(dp_config_file, output_file, report=True)
Expand All @@ -27,8 +28,7 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
migration_monitor.monitor_local(migration_monitor_params, proc)
# TODO - return process return code
return migration_monitor.monitor_local(migration_monitor_params, proc)


if __name__ == '__main__': # pragma no cover
Expand Down Expand Up @@ -61,4 +61,9 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
decoded_bytes = base64.b64decode(inline_pipeline)
with open(cli_args.config_file_path, 'wb') as config_file:
config_file.write(decoded_bytes)
run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
return_code = run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
if return_code == 0:
sys.exit(0)
else:
logging.error("Process exited with non-zero return code: " + str(return_code))
sys.exit(1)
2 changes: 2 additions & 0 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval
if dp_process.returncode is None:
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
return dp_process.returncode


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
Expand Down

0 comments on commit 9309b34

Please sign in to comment.