-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fetch Migration] Handle idle pipeline when target doc count is never reached #377
[Fetch Migration] Handle idle pipeline when target doc count is never reached #377
Conversation
Codecov Report
@@ Coverage Diff @@
## main #377 +/- ##
=========================================
Coverage 63.55% 63.55%
Complexity 715 715
=========================================
Files 82 82
Lines 3298 3298
Branches 303 303
=========================================
Hits 2096 2096
Misses 1014 1014
Partials 188 188
Flags with carried forward coverage won't be shown. Click here to find out more. |
5beb8a4
to
686bb94
Compare
idle_threshold: int | ||
current_values_map: dict[str, Optional[int]] | ||
prev_values_map: dict[str, Optional[int]] | ||
counter_map: dict[str, int] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but I'm not sure I get why we're using a single dict with key prefixes to track idle values and failures--does it make more sense to keep them separately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also isn't a blocker for me, but goign forward, I think it's little cleaner to use something like named tuples or dataclasses for these dictionaries. There's (I believe) a fixed set of possible values for each of these, so it seems clearer to a reader + less error prone to use defined lists of possible values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but I'm not sure I get why we're using a single dict with key prefixes to track idle values and failures--does it make more sense to keep them separately?
Yes, these could be stored in separate structures In my mind, they're both counter values - one for the number of times a metric value is idle, and the other for the number of failures - but i can see how this conflation makes the code harder to read.
I'll look into incorporating this, as well as your feedback on stronger control of possible values, in a follow-up PR.
__DOC_SUCCESS_METRIC: str = "_opensearch_documentsSuccess" | ||
__RECORDS_IN_FLIGHT_METRIC: str = "_BlockingBuffer_recordsInFlight" | ||
__NO_PARTITIONS_METRIC: str = "_noPartitionsAcquired" | ||
__IDLE_THRESHOLD: int = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see how this is used in ProgressMetrics, but I don't have much understanding on where this value comes from and why 5 is the right number. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5
was a completely random choice 😄 Data Prepper does not have the notion of an "idle" pipeline so this threshold is completely up to us. With a default polling interval of 30 seconds, I didn't want the monitor to leave the pipeline running for too long, or close it too quickly. So I picked 5 iterations (i.e. 2.5 minutes) as a reasonable (IMO) threshold
# Reset API failure counter | ||
progress.reset_metric_api_failure() | ||
success_docs = get_metric_value(metrics, __DOC_SUCCESS_METRIC) | ||
rec_in_flight = get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC) | ||
no_partitions_count = get_metric_value(metrics, __NO_PARTITIONS_METRIC) | ||
if success_docs is not None: # pragma no cover | ||
completion_percentage: int = math.floor((success_docs * 100) / target_doc_count) | ||
progress.update_records_in_flight_count(get_metric_value(metrics, __RECORDS_IN_FLIGHT_METRIC)) | ||
progress.update_no_partitions_count(get_metric_value(metrics, __NO_PARTITIONS_METRIC)) | ||
if success_docs is not None: | ||
completion_percentage = progress.update_success_doc_count(success_docs) | ||
progress_message: str = "Completed " + str(success_docs) + \ | ||
" docs ( " + str(completion_percentage) + "% )" | ||
logging.info(progress_message) | ||
if progress.all_docs_migrated(): | ||
logging.info("All documents migrated...") | ||
else: | ||
progress.record_success_doc_value_failure() | ||
logging.warning("Could not fetch progress stats from Data Prepper response, " + | ||
"will retry on next polling cycle...") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there's still more logic here than I would expect, given ProgressMetrics. Does it make sense for ProgressMetrics
to have an update
(or whatever) function that accepts a metrics
object, so this function can be stripped down to basically:
metrics = metrics = fetch_prometheus_metrics(endpoint_info)
progress.update(metrics)
if progress.all_docs_migrated():
# do whatever
else:
# do other stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's a reasonable ask, but i'd like to defer that change to the point where we have a better mechanism to surface metrics. I intentionally kept all of the logging out of ProgressMetrics
so it could function like a pure dataclass.
else: | ||
# Thread sleep | ||
time.sleep(poll_interval_seconds) | ||
if dp_process is None or is_process_alive(dp_process): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why might dp_process
be None? It seems surprising that woudl be handled in the same case as is_process_alive == True
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dp_process
being None
means the Data Prepper process is not a subprocess - this allows the migration monitor module to be used against remote Data Prepper processes as well. While the Fetch Migration solution today only uses a local subprocess, this flexibility allows the migration monitor to be used in other scenarios.
Here, dp_process is None
functions as short-circuit logic for the is_process_alive
check - which shouldn't be run unless the Data Prepper process is local.
The comment preceding the run
method was incorrect, so I updated this
…ete migration This includes a new ProgressMetrics class that is used by the migration monitor to track various Data Prepper and API failure metrics in order to detect an idle pipeline. Much of the migration-success logic from the monitoring module has now been encapsulated in this class. Unit test updates and improvements are also included. Signed-off-by: Kartik Ganesh <[email protected]>
Run and monitor_local have been merged into a single function since most of their code/logic is identical. Unit tests have been updated for improved coverage. Signed-off-by: Kartik Ganesh <[email protected]>
Signed-off-by: Kartik Ganesh <[email protected]>
5eecac5
to
21b3717
Compare
Description
This commit introduces changes to track various Data Prepper metrics (as well as API failure counters) in order to detect an idle pipeline or non-responsive Data Prepper subprocess. A new
ProgressMetrics
class has been added to track these metrics and encapsulate detection logic. Much of the migration-success logic from the monitoring module has been moved to this class now. Unit test updates and improvements are also included.This PR also includes a second commit which refactors/merges the
run
andmonitor_local
functions together (since most of their code/logic is identical) for improved unit test coverage.Testing
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.