Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Aug 12, 2024
1 parent e84f81d commit 86eff84
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
35 changes: 21 additions & 14 deletions sdks/python/apache_beam/metrics/monitoring_infos.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# pytype: skip-file

import collections
import logging
import time
from functools import reduce
from typing import FrozenSet
Expand Down Expand Up @@ -116,6 +117,8 @@
common_urns.monitoring_info_labels.INSTANCE_ID.label_props.name)
TABLE_ID_LABEL = common_urns.monitoring_info_labels.TABLE_ID.label_props.name

_LOGGER = logging.getLogger(__name__)


def extract_counter_value(monitoring_info_proto):
"""Returns the counter value of the monitoring info."""
Expand Down Expand Up @@ -213,16 +216,18 @@ def int64_user_distribution(namespace, name, metric, ptransform=None):
metric: The DistributionData for the metric.
ptransform: The ptransform id used as a label.
"""
labels = create_labels(ptransform=ptransform, namespace=namespace, name=name)
if metric.count <= 0:
raise TypeError(
_LOGGER.debug(
'Expected a non zero distribution count for %s metric but received %s' %
(metric, metric.count))

payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)
(metric.name, metric.count))
return
else:
labels = create_labels(
ptransform=ptransform, namespace=namespace, name=name)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(
USER_DISTRIBUTION_URN, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_distribution(urn, metric, ptransform=None, pcollection=None):
Expand All @@ -236,14 +241,16 @@ def int64_distribution(urn, metric, ptransform=None, pcollection=None):
ptransform: The ptransform id used as a label.
pcollection: The pcollection id used as a label.
"""
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
if metric.count <= 0:
raise TypeError(
_LOGGER.debug(
'Expected a non zero distribution count for %s metric but received %s' %
(metric, metric.count))
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)
(metric.name, metric.count))
return
else:
labels = create_labels(ptransform=ptransform, pcollection=pcollection)
payload = _encode_distribution(
coders.VarIntCoder(), metric.count, metric.sum, metric.min, metric.max)
return create_monitoring_info(urn, DISTRIBUTION_INT64_TYPE, payload, labels)


def int64_user_gauge(namespace, name, metric, ptransform=None):
Expand Down
12 changes: 6 additions & 6 deletions sdks/python/apache_beam/runners/worker/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,17 +607,17 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
)
all_monitoring_infos[monitoring_infos.to_key(elem_count_mi)] = elem_count_mi

try:
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
if (count > 0):
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id,
)
all_monitoring_infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
except Exception:
else:
_LOGGER.debug(
"Unable to create distribution for pcollection %s for urn %s",
pcollection_id,
Expand Down Expand Up @@ -1030,14 +1030,14 @@ def pcollection_count_monitoring_infos(self, tag_to_pcollection_id):
infos[monitoring_infos.to_key(mi)] = mi
(unused_mean, sum, count, min, max) = (
receiver.opcounter.mean_byte_counter.value())
try:
if (count > 0):
sampled_byte_count = monitoring_infos.int64_distribution(
monitoring_infos.SAMPLED_BYTE_SIZE_URN,
DistributionData(sum, count, min, max),
pcollection=pcollection_id)
infos[monitoring_infos.to_key(
sampled_byte_count)] = sampled_byte_count
except Exception:
else:
_LOGGER.debug(
"Unable to create distribution for pcollection %s for urn %s",
pcollection_id,
Expand Down

0 comments on commit 86eff84

Please sign in to comment.