Skip to content

Commit

Permalink
[Fetch Migration] Handling of idle Data Prepper pipeline with incompl…
Browse files Browse the repository at this point in the history
…ete migration

This includes a new ProgressMetrics class that is used by the migration monitor to track various Data Prepper and API failure metrics in order to detect an idle pipeline. Much of the migration-success logic from the monitoring module has now been encapsulated in this class. Unit test updates and improvements are also included.

Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Nov 1, 2023
1 parent f8017c5 commit 7003312
Show file tree
Hide file tree
Showing 5 changed files with 395 additions and 124 deletions.
2 changes: 1 addition & 1 deletion FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, indices_to_create, args.output_file)
if args.report:
if args.report: # pragma no cover
print("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
index_data = dict()
Expand Down
116 changes: 51 additions & 65 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import argparse
import logging
import math
import subprocess
import time
from subprocess import Popen
Expand All @@ -12,14 +11,20 @@

from endpoint_info import EndpointInfo
from migration_monitor_params import MigrationMonitorParams
from progress_metrics import ProgressMetrics

# Path to the Data Prepper Prometheus metrics API endpoint
# Used to monitor the progress of the migration
__METRICS_API_PATH = "/metrics/prometheus"
__SHUTDOWN_API_PATH = "/shutdown"
__DOC_SUCCESS_METRIC = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC = "_noPartitionsAcquired"
__METRICS_API_PATH: str = "/metrics/prometheus"
__SHUTDOWN_API_PATH: str = "/shutdown"
__DOC_SUCCESS_METRIC: str = "_opensearch_documentsSuccess"
__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight"
__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired"
__IDLE_THRESHOLD: int = 5


def is_process_alive(proc: Popen) -> bool:
return proc.returncode is None


# Gracefully shutdown a subprocess
Expand All @@ -29,7 +34,7 @@ def shutdown_process(proc: Popen) -> Optional[int]:
try:
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
if proc.returncode is None:
if is_process_alive(proc):
# Failed to terminate, send SIGKILL
proc.kill()
return proc.returncode
Expand Down Expand Up @@ -60,84 +65,63 @@ def get_metric_value(metric_families: List, metric_suffix: str) -> Optional[int]
return None


def check_if_complete(doc_count: Optional[int], in_flight: Optional[int], no_partition_count: Optional[int],
prev_no_partition: int, target: int) -> bool:
# Check for target doc_count
# TODO Add a check for partitionsCompleted = indices
if doc_count is not None and doc_count >= target:
# Check for idle pipeline
logging.info("Target doc count reached, checking for idle pipeline...")
# Debug metrics
if logging.getLogger().isEnabledFor(logging.DEBUG): # pragma no cover
debug_msg_template: str = "Idle pipeline metrics - " + \
"Records in flight: [{0}], " + \
"No-partitions counter: [{1}]" + \
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(in_flight, no_partition_count, prev_no_partition))

if in_flight is not None and in_flight == 0:
# No-partitions metrics should steadily tick up
if no_partition_count is not None and no_partition_count > prev_no_partition > 0:
return True
return False


def check_and_log_progress(endpoint_info: EndpointInfo, target_doc_count: int, prev_no_partitions_count: int) -> \
tuple[bool, int]:
terminal: bool = False
def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetrics) -> ProgressMetrics:
# If the API call fails, the response is empty
metrics = fetch_prometheus_metrics(endpoint_info)
if metrics is not None:
# Reset API failure counter
progress.reset_metric_api_failure()
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC)
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
if success_docs is not None: # pragma no cover
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count)
progress.update_records_in_flight_count(get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC))
progress.update_no_partitions_count(get_metric_value(metrics, __NO_PARTITIONS_METRIC))
if success_docs is not None:
completion_percentage = progress.update_success_doc_count(success_docs)
progress_message: str = "Completed " + str(success_docs) + \
" docs ( " + str(completion_percentage) + "% )"
logging.info(progress_message)
if progress.all_docs_migrated():
logging.info("All documents migrated...")
else:
progress.record_success_doc_value_failure()
logging.warning("Could not fetch progress stats from Data Prepper response, " +
"will retry on next polling cycle...")
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count, prev_no_partitions_count,
target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count
else:
progress.record_metric_api_failure()
logging.warning("Data Prepper metrics API call failed, will retry on next polling cycle...")
# TODO - Handle idle non-terminal pipeline
return terminal, prev_no_partitions_count
return progress


def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval_seconds: int = 30) -> Optional[int]:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
# Sets returncode. A value of None means the subprocess has not yet terminated
dp_process.poll()
while dp_process.returncode is None and not is_migration_complete:
while is_process_alive(dp_process) and not progress_metrics.is_in_terminal_state():
try:
dp_process.wait(timeout=poll_interval_seconds)
except subprocess.TimeoutExpired:
pass
if dp_process.returncode is None:
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
# Loop terminated
if not is_migration_complete:
if is_process_alive(dp_process):
progress_metrics = check_and_log_progress(endpoint_info, progress_metrics)
# Log debug metrics
progress_metrics.log_idle_pipeline_debug_metrics()
# Loop terminated, shut down the Data Prepper pipeline
if not progress_metrics.is_in_terminal_state():
logging.error("Migration did not complete, process exited with code: " + str(dp_process.returncode))
# TODO - Implement rollback
logging.error("Please delete any partially migrated indices before retrying the migration.")
return dp_process.returncode
else:
if progress_metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif progress_metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif progress_metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
# Shut down Data Prepper pipeline via API
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
shutdown_pipeline(endpoint_info)
if dp_process.returncode is None:
if is_process_alive(dp_process):
# Workaround for https://github.com/opensearch-project/data-prepper/issues/3141
return shutdown_process(dp_process)
else:
Expand All @@ -146,17 +130,19 @@ def monitor_local(args: MigrationMonitorParams, dp_process: Popen, poll_interval

def run(args: MigrationMonitorParams, poll_interval_seconds: int = 30) -> None:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
target_doc_count: int = args.target_count
# Counter to track the no_partition_count metric
no_partition_count: int = 0
is_migration_complete = False
logging.info("Starting migration monitor until target doc count: " + str(target_doc_count))
while not is_migration_complete:
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
logging.info("Starting migration monitor until target doc count: " + str(progress_metrics.target_doc_count))
while not progress_metrics.is_in_terminal_state():
time.sleep(poll_interval_seconds)
is_migration_complete, no_partition_count = check_and_log_progress(
endpoint_info, target_doc_count, no_partition_count)
progress_metrics = check_and_log_progress(endpoint_info, progress_metrics)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor observed successful migration and idle pipeline, shutting down...\n")
if progress_metrics.is_migration_complete_success():
logging.info("Migration monitor observed successful migration, shutting down...\n")
elif progress_metrics.is_migration_idle():
logging.warning("Migration monitor observed idle pipeline (migration may be incomplete), shutting down...")
elif progress_metrics.is_too_may_api_failures():
logging.warning("Migration monitor was unable to fetch migration metrics, terminating...")
# Shut down Data Prepper pipeline via API
shutdown_pipeline(endpoint_info)


Expand Down
138 changes: 138 additions & 0 deletions FetchMigration/python/progress_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import logging
import math
from typing import Optional


# Class that tracks metrics on the health and progress of the migration.Specific metric values from the Data Prepper
# metrics API endpoint are retrieved and stored, as well as idle-value tracking via counters that may indicate an
# idle pipeline. Counters are also used to keep track of API failures or missing metric values.
class ProgressMetrics:

# Private constants
__IDLE_VALUE_PREFIX: str = "idle-value-"
_METRIC_API_FAIL_KEY: str = "metric_api_fail"
_SUCCESS_DOCS_KEY = "success_docs"
_REC_IN_FLIGHT_KEY = "records_in_flight"
_NO_PART_KEY = "no_partitions"

target_doc_count: int
idle_threshold: int
current_values_map: dict[str, Optional[int]]
prev_values_map: dict[str, Optional[int]]
counter_map: dict[str, int]

def __init__(self, doc_count, idle_threshold):
self.target_doc_count = doc_count
self.idle_threshold = idle_threshold
self.current_values_map = dict()
self.prev_values_map = dict()
self.counter_map = dict()

def __reset_counter(self, key: str):
if key in self.counter_map:
del self.counter_map[key]

def __increment_counter(self, key: str):
val = self.counter_map.get(key, 0)
self.counter_map[key] = val + 1

def __get_idle_value_key_name(self, key: str) -> str:
return self.__IDLE_VALUE_PREFIX + key

def __get_idle_value_count(self, key: str) -> Optional[int]:
idle_value_key = self.__get_idle_value_key_name(key)
return self.counter_map.get(idle_value_key)

def __record_value(self, key: str, val: Optional[int]):
if key in self.current_values_map:
# Move current value to previous
self.prev_values_map[key] = self.current_values_map[key]
# Store new value
self.current_values_map[key] = val
# Track idle value metrics
idle_value_key = self.__get_idle_value_key_name(key)
if self.prev_values_map[key] == val:
self.__increment_counter(idle_value_key)
else:
self.__reset_counter(idle_value_key)
self.current_values_map[key] = val

def __get_current_value(self, key: str) -> Optional[int]:
return self.current_values_map.get(key)

def reset_metric_api_failure(self):
self.__reset_counter(self._METRIC_API_FAIL_KEY)

def record_metric_api_failure(self):
self.__increment_counter(self._METRIC_API_FAIL_KEY)

def __reset_success_doc_value_failure(self):
self.__reset_counter(self._SUCCESS_DOCS_KEY)
# Also reset API falure counter
self.reset_metric_api_failure()

def record_success_doc_value_failure(self):
self.__record_value(self._SUCCESS_DOCS_KEY, None)

def update_success_doc_count(self, doc_count: int) -> int:
self.__reset_success_doc_value_failure()
self.__record_value(self._SUCCESS_DOCS_KEY, doc_count)
return self.get_doc_completion_percentage()

def update_records_in_flight_count(self, rec_in_flight: Optional[int]):
self.__record_value(self._REC_IN_FLIGHT_KEY, rec_in_flight)

def update_no_partitions_count(self, no_part_count: Optional[int]):
self.__record_value(self._NO_PART_KEY, no_part_count)

def get_doc_completion_percentage(self) -> int:
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return math.floor((success_doc_count * 100) / self.target_doc_count)

def all_docs_migrated(self) -> bool:
# TODO Add a check for partitionsCompleted = indices
success_doc_count = self.__get_current_value(self._SUCCESS_DOCS_KEY)
if success_doc_count is None:
success_doc_count = 0
return success_doc_count >= self.target_doc_count

def is_migration_complete_success(self) -> bool:
is_idle_pipeline: bool = False
rec_in_flight = self.__get_current_value(self._REC_IN_FLIGHT_KEY)
no_partitions_count = self.__get_current_value(self._NO_PART_KEY)
prev_no_partitions_count = self.prev_values_map.get(self._NO_PART_KEY, 0)
# Check for no records in flight
if rec_in_flight is not None and rec_in_flight == 0:
# No-partitions metrics should steadily tick up
if no_partitions_count is not None and no_partitions_count > prev_no_partitions_count > 0:
is_idle_pipeline = True
return is_idle_pipeline and self.all_docs_migrated()

def is_migration_idle(self) -> bool:
keys_to_check = [self._NO_PART_KEY, self._SUCCESS_DOCS_KEY]
for key in keys_to_check:
val = self.__get_idle_value_count(key)
if val is not None and val >= self.idle_threshold:
logging.warning("Idle pipeline detected because [" + key + "] value was idle above threshold: " +
str(self.idle_threshold))
return True
# End of loop
return False

def is_too_may_api_failures(self) -> bool:
return self.counter_map.get(self._METRIC_API_FAIL_KEY, 0) >= self.idle_threshold

def is_in_terminal_state(self) -> bool:
return self.is_migration_complete_success() or self.is_migration_idle() or self.is_too_may_api_failures()

def log_idle_pipeline_debug_metrics(self): # pragma no cover
if logging.getLogger().isEnabledFor(logging.DEBUG):
debug_msg_template: str = "Idle pipeline metrics - " + \
"Records in flight: [{0}], " + \
"No-partitions counter: [{1}]" + \
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(self.__get_current_value(self._REC_IN_FLIGHT_KEY),
self.__get_current_value(self._NO_PART_KEY),
self.prev_values_map.get(self._NO_PART_KEY)))
Loading

0 comments on commit 7003312

Please sign in to comment.