diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index f79a3e386592..b6a9a279892c 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -20,6 +20,7 @@ # pytype: skip-file import collections +import logging import time from functools import reduce from typing import FrozenSet @@ -116,6 +117,8 @@ common_urns.monitoring_info_labels.INSTANCE_ID.label_props.name) TABLE_ID_LABEL = common_urns.monitoring_info_labels.TABLE_ID.label_props.name +_LOGGER = logging.getLogger(__name__) + def extract_counter_value(monitoring_info_proto): """Returns the counter value of the monitoring info.""" @@ -213,16 +216,18 @@ def int64_user_distribution(namespace, name, metric, ptransform=None): metric: The DistributionData for the metric. ptransform: The ptransform id used as a label. """ - labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) if metric.count <= 0: - raise TypeError( + _LOGGER.debug( 'Expected a non zero distribution count for %s metric but received %s' % - (metric, metric.count)) - - payload = _encode_distribution( - coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) - return create_monitoring_info( - USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) + (metric.name, metric.count)) + return + else: + labels = create_labels( + ptransform=ptransform, namespace=namespace, name=name) + payload = _encode_distribution( + coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) + return create_monitoring_info( + USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) def int64_distribution(urn, metric, ptransform=None, pcollection=None): @@ -236,14 +241,16 @@ def int64_distribution(urn, metric, ptransform=None, pcollection=None): ptransform: The ptransform id used as a label. pcollection: The pcollection id used as a label. """ - labels = create_labels(ptransform=ptransform, pcollection=pcollection) if metric.count <= 0: - raise TypeError( + _LOGGER.debug( 'Expected a non zero distribution count for %s metric but received %s' % - (metric, metric.count)) - payload = _encode_distribution( - coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) - return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels) + (metric.name, metric.count)) + return + else: + labels = create_labels(ptransform=ptransform, pcollection=pcollection) + payload = _encode_distribution( + coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) + return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels) def int64_user_gauge(namespace, name, metric, ptransform=None): diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 1de14564de2a..c5d94368e12b 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -607,9 +607,9 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): ) all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi - try: - (unused_mean, sum, count, min, max) = ( - receiver.opcounter.mean_byte_counter.value()) + (unused_mean, sum, count, min, max) = ( + receiver.opcounter.mean_byte_counter.value()) + if (count > 0): sampled_byte_count = monitoring_infos.int64_distribution( monitoring_infos.SAMPLED_BYTE_SIZE_URN, DistributionData(sum, count, min, max), @@ -617,7 +617,7 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): ) all_monitoring_infos[monitoring_infos.to_key( sampled_byte_count)] = sampled_byte_count - except Exception: + else: _LOGGER.debug( "Unable to create distribution for pcollection %s for urn %s", pcollection_id, @@ -1030,14 +1030,14 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): infos[monitoring_infos.to_key(mi)] = mi (unused_mean, sum, count, min, max) = ( receiver.opcounter.mean_byte_counter.value()) - try: + if (count > 0): sampled_byte_count = monitoring_infos.int64_distribution( monitoring_infos.SAMPLED_BYTE_SIZE_URN, DistributionData(sum, count, min, max), pcollection=pcollection_id) infos[monitoring_infos.to_key( sampled_byte_count)] = sampled_byte_count - except Exception: + else: _LOGGER.debug( "Unable to create distribution for pcollection %s for urn %s", pcollection_id,