Skip to content

Commit

Permalink
Fix rebase conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Kartik Ganesh <[email protected]>
  • Loading branch information
kartg committed Sep 11, 2023
1 parent a69dd23 commit d857c1f
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 7 deletions.
9 changes: 4 additions & 5 deletions FetchMigration/python/migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
# TODO Remove hardcoded EndpointInfo
default_auth = ('admin', 'admin')
endpoint = EndpointInfo(args.dp_endpoint, default_auth, False)
target_doc_count: int = args.target_count
prev_no_partitions_count = 0
terminal = False
logging.debug("Starting migration monitor until target doc count: " + str(target_doc_count))
Expand All @@ -71,15 +72,13 @@ def run(args: MigrationMonitorParams, wait_seconds: int = 30) -> None:
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC)
logging.debug("Got metrics: " +
",".join(map(str, [success_docs, rec_in_flight, prev_no_partitions_count, no_partitions_count])))
",".join(map(str, [success_docs, rec_in_flight, prev_no_partitions_count,
no_partitions_count])))
terminal = check_if_complete(success_docs, rec_in_flight, no_partitions_count,
prev_no_partitions_count, args.target_count)
prev_no_partitions_count, target_doc_count)
if not terminal:
# Save no_partitions_count
prev_no_partitions_count = no_partitions_count

if not terminal:
time.sleep(wait_seconds)
# Loop terminated, shut down the Data Prepper pipeline
logging.info("Migration monitor at terminal state, shutting down...\n")
shutdown_pipeline(endpoint)
Expand Down
3 changes: 1 addition & 2 deletions FetchMigration/python/tests/test_migration_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ def test_run(self, mock_fetch: MagicMock, mock_get: MagicMock, mock_check: Magic
expected_endpoint_info = EndpointInfo(test_input.dp_endpoint, ('admin', 'admin'), False)
self.assertEqual(2, mock_fetch.call_count)
mock_fetch.assert_called_with(expected_endpoint_info)
# We expect one wait cycle
mock_sleep.assert_called_once_with(wait_time)
mock_sleep.assert_called_with(wait_time)
mock_shut.assert_called_once_with(expected_endpoint_info)

@patch('migration_monitor.shutdown_pipeline')
Expand Down

0 comments on commit d857c1f

Please sign in to comment.