elements = Sets.newHashSet(STRING_SET_CODER.decode(input));
+ return StringSetData.create(elements);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}. */
public static ByteString encodeInt64Counter(long value) {
ByteStringOutputStream output = new ByteStringOutputStream();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java
new file mode 100644
index 000000000000..8455f154c0f8
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetCell.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Tracks the current value for a {@link StringSet} metric.
+ *
+ * This class generally shouldn't be used directly. The only exception is within a runner where a
+ * counter is being reported for a specific step (rather than the counter in the current context).
+ * In that case retrieving the underlying cell and reporting directly to it avoids a step of
+ * indirection.
+ */
+public class StringSetCell implements StringSet, MetricCell {
+
+ private final DirtyState dirty = new DirtyState();
+ private final AtomicReference setValue =
+ new AtomicReference<>(StringSetData.empty());
+ private final MetricName name;
+
+ /**
+ * Generally, runners should construct instances using the methods in {@link
+ * MetricsContainerImpl}, unless they need to define their own version of {@link
+ * MetricsContainer}. These constructors are *only* public so runners can instantiate.
+ */
+ public StringSetCell(MetricName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void reset() {
+ setValue.set(StringSetData.empty());
+ dirty.reset();
+ }
+
+ void update(StringSetData data) {
+ StringSetData original;
+ do {
+ original = setValue.get();
+ } while (!setValue.compareAndSet(original, original.combine(data)));
+ dirty.afterModification();
+ }
+
+ @Override
+ public DirtyState getDirty() {
+ return dirty;
+ }
+
+ @Override
+ public StringSetData getCumulative() {
+ return setValue.get();
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+
+ @Override
+ public boolean equals(@Nullable Object object) {
+ if (object instanceof StringSetCell) {
+ StringSetCell stringSetCell = (StringSetCell) object;
+ return Objects.equals(dirty, stringSetCell.dirty)
+ && Objects.equals(setValue.get(), stringSetCell.setValue.get())
+ && Objects.equals(name, stringSetCell.name);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dirty, setValue.get(), name);
+ }
+
+ @Override
+ public void add(String value) {
+ // if the given value is already present in the StringSet then skip this add for efficiency
+ if (this.setValue.get().stringSet().contains(value)) {
+ return;
+ }
+ update(StringSetData.create(ImmutableSet.of(value)));
+ }
+
+ @Override
+ public void add(String... values) {
+ update(StringSetData.create(ImmutableSet.copyOf(values)));
+ }
+}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
new file mode 100644
index 000000000000..93dfb8e3ebc8
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/StringSetData.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.sdk.metrics.StringSetResult;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * Data describing the StringSet. The {@link StringSetData} hold an immutable copy of the set from
+ * which it was initially created. This should retain enough detail that it can be combined with
+ * other {@link StringSetData}.
+ */
+@AutoValue
+public abstract class StringSetData implements Serializable {
+
+ public abstract Set stringSet();
+
+ /** Returns a {@link StringSetData} which is made from an immutable copy of the given set. */
+ public static StringSetData create(Set set) {
+ return new AutoValue_StringSetData(ImmutableSet.copyOf(set));
+ }
+
+ /** Return a {@link EmptyStringSetData#INSTANCE} representing an empty {@link StringSetData}. */
+ public static StringSetData empty() {
+ return EmptyStringSetData.INSTANCE;
+ }
+
+ /**
+ * Combines this {@link StringSetData} with other, both original StringSetData are left intact.
+ */
+ public StringSetData combine(StringSetData other) {
+ // do not merge other on this as this StringSetData might hold an immutable set like in case
+ // of EmptyStringSetData
+ Set combined = new HashSet<>();
+ combined.addAll(this.stringSet());
+ combined.addAll(other.stringSet());
+ return StringSetData.create(combined);
+ }
+
+ /**
+ * Combines this {@link StringSetData} with others, all original StringSetData are left intact.
+ */
+ public StringSetData combine(Iterable others) {
+ Set combined =
+ StreamSupport.stream(others.spliterator(), true)
+ .flatMap(other -> other.stringSet().stream())
+ .collect(Collectors.toSet());
+ combined.addAll(this.stringSet());
+ return StringSetData.create(combined);
+ }
+
+ /** Returns a {@link StringSetResult} representing this {@link StringSetData}. */
+ public StringSetResult extractResult() {
+ return StringSetResult.create(stringSet());
+ }
+
+ /** Empty {@link StringSetData}, representing no values reported and is immutable. */
+ public static class EmptyStringSetData extends StringSetData {
+
+ private static final EmptyStringSetData INSTANCE = new EmptyStringSetData();
+
+ private EmptyStringSetData() {}
+
+ /** Returns an immutable empty set. */
+ @Override
+ public Set stringSet() {
+ return ImmutableSet.of();
+ }
+
+ /** Return a {@link StringSetResult#empty()} which is immutable empty set. */
+ @Override
+ public StringSetResult extractResult() {
+ return StringSetResult.empty();
+ }
+ }
+}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 146b7df10f0c..809919f611b4 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -37,6 +37,7 @@
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.HistogramData;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -338,10 +339,12 @@ public void testDeltaCounters() {
MetricName gName = MetricName.named("namespace", "gauge");
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
MetricName hName = MetricName.named("namespace", "histogram");
+ MetricName stringSetName = MetricName.named("namespace", "stringset");
MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
prevContainer.getCounter(cName).inc(2L);
prevContainer.getGauge(gName).set(4L);
+ prevContainer.getStringSet(stringSetName).add("ab");
// Set buckets counts to: [1,1,1,0,0,0,1]
prevContainer.getHistogram(hName, bucketType).update(-1);
prevContainer.getHistogram(hName, bucketType).update(1);
@@ -351,6 +354,8 @@ public void testDeltaCounters() {
MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
nextContainer.getCounter(cName).inc(9L);
nextContainer.getGauge(gName).set(8L);
+ nextContainer.getStringSet(stringSetName).add("cd");
+ nextContainer.getStringSet(stringSetName).add("ab");
// Set buckets counts to: [2,4,5,0,0,0,3]
nextContainer.getHistogram(hName, bucketType).update(-1);
nextContainer.getHistogram(hName, bucketType).update(-1);
@@ -374,6 +379,10 @@ public void testDeltaCounters() {
GaugeData gValue = deltaContainer.getGauge(gName).getCumulative();
assertEquals(8L, gValue.value());
+ // Expect most recent value of string set which is all unique strings
+ StringSetData stringSetData = deltaContainer.getStringSet(stringSetName).getCumulative();
+ assertEquals(ImmutableSet.of("ab", "cd"), stringSetData.stringSet());
+
// Expect bucket counts: [1,3,4,0,0,0,2]
assertEquals(
1, deltaContainer.getHistogram(hName, bucketType).getCumulative().getBottomBucketCount());
@@ -411,6 +420,11 @@ public void testNotEquals() {
differentGauges.getGauge(MetricName.named("namespace", "name"));
Assert.assertNotEquals(metricsContainerImpl, differentGauges);
Assert.assertNotEquals(metricsContainerImpl.hashCode(), differentGauges.hashCode());
+
+ MetricsContainerImpl differentStringSets = new MetricsContainerImpl("stepName");
+ differentStringSets.getStringSet(MetricName.named("namespace", "name"));
+ Assert.assertNotEquals(metricsContainerImpl, differentStringSets);
+ Assert.assertNotEquals(metricsContainerImpl.hashCode(), differentStringSets.hashCode());
}
@Test
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
index 4718a6f2fed3..868c47f6a2e6 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMapTest.java
@@ -40,6 +40,9 @@
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSet;
+import org.apache.beam.sdk.metrics.StringSetResult;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.hamcrest.collection.IsIterableWithSize;
import org.joda.time.Instant;
import org.junit.Assert;
@@ -60,14 +63,22 @@ public class MetricsContainerStepMapTest {
private static final String DISTRIBUTION_NAME = "myDistribution";
private static final String GAUGE_NAME = "myGauge";
+ private static final String STRING_SET_NAME = "myStringSet";
+
private static final long VALUE = 100;
+ private static final String FIRST_STRING = "first";
+ private static final String SECOND_STRING = "second";
+
private static final Counter counter =
Metrics.counter(MetricsContainerStepMapTest.class, COUNTER_NAME);
private static final Distribution distribution =
Metrics.distribution(MetricsContainerStepMapTest.class, DISTRIBUTION_NAME);
private static final Gauge gauge = Metrics.gauge(MetricsContainerStepMapTest.class, GAUGE_NAME);
+ private static final StringSet stringSet =
+ Metrics.stringSet(MetricsContainerStepMapTest.class, STRING_SET_NAME);
+
private static final MetricsContainerImpl metricsContainer;
static {
@@ -77,6 +88,7 @@ public class MetricsContainerStepMapTest {
distribution.update(VALUE);
distribution.update(VALUE * 2);
gauge.set(VALUE);
+ stringSet.add(FIRST_STRING, SECOND_STRING);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
@@ -99,6 +111,7 @@ public void testAttemptedAccumulatedMetricResults() {
assertIterableSize(step1res.getCounters(), 1);
assertIterableSize(step1res.getDistributions(), 1);
assertIterableSize(step1res.getGauges(), 1);
+ assertIterableSize(step1res.getStringSets(), 1);
assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, false);
assertDistribution(
@@ -109,12 +122,20 @@ public void testAttemptedAccumulatedMetricResults() {
false);
assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+ assertStringSet(
+ STRING_SET_NAME,
+ step1res,
+ STEP1,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ false);
+
MetricQueryResults step2res =
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
assertIterableSize(step2res.getCounters(), 1);
assertIterableSize(step2res.getDistributions(), 1);
assertIterableSize(step2res.getGauges(), 1);
+ assertIterableSize(step2res.getStringSets(), 1);
assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, false);
assertDistribution(
@@ -125,11 +146,19 @@ public void testAttemptedAccumulatedMetricResults() {
false);
assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+ assertStringSet(
+ STRING_SET_NAME,
+ step2res,
+ STEP2,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ false);
+
MetricQueryResults allres = metricResults.allMetrics();
assertIterableSize(allres.getCounters(), 2);
assertIterableSize(allres.getDistributions(), 2);
assertIterableSize(allres.getGauges(), 2);
+ assertIterableSize(allres.getStringSets(), 2);
}
@Test
@@ -178,6 +207,21 @@ public void testGaugeCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.empty(), true);
}
+ @Test
+ public void testStringSetCommittedUnsupportedInAttemptedAccumulatedMetricResults() {
+ MetricsContainerStepMap attemptedMetrics = new MetricsContainerStepMap();
+ attemptedMetrics.update(STEP1, metricsContainer);
+ MetricResults metricResults = asAttemptedOnlyMetricResults(attemptedMetrics);
+
+ MetricQueryResults step1res =
+ metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP1).build());
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("This runner does not currently support committed metrics results.");
+
+ assertStringSet(STRING_SET_NAME, step1res, STEP1, StringSetResult.empty(), true);
+ }
+
@Test
public void testUserMetricDroppedOnUnbounded() {
MetricsContainerStepMap testObject = new MetricsContainerStepMap();
@@ -248,6 +292,7 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() {
assertIterableSize(step1res.getCounters(), 1);
assertIterableSize(step1res.getDistributions(), 1);
assertIterableSize(step1res.getGauges(), 1);
+ assertIterableSize(step1res.getStringSets(), 1);
assertCounter(COUNTER_NAME, step1res, STEP1, VALUE * 2, false);
assertDistribution(
@@ -257,6 +302,12 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() {
DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
false);
assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+ assertStringSet(
+ STRING_SET_NAME,
+ step1res,
+ STEP1,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ false);
assertCounter(COUNTER_NAME, step1res, STEP1, VALUE, true);
assertDistribution(
@@ -266,6 +317,12 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() {
DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2),
true);
assertGauge(GAUGE_NAME, step1res, STEP1, GaugeResult.create(VALUE, Instant.now()), true);
+ assertStringSet(
+ STRING_SET_NAME,
+ step1res,
+ STEP1,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ true);
MetricQueryResults step2res =
metricResults.queryMetrics(MetricsFilter.builder().addStep(STEP2).build());
@@ -273,6 +330,7 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() {
assertIterableSize(step2res.getCounters(), 1);
assertIterableSize(step2res.getDistributions(), 1);
assertIterableSize(step2res.getGauges(), 1);
+ assertIterableSize(step2res.getStringSets(), 1);
assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 3, false);
assertDistribution(
@@ -282,6 +340,12 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() {
DistributionResult.create(VALUE * 9, 6, VALUE, VALUE * 2),
false);
assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+ assertStringSet(
+ STRING_SET_NAME,
+ step2res,
+ STEP2,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ false);
assertCounter(COUNTER_NAME, step2res, STEP2, VALUE * 2, true);
assertDistribution(
@@ -291,12 +355,25 @@ public void testAttemptedAndCommittedAccumulatedMetricResults() {
DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
true);
assertGauge(GAUGE_NAME, step2res, STEP2, GaugeResult.create(VALUE, Instant.now()), true);
+ assertStringSet(
+ STRING_SET_NAME,
+ step2res,
+ STEP2,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ true);
+ assertStringSet(
+ STRING_SET_NAME,
+ step2res,
+ STEP2,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ true);
MetricQueryResults allres = metricResults.queryMetrics(MetricsFilter.builder().build());
assertIterableSize(allres.getCounters(), 2);
assertIterableSize(allres.getDistributions(), 2);
assertIterableSize(allres.getGauges(), 2);
+ assertIterableSize(allres.getStringSets(), 2);
}
@Test
@@ -345,6 +422,12 @@ public void testReset() {
DistributionResult.create(VALUE * 3, 2, VALUE, VALUE * 2),
false);
assertGauge(GAUGE_NAME, allres, STEP1, GaugeResult.create(VALUE, Instant.now()), false);
+ assertStringSet(
+ STRING_SET_NAME,
+ allres,
+ STEP1,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ false);
assertCounter(COUNTER_NAME, allres, STEP2, VALUE * 2, false);
assertDistribution(
@@ -354,6 +437,12 @@ public void testReset() {
DistributionResult.create(VALUE * 6, 4, VALUE, VALUE * 2),
false);
assertGauge(GAUGE_NAME, allres, STEP2, GaugeResult.create(VALUE, Instant.now()), false);
+ assertStringSet(
+ STRING_SET_NAME,
+ allres,
+ STEP2,
+ StringSetResult.create(ImmutableSet.of(FIRST_STRING, SECOND_STRING)),
+ false);
attemptedMetrics.reset();
metricResults = asAttemptedOnlyMetricResults(attemptedMetrics);
@@ -364,12 +453,14 @@ public void testReset() {
assertDistribution(
DISTRIBUTION_NAME, allres, STEP1, DistributionResult.IDENTITY_ELEMENT, false);
assertGauge(GAUGE_NAME, allres, STEP1, GaugeResult.empty(), false);
+ assertStringSet(STRING_SET_NAME, allres, STEP1, StringSetResult.empty(), false);
// Check that the metrics container for STEP2 is reset
assertCounter(COUNTER_NAME, allres, STEP2, 0L, false);
assertDistribution(
DISTRIBUTION_NAME, allres, STEP2, DistributionResult.IDENTITY_ELEMENT, false);
assertGauge(GAUGE_NAME, allres, STEP2, GaugeResult.empty(), false);
+ assertStringSet(STRING_SET_NAME, allres, STEP2, StringSetResult.empty(), false);
}
private void assertIterableSize(Iterable iterable, int size) {
@@ -408,4 +499,15 @@ private void assertGauge(
metricQueryResults.getGauges(),
hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted)));
}
+
+ private void assertStringSet(
+ String name,
+ MetricQueryResults metricQueryResults,
+ String step,
+ StringSetResult expected,
+ boolean isCommitted) {
+ assertThat(
+ metricQueryResults.getStringSets(),
+ hasItem(metricsResult(NAMESPACE, name, step, expected, isCommitted)));
+ }
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
index a1b73781cd6c..8a43eef5883d 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
@@ -21,14 +21,18 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.junit.Assert.assertEquals;
+import java.util.Collections;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -64,6 +68,30 @@ public void testInt64GaugeEncoding() {
assertEquals(data, decodeInt64Gauge(payload));
}
+ @Test
+ public void testStringSetEncoding() {
+
+ // test empty string set encoding
+ StringSetData data = StringSetData.create(Collections.emptySet());
+ ByteString payload = encodeStringSet(data);
+ assertEquals(data, decodeStringSet(payload));
+
+ // test single element string set encoding
+ data = StringSetData.create(ImmutableSet.of("ab"));
+ payload = encodeStringSet(data);
+ assertEquals(data, decodeStringSet(payload));
+
+ // test multiple element string set encoding
+ data = StringSetData.create(ImmutableSet.of("ab", "cd", "ef"));
+ payload = encodeStringSet(data);
+ assertEquals(data, decodeStringSet(payload));
+
+ // test empty string encoding
+ data = StringSetData.create(ImmutableSet.of("ab", "", "ef"));
+ payload = encodeStringSet(data);
+ assertEquals(data, decodeStringSet(payload));
+ }
+
@Test
public void testInt64CounterEncoding() {
ByteString payload = encodeInt64Counter(1L);
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
new file mode 100644
index 000000000000..f78ed01603fb
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetCellTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Tests for {@link StringSetCell}. */
+public class StringSetCellTest {
+ private final StringSetCell cell = new StringSetCell(MetricName.named("lineage", "sources"));
+
+ @Test
+ public void testDeltaAndCumulative() {
+ cell.add("pubsub");
+ cell.add("bq", "spanner");
+ assertEquals(cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub", "bq"));
+ assertEquals(
+ "getCumulative is idempotent",
+ cell.getCumulative().stringSet(),
+ ImmutableSet.of("spanner", "pubsub", "bq"));
+
+ assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+ cell.getDirty().afterCommit();
+ assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+ cell.add("gcs");
+ assertEquals(
+ cell.getCumulative().stringSet(), ImmutableSet.of("spanner", "pubsub", "bq", "gcs"));
+
+ assertThat(
+ "Adding a new value made the cell dirty", cell.getDirty().beforeCommit(), equalTo(true));
+ }
+
+ @Test
+ public void testEquals() {
+ StringSetCell stringSetCell = new StringSetCell(MetricName.named("namespace", "name"));
+ StringSetCell equal = new StringSetCell(MetricName.named("namespace", "name"));
+ assertEquals(stringSetCell, equal);
+ assertEquals(stringSetCell.hashCode(), equal.hashCode());
+ }
+
+ @Test
+ public void testNotEquals() {
+ StringSetCell stringSetCell = new StringSetCell(MetricName.named("namespace", "name"));
+
+ Assert.assertNotEquals(stringSetCell, new Object());
+
+ StringSetCell differentDirty = new StringSetCell(MetricName.named("namespace", "name"));
+ differentDirty.getDirty().afterModification();
+ Assert.assertNotEquals(stringSetCell, differentDirty);
+ Assert.assertNotEquals(stringSetCell.hashCode(), differentDirty.hashCode());
+
+ StringSetCell differentSetValues = new StringSetCell(MetricName.named("namespace", "name"));
+ differentSetValues.update(StringSetData.create(ImmutableSet.of("hello")));
+ Assert.assertNotEquals(stringSetCell, differentSetValues);
+ Assert.assertNotEquals(stringSetCell.hashCode(), differentSetValues.hashCode());
+
+ StringSetCell differentName = new StringSetCell(MetricName.named("DIFFERENT", "DIFFERENT"));
+ Assert.assertNotEquals(stringSetCell, differentName);
+ Assert.assertNotEquals(stringSetCell.hashCode(), differentName.hashCode());
+ }
+
+ @Test
+ public void testReset() {
+ StringSetCell stringSetCell = new StringSetCell(MetricName.named("namespace", "name"));
+ stringSetCell.add("hello");
+ Assert.assertNotEquals(stringSetCell.getDirty(), new DirtyState());
+ assertThat(
+ stringSetCell.getCumulative().stringSet(),
+ equalTo(StringSetData.create(ImmutableSet.of("hello")).stringSet()));
+
+ stringSetCell.reset();
+ assertThat(stringSetCell.getCumulative(), equalTo(StringSetData.empty()));
+ assertThat(stringSetCell.getDirty(), equalTo(new DirtyState()));
+ }
+}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java
new file mode 100644
index 000000000000..665ce3743c51
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/StringSetDataTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link StringSetData}. */
+public class StringSetDataTest {
+ @Rule public ExpectedException exception = ExpectedException.none();
+
+ @Test
+ public void testCreate() {
+ // test empty stringset creation
+ assertTrue(StringSetData.create(Collections.emptySet()).stringSet().isEmpty());
+ // single element test
+ ImmutableSet singleElement = ImmutableSet.of("ab");
+ StringSetData setData = StringSetData.create(singleElement);
+ assertEquals(setData.stringSet(), singleElement);
+
+ // multiple element test
+ ImmutableSet multipleElement = ImmutableSet.of("cd", "ef");
+ setData = StringSetData.create(multipleElement);
+ assertEquals(setData.stringSet(), multipleElement);
+ }
+
+ @Test
+ public void testCombine() {
+ StringSetData singleElement = StringSetData.create(ImmutableSet.of("ab"));
+ StringSetData multipleElement = StringSetData.create(ImmutableSet.of("cd", "ef"));
+ StringSetData result = singleElement.combine(multipleElement);
+ assertEquals(result.stringSet(), ImmutableSet.of("cd", "ef", "ab"));
+
+ // original sets in stringsetdata should have remained the same
+ assertEquals(singleElement.stringSet(), ImmutableSet.of("ab"));
+ assertEquals(multipleElement.stringSet(), ImmutableSet.of("cd", "ef"));
+ }
+
+ @Test
+ public void testCombineWithEmpty() {
+ StringSetData empty = StringSetData.empty();
+ StringSetData multipleElement = StringSetData.create(ImmutableSet.of("cd", "ef"));
+ StringSetData result = empty.combine(multipleElement);
+ assertEquals(result.stringSet(), ImmutableSet.of("cd", "ef"));
+ // original sets in stringsetdata should have remained the same
+ assertTrue(empty.stringSet().isEmpty());
+ assertEquals(multipleElement.stringSet(), ImmutableSet.of("cd", "ef"));
+ }
+
+ @Test
+ public void testEmpty() {
+ StringSetData empty = StringSetData.empty();
+ assertTrue(empty.stringSet().isEmpty());
+ }
+
+ @Test
+ public void testStringSetDataEmptyIsImmutable() {
+ StringSetData empty = StringSetData.empty();
+ assertThrows(UnsupportedOperationException.class, () -> empty.stringSet().add("aa"));
+ }
+
+ @Test
+ public void testEmptyExtract() {
+ assertTrue(StringSetData.empty().extractResult().getStringSet().isEmpty());
+ }
+
+ @Test
+ public void testExtract() {
+ ImmutableSet contents = ImmutableSet.of("ab", "cd");
+ StringSetData stringSetData = StringSetData.create(contents);
+ assertEquals(stringSetData.stringSet(), contents);
+ }
+
+ @Test
+ public void testExtractReturnsImmutable() {
+ StringSetData stringSetData = StringSetData.create(ImmutableSet.of("ab", "cd"));
+ // check that immutable copy is returned
+ assertThrows(UnsupportedOperationException.class, () -> stringSetData.stringSet().add("aa"));
+ }
+}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index 5b286dc0b2e0..b02c4f030b27 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -33,6 +33,7 @@
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.runners.core.metrics.MetricsMap;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
@@ -41,6 +42,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -217,6 +219,26 @@ public GaugeResult extract(GaugeData data) {
}
};
+ private static final MetricAggregation STRING_SET =
+ new MetricAggregation() {
+ @Override
+ public StringSetData zero() {
+ return StringSetData.empty();
+ }
+
+ @Override
+ public StringSetData combine(Iterable updates) {
+ StringSetData result = StringSetData.empty();
+ result = result.combine(updates);
+ return result;
+ }
+
+ @Override
+ public StringSetResult extract(StringSetData data) {
+ return data.extractResult();
+ }
+ };
+
/** The current values of counters in memory. */
private final MetricsMap> counters;
@@ -224,12 +246,14 @@ public GaugeResult extract(GaugeData data) {
distributions;
private final MetricsMap> gauges;
+ private final MetricsMap> stringSet;
DirectMetrics(ExecutorService executorService) {
this.counters = new MetricsMap<>(unusedKey -> new DirectMetric<>(COUNTER, executorService));
this.distributions =
new MetricsMap<>(unusedKey -> new DirectMetric<>(DISTRIBUTION, executorService));
this.gauges = new MetricsMap<>(unusedKey -> new DirectMetric<>(GAUGE, executorService));
+ this.stringSet = new MetricsMap<>(unusedKey -> new DirectMetric<>(STRING_SET, executorService));
}
@Override
@@ -249,8 +273,17 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
maybeExtractResult(filter, gaugeResults, gauge);
}
+ ImmutableList.Builder> stringSetResult = ImmutableList.builder();
+ for (Entry> stringSet :
+ stringSet.entries()) {
+ maybeExtractResult(filter, stringSetResult, stringSet);
+ }
+
return MetricQueryResults.create(
- counterResults.build(), distributionResults.build(), gaugeResults.build());
+ counterResults.build(),
+ distributionResults.build(),
+ gaugeResults.build(),
+ stringSetResult.build());
}
private void maybeExtractResult(
@@ -277,6 +310,10 @@ public void updatePhysical(CommittedBundle> bundle, MetricUpdates updates) {
for (MetricUpdate gauge : updates.gaugeUpdates()) {
gauges.get(gauge.getKey()).updatePhysical(bundle, gauge.getUpdate());
}
+
+ for (MetricUpdate sSet : updates.stringSetUpdates()) {
+ stringSet.get(sSet.getKey()).updatePhysical(bundle, sSet.getUpdate());
+ }
}
public void commitPhysical(CommittedBundle> bundle, MetricUpdates updates) {
@@ -289,6 +326,9 @@ public void commitPhysical(CommittedBundle> bundle, MetricUpdates updates) {
for (MetricUpdate gauge : updates.gaugeUpdates()) {
gauges.get(gauge.getKey()).commitPhysical(bundle, gauge.getUpdate());
}
+ for (MetricUpdate sSet : updates.stringSetUpdates()) {
+ stringSet.get(sSet.getKey()).commitPhysical(bundle, sSet.getUpdate());
+ }
}
/** Apply metric updates that represent new logical values from a bundle being committed. */
@@ -302,5 +342,8 @@ public void commitLogical(CommittedBundle> bundle, MetricUpdates updates) {
for (MetricUpdate gauge : updates.gaugeUpdates()) {
gauges.get(gauge.getKey()).commitLogical(bundle, gauge.getUpdate());
}
+ for (MetricUpdate sSet : updates.stringSetUpdates()) {
+ stringSet.get(sSet.getKey()).commitLogical(bundle, sSet.getUpdate());
+ }
}
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 46f74d6b7e05..00df20c4ac39 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -30,13 +30,16 @@
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
@@ -85,7 +88,11 @@ public void testApplyCommittedNoFilter() {
MetricUpdate.create(
MetricKey.create("step1", NAME1), DistributionData.create(8, 2, 3, 5))),
ImmutableList.of(
- MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L)))));
+ MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L))),
+ ImmutableList.of(
+ MetricUpdate.create(
+ MetricKey.create("step1", NAME4),
+ StringSetData.create(ImmutableSet.of("ab"))))));
metrics.commitLogical(
bundle1,
MetricUpdates.create(
@@ -96,7 +103,11 @@ public void testApplyCommittedNoFilter() {
MetricUpdate.create(
MetricKey.create("step1", NAME1), DistributionData.create(4, 1, 4, 4))),
ImmutableList.of(
- MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L)))));
+ MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L))),
+ ImmutableList.of(
+ MetricUpdate.create(
+ MetricKey.create("step1", NAME4),
+ StringSetData.create(ImmutableSet.of("cd"))))));
MetricQueryResults results = metrics.allMetrics();
assertThat(
@@ -128,6 +139,11 @@ public void testApplyCommittedNoFilter() {
contains(
committedMetricsResult(
"ns2", "name2", "step1", GaugeResult.create(27L, Instant.now()))));
+ assertThat(
+ results.getStringSets(),
+ contains(
+ committedMetricsResult(
+ "ns2", "name2", "step1", StringSetResult.create(ImmutableSet.of("ab", "cd")))));
}
@SuppressWarnings("unchecked")
@@ -140,6 +156,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() {
MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
@@ -148,6 +165,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() {
MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
@@ -176,6 +194,7 @@ public void testApplyAttemptedQueryCompositeScope() {
MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
@@ -184,6 +203,7 @@ public void testApplyAttemptedQueryCompositeScope() {
MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
@@ -212,6 +232,7 @@ public void testPartialScopeMatchingInMetricsQuery() {
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L),
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
@@ -220,6 +241,7 @@ public void testPartialScopeMatchingInMetricsQuery() {
MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L),
MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
index a9cea996680b..96c0374067cf 100644
--- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
+++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
@@ -26,6 +26,8 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsSink;
+import org.apache.beam.sdk.metrics.StringSetResult;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
/** Test class to be used as a input to {@link MetricsSink} implementations tests. */
@@ -71,4 +73,13 @@ public List> getGauges() {
GaugeResult.create(100L, new Instant(345862800L)),
GaugeResult.create(120L, new Instant(345862800L)));
}
+
+ @Override
+ public Iterable> getStringSets() {
+ return makeResults(
+ "s3",
+ "n3",
+ StringSetResult.create(ImmutableSet.of("ab")),
+ StringSetResult.create(ImmutableSet.of("cd")));
+ }
}
diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
index afbe77bdb885..10e9481d271b 100644
--- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
+++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
@@ -94,7 +94,9 @@ public void testWriteMetricsWithCommittedSupported() throws Exception {
+ "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":"
- + "\"ns1\"},\"step\":\"s3\"}]}";
+ + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd"
+ + "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\","
+ + "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size());
assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0));
}
@@ -114,7 +116,8 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception {
+ "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\""
+ ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":"
- + "\"ns1\"},\"step\":\"s3\"}]}";
+ + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]},"
+ + "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size());
assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0));
}
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
index c5023a57d8d6..1fad140717f6 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java
@@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
@@ -38,10 +39,12 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,12 +102,13 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
ImmutableList> counters = ImmutableList.of();
ImmutableList> distributions = ImmutableList.of();
ImmutableList> gauges = ImmutableList.of();
+ ImmutableList> stringSets = ImmutableList.of();
JobMetrics jobMetrics;
try {
jobMetrics = getJobMetrics();
} catch (IOException e) {
LOG.warn("Unable to query job metrics.\n");
- return MetricQueryResults.create(counters, distributions, gauges);
+ return MetricQueryResults.create(counters, distributions, gauges, stringSets);
}
metricUpdates = firstNonNull(jobMetrics.getMetrics(), Collections.emptyList());
return populateMetricQueryResults(metricUpdates, filter);
@@ -127,12 +131,19 @@ private static class DataflowMetricResultExtractor {
private final ImmutableList.Builder> counterResults;
private final ImmutableList.Builder> distributionResults;
private final ImmutableList.Builder> gaugeResults;
+ private final ImmutableList.Builder> stringSetResults;
private final boolean isStreamingJob;
DataflowMetricResultExtractor(boolean isStreamingJob) {
counterResults = ImmutableList.builder();
distributionResults = ImmutableList.builder();
gaugeResults = ImmutableList.builder();
+ stringSetResults = ImmutableList.builder();
+ /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
+ * In Dataflow batch jobs, only COMMITTED metrics are available, but
+ * we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
+ * Reporting the appropriate metric depending on whether it's a batch/streaming job.
+ */
this.isStreamingJob = isStreamingJob;
}
@@ -148,20 +159,14 @@ public void addMetricResult(
// distribution metric
DistributionResult value = getDistributionValue(committed);
distributionResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
- /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
- * In Dataflow batch jobs, only COMMITTED metrics are available, but
- * we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
- * Reporting the appropriate metric depending on whether it's a batch/streaming job.
- */
} else if (committed.getScalar() != null && attempted.getScalar() != null) {
// counter metric
Long value = getCounterValue(committed);
counterResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
- /* In Dataflow streaming jobs, only ATTEMPTED metrics are available.
- * In Dataflow batch jobs, only COMMITTED metrics are available, but
- * we must provide ATTEMPTED, so we use COMMITTED as a good approximation.
- * Reporting the appropriate metric depending on whether it's a batch/streaming job.
- */
+ } else if (committed.getSet() != null && attempted.getSet() != null) {
+ // stringset metric
+ StringSetResult value = getStringSetValue(committed);
+ stringSetResults.add(MetricResult.create(metricKey, !isStreamingJob, value));
} else {
// This is exceptionally unexpected. We expect matching user metrics to only have the
// value types provided by the Metrics API.
@@ -182,6 +187,13 @@ private Long getCounterValue(MetricUpdate metricUpdate) {
return ((Number) metricUpdate.getScalar()).longValue();
}
+ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
+ if (metricUpdate.getSet() == null) {
+ return StringSetResult.empty();
+ }
+ return StringSetResult.create(ImmutableSet.copyOf(((Set) metricUpdate.getSet())));
+ }
+
private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {
if (metricUpdate.getDistribution() == null) {
return DistributionResult.IDENTITY_ELEMENT;
@@ -205,6 +217,10 @@ public Iterable> getCounterResults() {
public Iterable> getGaugeResults() {
return gaugeResults.build();
}
+
+ public Iterable> geStringSetResults() {
+ return stringSetResults.build();
+ }
}
private static class DataflowMetricQueryResultsFactory {
@@ -369,7 +385,8 @@ public MetricQueryResults build() {
return MetricQueryResults.create(
extractor.getCounterResults(),
extractor.getDistributionResults(),
- extractor.getGaugeResults());
+ extractor.getGaugeResults(),
+ extractor.geStringSetResults());
}
}
}
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
index 527273abb42e..9b8e3cc871da 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowMetricsTest.java
@@ -40,6 +40,7 @@
import com.google.api.services.dataflow.model.MetricUpdate;
import java.io.IOException;
import java.math.BigDecimal;
+import java.util.Set;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.DataflowTemplateJob;
import org.apache.beam.sdk.PipelineResult.State;
@@ -48,12 +49,14 @@
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBiMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -115,6 +118,7 @@ public void testEmptyMetricUpdates() throws IOException {
MetricQueryResults result = dataflowMetrics.allMetrics();
assertThat(ImmutableList.copyOf(result.getCounters()), is(empty()));
assertThat(ImmutableList.copyOf(result.getDistributions()), is(empty()));
+ assertThat(ImmutableList.copyOf(result.getStringSets()), is(empty()));
}
@Test
@@ -184,6 +188,13 @@ private MetricUpdate makeCounterMetricUpdate(
return setStructuredName(update, name, namespace, step, tentative);
}
+ private MetricUpdate makeStringSetMetricUpdate(
+ String name, String namespace, String step, Set setValues, boolean tentative) {
+ MetricUpdate update = new MetricUpdate();
+ update.setSet(setValues);
+ return setStructuredName(update, name, namespace, step, tentative);
+ }
+
@Test
public void testSingleCounterUpdates() throws IOException {
AppliedPTransform, ?, ?> myStep = mock(AppliedPTransform.class);
@@ -226,6 +237,54 @@ public void testSingleCounterUpdates() throws IOException {
committedMetricsResult("counterNamespace", "counterName", "myStepName", 1234L)));
}
+ @Test
+ public void testSingleStringSetUpdates() throws IOException {
+ AppliedPTransform, ?, ?> myStep = mock(AppliedPTransform.class);
+ when(myStep.getFullName()).thenReturn("myStepName");
+ BiMap, String> transformStepNames = HashBiMap.create();
+ transformStepNames.put(myStep, "s2");
+
+ JobMetrics jobMetrics = new JobMetrics();
+ DataflowPipelineJob job = mock(DataflowPipelineJob.class);
+ DataflowPipelineOptions options = mock(DataflowPipelineOptions.class);
+ when(options.isStreaming()).thenReturn(false);
+ when(job.getDataflowOptions()).thenReturn(options);
+ when(job.getState()).thenReturn(State.RUNNING);
+ when(job.getJobId()).thenReturn(JOB_ID);
+ when(job.getTransformStepNames()).thenReturn(transformStepNames);
+
+ // The parser relies on the fact that one tentative and one committed metric update exist in
+ // the job metrics results.
+ MetricUpdate mu1 =
+ makeStringSetMetricUpdate(
+ "counterName", "counterNamespace", "s2", ImmutableSet.of("ab", "cd"), false);
+ MetricUpdate mu1Tentative =
+ makeStringSetMetricUpdate(
+ "counterName", "counterNamespace", "s2", ImmutableSet.of("ab", "cd"), true);
+ jobMetrics.setMetrics(ImmutableList.of(mu1, mu1Tentative));
+ DataflowClient dataflowClient = mock(DataflowClient.class);
+ when(dataflowClient.getJobMetrics(JOB_ID)).thenReturn(jobMetrics);
+
+ DataflowMetrics dataflowMetrics = new DataflowMetrics(job, dataflowClient);
+ MetricQueryResults result = dataflowMetrics.allMetrics();
+ assertThat(
+ result.getStringSets(),
+ containsInAnyOrder(
+ attemptedMetricsResult(
+ "counterNamespace",
+ "counterName",
+ "myStepName",
+ StringSetResult.create(ImmutableSet.of("ab", "cd")))));
+ assertThat(
+ result.getStringSets(),
+ containsInAnyOrder(
+ committedMetricsResult(
+ "counterNamespace",
+ "counterName",
+ "myStepName",
+ StringSetResult.create(ImmutableSet.of("ab", "cd")))));
+ }
+
@Test
public void testIgnoreDistributionButGetCounterUpdates() throws IOException {
AppliedPTransform, ?, ?> myStep = mock(AppliedPTransform.class);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
index 901e305b22b1..62ec70ff9b18 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java
@@ -517,7 +517,12 @@ public Iterable extractMetricUpdates(boolean isFinalUpdate) {
.transform(
update ->
MetricsToCounterUpdateConverter.fromDistribution(
- update.getKey(), true, update.getUpdate())));
+ update.getKey(), true, update.getUpdate())),
+ FluentIterable.from(updates.stringSetUpdates())
+ .transform(
+ update ->
+ MetricsToCounterUpdateConverter.fromStringSet(
+ update.getKey(), update.getUpdate())));
});
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
index c3e4fb1388b0..f9cd098edaa6 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMetricsContainer.java
@@ -26,6 +26,7 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.util.HistogramData;
/**
@@ -73,6 +74,11 @@ public Gauge getGauge(MetricName metricName) {
return getCurrentContainer().getGauge(metricName);
}
+ @Override
+ public StringSet getStringSet(MetricName metricName) {
+ return getCurrentContainer().getStringSet(metricName);
+ }
+
@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java
index 22b55058d4f5..dbedc51528a5 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricsToCounterUpdateConverter.java
@@ -25,7 +25,10 @@
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.DistributionUpdate;
import com.google.api.services.dataflow.model.IntegerGauge;
+import com.google.api.services.dataflow.model.StringList;
+import java.util.ArrayList;
import org.apache.beam.runners.core.metrics.DistributionData;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
@@ -58,7 +61,8 @@ public enum Kind {
DISTRIBUTION("DISTRIBUTION"),
MEAN("MEAN"),
SUM("SUM"),
- LATEST_VALUE("LATEST_VALUE");
+ LATEST_VALUE("LATEST_VALUE"),
+ SET("SET");
private final String kind;
@@ -94,6 +98,18 @@ public static CounterUpdate fromGauge(
.setIntegerGauge(integerGaugeProto);
}
+ public static CounterUpdate fromStringSet(MetricKey key, StringSetData stringSetData) {
+ CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.SET);
+
+ StringList stringList = new StringList();
+ stringList.setElements(new ArrayList<>(stringSetData.stringSet()));
+
+ return new CounterUpdate()
+ .setStructuredNameAndMetadata(name)
+ .setCumulative(false)
+ .setStringList(stringList);
+ }
+
public static CounterUpdate fromDistribution(
MetricKey key, boolean isCumulative, DistributionData update) {
CounterStructuredNameAndMetadata name = structuredNameAndMetadata(key, Kind.DISTRIBUTION);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index 15fdbf4ab7dd..7cc0dc68f7e7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -33,6 +33,7 @@
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeCell;
import org.apache.beam.runners.core.metrics.MetricsMap;
+import org.apache.beam.runners.core.metrics.StringSetCell;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
@@ -41,6 +42,7 @@
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Function;
@@ -67,6 +69,8 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
private MetricsMap gauges = new MetricsMap<>(GaugeCell::new);
+ private MetricsMap stringSet = new MetricsMap<>(StringSetCell::new);
+
private MetricsMap distributions =
new MetricsMap<>(DeltaDistributionCell::new);
@@ -159,6 +163,11 @@ public Gauge getGauge(MetricName metricName) {
return gauges.get(metricName);
}
+ @Override
+ public StringSet getStringSet(MetricName metricName) {
+ return stringSet.get(metricName);
+ }
+
@Override
public Histogram getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
@@ -176,7 +185,9 @@ public Histogram getPerWorkerHistogram(
}
public Iterable extractUpdates() {
- return counterUpdates().append(distributionUpdates()).append(gaugeUpdates());
+ return counterUpdates()
+ .append(distributionUpdates())
+ .append(gaugeUpdates().append(stringSetUpdates()));
}
private FluentIterable counterUpdates() {
@@ -218,6 +229,20 @@ private FluentIterable gaugeUpdates() {
.filter(Predicates.notNull());
}
+ private FluentIterable stringSetUpdates() {
+ return FluentIterable.from(stringSet.entries())
+ .transform(
+ new Function, CounterUpdate>() {
+ @Override
+ public @Nullable CounterUpdate apply(
+ @Nonnull Map.Entry entry) {
+ return MetricsToCounterUpdateConverter.fromStringSet(
+ MetricKey.create(stepName, entry.getKey()), entry.getValue().getCumulative());
+ }
+ })
+ .filter(Predicates.notNull());
+ }
+
private FluentIterable distributionUpdates() {
return FluentIterable.from(distributions.entries())
.transform(
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
index 0e516b3ffb49..18bd814b4df7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContextTest.java
@@ -30,6 +30,8 @@
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import com.google.api.services.dataflow.model.DistributionUpdate;
+import com.google.api.services.dataflow.model.StringList;
+import java.util.Arrays;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
@@ -42,6 +44,7 @@
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.hamcrest.Matchers;
import org.junit.Test;
@@ -158,6 +161,37 @@ public void extractMetricUpdatesDistribution() {
assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
}
+ @Test
+ public void extractMetricUpdatesStringSet() {
+ BatchModeExecutionContext executionContext =
+ BatchModeExecutionContext.forTesting(PipelineOptionsFactory.create(), "testStage");
+ DataflowOperationContext operationContext =
+ executionContext.createOperationContext(NameContextsForTests.nameContextForTest());
+
+ StringSet stringSet =
+ operationContext
+ .metricsContainer()
+ .getStringSet(MetricName.named("namespace", "some-stringset"));
+ stringSet.add("ab");
+ stringSet.add("cd");
+
+ final CounterUpdate expected =
+ new CounterUpdate()
+ .setStructuredNameAndMetadata(
+ new CounterStructuredNameAndMetadata()
+ .setName(
+ new CounterStructuredName()
+ .setOrigin("USER")
+ .setOriginNamespace("namespace")
+ .setName("some-stringset")
+ .setOriginalStepName("originalName"))
+ .setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
+ .setCumulative(false)
+ .setStringList(new StringList().setElements(Arrays.asList("ab", "cd")));
+
+ assertThat(executionContext.extractMetricUpdates(false), containsInAnyOrder(expected));
+ }
+
@Test
public void extractMsecCounters() {
BatchModeExecutionContext executionContext =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
index a9b8abdca93c..2d5a8d8266ae 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java
@@ -37,11 +37,13 @@
import com.google.api.services.dataflow.model.Linear;
import com.google.api.services.dataflow.model.MetricValue;
import com.google.api.services.dataflow.model.PerStepNamespaceMetrics;
+import com.google.api.services.dataflow.model.StringList;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -57,6 +59,7 @@
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.NoOpCounter;
import org.apache.beam.sdk.metrics.NoOpHistogram;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.hamcrest.collection.IsEmptyIterable;
@@ -267,6 +270,61 @@ public void testGaugeUpdateExtraction() {
DateTimeUtils.setCurrentMillisSystem();
}
+ @Test
+ public void testStringSetUpdateExtraction() {
+ StringSet stringSet = c1.getStringSet(name1);
+ stringSet.add("ab");
+ stringSet.add("cd", "ef");
+ stringSet.add("gh");
+ stringSet.add("gh");
+
+ CounterUpdate name1Update =
+ new CounterUpdate()
+ .setStructuredNameAndMetadata(
+ new CounterStructuredNameAndMetadata()
+ .setName(
+ new CounterStructuredName()
+ .setOrigin(Origin.USER.toString())
+ .setOriginNamespace("ns")
+ .setName("name1")
+ .setOriginalStepName("s1"))
+ .setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
+ .setCumulative(false)
+ .setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh")));
+
+ Iterable updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
+ assertThat(updates, containsInAnyOrder(name1Update));
+
+ stringSet = c2.getStringSet(name2);
+ stringSet.add("ij");
+ stringSet.add("kl", "mn");
+ stringSet.add("mn");
+
+ CounterUpdate name2Update =
+ new CounterUpdate()
+ .setStructuredNameAndMetadata(
+ new CounterStructuredNameAndMetadata()
+ .setName(
+ new CounterStructuredName()
+ .setOrigin(Origin.USER.toString())
+ .setOriginNamespace("ns")
+ .setName("name2")
+ .setOriginalStepName("s2"))
+ .setMetadata(new CounterMetadata().setKind(Kind.SET.toString())))
+ .setCumulative(false)
+ .setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn")));
+
+ updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
+ assertThat(updates, containsInAnyOrder(name1Update, name2Update));
+
+ c1.getStringSet(name1).add("op");
+ name1Update.setStringList(
+ new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh", "op")));
+
+ updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
+ assertThat(updates, containsInAnyOrder(name1Update, name2Update));
+ }
+
@Test
public void testPerWorkerMetrics() {
StreamingStepMetricsContainer.setEnablePerWorkerMetrics(false);
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java
index b6dae10da6bc..67cf3280a83c 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java
@@ -25,6 +25,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -84,6 +85,11 @@ public Iterable> getDistributions() {
public Iterable> getGauges() {
return Collections.emptyList();
}
+
+ @Override
+ public Iterable> getStringSets() {
+ return Collections.emptyList();
+ }
};
}
};
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
index 8e28f3fda0e8..44681a626cc0 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
@@ -25,6 +25,7 @@
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
@@ -33,6 +34,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -52,6 +54,9 @@ public class JetMetricResults extends MetricResults {
@GuardedBy("this")
private final Gauges gauges = new Gauges();
+ @GuardedBy("this")
+ private final StringSets stringSet = new StringSets();
+
@GuardedBy("this")
private IMap metricsAccumulator;
@@ -70,18 +75,23 @@ public synchronized MetricQueryResults queryMetrics(@Nullable MetricsFilter filt
updateLocalMetrics(metricsAccumulator);
}
return new QueryResults(
- counters.filter(filter), distributions.filter(filter), gauges.filter(filter));
+ counters.filter(filter),
+ distributions.filter(filter),
+ gauges.filter(filter),
+ stringSet.filter(filter));
}
private synchronized void updateLocalMetrics(IMap metricsAccumulator) {
counters.clear();
distributions.clear();
gauges.clear();
+ stringSet.clear();
for (MetricUpdates metricUpdates : metricsAccumulator.values()) {
counters.merge(metricUpdates.counterUpdates());
distributions.merge(metricUpdates.distributionUpdates());
gauges.merge(metricUpdates.gaugeUpdates());
+ stringSet.merge(metricUpdates.stringSetUpdates());
}
}
@@ -93,14 +103,17 @@ private static class QueryResults extends MetricQueryResults {
private final Iterable> counters;
private final Iterable> distributions;
private final Iterable> gauges;
+ private final Iterable> stringSets;
private QueryResults(
Iterable> counters,
Iterable> distributions,
- Iterable> gauges) {
+ Iterable> gauges,
+ Iterable> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
+ this.stringSets = stringSets;
}
@Override
@@ -117,6 +130,11 @@ public Iterable> getDistributions() {
public Iterable> getGauges() {
return gauges;
}
+
+ @Override
+ public Iterable> getStringSets() {
+ return stringSets;
+ }
}
private static class Counters {
@@ -212,4 +230,36 @@ private MetricResult toUpdateResult(Map.Entry
return MetricResult.create(key, gaugeResult, gaugeResult);
}
}
+
+ private static class StringSets {
+
+ private final Map stringSets = new HashMap<>();
+
+ void merge(Iterable> updates) {
+ for (MetricUpdate update : updates) {
+ MetricKey key = update.getKey();
+ StringSetData oldStringSet = stringSets.getOrDefault(key, StringSetData.empty());
+ StringSetData updatedStringSet = update.getUpdate().combine(oldStringSet);
+ stringSets.put(key, updatedStringSet);
+ }
+ }
+
+ void clear() {
+ stringSets.clear();
+ }
+
+ Iterable> filter(MetricsFilter filter) {
+ return FluentIterable.from(stringSets.entrySet())
+ .filter(matchesFilter(filter))
+ .transform(this::toUpdateResult)
+ .toList();
+ }
+
+ private MetricResult toUpdateResult(
+ Map.Entry entry) {
+ MetricKey key = entry.getKey();
+ StringSetResult stringSetResult = entry.getValue().extractResult();
+ return MetricResult.create(key, stringSetResult, stringSetResult);
+ }
+ }
}
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
index 5441d05dcf76..64455d704c9b 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
@@ -26,12 +26,14 @@
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
import org.apache.beam.runners.core.metrics.MetricUpdates;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
/** Jet specific implementation of {@link MetricsContainer}. */
@@ -47,6 +49,7 @@ public static String getMetricsMapName(long jobId) {
private final Map counters = new HashMap<>();
private final Map distributions = new HashMap<>();
private final Map gauges = new HashMap<>();
+ private final Map stringSets = new HashMap<>();
private final IMap accumulator;
@@ -71,9 +74,14 @@ public Gauge getGauge(MetricName metricName) {
return gauges.computeIfAbsent(metricName, GaugeImpl::new);
}
+ @Override
+ public StringSet getStringSet(MetricName metricName) {
+ return stringSets.computeIfAbsent(metricName, StringSetImpl::new);
+ }
+
@SuppressWarnings("FutureReturnValueIgnored")
public void flush(boolean async) {
- if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty()) {
+ if (counters.isEmpty() && distributions.isEmpty() && gauges.isEmpty() && stringSets.isEmpty()) {
return;
}
@@ -81,7 +89,9 @@ public void flush(boolean async) {
ImmutableList> distributions =
extractUpdates(this.distributions);
ImmutableList> gauges = extractUpdates(this.gauges);
- MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges);
+ ImmutableList> stringSets =
+ extractUpdates(this.stringSets);
+ MetricUpdates updates = new MetricUpdatesImpl(counters, distributions, gauges, stringSets);
if (async) {
accumulator.setAsync(metricsKey, updates);
@@ -110,14 +120,17 @@ private static class MetricUpdatesImpl extends MetricUpdates implements Serializ
private final Iterable> counters;
private final Iterable> distributions;
private final Iterable> gauges;
+ private final Iterable> stringSets;
MetricUpdatesImpl(
Iterable> counters,
Iterable> distributions,
- Iterable> gauges) {
+ Iterable> gauges,
+ Iterable> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
+ this.stringSets = stringSets;
}
@Override
@@ -134,5 +147,10 @@ public Iterable> distributionUpdates() {
public Iterable> gaugeUpdates() {
return gauges;
}
+
+ @Override
+ public Iterable> stringSetUpdates() {
+ return stringSets;
+ }
}
}
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java
new file mode 100644
index 000000000000..4fd67042e3cf
--- /dev/null
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/StringSetImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.jet.metrics;
+
+import org.apache.beam.runners.core.metrics.StringSetData;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.StringSet;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/** Implementation of {@link StringSet}. */
+public class StringSetImpl extends AbstractMetric implements StringSet {
+
+ private final StringSetData stringSetData = StringSetData.empty();
+
+ public StringSetImpl(MetricName name) {
+ super(name);
+ }
+
+ @Override
+ StringSetData getValue() {
+ return stringSetData;
+ }
+
+ @Override
+ public void add(String value) {
+ if (stringSetData.stringSet().contains(value)) {
+ return;
+ }
+ stringSetData.combine(StringSetData.create(ImmutableSet.of(value)));
+ }
+
+ @Override
+ public void add(String... values) {
+ stringSetData.combine(StringSetData.create(ImmutableSet.copyOf(values)));
+ }
+}
diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
index fc94e408bfd3..1d45a83b1e79 100644
--- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
+++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/PortableMetrics.java
@@ -19,10 +19,12 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import java.util.ArrayList;
import java.util.List;
@@ -32,6 +34,7 @@
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.GaugeResult;
import org.apache.beam.sdk.metrics.MetricFiltering;
@@ -41,6 +44,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@SuppressWarnings({
@@ -53,14 +57,17 @@ public class PortableMetrics extends MetricResults {
private Iterable> counters;
private Iterable> distributions;
private Iterable> gauges;
+ private Iterable> stringSets;
private PortableMetrics(
Iterable> counters,
Iterable> distributions,
- Iterable> gauges) {
+ Iterable> gauges,
+ Iterable> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
+ this.stringSets = stringSets;
}
public static PortableMetrics of(JobApi.MetricResults jobMetrics) {
@@ -75,7 +82,9 @@ public MetricQueryResults queryMetrics(MetricsFilter filter) {
Iterables.filter(
this.distributions,
(distribution) -> MetricFiltering.matches(filter, distribution.getKey())),
- Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey())));
+ Iterables.filter(this.gauges, (gauge) -> MetricFiltering.matches(filter, gauge.getKey())),
+ Iterables.filter(
+ this.stringSets, (stringSet) -> MetricFiltering.matches(filter, stringSet.getKey())));
}
private static PortableMetrics convertMonitoringInfosToMetricResults(
@@ -89,7 +98,10 @@ private static PortableMetrics convertMonitoringInfosToMetricResults(
extractDistributionMetricsFromJobMetrics(monitoringInfoList);
Iterable> gaugesFromMetrics =
extractGaugeMetricsFromJobMetrics(monitoringInfoList);
- return new PortableMetrics(countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics);
+ Iterable> stringSetFromMetrics =
+ extractStringSetMetricsFromJobMetrics(monitoringInfoList);
+ return new PortableMetrics(
+ countersFromJobMetrics, distributionsFromMetrics, gaugesFromMetrics, stringSetFromMetrics);
}
private static Iterable>
@@ -123,6 +135,28 @@ private static MetricResult convertGaugeMonitoringInfoToGauge(
return MetricResult.create(key, false, result);
}
+ private static Iterable> extractStringSetMetricsFromJobMetrics(
+ List monitoringInfoList) {
+ return monitoringInfoList.stream()
+ .filter(item -> SET_STRING_TYPE.equals(item.getType()))
+ .filter(item -> item.getLabelsMap().get(NAMESPACE_LABEL) != null)
+ .map(PortableMetrics::convertStringSetMonitoringInfoToStringSet)
+ .collect(Collectors.toList());
+ }
+
+ private static MetricResult convertStringSetMonitoringInfoToStringSet(
+ MetricsApi.MonitoringInfo monitoringInfo) {
+ Map labelsMap = monitoringInfo.getLabelsMap();
+ MetricKey key =
+ MetricKey.create(
+ labelsMap.get(STEP_NAME_LABEL),
+ MetricName.named(labelsMap.get(NAMESPACE_LABEL), labelsMap.get(METRIC_NAME_LABEL)));
+
+ StringSetData data = decodeStringSet(monitoringInfo.getPayload());
+ StringSetResult result = StringSetResult.create(data.stringSet());
+ return MetricResult.create(key, false, result);
+ }
+
private static MetricResult convertDistributionMonitoringInfoToDistribution(
MetricsApi.MonitoringInfo monitoringInfo) {
Map labelsMap = monitoringInfo.getLabelsMap();
diff --git a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
index 25353437a2ec..788d4a43319d 100644
--- a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
+++ b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java
@@ -20,6 +20,7 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
+import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -27,6 +28,7 @@
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
@@ -34,6 +36,7 @@
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.runners.core.metrics.DistributionData;
import org.apache.beam.runners.core.metrics.GaugeData;
+import org.apache.beam.runners.core.metrics.StringSetData;
import org.apache.beam.runners.fnexecution.artifact.ArtifactStagingService;
import org.apache.beam.runners.portability.testing.TestJobService;
import org.apache.beam.sdk.PipelineResult;
@@ -50,6 +53,7 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.testing.GrpcCleanupRule;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -68,6 +72,7 @@ public class PortableRunnerTest implements Serializable {
private static final String COUNTER_TYPE = "beam:metrics:sum_int64:v1";
private static final String DIST_TYPE = "beam:metrics:distribution_int64:v1";
private static final String GAUGE_TYPE = "beam:metrics:latest_int64:v1";
+ private static final String STRING_SET_TYPE = "beam:metrics:set_string:v1";
private static final String NAMESPACE_LABEL = "NAMESPACE";
private static final String METRIC_NAME_LABEL = "NAME";
private static final String STEP_NAME_LABEL = "PTRANSFORM";
@@ -76,6 +81,7 @@ public class PortableRunnerTest implements Serializable {
private static final String STEP_NAME = "testStep";
private static final Long COUNTER_VALUE = 42L;
private static final Long GAUGE_VALUE = 64L;
+ private static final Set STRING_SET_VALUE = ImmutableSet.of("ab", "cd");
private static final Instant GAUGE_TIME =
GlobalWindow.TIMESTAMP_MIN_VALUE.plus(Duration.standardSeconds(1));
private static final Long DIST_SUM = 1000L;
@@ -124,6 +130,9 @@ public void extractsMetrics() throws Exception {
assertThat(
metricQueryResults.getGauges().iterator().next().getAttempted().getValue(),
is(GAUGE_VALUE));
+ assertThat(
+ metricQueryResults.getStringSets().iterator().next().getAttempted().getStringSet(),
+ is(STRING_SET_VALUE));
}
private JobApi.MetricResults generateMetricResults() throws Exception {
@@ -155,10 +164,18 @@ private JobApi.MetricResults generateMetricResults() throws Exception {
.setPayload(encodeInt64Gauge(GaugeData.create(GAUGE_VALUE, GAUGE_TIME)))
.build();
+ MetricsApi.MonitoringInfo stringSetMonitoringInfo =
+ MetricsApi.MonitoringInfo.newBuilder()
+ .setType(STRING_SET_TYPE)
+ .putAllLabels(labelMap)
+ .setPayload(encodeStringSet(StringSetData.create(STRING_SET_VALUE)))
+ .build();
+
return JobApi.MetricResults.newBuilder()
.addAttempted(counterMonitoringInfo)
.addAttempted(distMonitoringInfo)
.addAttempted(gaugeMonitoringInfo)
+ .addAttempted(stringSetMonitoringInfo)
.build();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
index 86b1c1092824..9f60ce3d6c07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -21,9 +21,7 @@
import java.util.List;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-/**
- * The results of a query for metrics. Allows accessing all of the metrics that matched the filter.
- */
+/** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */
@AutoValue
public abstract class MetricQueryResults {
/** Return the metric results for the counters that matched the filter. */
@@ -35,6 +33,9 @@ public abstract class MetricQueryResults {
/** Return the metric results for the gauges that matched the filter. */
public abstract Iterable> getGauges();
+ /** Return the metric results for the sets that matched the filter. */
+ public abstract Iterable> getStringSets();
+
static void printMetrics(String type, Iterable> metrics, StringBuilder sb) {
List> metricsList = ImmutableList.copyOf(metrics);
if (!metricsList.isEmpty()) {
@@ -63,6 +64,7 @@ public final String toString() {
printMetrics("Counters", getCounters(), sb);
printMetrics("Distributions", getDistributions(), sb);
printMetrics("Gauges", getGauges(), sb);
+ printMetrics("StringSets", getStringSets(), sb);
sb.append(")");
return sb.toString();
}
@@ -70,7 +72,8 @@ public final String toString() {
public static MetricQueryResults create(
Iterable> counters,
Iterable> distributions,
- Iterable> gauges) {
- return new AutoValue_MetricQueryResults(counters, distributions, gauges);
+ Iterable> gauges,
+ Iterable> stringSets) {
+ return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
index 25f4d8d9e626..b9cbc8d755ee 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
@@ -37,7 +37,7 @@ public abstract class MetricResult {
/** Return the name of the metric. */
public MetricName getName() {
return getKey().metricName();
- };
+ }
public abstract MetricKey getKey();
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 056141284655..916e18647c34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -93,6 +93,22 @@ public static Gauge gauge(Class> namespace, String name) {
return new DelegatingGauge(MetricName.named(namespace, name));
}
+ /**
+ * Create a metric that can have its new value set, and is aggregated by taking the last reported
+ * value.
+ */
+ public static StringSet stringSet(String namespace, String name) {
+ return new DelegatingStringSet(MetricName.named(namespace, name));
+ }
+
+ /**
+ * Create a metric that can have its new value set, and is aggregated by taking the last reported
+ * value.
+ */
+ public static StringSet stringSet(Class> namespace, String name) {
+ return new DelegatingStringSet(MetricName.named(namespace, name));
+ }
+
/**
* Implementation of {@link Distribution} that delegates to the instance for the current context.
*/
@@ -146,4 +162,34 @@ public MetricName getName() {
return name;
}
}
+
+ /** Implementation of {@link StringSet} that delegates to the instance for the current context. */
+ private static class DelegatingStringSet implements Metric, StringSet, Serializable {
+ private final MetricName name;
+
+ private DelegatingStringSet(MetricName name) {
+ this.name = name;
+ }
+
+ @Override
+ public void add(String value) {
+ MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getStringSet(name).add(value);
+ }
+ }
+
+ @Override
+ public void add(String... value) {
+ MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+ if (container != null) {
+ container.getStringSet(name).add(value);
+ }
+ }
+
+ @Override
+ public MetricName getName() {
+ return name;
+ }
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index f48b9195c37c..0c4766bb2c0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -53,6 +53,12 @@ default Counter getPerWorkerCounter(MetricName metricName) {
*/
Gauge getGauge(MetricName metricName);
+ /**
+ * Return the {@link StringSet} that should be used for implementing the given {@code metricName}
+ * in this container.
+ */
+ StringSet getStringSet(MetricName metricName);
+
/**
* Return the {@link Histogram} that should be used for implementing the given {@code metricName}
* in this container.
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java
new file mode 100644
index 000000000000..42e8f2388e38
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSet.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+/**
+ * A metric that reports set of unique string values. This metric is backed by {@link
+ * java.util.HashSet} and hence it does not maintain any ordering.
+ */
+public interface StringSet extends Metric {
+
+ /** Add a value to this set. */
+ void add(String value);
+
+ /** Add values to this set. */
+ default void add(String... values) {
+ for (String value : values) {
+ add(value);
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java
new file mode 100644
index 000000000000..f2ad6292a5aa
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/StringSetResult.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import java.util.Set;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+
+/**
+ * The result of a {@link StringSet} metric. The {@link StringSetResult} hold an immutable copy of
+ * the set from which it was initially created representing that a result cannot be modified once
+ * created.
+ */
+@AutoValue
+public abstract class StringSetResult {
+ public abstract Set getStringSet();
+
+ /**
+ * Creates a {@link StringSetResult} from the given {@link Set} by making an immutable copy.
+ *
+ * @param s the set from which the {@link StringSetResult} should be created.
+ * @return {@link StringSetResult} containing an immutable copy of the given set.
+ */
+ public static StringSetResult create(Set s) {
+ return new AutoValue_StringSetResult(ImmutableSet.copyOf(s));
+ }
+
+ /** @return a {@link EmptyStringSetResult} */
+ public static StringSetResult empty() {
+ return EmptyStringSetResult.INSTANCE;
+ }
+
+ /** Empty {@link StringSetResult}, representing no values reported and is immutable. */
+ public static class EmptyStringSetResult extends StringSetResult {
+
+ private static final EmptyStringSetResult INSTANCE = new EmptyStringSetResult();
+
+ private EmptyStringSetResult() {}
+
+ /** Returns an empty immutable set. */
+ @Override
+ public Set getStringSet() {
+ return ImmutableSet.of();
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStringSetMetrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStringSetMetrics.java
new file mode 100644
index 000000000000..e645db801e48
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesStringSetMetrics.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * Category tag for validation tests which utilize {@link org.apache.beam.sdk.metrics.StringSet}.
+ * Tests tagged with {@link UsesStringSetMetrics} should be run for runners which support StringSet.
+ */
+@Internal
+public class UsesStringSetMetrics {}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
index 089d67993314..79709c89963b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java
@@ -22,6 +22,7 @@
import static org.apache.beam.sdk.metrics.MetricResultsMatchers.metricsResult;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.verify;
@@ -37,12 +38,14 @@
import org.apache.beam.sdk.testing.UsesCounterMetrics;
import org.apache.beam.sdk.testing.UsesDistributionMetrics;
import org.apache.beam.sdk.testing.UsesGaugeMetrics;
+import org.apache.beam.sdk.testing.UsesStringSetMetrics;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.After;
@@ -85,6 +88,7 @@ public void tearDown() {
protected PipelineResult runPipelineWithMetrics() {
final Counter count = Metrics.counter(MetricsTest.class, "count");
+ StringSet sideinputs = Metrics.stringSet(MetricsTest.class, "sideinputs");
final TupleTag output1 = new TupleTag() {};
final TupleTag output2 = new TupleTag() {};
pipeline
@@ -104,11 +108,16 @@ public void startBundle() {
@ProcessElement
public void processElement(ProcessContext c) {
Distribution values = Metrics.distribution(MetricsTest.class, "input");
+ StringSet sources = Metrics.stringSet(MetricsTest.class, "sources");
count.inc();
values.update(c.element());
c.output(c.element());
c.output(c.element());
+ sources.add("gcs");
+ sources.add("gcs"); // repeated should appear once
+ sources.add("gcs", "gcs"); // repeated should appear once
+ sideinputs.add("bigtable", "spanner");
}
@DoFn.FinishBundle
@@ -125,11 +134,14 @@ public void finishBundle() {
public void processElement(ProcessContext c) {
Distribution values = Metrics.distribution(MetricsTest.class, "input");
Gauge gauge = Metrics.gauge(MetricsTest.class, "my-gauge");
+ StringSet sinks = Metrics.stringSet(MetricsTest.class, "sinks");
Integer element = c.element();
count.inc();
values.update(element);
gauge.set(12L);
c.output(element);
+ sinks.add("bq", "kafka", "kafka"); // repeated should appear once
+ sideinputs.add("bigtable", "sql");
c.output(output2, element);
}
})
@@ -233,7 +245,8 @@ public static class CommittedMetricTests extends SharedTestBase {
UsesCommittedMetrics.class,
UsesCounterMetrics.class,
UsesDistributionMetrics.class,
- UsesGaugeMetrics.class
+ UsesGaugeMetrics.class,
+ UsesStringSetMetrics.class
})
@Test
public void testAllCommittedMetrics() {
@@ -267,6 +280,14 @@ public void testCommittedGaugeMetrics() {
assertGaugeMetrics(metrics, true);
}
+ @Category({ValidatesRunner.class, UsesCommittedMetrics.class, UsesStringSetMetrics.class})
+ @Test
+ public void testCommittedStringSetMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertStringSetMetrics(metrics, true);
+ }
+
@Test
@Category({NeedsRunner.class, UsesAttemptedMetrics.class, UsesCounterMetrics.class})
public void testBoundedSourceMetrics() {
@@ -352,7 +373,8 @@ public static class AttemptedMetricTests extends SharedTestBase {
UsesAttemptedMetrics.class,
UsesCounterMetrics.class,
UsesDistributionMetrics.class,
- UsesGaugeMetrics.class
+ UsesGaugeMetrics.class,
+ UsesStringSetMetrics.class
})
@Test
public void testAllAttemptedMetrics() {
@@ -386,6 +408,14 @@ public void testAttemptedGaugeMetrics() {
MetricQueryResults metrics = queryTestMetrics(result);
assertGaugeMetrics(metrics, false);
}
+
+ @Category({ValidatesRunner.class, UsesAttemptedMetrics.class, UsesStringSetMetrics.class})
+ @Test
+ public void testAttemptedStringSetMetrics() {
+ PipelineResult result = runPipelineWithMetrics();
+ MetricQueryResults metrics = queryTestMetrics(result);
+ assertStringSetMetrics(metrics, false);
+ }
}
private static void assertCounterMetrics(MetricQueryResults metrics, boolean isCommitted) {
@@ -415,6 +445,36 @@ private static void assertGaugeMetrics(MetricQueryResults metrics, boolean isCom
isCommitted)));
}
+ private static void assertStringSetMetrics(MetricQueryResults metrics, boolean isCommitted) {
+ assertThat(
+ metrics.getStringSets(),
+ containsInAnyOrder(
+ metricsResult(
+ NAMESPACE,
+ "sources",
+ "MyStep1",
+ StringSetResult.create(ImmutableSet.of("gcs")),
+ isCommitted),
+ metricsResult(
+ NAMESPACE,
+ "sinks",
+ "MyStep2",
+ StringSetResult.create(ImmutableSet.of("kafka", "bq")),
+ isCommitted),
+ metricsResult(
+ NAMESPACE,
+ "sideinputs",
+ "MyStep1",
+ StringSetResult.create(ImmutableSet.of("bigtable", "spanner")),
+ isCommitted),
+ metricsResult(
+ NAMESPACE,
+ "sideinputs",
+ "MyStep2",
+ StringSetResult.create(ImmutableSet.of("sql", "bigtable")),
+ isCommitted)));
+ }
+
private static void assertDistributionMetrics(MetricQueryResults metrics, boolean isCommitted) {
assertThat(
metrics.getDistributions(),
@@ -458,5 +518,6 @@ private static void assertAllMetrics(MetricQueryResults metrics, boolean isCommi
assertCounterMetrics(metrics, isCommitted);
assertDistributionMetrics(metrics, isCommitted);
assertGaugeMetrics(metrics, isCommitted);
+ assertStringSetMetrics(metrics, isCommitted);
}
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java
new file mode 100644
index 000000000000..85c819b4a9cb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/StringSetResultTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets.SetView;
+import org.junit.Test;
+
+public class StringSetResultTest {
+
+ @Test
+ public void getStringSet() {
+ // Test that getStringSet gives an immutable set
+ HashSet initialSet = new HashSet<>(Arrays.asList("ab", "cd"));
+ Set stringSetResultSet = StringSetResult.create(initialSet).getStringSet();
+ assertEquals(initialSet, stringSetResultSet);
+ assertThrows(UnsupportedOperationException.class, () -> stringSetResultSet.add("should-fail"));
+ }
+
+ @Test
+ public void create() {
+ // Test that create makes an immutable copy of the given set
+ HashSet modifiableSet = new HashSet<>(Arrays.asList("ab", "cd"));
+ StringSetResult stringSetResult = StringSetResult.create(modifiableSet);
+ // change the initial set.
+ modifiableSet.add("ef");
+ SetView difference = Sets.difference(modifiableSet, stringSetResult.getStringSet());
+ assertEquals(1, difference.size());
+ assertEquals("ef", difference.iterator().next());
+ assertTrue(Sets.difference(stringSetResult.getStringSet(), modifiableSet).isEmpty());
+ }
+
+ @Test
+ public void empty() {
+ // Test empty returns an immutable set
+ StringSetResult empptyStringSetResult = StringSetResult.empty();
+ assertTrue(empptyStringSetResult.getStringSet().isEmpty());
+ assertThrows(
+ UnsupportedOperationException.class,
+ () -> empptyStringSetResult.getStringSet().add("should-fail"));
+ }
+}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
index 5d856ee63063..bcd243ba746d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java
@@ -42,6 +42,7 @@
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -216,6 +217,14 @@ public Gauge getGauge(MetricName metricName) {
return tracker.metricsContainerRegistry.getUnboundContainer().getGauge(metricName);
}
+ @Override
+ public StringSet getStringSet(MetricName metricName) {
+ if (tracker.currentState != null) {
+ return tracker.currentState.metricsContainer.getStringSet(metricName);
+ }
+ return tracker.metricsContainerRegistry.getUnboundContainer().getStringSet(metricName);
+ }
+
@Override
public Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
if (tracker.currentState != null) {
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java
index c2fd308205a1..1f4341860295 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ExecutionStateSamplerTest.java
@@ -42,10 +42,12 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.metrics.StringSet;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Duration;
import org.junit.After;
@@ -65,6 +67,8 @@ public class ExecutionStateSamplerTest {
private static final Distribution TEST_USER_DISTRIBUTION =
Metrics.distribution("foo", "distribution");
private static final Gauge TEST_USER_GAUGE = Metrics.gauge("foo", "gauge");
+
+ private static final StringSet TEST_USER_STRING_SET = Metrics.stringSet("foo", "stringset");
private static final Histogram TEST_USER_HISTOGRAM =
new DelegatingHistogram(
MetricName.named("foo", "histogram"), HistogramData.LinearBuckets.of(0, 100, 1), false);
@@ -375,12 +379,14 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep
TEST_USER_COUNTER.inc();
TEST_USER_DISTRIBUTION.update(2);
TEST_USER_GAUGE.set(3);
+ TEST_USER_STRING_SET.add("ab");
TEST_USER_HISTOGRAM.update(4);
state.deactivate();
TEST_USER_COUNTER.inc(11);
TEST_USER_DISTRIBUTION.update(12);
TEST_USER_GAUGE.set(13);
+ TEST_USER_STRING_SET.add("cd");
TEST_USER_HISTOGRAM.update(14);
TEST_USER_HISTOGRAM.update(14);
@@ -411,6 +417,14 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep
.getGauge(TEST_USER_GAUGE.getName())
.getCumulative()
.value());
+ assertEquals(
+ ImmutableSet.of("ab"),
+ tracker
+ .getMetricsContainerRegistry()
+ .getContainer("ptransformId")
+ .getStringSet(TEST_USER_STRING_SET.getName())
+ .getCumulative()
+ .stringSet());
assertEquals(
1L,
(long)
@@ -449,6 +463,14 @@ public void testCountersReturnedAreBasedUponCurrentExecutionState() throws Excep
.getGauge(TEST_USER_GAUGE.getName())
.getCumulative()
.value());
+ assertEquals(
+ ImmutableSet.of("cd"),
+ tracker
+ .getMetricsContainerRegistry()
+ .getUnboundContainer()
+ .getStringSet(TEST_USER_STRING_SET.getName())
+ .getCumulative()
+ .stringSet());
assertEquals(
2L,
(long)