Skip to content

Commit

Permalink
update proto to add special per worker urn
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Oct 24, 2024
1 parent 1855c5e commit 6846335
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,16 +198,6 @@ message MonitoringInfoSpecs {
}]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down Expand Up @@ -378,6 +368,22 @@ message MonitoringInfoSpecs {
}
]
}];

USER_PER_WORKER_HISTOGRAM = 22 [(monitoring_info_spec) = {
urn: "beam:metric:user:per_worker_histogram_int64:v1",
type: "beam:metrics:per_worker_metric:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
PER_WORKER_METRIC = 23 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_metric:v1"];

}
}

Expand Down Expand Up @@ -593,6 +599,12 @@ message MonitoringInfoTypeUrns {
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];

// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
PER_WORKER_METRIC = 14 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_metric:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.PER_WORKER_HISTOGRAM_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;

import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
Expand All @@ -46,6 +47,7 @@
import java.util.function.Function;
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.Metric;
Expand Down Expand Up @@ -191,6 +193,7 @@ public HistogramCell getPerWorkerHistogram(
// }
// return no op histogram instead
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
LOG.info("xxx getPerWorkerHistogram {}" , val.toString());
return val; // no null chceks for the others
}

Expand All @@ -212,7 +215,7 @@ public HistogramCell getHistogram(MetricName metricName, HistogramData.BucketTyp
}

public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell>
getPerWorkerHistogram() {
getPerWorkerHistograms() {
return perWorkerHistograms;
}

Expand Down Expand Up @@ -315,12 +318,14 @@ public MetricUpdates getUpdates() {
extractHistogramUpdates(perWorkerHistograms) /* */);
}

// why is this returning null?
/** @return The MonitoringInfo metadata from the metric. */
private @Nullable SimpleMonitoringInfoBuilder metricToMonitoringMetadata(
MetricKey metricKey, String typeUrn, String userUrn) {
LOG.info(
"xxx metricToMonitoringMetadata urn {}, metrics key {} step name {}",
"xxx metricToMonitoringMetadata typeUrn {}, user urn {} metrics key {} step name {}",
typeUrn,
userUrn,
metricKey.metricName().getName(),
metricKey.stepName());
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
Expand All @@ -332,16 +337,18 @@ public MetricUpdates getUpdates() {
LOG.info("xxx metric name is instance of MonitoringInfoMetricName {}", metricName.getName());
MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
// Represents a specific MonitoringInfo for a specific URN.
LOG.info("xxx monitoringInfoName {}", monitoringInfoName.toString());

builder.setUrn(monitoringInfoName.getUrn());
for (Entry<String, String> e : monitoringInfoName.getLabels().entrySet()) {
builder.setLabel(e.getKey(), e.getValue());
}
} else { // Represents a user counter.
// LOG.info("xxx metric name is a user counter {}", metricName.getName());
LOG.info("xxx metric name is a user counter {}", metricName.getName());
// Drop if the stepname is not set. All user counters must be
// defined for a PTransform. They must be defined on a container bound to a step.
if (this.stepName == null) {
// LOG.info("xxx dropping {} since step name is null", metricName.getName());
LOG.info("xxx dropping {} since step name is null", metricName.getName());
return null;
}
builder
Expand Down Expand Up @@ -404,7 +411,7 @@ public MetricUpdates getUpdates() {

/**
* @param metricUpdate
* @return The MonitoringInfo generated from the histogram metricUpdate.
* @return The MonitoringInfo generated from the histogram metricUpdate. // check if per worker or not
*/
private @Nullable MonitoringInfo histogramUpdateToMonitoringInfo(
MetricUpdate<HistogramData> metricUpdate) {
Expand Down Expand Up @@ -536,7 +543,7 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
(metricName, histogramCell) -> {
if (histogramCell.getDirty().beforeCommit()) {
LOG.info(
"xxx does metricName for perWorkerHist? {}",
"xxx does metricName for hist? {}",
metricName.getKey().getName()); // add per worker metrics here?
String shortId =
getShortId(metricName.getKey(), this::histogramToMonitoringMetadata, shortIds);
Expand Down Expand Up @@ -576,11 +583,14 @@ private String getShortId(
if (shortId == null) {
SimpleMonitoringInfoBuilder monitoringInfoBuilder =
toInfo.apply(MetricKey.create(stepName, metricName));
LOG.info("xxx monitoringInfoBuilder {}", monitoringInfoBuilder.toString());
if (monitoringInfoBuilder == null) {
LOG.info("xxx monitoringInfoBuilder is null return no short id");
shortId = Optional.empty();
} else {
MonitoringInfo monitoringInfo = monitoringInfoBuilder.build();
if (monitoringInfo == null) {
LOG.info("xxx monitoringInfo is null return no short id");
shortId = Optional.empty();
} else {
shortId = Optional.of(shortIds.getOrCreateShortId(monitoringInfo));
Expand Down Expand Up @@ -719,8 +729,9 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
updateForStringSetType(monitoringInfo);
break;

// Per worker histogram , de mangle metrics in backend?
// Per worker histogram , de mangle metrics in backend? update case
case PER_WORKER_HISTOGRAM_TYPE:
LOG.info("xxx here");
updateForPerWorkerHistogramInt64(monitoringInfo);
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static final class Urns {
extractUrn(MonitoringInfoSpecs.Enum.API_REQUEST_COUNT);
public static final String API_REQUEST_LATENCIES =
extractUrn(MonitoringInfoSpecs.Enum.API_REQUEST_LATENCIES);

public static final String PER_WORKER_METRIC =
extractUrn(MonitoringInfoSpecs.Enum.PER_WORKER_METRIC);
static {
// Validate that compile time constants match the values stored in the protos.
// Defining these as constants allows for usage in switch case statements and also
Expand Down

0 comments on commit 6846335

Please sign in to comment.