Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter empty distribution metrics #32027

Closed
wants to merge 3 commits into from

Conversation

Naireen
Copy link
Contributor

@Naireen Naireen commented Jul 30, 2024

Filter empty distributions

On the python sdk, filter emtpy distributions so its not sent to the runner for processing, since it adds unnecessary overhead. This can occur when processing failed, or there was an empty split created (for batch pipelines)


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Naireen Naireen force-pushed the fix_distribution_counter branch 5 times, most recently from afc0bb3 to b0d332d Compare July 30, 2024 22:14
@Naireen Naireen marked this pull request as ready for review July 30, 2024 22:15
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @tvalentyn for label python.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@Naireen
Copy link
Contributor Author

Naireen commented Jul 31, 2024

Run Python_Coverage PreCommit

@Naireen Naireen force-pushed the fix_distribution_counter branch 6 times, most recently from e17a6fa to d1eee16 Compare August 1, 2024 22:14
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count

try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to check here if(count > 0) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -214,6 +214,11 @@ def int64_user_distribution(namespace, name, metric, ptransform=None):
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if metric.count <= 0:
raise TypeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not seeing where this exception will be caught, hence same question - can we avoid entering this codepath instead of catching the exception in a try-catch.

ValueError would be more appropriate here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if exception is uncaught, it can create a breaking change for users, so as much as possible I'd prefer to not enter the exception path or fail silently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Dataflow Runner, there are checks when we create the distribution, so that should be fine. For a user defined counter, what behaviour do you want? I agree we shouldn't introduce breaking changes. Is it fine to not emit a counter in that case? If we still want to emit something here, then we'll have an empty counter, and we'd have to filter it out in the runner to prevent sending it to the backend (which we don't want to add either based on your previous comments.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not emitting sounds fine, left some comments.

if metric.count <= 0:
raise TypeError(
'Expected a non zero distribution count for %s metric but received %s' %
(metric, metric.count))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably meant metric.name here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Naireen Naireen force-pushed the fix_distribution_counter branch 2 times, most recently from 40d4591 to cf01c43 Compare August 12, 2024 15:48
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)
if metric.count <= 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to lead the condition with the happy path (if metric.count > 0)

_LOGGER.debug(
'Expected a non zero distribution count for %s metric but received %s' %
(metric.name, metric.count))
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we return None (implicitly), we need to change the method typehint to # type: (...) -> Optional[metrics_pb2.MonitoringInfo].

USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)
if metric.count <= 0:
_LOGGER.debug(
'Expected a non zero distribution count for %s metric but received %s' %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which situation this log message will be actionable? I wonder if we should remove this log if it commonly happens (e.g. retries).

@@ -605,18 +605,23 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that elem_count_mi is not None here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what happens with distribution if the count is 0 after all? For example , pipeline reads from a file, but there are no elements, and pipeline stops. will this case be handled correctly?

@@ -214,6 +214,11 @@ def int64_user_distribution(namespace, name, metric, ptransform=None):
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if metric.count <= 0:
raise TypeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not emitting sounds fine, left some comments.

@Naireen
Copy link
Contributor Author

Naireen commented Aug 12, 2024

So with the current approach of returning None, this fails:

./gradlew :sdks:python:test-suites:tox:pycommon:mypy (part of the Precommig Python Lint test)

Essentially the issue is that we have a dict of strings input, which should produce a dict of Monitoring info. Since we aren't raising errors, pyhint fails since it now expects monitoring_infos.int64_distribution changes from metrics_pb2.MonitoringInfo to Optional[metrics_pb2.MonitoringInfo], which makes operations.pcollection_count_monitoring_infos invalid, can't go from Dict[FrozenSet, metrics_pb2.MonitoringInfo] to Dict[FrozenSet, Optional[metrics_pb2.MonitoringInfo]]

Ideally we do some filtering here, if we went down this approach, which isn't the most performant

The other option was to raise an error, gaurd the runner against it, so then if a user creates a invalid distribution, it would error (which is a breaking change, which also isn't desirable)

Then which of the two approaches make sense? Is there a third option?

Copy link
Contributor

Reminder, please take a look at this pr: @tvalentyn

@tvalentyn
Copy link
Contributor

Discussed offline

@tvalentyn
Copy link
Contributor

waiting on author

Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Oct 20, 2024
Copy link
Contributor

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Oct 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants