Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fetch Migration] Improvements to subprocess handling #372

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import logging
import os
import subprocess
import sys
from typing import Optional

import migration_monitor
import metadata_migration
from migration_monitor_params import MigrationMonitorParams
import migration_monitor
from metadata_migration_params import MetadataMigrationParams

from migration_monitor_params import MigrationMonitorParams

__DP_EXECUTABLE_SUFFIX = "/bin/data-prepper"
__PIPELINE_OUTPUT_FILE_SUFFIX = "/pipelines/pipeline.yaml"


def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[int]:
dp_exec_path = dp_base_path + __DP_EXECUTABLE_SUFFIX
output_file = dp_base_path + __PIPELINE_OUTPUT_FILE_SUFFIX
metadata_migration_params = MetadataMigrationParams(dp_config_file, output_file, report=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can your code confirm that DataPrepper is bound to only localhost? I could see a lot of difficulty happening if a second managing agent came along and started acting on it - or if somebody else DDOS'ed the API (while it was causing a lot of enery on the source cluster).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this code needs an update to stop accepting dp_endpoint. Currently the localhost configuration is driven only by the Dockerfile:

ENTRYPOINT python3 -u ./fetch_orchestrator.py $DATA_PREPPER_PATH $FM_CODE_PATH/input.yaml http://localhost:4900

This is obviously not a strong check. I'll make that change in a follow-up PR.

Expand All @@ -24,14 +25,10 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
# Kick off a subprocess for Data Prepper
logging.info("Running Data Prepper...\n")
proc = subprocess.Popen(dp_exec_path)
# Data Prepper started successfully, run the migration monitor
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
migration_monitor.run(migration_monitor_params)
# Migration ended, the following is a workaround for
# https://github.com/opensearch-project/data-prepper/issues/3141
if proc.returncode is None:
proc.terminate()
return migration_monitor.monitor_local(migration_monitor_params, proc)


if __name__ == '__main__': # pragma no cover
Expand Down Expand Up @@ -64,4 +61,11 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str):
decoded_bytes = base64.b64decode(inline_pipeline)
with open(cli_args.config_file_path, 'wb') as config_file:
config_file.write(decoded_bytes)
run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
return_code = run(base_path, cli_args.config_file_path, cli_args.data_prepper_endpoint)
if return_code == 0:
sys.exit(0)
else:
logging.error("Process exited with non-zero return code: " + str(return_code))
if return_code is None:
return_code = 1
sys.exit(return_code)
16 changes: 10 additions & 6 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
def fetch_all_indices(endpoint: EndpointInfo) -> dict:
actual_endpoint = endpoint.url + __ALL_INDICES_ENDPOINT
resp = requests.get(actual_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
# Remove internal settings
result = dict(resp.json())
for index in result:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
for index in list(result.keys()):
# Remove system indices
if index.startswith("."):
del result[index]
# Remove internal settings
else:
for setting in __INTERNAL_SETTINGS_KEYS:
index_settings = result[index][SETTINGS_KEY]
if __INDEX_KEY in index_settings:
index_settings[__INDEX_KEY].pop(setting, None)
return result


Expand Down
9 changes: 6 additions & 3 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import argparse
import yaml
from typing import Optional

import yaml

import index_operations
import utils

# Constants
from endpoint_info import EndpointInfo
from metadata_migration_params import MetadataMigrationParams
Expand Down Expand Up @@ -163,14 +163,17 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
validate_pipeline_config(pipeline_config)
result = MetadataMigrationResult()
kartg marked this conversation as resolved.
Show resolved Hide resolved
# Fetch EndpointInfo and indices
source_endpoint_info, source_indices = compute_endpoint_and_fetch_indices(pipeline_config, SOURCE_KEY)
# If source indices is empty, return immediately
if len(source_indices.keys()) == 0:
return result
target_endpoint_info, target_indices = compute_endpoint_and_fetch_indices(pipeline_config, SINK_KEY)
# Compute index differences and print report
diff = get_index_differences(source_indices, target_indices)
# The first element in the tuple is the set of indices to create
indices_to_create = diff[0]
result = MetadataMigrationResult()
if indices_to_create:
result.created_indices = indices_to_create
result.target_doc_count = index_operations.doc_count(indices_to_create, source_endpoint_info)
Expand Down
120 changes: 92 additions & 28 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import argparse
import logging
import math
import subprocess
import time
from subprocess import Popen
from typing import Optional, List
import math

import requests
from prometheus_client import Metric
Expand All @@ -11,20 +13,35 @@
from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams

__PROMETHEUS_METRICS_ENDPOINT = "/metrics/prometheus"
__SHUTDOWN_ENDPOINT = "/shutdown"
# Path to the Data Prepper Prometheus metrics API endpoint
# Used to monitor the progress of the migration
__METRICS_API_PATH = "/metrics/prometheus"
__SHUTDOWN_API_PATH = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"


# Gracefully shutdown a subprocess
def shutdown_process(proc: Popen) -> Optional[int]:
# Process is still running, send SIGTERM
proc.terminate()
try:
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
if proc.returncode is None:
# Failed to terminate, send SIGKILL
proc.kill()
return proc.returncode


def shutdown_pipeline(endpoint: EndpointInfo):
shutdown_endpoint = endpoint.url + __SHUTDOWN_ENDPOINT
shutdown_endpoint = endpoint.url + __SHUTDOWN_API_PATH
requests.post(shutdown_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)


def fetch_prometheus_metrics(endpoint: EndpointInfo) -> Optional[List[Metric]]:
metrics_endpoint = endpoint.url + __PROMETHEUS_METRICS_ENDPOINT
metrics_endpoint = endpoint.url + __METRICS_API_PATH
try:
response = requests.get(metrics_endpoint, auth=endpoint.auth, verify=endpoint.verify_ssl)
response.raise_for_status()
Expand Down Expand Up @@ -65,35 +82,82 @@ def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_par
return False


def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \
tuple[bool, int]:
terminal: bool = False
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint_info)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
Comment on lines +91 to +93
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What can go wrong in getting these values, since we're relying on not just one but two processes?
It's a socratic question. If this were in a single docker container and not able to be run elsewhere, I'd be a bit less concerned, but this is python code that can run in any deployment scenario, so the faults of a distributed application are present.

if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
else:
logging.warning("Could not fetch progress stats from Data Prepper response, " +
"will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count,
target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
else:
logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...")
# TODO - Handle idle non-terminal pipeline
return terminal, prev_no_partitions_count


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
kartg marked this conversation as resolved.
Show resolved Hide resolved
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while dp_process.returncode is None and not is_migration_complete:
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if dp_process.returncode is None:
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated
if not is_migration_complete:
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
kartg marked this conversation as resolved.
Show resolved Hide resolved
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
# Shut down Data Prepper pipeline via API
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)
if dp_process.returncode is None:
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
return dp_process.returncode


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint = EndpointInfo(args.data_prepper_endpoint)
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
prev_no_partitions_count = 0
terminal = False
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not terminal:
while not is_migration_complete:
time.sleep(poll_interval_seconds)
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint)
if metrics is not None:
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
else:
logging.info("Could not fetch metrics from Data Prepper, will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint)
shutdown_pipeline(endpoint_info)


if __name__ == '__main__': # pragma no cover
Expand Down
32 changes: 3 additions & 29 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TestFetchOrchestrator(unittest.TestCase):

@patch('migration_monitor.run')
@patch('migration_monitor.monitor_local')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand All @@ -31,35 +31,9 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
expected_dp_runnable = test_path + "/bin/data-prepper"
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input)
mock_subprocess.return_value.terminate.assert_not_called()
mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value)

@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
def test_orchestrator_shutdown_workaround(self, mock_metadata_migration: MagicMock, mock_subprocess: MagicMock,
mock_monitor: MagicMock):
test_path = "test_path"
test_file = "test_file"
test_host = "test_host"
# Setup mock pre-migration
expected_metadata_migration_input = MetadataMigrationParams(test_file, test_path + "/pipelines/pipeline.yaml",
report=True)
test_result = MetadataMigrationResult(10, {"index1", "index2"})
expected_monitor_input = MigrationMonitorParams(test_result.target_doc_count, test_host)
mock_metadata_migration.return_value = test_result
# set subprocess return value to None to simulate a zombie Data Prepper process
mock_subprocess.return_value.returncode = None
# Run test
orchestrator.run(test_path, test_file, test_host)
mock_metadata_migration.assert_called_once_with(expected_metadata_migration_input)
expected_dp_runnable = test_path + "/bin/data-prepper"
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input)
mock_subprocess.return_value.terminate.assert_called_once()

@patch('migration_monitor.run')
@patch('migration_monitor.monitor_local')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand Down
12 changes: 11 additions & 1 deletion FetchMigration/python/tests/test_index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,17 @@ class TestSearchEndpoint(unittest.TestCase):
@responses.activate
def test_fetch_all_indices(self):
# Set up GET response
responses.get(test_constants.SOURCE_ENDPOINT + "*", json=test_constants.BASE_INDICES_DATA)
test_data = copy.deepcopy(test_constants.BASE_INDICES_DATA)
# Add system index
test_data[".system-index"] = {
test_constants.SETTINGS_KEY: {
test_constants.INDEX_KEY: {
test_constants.NUM_SHARDS_SETTING: 1,
test_constants.NUM_REPLICAS_SETTING: 1
}
}
}
responses.get(test_constants.SOURCE_ENDPOINT + "*", json=test_data)
# Now send request
index_data = index_operations.fetch_all_indices(EndpointInfo(test_constants.SOURCE_ENDPOINT))
self.assertEqual(3, len(index_data.keys()))
Expand Down
10 changes: 10 additions & 0 deletions FetchMigration/python/tests/test_metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,16 @@ def test_missing_output_file_non_report(self):
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH)
self.assertRaises(ValueError, metadata_migration.run, test_input)

@patch('index_operations.fetch_all_indices')
# Note that mock objects are passed bottom-up from the patch order above
def test_no_indices_in_source(self, mock_fetch_indices: MagicMock):
mock_fetch_indices.return_value = {}
test_input = MetadataMigrationParams(test_constants.PIPELINE_CONFIG_RAW_FILE_PATH, "dummy")
test_result = metadata_migration.run(test_input)
mock_fetch_indices.assert_called_once()
self.assertEqual(0, test_result.target_doc_count)
self.assertEqual(0, len(test_result.created_indices))


if __name__ == '__main__':
unittest.main()
Loading
Loading