Skip to content

Commit

Permalink
Export string sets in monitoring infos. (#31838)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jul 11, 2024
1 parent e646c28 commit 50a3403
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,17 @@ message MonitoringInfoSpecs {
}]
}];

// Represents a set of strings seen across bundles.
USER_SET_STRING = 21 [(monitoring_info_spec) = {
urn: "beam:metric:user:set_string:v1",
type: "beam:metrics:set_string:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report user metric."
}]
}];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.io.Serializable;
Expand Down Expand Up @@ -331,6 +334,28 @@ public MetricUpdates getUpdates() {
return builder.build();
}

/** @return The MonitoringInfo metadata from the string set metric. */
private @Nullable SimpleMonitoringInfoBuilder stringSetToMonitoringMetadata(MetricKey metricKey) {
return metricToMonitoringMetadata(
metricKey,
MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE,
MonitoringInfoConstants.Urns.USER_SET_STRING);
}

/**
* @param metricUpdate
* @return The MonitoringInfo generated from the string set metricUpdate.
*/
private @Nullable MonitoringInfo stringSetUpdateToMonitoringInfo(
MetricUpdate<StringSetData> metricUpdate) {
SimpleMonitoringInfoBuilder builder = stringSetToMonitoringMetadata(metricUpdate.getKey());
if (builder == null) {
return null;
}
builder.setStringSetValue(metricUpdate.getUpdate());
return builder.build();
}

/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
@Override
public Iterable<MonitoringInfo> getMonitoringInfos() {
Expand Down Expand Up @@ -358,6 +383,13 @@ public Iterable<MonitoringInfo> getMonitoringInfos() {
monitoringInfos.add(mi);
}
}

for (MetricUpdate<StringSetData> metricUpdate : metricUpdates.stringSetUpdates()) {
MonitoringInfo mi = stringSetUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
}
}
return monitoringInfos;
}

Expand Down Expand Up @@ -391,6 +423,15 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
}
}
});
stringSets.forEach(
(metricName, stringSetCell) -> {
if (stringSetCell.getDirty().beforeCommit()) {
String shortId = getShortId(metricName, this::stringSetToMonitoringMetadata, shortIds);
if (shortId != null) {
builder.put(shortId, encodeStringSet(stringSetCell.getCumulative()));
}
}
});
return builder.build();
}

Expand Down Expand Up @@ -418,7 +459,7 @@ private String getShortId(
}

/**
* Mark all the updates that were retrieved with the latest call to {@link #getUpdates()} as
* Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
* committed.
*/
public void commitUpdates() {
Expand Down Expand Up @@ -480,6 +521,12 @@ private void updateForLatestInt64Type(MonitoringInfo monitoringInfo) {
gauge.update(decodeInt64Gauge(monitoringInfo.getPayload()));
}

private void updateForStringSetType(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
StringSetCell stringSet = getStringSet(metricName);
stringSet.update(decodeStringSet(monitoringInfo.getPayload()));
}

/** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */
public void update(Iterable<MonitoringInfo> monitoringInfos) {
for (MonitoringInfo monitoringInfo : monitoringInfos) {
Expand All @@ -500,6 +547,10 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
updateForLatestInt64Type(monitoringInfo);
break;

case SET_STRING_TYPE:
updateForStringSetType(monitoringInfo);
break;

default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public static final class Urns {
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_INT64);
public static final String USER_DISTRIBUTION_DOUBLE =
extractUrn(MonitoringInfoSpecs.Enum.USER_DISTRIBUTION_DOUBLE);
public static final String USER_SET_STRING =
extractUrn(MonitoringInfoSpecs.Enum.USER_SET_STRING);
public static final String SAMPLED_BYTE_SIZE =
extractUrn(MonitoringInfoSpecs.Enum.SAMPLED_BYTE_SIZE);
public static final String WORK_COMPLETED = extractUrn(MonitoringInfoSpecs.Enum.WORK_COMPLETED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.HashMap;
Expand Down Expand Up @@ -148,6 +149,16 @@ public SimpleMonitoringInfoBuilder setDoubleDistributionValue(
return this;
}

/**
* Encodes the value and sets the type to {@link
* MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}.
*/
public SimpleMonitoringInfoBuilder setStringSetValue(StringSetData value) {
this.builder.setPayload(encodeStringSet(value));
this.builder.setType(MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE);
return this;
}

/** Sets the MonitoringInfo label to the given name and value. */
public SimpleMonitoringInfoBuilder setLabel(String labelName, String labelValue) {
this.builder.putLabels(labelName, labelValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,38 @@ public void testMonitoringInfosArePopulatedForUserGauges() {
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}

@Test
public void testMonitoringInfosArePopulatedForUserStringSets() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
StringSetCell stringSetCellA = testObject.getStringSet(MetricName.named("ns", "nameA"));
StringSetCell stringSetCellB = testObject.getStringSet(MetricName.named("ns", "nameB"));
stringSetCellA.add("A");
stringSetCellB.add("BBB");

SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
builder1
.setUrn(MonitoringInfoConstants.Urns.USER_SET_STRING)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
.setLabel(MonitoringInfoConstants.Labels.NAME, "nameA")
.setStringSetValue(stringSetCellA.getCumulative())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
builder2
.setUrn(MonitoringInfoConstants.Urns.USER_SET_STRING)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns")
.setLabel(MonitoringInfoConstants.Labels.NAME, "nameB")
.setStringSetValue(stringSetCellB.getCumulative())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

List<MonitoringInfo> actualMonitoringInfos = new ArrayList<>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
actualMonitoringInfos.add(mi);
}

assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}

@Test
public void testMonitoringInfosArePopulatedForSystemDistributions() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
Expand Down

0 comments on commit 50a3403

Please sign in to comment.