diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index 7bc7cced280c..b6a9a279892c 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -20,6 +20,7 @@ # pytype: skip-file import collections +import logging import time from functools import reduce from typing import FrozenSet @@ -116,6 +117,8 @@ common_urns.monitoring_info_labels.INSTANCE_ID.label_props.name) TABLE_ID_LABEL = common_urns.monitoring_info_labels.TABLE_ID.label_props.name +_LOGGER = logging.getLogger(__name__) + def extract_counter_value(monitoring_info_proto): """Returns the counter value of the monitoring info.""" @@ -213,11 +216,18 @@ def int64_user_distribution(namespace, name, metric, ptransform=None): metric: The DistributionData for the metric. ptransform: The ptransform id used as a label. """ - labels = create_labels(ptransform=ptransform, namespace=namespace, name=name) - payload = _encode_distribution( - coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) - return create_monitoring_info( - USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) + if metric.count <= 0: + _LOGGER.debug( + 'Expected a non zero distribution count for %s metric but received %s' % + (metric.name, metric.count)) + return + else: + labels = create_labels( + ptransform=ptransform, namespace=namespace, name=name) + payload = _encode_distribution( + coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) + return create_monitoring_info( + USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels) def int64_distribution(urn, metric, ptransform=None, pcollection=None): @@ -231,10 +241,16 @@ def int64_distribution(urn, metric, ptransform=None, pcollection=None): ptransform: The ptransform id used as a label. pcollection: The pcollection id used as a label. """ - labels = create_labels(ptransform=ptransform, pcollection=pcollection) - payload = _encode_distribution( - coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) - return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels) + if metric.count <= 0: + _LOGGER.debug( + 'Expected a non zero distribution count for %s metric but received %s' % + (metric.name, metric.count)) + return + else: + labels = create_labels(ptransform=ptransform, pcollection=pcollection) + payload = _encode_distribution( + coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max) + return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels) def int64_user_gauge(namespace, name, metric, ptransform=None): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py index dafb4dbd4bf0..1fa763e034fc 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor_test.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor_test.py @@ -314,6 +314,14 @@ def test_can_sample(self): ]) }) self.assertEqual(samples, expected) + + # Since data was successfully processed, there should be a + # beam:metric:sampled_byte_size:v1 metric. + self.assertTrue([ + val for val in processor.monitoring_infos() + if val.urn == "beam:metric:sampled_byte_size:v1" + ]) + finally: data_sampler.stop() @@ -382,6 +390,13 @@ def test_can_sample_exceptions(self): with self.assertRaisesRegex(RuntimeError, 'expected exception'): processor.process_bundle('instruction_id') + # Since data was not successfully processed, there should not be a + # beam:metric:sampled_byte_size:v1 metric. + self.assertFalse([ + val for val in processor.monitoring_infos() + if val.urn == "beam:metric:sampled_byte_size:v1" + ]) + # NOTE: The expected sample comes from the input PCollection. This is very # important because there can be coder issues if the sample is put in the # wrong PCollection. diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 00a652c49e66..c5d94368e12b 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -605,18 +605,23 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): receiver.opcounter.element_counter.value(), pcollection=pcollection_id, ) + all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi (unused_mean, sum, count, min, max) = ( receiver.opcounter.mean_byte_counter.value()) - - sampled_byte_count = monitoring_infos.int64_distribution( - monitoring_infos.SAMPLED_BYTE_SIZE_URN, - DistributionData(sum, count, min, max), - pcollection=pcollection_id, - ) - all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi - all_monitoring_infos[monitoring_infos.to_key( - sampled_byte_count)] = sampled_byte_count + if (count > 0): + sampled_byte_count = monitoring_infos.int64_distribution( + monitoring_infos.SAMPLED_BYTE_SIZE_URN, + DistributionData(sum, count, min, max), + pcollection=pcollection_id, + ) + all_monitoring_infos[monitoring_infos.to_key( + sampled_byte_count)] = sampled_byte_count + else: + _LOGGER.debug( + "Unable to create distribution for pcollection %s for urn %s", + pcollection_id, + monitoring_infos.SAMPLED_BYTE_SIZE_URN) return all_monitoring_infos @@ -1025,11 +1030,19 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id): infos[monitoring_infos.to_key(mi)] = mi (unused_mean, sum, count, min, max) = ( receiver.opcounter.mean_byte_counter.value()) - sampled_byte_count = monitoring_infos.int64_distribution( - monitoring_infos.SAMPLED_BYTE_SIZE_URN, - DistributionData(sum, count, min, max), - pcollection=pcollection_id) - infos[monitoring_infos.to_key(sampled_byte_count)] = sampled_byte_count + if (count > 0): + sampled_byte_count = monitoring_infos.int64_distribution( + monitoring_infos.SAMPLED_BYTE_SIZE_URN, + DistributionData(sum, count, min, max), + pcollection=pcollection_id) + infos[monitoring_infos.to_key( + sampled_byte_count)] = sampled_byte_count + else: + _LOGGER.debug( + "Unable to create distribution for pcollection %s for urn %s", + pcollection_id, + monitoring_infos.SAMPLED_BYTE_SIZE_URN) + pass return infos def _get_runtime_performance_hints(self):