Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter empty distribution metrics #32027

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 25 additions & 9 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# pytype: skip-file

import collections
import logging
import time
from functools import reduce
from typing import FrozenSet
Expand Down Expand Up @@ -116,6 +117,8 @@
common_urns.monitoring_info_labels.INSTANCE_ID.label_props.name)
TABLE_ID_LABEL = common_urns.monitoring_info_labels.TABLE_ID.label_props.name

_LOGGER = logging.getLogger(__name__)


def extract_counter_value(monitoring_info_proto):
"""Returns the counter value of the monitoring info."""
Expand Down Expand Up @@ -213,11 +216,18 @@ def int64_user_distribution(namespace, name, metric, ptransform=None):
metric: The DistributionData for the metric.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)
if metric.count <= 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to lead the condition with the happy path (if metric.count > 0)

_LOGGER.debug(
'Expected a non zero distribution count for %s metric but received %s' %
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which situation this log message will be actionable? I wonder if we should remove this log if it commonly happens (e.g. retries).

(metric.name, metric.count))
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we return None (implicitly), we need to change the method typehint to # type: (...) -> Optional[metrics_pb2.MonitoringInfo].

else:
labels = create_labels(
ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_distribution(urn, metric, ptransform=None, pcollection=None):
Expand All @@ -231,10 +241,16 @@ def int64_distribution(urn, metric, ptransform=None, pcollection=None):
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)
if metric.count <= 0:
_LOGGER.debug(
'Expected a non zero distribution count for %s metric but received %s' %
(metric.name, metric.count))
return
else:
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_user_gauge(namespace, name, metric, ptransform=None):
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/runners/worker/bundle_processor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,14 @@ def test_can_sample(self):
])
})
self.assertEqual(samples, expected)

# Since data was successfully processed, there should be a
# beam:metric:sampled_byte_size:v1 metric.
self.assertTrue([
val for val in processor.monitoring_infos()
if val.urn == "beam:metric:sampled_byte_size:v1"
])

finally:
data_sampler.stop()

Expand Down Expand Up @@ -382,6 +390,13 @@ def test_can_sample_exceptions(self):
with self.assertRaisesRegex(RuntimeError, 'expected exception'):
processor.process_bundle('instruction_id')

# Since data was not successfully processed, there should not be a
# beam:metric:sampled_byte_size:v1 metric.
self.assertFalse([
val for val in processor.monitoring_infos()
if val.urn == "beam:metric:sampled_byte_size:v1"
])

# NOTE: The expected sample comes from the input PCollection. This is very
# important because there can be coder issues if the sample is put in the
# wrong PCollection.
Expand Down
41 changes: 27 additions & 14 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,18 +605,23 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
receiver.opcounter.element_counter.value(),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check that elem_count_mi is not None here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, what happens with distribution if the count is 0 after all? For example , pipeline reads from a file, but there are no elements, and pipeline stops. will this case be handled correctly?


(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())

sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
if (count > 0):
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
else:
_LOGGER.debug(
"Unable to create distribution for pcollection %s for urn %s",
pcollection_id,
monitoring_infos.SAMPLED_BYTE_SIZE_URN)

return all_monitoring_infos

Expand Down Expand Up @@ -1025,11 +1030,19 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count
if (count > 0):
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
else:
_LOGGER.debug(
"Unable to create distribution for pcollection %s for urn %s",
pcollection_id,
monitoring_infos.SAMPLED_BYTE_SIZE_URN)
pass
return infos

def _get_runtime_performance_hints(self):
Expand Down
Loading