Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fetch Migration] Handle idle pipeline when target doc count is never reached #377

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion FetchMigration/python/fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run(dp_base_path: str, dp_config_file: str, dp_endpoint: str) -> Optional[in
# Run the migration monitor next
migration_monitor_params = MigrationMonitorParams(metadata_migration_result.target_doc_count, dp_endpoint)
logging.info("Starting migration monitor...\n")
return migration_monitor.monitor_local(migration_monitor_params, proc)
return migration_monitor.run(migration_monitor_params, proc)


if __name__ == '__main__': # pragma no cover
Expand Down
2 changes: 1 addition & 1 deletion FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, indices_to_create, args.output_file)
if args.report:
if args.report: # pragma no cover
print("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
index_data = dict()
Expand Down
134 changes: 56 additions & 78 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import logging
import math
import subprocess
import time
from subprocess import Popen
Expand All @@ -12,14 +11,20 @@

from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams
from progress_metrics import ProgressMetrics

# Path to the Data Prepper Prometheus metrics API endpoint
# Used to monitor the progress of the migration
__METRICS_API_PATH = "/metrics/prometheus"
__SHUTDOWN_API_PATH = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"
__METRICS_API_PATH: str = "/metrics/prometheus"
__SHUTDOWN_API_PATH: str = "/shutdown"
__DOC_SUCCESS_METRIC: str = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired"
__IDLE_THRESHOLD: int = 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see how this is used in ProgressMetrics, but I don't have much understanding on where this value comes from and why 5 is the right number. Can you elaborate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 was a completely random choice 😄 Data Prepper does not have the notion of an "idle" pipeline so this threshold is completely up to us. With a default polling interval of 30 seconds, I didn't want the monitor to leave the pipeline running for too long, or close it too quickly. So I picked 5 iterations (i.e. 2.5 minutes) as a reasonable (IMO) threshold



def is_process_alive(proc: Popen) -> bool:
return proc.returncode is None


# Gracefully shutdown a subprocess
Expand All @@ -29,7 +34,7 @@ def shutdown_process(proc: Popen) -> Optional[int]:
try:
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
if proc.returncode is None:
if is_process_alive(proc):
# Failed to terminate, send SIGKILL
proc.kill()
return proc.returncode
Expand Down Expand Up @@ -60,106 +65,79 @@ def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_partition_count: Optional[int],
prev_no_partition: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
logging.info("Target doc count reached, checking for idle pipeline...")
# Debug metrics
if logging.getLogger().isEnabledFor(logging.DEBUG): # pragma no cover
debug_msg_template: str = "Idle pipeline metrics - " + \
"Records in flight: [{0}], " + \
"No-partitions counter: [{1}]" + \
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(in_flight, no_partition_count, prev_no_partition))

if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_partition_count is not None and no_partition_count > prev_no_partition > 0:
return True
return False


def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \
tuple[bool, int]:
terminal: bool = False
def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetrics) -> ProgressMetrics:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint_info)
if metrics is not None:
# Reset API failure counter
progress.reset_metric_api_failure()
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress.update_records_in_flight_count(get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC))
progress.update_no_partitions_count(get_metric_value(metrics, __NO_PARTITIONS_METRIC))
if success_docs is not None:
completion_percentage = progress.update_success_doc_count(success_docs)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
if progress.all_docs_migrated():
logging.info("All documents migrated...")
else:
progress.record_success_doc_value_failure()
logging.warning("Could not fetch progress stats from Data Prepper response, " +
"will retry on next polling cycle...")
Comment on lines +72 to 87
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's still more logic here than I would expect, given ProgressMetrics. Does it make sense for ProgressMetrics to have an update (or whatever) function that accepts a metrics object, so this function can be stripped down to basically:

metrics = metrics = fetch_prometheus_metrics(endpoint_info)
progress.update(metrics)
if progress.all_docs_migrated():
    # do whatever
else:
	# do other stuff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a reasonable ask, but i'd like to defer that change to the point where we have a better mechanism to surface metrics. I intentionally kept all of the logging out of ProgressMetrics so it could function like a pure dataclass.

terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count,
target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
else:
progress.record_metric_api_failure()
logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...")
# TODO - Handle idle non-terminal pipeline
return terminal, prev_no_partitions_count
return progress


def __should_continue_monitoring(progress: ProgressMetrics, proc: Optional[Popen] = None) -> bool:
return not progress.is_in_terminal_state() and (proc is None or is_process_alive(proc))


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
# The "dp_process" parameter is optional, and signifies a local Data Prepper process
def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while dp_process.returncode is None and not is_migration_complete:
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if dp_process.returncode is None:
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
while __should_continue_monitoring(progress_metrics, dp_process):
if dp_process is not None:
# Wait on local process
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
else:
# Thread sleep
time.sleep(poll_interval_seconds)
if dp_process is None or is_process_alive(dp_process):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why might dp_process be None? It seems surprising that woudl be handled in the same case as is_process_alive == True

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dp_process being None means the Data Prepper process is not a subprocess - this allows the migration monitor module to be used against remote Data Prepper processes as well. While the Fetch Migration solution today only uses a local subprocess, this flexibility allows the migration monitor to be used in other scenarios.

Here, dp_process is None functions as short-circuit logic for the is_process_alive check - which shouldn't be run unless the Data Prepper process is local.

The comment preceding the run method was incorrect, so I updated this

progress_metrics = check_and_log_progress(endpoint_info, progress_metrics)
# Loop terminated
if not is_migration_complete:
if not progress_metrics.is_in_terminal_state():
# This will only happen for a local Data Prepper process
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
if progress_metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif progress_metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif progress_metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
# Shut down Data Prepper pipeline via API
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)
if dp_process.returncode is None:
if dp_process is None:
# No local process
return 0
elif is_process_alive(dp_process):
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
return dp_process.returncode


def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not is_migration_complete:
time.sleep(poll_interval_seconds)
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)


if __name__ == '__main__': # pragma no cover
# Set up parsing for command line arguments
arg_parser = argparse.ArgumentParser(
Expand Down
1 change: 1 addition & 0 deletions FetchMigration/python/migration_monitor_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
class MigrationMonitorParams:
target_count: int
data_prepper_endpoint: str
is_local_process: bool = False
134 changes: 134 additions & 0 deletions FetchMigration/python/progress_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging
import math
from typing import Optional


# Class that tracks metrics on the health and progress of the migration.Specific metric values from the Data Prepper
# metrics API endpoint are retrieved and stored, as well as idle-value tracking via counters that may indicate an
# idle pipeline. Counters are also used to keep track of API failures or missing metric values.
class ProgressMetrics:

# Private constants
__IDLE_VALUE_PREFIX: str = "idle-value-"
_METRIC_API_FAIL_KEY: str = "metric_api_fail"
_SUCCESS_DOCS_KEY = "success_docs"
_REC_IN_FLIGHT_KEY = "records_in_flight"
_NO_PART_KEY = "no_partitions"

target_doc_count: int
idle_threshold: int
current_values_map: dict[str, Optional[int]]
prev_values_map: dict[str, Optional[int]]
counter_map: dict[str, int]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I'm not sure I get why we're using a single dict with key prefixes to track idle values and failures--does it make more sense to keep them separately?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also isn't a blocker for me, but goign forward, I think it's little cleaner to use something like named tuples or dataclasses for these dictionaries. There's (I believe) a fixed set of possible values for each of these, so it seems clearer to a reader + less error prone to use defined lists of possible values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I'm not sure I get why we're using a single dict with key prefixes to track idle values and failures--does it make more sense to keep them separately?

Yes, these could be stored in separate structures In my mind, they're both counter values - one for the number of times a metric value is idle, and the other for the number of failures - but i can see how this conflation makes the code harder to read.

I'll look into incorporating this, as well as your feedback on stronger control of possible values, in a follow-up PR.


def __init__(self, doc_count, idle_threshold):
self.target_doc_count = doc_count
self.idle_threshold = idle_threshold
self.current_values_map = dict()
self.prev_values_map = dict()
self.counter_map = dict()

def __reset_counter(self, key: str):
if key in self.counter_map:
del self.counter_map[key]

def __increment_counter(self, key: str):
val = self.counter_map.get(key, 0)
self.counter_map[key] = val + 1

def __get_idle_value_key_name(self, key: str) -> str:
return self.__IDLE_VALUE_PREFIX + key

def __get_idle_value_count(self, key: str) -> Optional[int]:
idle_value_key = self.__get_idle_value_key_name(key)
return self.counter_map.get(idle_value_key)

def __record_value(self, key: str, val: Optional[int]):
if key in self.current_values_map:
# Move current value to previous
self.prev_values_map[key] = self.current_values_map[key]
# Track idle value metrics
idle_value_key = self.__get_idle_value_key_name(key)
if self.prev_values_map[key] == val:
self.__increment_counter(idle_value_key)
else:
self.__reset_counter(idle_value_key)
# Store new value
self.current_values_map[key] = val
kartg marked this conversation as resolved.
Show resolved Hide resolved

def __get_current_value(self, key: str) -> Optional[int]:
return self.current_values_map.get(key)

def reset_metric_api_failure(self):
self.__reset_counter(self._METRIC_API_FAIL_KEY)

def record_metric_api_failure(self):
self.__increment_counter(self._METRIC_API_FAIL_KEY)

def __reset_success_doc_value_failure(self):
self.__reset_counter(self._SUCCESS_DOCS_KEY)
# Also reset API falure counter
self.reset_metric_api_failure()

def record_success_doc_value_failure(self):
self.__record_value(self._SUCCESS_DOCS_KEY, None)

def update_success_doc_count(self, doc_count: int) -> int:
self.__reset_success_doc_value_failure()
self.__record_value(self._SUCCESS_DOCS_KEY, doc_count)
return self.get_doc_completion_percentage()

def update_records_in_flight_count(self, rec_in_flight: Optional[int]):
self.__record_value(self._REC_IN_FLIGHT_KEY, rec_in_flight)

def update_no_partitions_count(self, no_part_count: Optional[int]):
self.__record_value(self._NO_PART_KEY, no_part_count)

def get_doc_completion_percentage(self) -> int:
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return math.floor((success_doc_count * 100) / self.target_doc_count)

def all_docs_migrated(self) -> bool:
# TODO Add a check for partitionsCompleted = indices
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return success_doc_count >= self.target_doc_count

def is_migration_complete_success(self) -> bool:
is_idle_pipeline: bool = False
rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY)
no_partitions_count = self.__get_current_value(self._NO_PART_KEY)
prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0)
# Check for no records in flight
if rec_in_flight is not None and rec_in_flight == 0:
# No-partitions metrics should steadily tick up
if no_partitions_count is not None and no_partitions_count > prev_no_partitions_count > 0:
is_idle_pipeline = True
return is_idle_pipeline and self.all_docs_migrated()

def is_migration_idle(self) -> bool:
keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY]
for key in keys_to_check:
val = self.__get_idle_value_count(key)
if val is not None and val >= self.idle_threshold:
logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " +
str(self.idle_threshold))
return True
# End of loop
return False

def is_too_may_api_failures(self) -> bool:
return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold

def is_in_terminal_state(self) -> bool:
return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures()

def log_idle_pipeline_debug_metrics(self): # pragma no cover
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.debug("Idle pipeline metrics - " +
f"Records in flight: [{self.__get_current_value(self._REC_IN_FLIGHT_KEY)}], " +
f"No-partitions counter: [{self.__get_current_value(self._NO_PART_KEY)}]" +
f"Previous no-partition value: [{self.prev_values_map.get(self._NO_PART_KEY)}]")
4 changes: 2 additions & 2 deletions FetchMigration/python/tests/test_fetch_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TestFetchOrchestrator(unittest.TestCase):

@patch('migration_monitor.monitor_local')
@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand All @@ -33,7 +33,7 @@ def test_orchestrator_run(self, mock_metadata_migration: MagicMock, mock_subproc
mock_subprocess.assert_called_once_with(expected_dp_runnable)
mock_monitor.assert_called_once_with(expected_monitor_input, mock_subprocess.return_value)

@patch('migration_monitor.monitor_local')
@patch('migration_monitor.run')
@patch('subprocess.Popen')
@patch('metadata_migration.run')
# Note that mock objects are passed bottom-up from the patch order above
Expand Down
Loading
Loading