Skip to content

Commit

Permalink
update proto to add special per worker urn
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Oct 28, 2024
1 parent 1855c5e commit 59f5752
Show file tree
Hide file tree
Showing 18 changed files with 331 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,6 @@ message MonitoringInfoSpecs {
}]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down Expand Up @@ -378,6 +368,22 @@ message MonitoringInfoSpecs {
}
]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_metric:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
PER_WORKER_METRIC = 23 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_metric:v1"];

}
}

Expand Down Expand Up @@ -593,6 +599,12 @@ message MonitoringInfoTypeUrns {
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];

// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
PER_WORKER_METRIC = 14 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_metric:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,22 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<GaugeResult>> perWorkerGauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<HistogramData>> perWorkerHistograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<GaugeResult>> perWorkerGauges,
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
LOG.info("xxx does this get here? DefaultMetricResults ");
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.perWorkerGauges = perWorkerGauges;
this.stringSets = stringSets;
this.perWorkerHistograms = perWorkerHistograms;
}
Expand All @@ -69,6 +72,7 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
Iterables.filter(
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(perWorkerGauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HistogramCell
private final DirtyState dirty = new DirtyState();
private final HistogramData value;
private final MetricName name;
// create per worker check here?

/**
* Generally, runners should construct instances using the methods in {@link
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public abstract class MetricUpdates {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
Expand Down Expand Up @@ -65,6 +66,9 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
/** All the gauges updates. */
public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();

/** All the gauges updates. */
public abstract Iterable<MetricUpdate<GaugeData>> perWorkerGaugeUpdates();

/** All the sets updates. */
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

Expand All @@ -75,14 +79,16 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates, // ideally combine this with perworker metrics
Iterable<MetricUpdate<GaugeData>> perWorkerGaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<HistogramData>> perWorkerHistogramsUpdates) {
// System.out.println("xxx metric create");
return new AutoValue_MetricUpdates(
counterUpdates,
distributionUpdates,
gaugeUpdates,
perWorkerGaugeUpdates,
stringSetUpdates,
perWorkerHistogramsUpdates);
}
Expand All @@ -93,6 +99,7 @@ public boolean isEmpty() {
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(perWorkerHistogramsUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(perWorkerGaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates());
}
}
Loading

0 comments on commit 59f5752

Please sign in to comment.