Skip to content

Commit

Permalink
Add support for StringSet metric in Java SDK. (#31789)
Browse files Browse the repository at this point in the history
Add support for StringSet metric in Java SDK to track set of unique string as metric.
addresses #31788

* Add support for StringSet metric in Java SDK to track set of unique string as metric.

* Fix compilation and tests

* Add support for StringSet in PortableRunner and fix some spotless java checks

* Add support for StringSet in JetRunner

* Fix precommit errors

* Fixes for review comments

* Other fixes

* Fixes for spotless java

* Fix a couple of tests.

* Null-containing sets don't need to be tested as they can no longer be constructed.
* Use vendered guava.

* unused imports

---------

Co-authored-by: Robert Bradshaw <[email protected]>
  • Loading branch information
rohitsinha54 and robertwb authored Jul 8, 2024
1 parent 746f3c5 commit de4645d
Show file tree
Hide file tree
Showing 43 changed files with 1,522 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,14 @@ message MonitoringInfoTypeUrns {
PROGRESS_TYPE = 10 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:progress:v1"];

// Represents a set of strings.
//
// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
* {@link Iterable}s of counters, distributions, and gauges, and serves queries by applying {@link
* org.apache.beam.sdk.metrics.MetricsFilter}s linearly to them.
* {@link Iterable}s of counters, distributions, gauges, and stringsets, and serves queries by
* applying {@link org.apache.beam.sdk.metrics.MetricsFilter}s linearly to them.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -40,14 +41,17 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges) {
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
}

@Override
Expand All @@ -56,6 +60,8 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
Iterables.filter(counters, counter -> MetricFiltering.matches(filter, counter.getKey())),
Iterables.filter(
distributions, distribution -> MetricFiltering.matches(filter, distribution.getKey())),
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())));
Iterables.filter(gauges, gauge -> MetricFiltering.matches(filter, gauge.getKey())),
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
public abstract class MetricUpdates {

public static final MetricUpdates EMPTY =
MetricUpdates.create(
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
* Representation of a single metric update.
Expand All @@ -52,25 +54,33 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {
}
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates()) && Iterables.isEmpty(distributionUpdates());
}

/** All of the counter updates. */
/** All the counter updates. */
public abstract Iterable<MetricUpdate<Long>> counterUpdates();

/** All of the distribution updates. */
/** All the distribution updates. */
public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates();

/** All of the gauges updates. */
/** All the gauges updates. */
public abstract Iterable<MetricUpdate<GaugeData>> gaugeUpdates();

/** All the sets updates. */
public abstract Iterable<MetricUpdate<StringSetData>> stringSetUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates) {
return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates, gaugeUpdates);
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
public boolean isEmpty() {
return Iterables.isEmpty(counterUpdates())
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);

Expand Down Expand Up @@ -123,6 +125,7 @@ public void reset() {
distributions.forEachValue(DistributionCell::reset);
gauges.forEachValue(GaugeCell::reset);
histograms.forEachValue(HistogramCell::reset);
stringSets.forEachValue(StringSetCell::reset);
}

/**
Expand Down Expand Up @@ -193,6 +196,23 @@ public GaugeCell getGauge(MetricName metricName) {
return gauges.tryGet(metricName);
}

/**
* Return a {@code StringSetCell} named {@code metricName}. If it doesn't exist, create a {@code
* Metric} with the specified name.
*/
@Override
public StringSetCell getStringSet(MetricName metricName) {
return stringSets.get(metricName);
}

/**
* Return a {@code StringSetCell} named {@code metricName}. If it doesn't exist, return {@code
* null}.
*/
public @Nullable StringSetCell tryGetStringSet(MetricName metricName) {
return stringSets.tryGet(metricName);
}

private <UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
Expand All @@ -212,7 +232,10 @@ ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT
*/
public MetricUpdates getUpdates() {
return MetricUpdates.create(
extractUpdates(counters), extractUpdates(distributions), extractUpdates(gauges));
extractUpdates(counters),
extractUpdates(distributions),
extractUpdates(gauges),
extractUpdates(stringSets));
}

/** @return The MonitoringInfo metadata from the metric. */
Expand Down Expand Up @@ -395,13 +418,14 @@ private String getShortId(
}

/**
* Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as
* Mark all the updates that were retrieved with the latest call to {@link #getUpdates()} as
* committed.
*/
public void commitUpdates() {
counters.forEachValue(counter -> counter.getDirty().afterCommit());
distributions.forEachValue(distribution -> distribution.getDirty().afterCommit());
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit());
}

private <UserT extends Metric, UpdateT, CellT extends MetricCell<UpdateT>>
Expand All @@ -423,7 +447,8 @@ public MetricUpdates getCumulative() {
return MetricUpdates.create(
extractCumulatives(counters),
extractCumulatives(distributions),
extractCumulatives(gauges));
extractCumulatives(gauges),
extractCumulatives(stringSets));
}

/** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
Expand All @@ -432,6 +457,7 @@ public void update(MetricsContainerImpl other) {
updateDistributions(distributions, other.distributions);
updateGauges(gauges, other.gauges);
updateHistograms(histograms, other.histograms);
updateStringSets(stringSets, other.stringSets);
}

private void updateForSumInt64Type(MonitoringInfo monitoringInfo) {
Expand Down Expand Up @@ -502,21 +528,28 @@ private void updateHistograms(
updates.forEach((key, value) -> current.get(key).update(value));
}

private void updateStringSets(
MetricsMap<MetricName, StringSetCell> current,
MetricsMap<MetricName, StringSetCell> updates) {
updates.forEach((key, value) -> current.get(key).update(value.getCumulative()));
}

@Override
public boolean equals(@Nullable Object object) {
if (object instanceof MetricsContainerImpl) {
MetricsContainerImpl metricsContainerImpl = (MetricsContainerImpl) object;
return Objects.equals(stepName, metricsContainerImpl.stepName)
&& Objects.equals(counters, metricsContainerImpl.counters)
&& Objects.equals(distributions, metricsContainerImpl.distributions)
&& Objects.equals(gauges, metricsContainerImpl.gauges);
&& Objects.equals(gauges, metricsContainerImpl.gauges)
&& Objects.equals(stringSets, metricsContainerImpl.stringSets);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(stepName, counters, distributions, gauges);
return Objects.hash(stepName, counters, distributions, gauges, stringSets);
}

/**
Expand Down Expand Up @@ -588,6 +621,16 @@ public String getCumulativeString(@Nullable Set<String> allowedMetricUrns) {
}
message.append("\n");
}
for (Map.Entry<MetricName, StringSetCell> cell : stringSets.entries()) {
if (!matchMetric(cell.getKey(), allowedMetricUrns)) {
continue;
}
message.append(cell.getKey().toString());
message.append(" = ");
StringSetData data = cell.getValue().getCumulative();
message.append(data.stringSet().toString());
message.append("\n");
}
return message.toString();
}

Expand Down Expand Up @@ -628,6 +671,10 @@ public static MetricsContainerImpl deltaContainer(
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}
for (Map.Entry<MetricName, StringSetCell> cell : curr.stringSets.entries()) {
// Simply take the most recent value for stringSets, no need to count deltas.
deltaContainer.stringSets.get(cell.getKey()).update(cell.getValue().getCumulative());
}
return deltaContainer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public static MetricResults asMetricResults(
Map<MetricKey, MetricResult<Long>> counters = new HashMap<>();
Map<MetricKey, MetricResult<DistributionData>> distributions = new HashMap<>();
Map<MetricKey, MetricResult<GaugeData>> gauges = new HashMap<>();
Map<MetricKey, MetricResult<StringSetData>> sets = new HashMap<>();

attemptedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -144,6 +145,7 @@ public static MetricResults asMetricResults(
mergeAttemptedResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeAttemptedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
mergeAttemptedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
});
committedMetricsContainers.forEachMetricContainer(
container -> {
Expand All @@ -152,6 +154,7 @@ public static MetricResults asMetricResults(
mergeCommittedResults(
distributions, cumulative.distributionUpdates(), DistributionData::combine);
mergeCommittedResults(gauges, cumulative.gaugeUpdates(), GaugeData::combine);
mergeCommittedResults(sets, cumulative.stringSetUpdates(), StringSetData::combine);
});

return new DefaultMetricResults(
Expand All @@ -161,6 +164,9 @@ public static MetricResults asMetricResults(
.collect(toList()),
gauges.values().stream()
.map(result -> result.transform(GaugeData::extractResult))
.collect(toList()),
sets.values().stream()
.map(result -> result.transform(StringSetData::extractResult))
.collect(toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public static final class TypeUrns {
public static final String BOTTOM_N_INT64_TYPE = "beam:metrics:bottom_n_int64:v1";
public static final String BOTTOM_N_DOUBLE_TYPE = "beam:metrics:bottom_n_double:v1";
public static final String PROGRESS_TYPE = "beam:metrics:progress:v1";
public static final String SET_STRING_TYPE = "beam:metrics:set_string:v1";

static {
// Validate that compile time constants match the values stored in the protos.
Expand All @@ -187,6 +188,7 @@ public static final class TypeUrns {
checkArgument(
BOTTOM_N_DOUBLE_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.BOTTOM_N_DOUBLE_TYPE)));
checkArgument(PROGRESS_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.PROGRESS_TYPE)));
checkArgument(SET_STRING_TYPE.equals(getUrn(MonitoringInfoTypeUrns.Enum.SET_STRING_TYPE)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;

/** A set of functions used to encode and decode common monitoring info types. */
public class MonitoringInfoEncodings {
private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
private static final Coder<Double> DOUBLE_CODER = DoubleCoder.of();
private static final IterableCoder<String> STRING_SET_CODER =
IterableCoder.of(StringUtf8Coder.of());

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#DISTRIBUTION_INT64_TYPE}. */
public static ByteString encodeInt64Distribution(DistributionData data) {
Expand Down Expand Up @@ -98,6 +104,26 @@ public static GaugeData decodeInt64Gauge(ByteString payload) {
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */
public static ByteString encodeStringSet(StringSetData data) {
try (ByteStringOutputStream output = new ByteStringOutputStream()) {
STRING_SET_CODER.encode(data.stringSet(), output);
return output.toByteString();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Decodes from {@link MonitoringInfoConstants.TypeUrns#SET_STRING_TYPE}. */
public static StringSetData decodeStringSet(ByteString payload) {
try (InputStream input = payload.newInput()) {
Set<String> elements = Sets.newHashSet(STRING_SET_CODER.decode(input));
return StringSetData.create(elements);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#SUM_INT64_TYPE}. */
public static ByteString encodeInt64Counter(long value) {
ByteStringOutputStream output = new ByteStringOutputStream();
Expand Down
Loading

0 comments on commit de4645d

Please sign in to comment.