Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter empty distribution metrics #32027

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,11 @@ def int64_user_distribution(namespace, name, metric, ptransform=None):
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if metric.count <= 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to lead the condition with the happy path (if metric.count > 0)

raise TypeError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not seeing where this exception will be caught, hence same question - can we avoid entering this codepath instead of catching the exception in a try-catch.

ValueError would be more appropriate here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if exception is uncaught, it can create a breaking change for users, so as much as possible I'd prefer to not enter the exception path or fail silently

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Dataflow Runner, there are checks when we create the distribution, so that should be fine. For a user defined counter, what behaviour do you want? I agree we shouldn't introduce breaking changes. Is it fine to not emit a counter in that case? If we still want to emit something here, then we'll have an empty counter, and we'd have to filter it out in the runner to prevent sending it to the backend (which we don't want to add either based on your previous comments.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not emitting sounds fine, left some comments.

'Expected a non zero distribution count for %s metric but received %s' %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which situation this log message will be actionable? I wonder if we should remove this log if it commonly happens (e.g. retries).

(metric, metric.count))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably meant metric.name here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
Expand All @@ -232,6 +237,10 @@ def int64_distribution(urn, metric, ptransform=None, pcollection=None):
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
if metric.count <= 0:
raise TypeError(
'Expected a non zero distribution count for %s metric but received %s' %
(metric, metric.count))
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/runners/worker/bundle_processor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,14 @@ def test_can_sample(self):
])
})
self.assertEqual(samples, expected)

# Since data was successfully processed, there should be a
# beam:metric:sampled_byte_size:v1 metric.
self.assertTrue([
val for val in processor.monitoring_infos()
if val.urn == "beam:metric:sampled_byte_size:v1"
])

finally:
data_sampler.stop()

Expand Down Expand Up @@ -382,6 +390,13 @@ def test_can_sample_exceptions(self):
with self.assertRaisesRegex(RuntimeError, 'expected exception'):
processor.process_bundle('instruction_id')

# Since data was not successfully processed, there should not be a
# beam:metric:sampled_byte_size:v1 metric.
self.assertFalse([
val for val in processor.monitoring_infos()
if val.urn == "beam:metric:sampled_byte_size:v1"
])

# NOTE: The expected sample comes from the input PCollection. This is very
# important because there can be coder issues if the sample is put in the
# wrong PCollection.
Expand Down
45 changes: 29 additions & 16 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,18 +605,23 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)

(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that elem_count_mi is not None here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what happens with distribution if the count is 0 after all? For example , pipeline reads from a file, but there are no elements, and pipeline stops. will this case be handled correctly?

all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count

try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to check here if(count > 0) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
except Exception:
_LOGGER.debug(
"Unable to create distribution for pcollection %s for urn %s",
pcollection_id,
monitoring_infos.SAMPLED_BYTE_SIZE_URN)

return all_monitoring_infos

Expand Down Expand Up @@ -1025,11 +1030,19 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
try:
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
except Exception:
_LOGGER.debug(
"Unable to create distribution for pcollection %s for urn %s",
pcollection_id,
monitoring_infos.SAMPLED_BYTE_SIZE_URN)
pass
return infos

def _get_runtime_performance_hints(self):
Expand Down
Loading