Skip to content

Commit

Permalink
Incorporated PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Nov 7, 2023
1 parent 686bb94 commit 5eecac5
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 12 deletions.
6 changes: 3 additions & 3 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ def check_and_log_progress(endpoint_info: EndpointInfo, progress: ProgressMetric
return progress


def __should_continue_monitoring(metrics: ProgressMetrics, proc: Optional[Popen] = None) -> bool:
return not metrics.is_in_terminal_state() and (proc is None or is_process_alive(proc))
def __should_continue_monitoring(progress: ProgressMetrics, proc: Optional[Popen] = None) -> bool:
return not progress.is_in_terminal_state() and (proc is None or is_process_alive(proc))


# Last parameter is optional, and signifies a local Data Prepper process
# The "dp_process" parameter is optional, and signifies a local Data Prepper process
def run(args: MigrationMonitorParams, dp_process: Optional[Popen] = None, poll_interval_seconds: int = 30) -> int:
endpoint_info = EndpointInfo(args.data_prepper_endpoint)
progress_metrics = ProgressMetrics(args.target_count, __IDLE_THRESHOLD)
Expand Down
14 changes: 5 additions & 9 deletions FetchMigration/python/progress_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ def __record_value(self, key: str, val: Optional[int]):
if key in self.current_values_map:
# Move current value to previous
self.prev_values_map[key] = self.current_values_map[key]
# Store new value
self.current_values_map[key] = val
# Track idle value metrics
idle_value_key = self.__get_idle_value_key_name(key)
if self.prev_values_map[key] == val:
self.__increment_counter(idle_value_key)
else:
self.__reset_counter(idle_value_key)
# Store new value
self.current_values_map[key] = val

def __get_current_value(self, key: str) -> Optional[int]:
Expand Down Expand Up @@ -129,10 +128,7 @@ def is_in_terminal_state(self) -> bool:

def log_idle_pipeline_debug_metrics(self): # pragma no cover
if logging.getLogger().isEnabledFor(logging.DEBUG):
debug_msg_template: str = "Idle pipeline metrics - " + \
"Records in flight: [{0}], " + \
"No-partitions counter: [{1}]" + \
"Previous no-partition value: [{2}]"
logging.debug(debug_msg_template.format(self.__get_current_value(self._REC_IN_FLIGHT_KEY),
self.__get_current_value(self._NO_PART_KEY),
self.prev_values_map.get(self._NO_PART_KEY)))
logging.debug("Idle pipeline metrics - " +
f"Records in flight: [{self.__get_current_value(self._REC_IN_FLIGHT_KEY)}], " +
f"No-partitions counter: [{self.__get_current_value(self._NO_PART_KEY)}]" +
f"Previous no-partition value: [{self.prev_values_map.get(self._NO_PART_KEY)}]")

0 comments on commit 5eecac5

Please sign in to comment.