diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
index feb763fb5fed..d45686494576 100644
--- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
+++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/metrics.proto
@@ -352,7 +352,6 @@ message MonitoringInfoSpecs {
]
}];
- //import com.google.api.services.dataflow.model.PerWorkerMetrics;
API_REQUEST_LATENCIES = 20 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_latencies:v1",
type: "beam:metrics:histogram_int64:v1",
@@ -587,9 +586,7 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];
- // Encoding: ...
- // - iter: beam:coder:iterable:v1
- // - valueX: beam:coder:stringutf8:v1
+ // Represents histograms
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];
diff --git a/runners/core-java/build.gradle b/runners/core-java/build.gradle
index 4898bcdc401e..639ccbed86f1 100644
--- a/runners/core-java/build.gradle
+++ b/runners/core-java/build.gradle
@@ -38,29 +38,21 @@ test {
}
}
-// def google_api_services_dataflow = library.java.google_api_services_dataflow
-
dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
+ implementation library.java.google_api_services_dataflow
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
- // implementation library.java.proto_google_common_protos
- implementation library.java.google_cloud_dataflow_java_proto_library_all
-
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_api
testRuntimeOnly library.java.slf4j_simple
- provided(library.java.google_api_services_dataflow)
- provided library.java.google_cloud_dataflow_java_proto_library_all
- testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation(library.java.google_api_services_dataflow)
- implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java
index eeda5f31707e..f45dd154eb9e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/DefaultMetricResults.java
@@ -28,8 +28,6 @@
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
@@ -40,7 +38,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DefaultMetricResults extends MetricResults {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricResults.class);
private final Iterable> counters;
private final Iterable> distributions;
@@ -54,7 +51,6 @@ public DefaultMetricResults(
Iterable> gauges,
Iterable> stringSets,
Iterable> perWorkerHistograms) {
- LOG.info("xxx does this get here? DefaultMetricResults ");
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
index 14972156e079..b0da2c6b5993 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/HistogramCell.java
@@ -62,7 +62,6 @@ public void update(double value) {
dirty.afterModification();
}
- /** Update it by another Histogram Data. */
@Override
public void update(HistogramData data) {
this.value.update(data);
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java
index cebc70e717c0..f3d27937763f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LabeledMetrics.java
@@ -52,13 +52,4 @@ public static Histogram histogram(
boolean processWideContainer) {
return new DelegatingHistogram(metricName, bucketType, processWideContainer);
}
-
- public static Histogram histogram(
- MonitoringInfoMetricName metricName,
- HistogramData.BucketType bucketType,
- boolean processWideContainer,
- boolean perWorkerHistogram) {
- return new DelegatingHistogram(
- metricName, bucketType, processWideContainer, perWorkerHistogram);
- }
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LockFreeHistogram.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LockFreeHistogram.java
deleted file mode 100644
index 884c535c2632..000000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/LockFreeHistogram.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.metrics;
-
-import com.google.auto.value.AutoValue;
-import com.google.auto.value.extension.memoized.Memoized;
-import java.io.Serializable;
-import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLongArray;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.metrics.Histogram;
-import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.util.HistogramData;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.ImmutableLongArray;
-
-/**
- * A lock free implementation of {@link org.apache.beam.sdk.metrics.Histogram}. This class supports
- * extracting delta updates with the {@link #getSnapshotAndReset} method.
- */
-@ThreadSafe
-@Internal
-public final class LockFreeHistogram implements Histogram {
- private final HistogramData.BucketType bucketType;
- private final AtomicLongArray buckets;
- private final MetricName name;
- private final AtomicReference underflowStatistic;
- private final AtomicReference overflowStatistic;
-
- /**
- * Whether this histogram has updates that have not been extracted by {@code getSnapshotAndReset}.
- * This values should be flipped to true AFTER recording a value, and flipped to false BEFORE
- * extracting a snapshot. This ensures that recorded values will always be seen by a future {@code
- * getSnapshotAndReset} call.
- */
- private final AtomicBoolean dirty;
-
- /** Create a histogram. */
- public LockFreeHistogram(MetricName name, HistogramData.BucketType bucketType) {
- this.name = name;
- this.bucketType = bucketType;
- this.buckets = new AtomicLongArray(bucketType.getNumBuckets());
- this.underflowStatistic =
- new AtomicReference(OutlierStatistic.EMPTY);
- this.overflowStatistic =
- new AtomicReference(OutlierStatistic.EMPTY);
- this.dirty = new AtomicBoolean(false);
- }
-
- /**
- * Represents the sum and mean of a collection of numbers. Used to represent the
- * underflow/overflow statistics of a histogram.
- */
- @AutoValue
- public abstract static class OutlierStatistic implements Serializable {
- abstract double sum();
-
- public abstract long count();
-
- public static final OutlierStatistic EMPTY = create(0, 0);
-
- public static OutlierStatistic create(double sum, long count) {
- return new AutoValue_LockFreeHistogram_OutlierStatistic(sum, count);
- }
-
- public OutlierStatistic combine(double value) {
- return create(sum() + value, count() + 1);
- }
-
- public double mean() {
- if (count() == 0) {
- return 0;
- }
- return sum() / count();
- }
- }
-
- /**
- * The snapshot of a histogram. The snapshot contains the overflow/underflow statistic, number of
- * values recorded in each bucket, and the BucketType of the underlying histogram.
- */
- @AutoValue
- public abstract static class Snapshot {
- public abstract OutlierStatistic underflowStatistic();
-
- public abstract OutlierStatistic overflowStatistic();
-
- public abstract ImmutableLongArray buckets();
-
- public abstract HistogramData.BucketType bucketType();
-
- public static Snapshot create(
- OutlierStatistic underflowStatistic,
- OutlierStatistic overflowStatistic,
- ImmutableLongArray buckets,
- HistogramData.BucketType bucketType) {
- return new AutoValue_LockFreeHistogram_Snapshot(
- underflowStatistic, overflowStatistic, buckets, bucketType);
- }
-
- @Memoized
- public long totalCount() {
- long count = 0;
- count += underflowStatistic().count();
- count += overflowStatistic().count();
- count += buckets().stream().sum();
-
- return count;
- }
- }
-
- /**
- * Extract a delta update of this histogram. Update represents values that have been recorded in
- * this histogram since the last time this method was called.
- *
- *
If this histogram is being updated concurrent to this method, then the returned snapshot is
- * not guarenteed to contain those updates. However, those updates are not dropped and will be
- * represented in a future call to this method.
- *
- *
If this histogram has not been updated since the last call to this method, an empty optional
- * is returned.
- */
- public Optional getSnapshotAndReset() {
- if (!dirty.getAndSet(false)) {
- return Optional.empty();
- }
-
- ImmutableLongArray.Builder bucketsSnapshotBuilder =
- ImmutableLongArray.builder(buckets.length());
- for (int i = 0; i < buckets.length(); i++) {
- bucketsSnapshotBuilder.add(buckets.getAndSet(i, 0));
- }
- OutlierStatistic overflowSnapshot = overflowStatistic.getAndSet(OutlierStatistic.EMPTY);
- OutlierStatistic underflowSnapshot = underflowStatistic.getAndSet(OutlierStatistic.EMPTY);
-
- return Optional.of(
- Snapshot.create(
- underflowSnapshot, overflowSnapshot, bucketsSnapshotBuilder.build(), bucketType));
- }
-
- @Override
- public MetricName getName() {
- return name;
- }
-
- private void updateInternal(double value) {
- double rangeTo = bucketType.getRangeTo();
- double rangeFrom = bucketType.getRangeFrom();
- if (value >= rangeTo) {
- recordTopRecordsValue(value);
- } else if (value < rangeFrom) {
- recordBottomRecordsValue(value);
- } else {
- recordInBoundsValue(value);
- }
- }
-
- @Override
- public void update(double value) {
- updateInternal(value);
- dirty.set(true);
- }
-
- @Override
- public void update(double... values) {
- for (double value : values) {
- updateInternal(value);
- }
- dirty.set(true);
- }
-
- /** Record a inbounds value to the appropriate bucket. */
- private void recordInBoundsValue(double value) {
- int index = bucketType.getBucketIndex(value);
- if (index < 0 || index >= bucketType.getNumBuckets()) {
- return;
- }
-
- buckets.getAndIncrement(index);
- }
-
- /**
- * Record a new value in {@code overflowStatistic}. This method should only be called when a
- * Histogram is recording a value greater than the upper bound of it's largest bucket.
- *
- * @param value
- */
- private void recordTopRecordsValue(double value) {
- OutlierStatistic original;
- do {
- original = overflowStatistic.get();
- } while (!overflowStatistic.compareAndSet(original, original.combine(value)));
- }
-
- /**
- * Record a new value in {@code underflowStatistic}. This method should only be called when a
- * Histogram is recording a value smaller than the lowerbound bound of it's smallest bucket.
- */
- private void recordBottomRecordsValue(double value) {
- OutlierStatistic original;
- do {
- original = underflowStatistic.get();
- } while (!underflowStatistic.compareAndSet(original, original.combine(value)));
- }
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
index 0838463d0fd0..eb3437422b17 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUpdates.java
@@ -78,7 +78,6 @@ public static MetricUpdates create(
Iterable> gaugeUpdates,
Iterable> stringSetUpdates,
Iterable> perWorkerHistogramsUpdates) {
- // System.out.println("xxx metric create");
return new AutoValue_MetricUpdates(
counterUpdates,
distributionUpdates,
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 1c15774c36e4..e4ae5d196159 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -59,8 +59,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Holds the metrics for a single step and uses metric cells that allow extracting the cumulative
@@ -79,8 +77,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class MetricsContainerImpl implements Serializable, MetricsContainer {
- private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerImpl.class);
-
protected final @Nullable String stepName;
private final boolean isProcessWide;
@@ -92,28 +88,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {
private MetricsMap gauges = new MetricsMap<>(GaugeCell::new);
- // Should it be a cell Instead?
- // Can this be a regular histogram instead of a cell'? see
- // dirty state acts as being lock free, commits only non dirty metrics.
- // also of type DISTRIBUTION_INT64_TYPE
- // refactor to use Lock free histograms? later?
private MetricsMap, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);
private MetricsMap stringSets = new MetricsMap<>(StringSetCell::new);
- // assume the same bucket type?
private MetricsMap, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);
private MetricsContainerImpl(@Nullable String stepName, boolean isProcessWide) {
- LOG.info("xxx create metric container {}: isProcessWide {}", stepName, isProcessWide);
this.stepName = stepName;
this.isProcessWide = isProcessWide;
}
- // private static boolean enablePerWorkerMetrics = true; // default should be false
-
/**
* Create a new {@link MetricsContainerImpl} associated with the given {@code stepName}. If
* stepName is null, this MetricsContainer is not bound to a step.
@@ -127,7 +114,6 @@ public MetricsContainerImpl(@Nullable String stepName) {
* collecting processWide metrics for HarnessMonitoringInfoRequest/Response.
*/
public static MetricsContainerImpl createProcessWideContainer() {
- LOG.info("xxx create createProcessWideContainer");
return new MetricsContainerImpl(null, true);
}
@@ -171,7 +157,6 @@ public CounterCell getCounter(MetricName metricName) {
*/
@Override
public DistributionCell getDistribution(MetricName metricName) {
- // LOG.info("xxx stepName {}, distribution metric {}", stepName, metricName.getName());
return distributions.get(metricName);
}
@@ -182,16 +167,8 @@ public DistributionCell getDistribution(MetricName metricName) {
@Override
public HistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
- // LOG.info("xxx stepName {}, getPerWorkerHistogram metric {}", stepName, metricName.getName());
- // if not enabled, return a no op container from parent class
- // if (!enablePerWorkerMetrics) {
- // // will be a no op
- // return null;
- // // return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
- // }
- // return no op histogram instead
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
- return val; // no null chceks for the others
+ return val;
}
/**
@@ -235,7 +212,6 @@ public MetricsMap distributions() {
*/
@Override
public GaugeCell getGauge(MetricName metricName) {
- LOG.info("xxx stepName {}, gauge metric {}", stepName, metricName.getName());
return gauges.get(metricName);
}
@@ -285,13 +261,11 @@ ImmutableList> extractUpdates(MetricsMap>
ImmutableList> extractHistogramUpdates(
MetricsMap, CellT> cells) {
ImmutableList.Builder> updates = ImmutableList.builder();
cells.forEach(
- // metric namd and bucket type pair, then cell
(key, value) -> {
if (value.getDirty().beforeCommit()) {
updates.add(
@@ -318,18 +292,11 @@ public MetricUpdates getUpdates() {
/** @return The MonitoringInfo metadata from the metric. */
private @Nullable SimpleMonitoringInfoBuilder metricToMonitoringMetadata(
MetricKey metricKey, String typeUrn, String userUrn) {
- LOG.info(
- "xxx metricToMonitoringMetadata urn {}, metrics key {} step name {}",
- typeUrn,
- metricKey.metricName().getName(),
- metricKey.stepName());
SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder(true);
builder.setType(typeUrn);
MetricName metricName = metricKey.metricName();
- // LOG.info("xxx metric name {}, step name {}", metricName.getName(), metricKey.stepName());
if (metricName instanceof MonitoringInfoMetricName) {
- LOG.info("xxx metric name is instance of MonitoringInfoMetricName {}", metricName.getName());
MonitoringInfoMetricName monitoringInfoName = (MonitoringInfoMetricName) metricName;
// Represents a specific MonitoringInfo for a specific URN.
builder.setUrn(monitoringInfoName.getUrn());
@@ -337,13 +304,12 @@ public MetricUpdates getUpdates() {
builder.setLabel(e.getKey(), e.getValue());
}
} else { // Represents a user counter.
- // LOG.info("xxx metric name is a user counter {}", metricName.getName());
// Drop if the stepname is not set. All user counters must be
// defined for a PTransform. They must be defined on a container bound to a step.
if (this.stepName == null) {
- // LOG.info("xxx dropping {} since step name is null", metricName.getName());
return null;
}
+
builder
.setUrn(userUrn)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, metricKey.metricName().getNamespace())
@@ -482,7 +448,6 @@ public Iterable getMonitoringInfos() {
}
for (MetricUpdate metricUpdate : metricUpdates.perWorkerHistogramsUpdates()) {
- // LOG.info("xxx look at histograms updates {}", metricUpdate.getUpdate().toString());
MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
@@ -506,7 +471,6 @@ public Iterable getMonitoringInfos() {
}
public Map getMonitoringData(ShortIdMap shortIds) {
- // LOG.info("xxx does getMonitoringData?"); // add per worker metrics here?
ImmutableMap.Builder builder = ImmutableMap.builder();
counters.forEach(
(metricName, counterCell) -> {
@@ -517,30 +481,22 @@ public Map getMonitoringData(ShortIdMap shortIds) {
}
}
});
-
distributions.forEach(
(metricName, distributionCell) -> {
if (distributionCell.getDirty().beforeCommit()) {
String shortId =
getShortId(metricName, this::distributionToMonitoringMetadata, shortIds);
- // LOG.info("xxx does metricName for distributions? {}", metricName.getName()); // add
- // per worker metrics here?
if (shortId != null) {
builder.put(shortId, encodeInt64Distribution(distributionCell.getCumulative()));
}
}
});
- // LOG.info("xxx perWorkerHistograms size: {}", perWorkerHistograms.size());
perWorkerHistograms.forEach(
(metricName, histogramCell) -> {
if (histogramCell.getDirty().beforeCommit()) {
- LOG.info(
- "xxx does metricName for perWorkerHist? {}",
- metricName.getKey().getName()); // add per worker metrics here?
String shortId =
getShortId(metricName.getKey(), this::histogramToMonitoringMetadata, shortIds);
- LOG.info("xxx short id {}", shortId);
if (shortId != null) {
builder.put(shortId, encodeInt64Histogram(histogramCell.getCumulative()));
}
@@ -596,14 +552,12 @@ private String getShortId(
* committed.
*/
public void commitUpdates() {
- // LOG.info("xxx does is commitUpdates?"); // add per worker metrics here?
counters.forEachValue(counter -> counter.getDirty().afterCommit());
distributions.forEachValue(distribution -> distribution.getDirty().afterCommit());
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit());
perWorkerHistograms.forEachValue(
histogram -> {
- LOG.info("xxx commit histogram");
histogram.getDirty().afterCommit();
});
}
@@ -625,7 +579,6 @@ ImmutableList> extractHistogramCumulatives(
ImmutableList.Builder> updates = ImmutableList.builder();
cells.forEach(
(key, value) -> {
- LOG.info("xxx update histogram");
UpdateT update = checkNotNull(value.getCumulative());
updates.add(MetricUpdate.create(MetricKey.create(stepName, key.getKey()), update));
});
@@ -662,7 +615,6 @@ private void updateForSumInt64Type(MonitoringInfo monitoringInfo) {
private void updateForDistributionInt64Type(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
- LOG.info("xxx metricName distribution {}", metricName);
Distribution distribution = getDistribution(metricName);
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
distribution.update(data.sum(), data.count(), data.min(), data.max());
@@ -670,16 +622,10 @@ private void updateForDistributionInt64Type(MonitoringInfo monitoringInfo) {
private void updateForPerWorkerHistogramInt64(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
- LOG.info("xxx metricName {}", metricName);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
- Histogram histogram =
- getPerWorkerHistogram(metricName, buckets); // add flag, based on what kind of hist it is
+ Histogram histogram = getPerWorkerHistogram(metricName, buckets);
HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload());
histogram.update(data);
- LOG.info(
- "xxx histogram decoded payload {} from {}",
- monitoringInfo.getPayload(),
- histogram.toString());
}
private void updateForLatestInt64Type(MonitoringInfo monitoringInfo) {
@@ -696,9 +642,7 @@ private void updateForStringSetType(MonitoringInfo monitoringInfo) {
/** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */
public void update(Iterable monitoringInfos) {
- LOG.info("xxx metrics size:");
for (MonitoringInfo monitoringInfo : monitoringInfos) {
- LOG.info("xxx update metric type: {}", monitoringInfo.getType());
if (monitoringInfo.getPayload().isEmpty()) {
return;
}
@@ -719,7 +663,6 @@ public void update(Iterable monitoringInfos) {
updateForStringSetType(monitoringInfo);
break;
- // Per worker histogram , de mangle metrics in backend?
case PER_WORKER_HISTOGRAM_TYPE:
updateForPerWorkerHistogramInt64(monitoringInfo);
break;
@@ -897,13 +840,12 @@ public static MetricsContainerImpl deltaContainer(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}
- // treat per worker histograms differently
for (Map.Entry, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
- HistogramData prevValue = prev.histograms.get(cell.getKey()).getCumulative();
+ HistogramData prevValue = prev.perWorkerHistograms.get(cell.getKey()).getCumulative();
HistogramData currValue = cell.getValue().getCumulative();
- HistogramCell deltaValueCell = deltaContainer.histograms.get(cell.getKey());
+ HistogramCell deltaValueCell = deltaContainer.perWorkerHistograms.get(cell.getKey());
deltaValueCell.incBottomBucketCount(
currValue.getBottomBucketCount() - prevValue.getBottomBucketCount());
for (int i = 0; i < bt.getNumBuckets(); i++) {
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
index 62d0f1a39c43..cb74b26ff0bd 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerStepMap.java
@@ -40,8 +40,6 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Metrics containers by step.
@@ -52,7 +50,6 @@ public class MetricsContainerStepMap implements Serializable {
private Map metricsContainers;
private MetricsContainerImpl unboundContainer = new MetricsContainerImpl(null);
- private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerStepMap.class);
public MetricsContainerStepMap() {
this.metricsContainers = new ConcurrentHashMap<>();
@@ -142,13 +139,9 @@ public static MetricResults asMetricResults(
Map> gauges = new HashMap<>();
Map> sets = new HashMap<>();
Map> perWorkerHistograms = new HashMap<>();
- // LOG.info("xxx asMetricresults");
+
attemptedMetricsContainers.forEachMetricContainer(
container -> {
- LOG.info(
- "xxx asMetricResults {} per worker histogram size {}",
- container.stepName,
- container.getPerWorkerHistogram().size());
MetricUpdates cumulative = container.getCumulative();
mergeAttemptedResults(counters, cumulative.counterUpdates(), (l, r) -> l + r);
mergeAttemptedResults(
@@ -160,10 +153,6 @@ public static MetricResults asMetricResults(
});
committedMetricsContainers.forEachMetricContainer(
container -> {
- LOG.info(
- "xxx asMetricResults {} per worker histogram size {}",
- container.stepName,
- container.getPerWorkerHistogram().size());
MetricUpdates cumulative = container.getCumulative();
mergeCommittedResults(counters, cumulative.counterUpdates(), (l, r) -> l + r);
mergeCommittedResults(
@@ -173,10 +162,7 @@ public static MetricResults asMetricResults(
mergeCommittedResults(
perWorkerHistograms, cumulative.perWorkerHistogramsUpdates(), HistogramData::combine);
});
- LOG.info("xxx export results {}", perWorkerHistograms.size());
- perWorkerHistograms
- .values()
- .forEach(hist -> LOG.info("xxx {}", hist.getKey().metricName().getName()));
+
return new DefaultMetricResults(
counters.values(),
distributions.values().stream()
@@ -199,12 +185,7 @@ public Iterable getMonitoringInfos() {
ArrayList monitoringInfos = new ArrayList<>();
forEachMetricContainer(
container -> {
- LOG.info(
- "xxx get getMonitoringInfos {} per worker histogram size {}",
- container.stepName,
- container.getPerWorkerHistogram().size());
for (MonitoringInfo mi : container.getMonitoringInfos()) {
- LOG.info("xxx monitoring info {}", mi.toString());
monitoringInfos.add(mi);
}
});
@@ -215,18 +196,6 @@ public Iterable getMonitoringInfos() {
public Map getMonitoringData(ShortIdMap shortIds) {
// Extract user metrics and store as MonitoringInfos.
ImmutableMap.Builder builder = ImmutableMap.builder();
- // it does get here.
- forEachMetricContainer(
- (container) -> {
- LOG.info(
- "xxx get getMonitoringData {} per worker histogram size {}, distribution size {}",
- container.stepName,
- container.getPerWorkerHistogram().size(),
- container.distributions().size());
- container
- .getPerWorkerHistogram()
- .forEach((histogram, data) -> LOG.info("xxx {}", histogram.getKey().getName()));
- });
forEachMetricContainer((container) -> builder.putAll(container.getMonitoringData(shortIds)));
return builder.build();
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
index 79fb5b440b9c..f8751ece2dcf 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsMap.java
@@ -106,8 +106,4 @@ public boolean equals(@Nullable Object object) {
public int hashCode() {
return metrics.hashCode();
}
-
- public int size() {
- return metrics.size();
- }
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
index ff69e0d6a31e..8c6fabeb742b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodings.java
@@ -17,15 +17,20 @@
*/
package org.apache.beam.runners.core.metrics;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.dataflow.model.Base2Exponent;
+import com.google.api.services.dataflow.model.BucketOptions;
+import com.google.api.services.dataflow.model.DataflowHistogramValue;
+import com.google.api.services.dataflow.model.Linear;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
@@ -36,17 +41,11 @@
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.api.services.dataflow.model.DataflowHistogramValue;
-
-// TODO Refactor out DataflowHistogramValue to be runner agnostic.
+// TODO(naireenhussain): Refactor out DataflowHistogramValue to be runner agnostic.
/** A set of functions used to encode and decode common monitoring info types. */
public class MonitoringInfoEncodings {
- private static final Logger LOG = LoggerFactory.getLogger(MonitoringInfoEncodings.class);
-
private static final Coder VARINT_CODER = VarLongCoder.of();
private static final Coder DOUBLE_CODER = DoubleCoder.of();
private static final Coder STRING_CODER = StringUtf8Coder.of();
@@ -60,57 +59,54 @@ public static ByteString encodeInt64Distribution(DistributionData data) {
VARINT_CODER.encode(data.sum(), output);
VARINT_CODER.encode(data.min(), output);
VARINT_CODER.encode(data.max(), output);
- } catch (Exception e) {
+ } catch (IOException e) {
throw new RuntimeException(e);
}
return output.toByteString();
}
/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
- // encode specific fields from histogramData in DataflowHistogramValue
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
- LOG.info("Xxx: data {}", inputHistogram.getPercentileString("poll latency", "seconds"));
try {
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.Builder outputHistogram =
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.newBuilder();
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();
- // refactor out different bucket types?
+
+ DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();
+
if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
HistogramData.LinearBuckets buckets =
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
- Linear.Builder linearOptions =
- Linear.newBuilder()
- .setNumberOfBuckets(numberOfBuckets)
- .setWidth(buckets.getWidth())
- .setStart(buckets.getStart());
- outputHistogram.getBucketOptionsBuilder().setLinear(linearOptions);
+ Linear linear = new Linear();
+ linear.setNumberOfBuckets(numberOfBuckets);
+ linear.setWidth(buckets.getWidth());
+ linear.setStart(buckets.getStart());
+ outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
- Base2Exponent.Builder exponentialOptions =
- Base2Exponent.newBuilder().setNumberOfBuckets(numberOfBuckets).setScale(buckets.getScale());
- outputHistogram.getBucketOptionsBuilder().setExponential(exponentialOptions);
- } else { // unsupported type
- // should an error be thrown here?
+ Base2Exponent base2Exp = new Base2Exponent();
+ base2Exp.setNumberOfBuckets(numberOfBuckets);
+ base2Exp.setScale(buckets.getScale());
+ outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
}
- outputHistogram.setCount(inputHistogram.getTotalCount());
- LOG.info("xxx inputHistogram.getBucketType().getNumBuckets() {}", inputHistogram.getBucketType().getNumBuckets());
- for (int i = 0; i < inputHistogram.getBucketType().getNumBuckets(); i++) {
- LOG.info("xxx bucket counts {}, num buckets {}", i, inputHistogram.getBucketType().getNumBuckets());
- // dont count overflow and underflow records
- outputHistogram.addBucketCounts(inputHistogram.getCount(i));
- }
+ outputHistogram2.setCount(inputHistogram.getTotalCount());
- Method[] methods = outputHistogram.getClass().getMethods();
- for (Method method : methods) {
- System.out.println(method.toString());
- }
- LOG.info("Xxx: encoded data {} ", outputHistogram.toString());
+ List bucketCounts = new ArrayList<>();
+
+ Arrays.stream(inputHistogram.getBucketCount())
+ .forEach(
+ val -> {
+ bucketCounts.add(val);
+ });
+
+ outputHistogram2.setBucketCounts(bucketCounts);
- // try with new proto:
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonString = objectMapper.writeValueAsString(outputHistogram2);
- return outputHistogram.build().toByteString();
+ return ByteString.copyFromUtf8(jsonString);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -118,13 +114,39 @@ public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
- // decode to DataflowHistogramValue, then create Histogram Data from it, and pass that along.
try {
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram outputHistogram =
- org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.parseFrom(payload);
- LOG.info("Xxx: data {}, {} ", outputHistogram.toString(), payload);
- return new HistogramData(outputHistogram);
- } catch (Exception e) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
+ DataflowHistogramValue newHist = new DataflowHistogramValue();
+ newHist.setCount(jsonNode.get("count").asLong());
+
+ List bucketCounts = new ArrayList<>();
+ Iterator itr = jsonNode.get("bucketCounts").iterator();
+ while (itr.hasNext()) {
+ Long item = itr.next().asLong();
+ bucketCounts.add(item);
+ }
+ newHist.setBucketCounts(bucketCounts);
+
+ if (jsonNode.get("bucketOptions").has("linear")) {
+ Linear linear = new Linear();
+ JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
+ linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
+ linear.setWidth(linearNode.get("width").asDouble());
+ linear.setStart(linearNode.get("start").asDouble());
+ newHist.setBucketOptions(new BucketOptions().setLinear(linear));
+ } else if (jsonNode.get("bucketOptions").has("exponential")) {
+ Base2Exponent base2Exp = new Base2Exponent();
+ JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
+ base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
+ base2Exp.setScale(expNode.get("scale").asInt());
+ newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
+ } else {
+ throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
+ }
+ return new HistogramData(newHist);
+ } catch (IOException e) {
throw new RuntimeException(e);
}
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java
index 3f25d6810217..29561342ff8a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/HistogramCellTest.java
@@ -64,6 +64,15 @@ public void testEquals() {
Assert.assertEquals(equalCell.hashCode(), cell.hashCode());
}
+ @Test
+ public void testUpdateWithHistogramData() {
+ HistogramCell cell = new HistogramCell(KV.of(MetricName.named("hello", "world"), bucketType));
+ HistogramData data = HistogramData.linear(0, 10, 100);
+ data.record(5, 7, 9);
+ cell.update(data);
+ assertThat(cell.getCumulative(), equalTo(data));
+ }
+
@Test
public void testNotEquals() {
HistogramCell cell = new HistogramCell(KV.of(MetricName.named("hello", "world"), bucketType));
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
index 5b3d71f4873e..84ec603d5be3 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java
@@ -371,6 +371,7 @@ public void testDeltaCounters() {
MetricName gName = MetricName.named("namespace", "gauge");
HistogramData.BucketType bucketType = HistogramData.LinearBuckets.of(0, 2, 5);
MetricName hName = MetricName.named("namespace", "histogram");
+ MetricName pwhName = MetricName.named("namespace", "perWorkerHistogram");
MetricName stringSetName = MetricName.named("namespace", "stringset");
MetricsContainerImpl prevContainer = new MetricsContainerImpl(null);
@@ -383,6 +384,10 @@ public void testDeltaCounters() {
prevContainer.getHistogram(hName, bucketType).update(3);
prevContainer.getHistogram(hName, bucketType).update(20);
+ // Set PerWorkerBucketCounts to [0,1,1,0,0,0,0]
+ prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(1);
+ prevContainer.getPerWorkerHistogram(pwhName, bucketType).update(3);
+
MetricsContainerImpl nextContainer = new MetricsContainerImpl(null);
nextContainer.getCounter(cName).inc(9L);
nextContainer.getGauge(gName).set(8L);
@@ -401,6 +406,10 @@ public void testDeltaCounters() {
nextContainer.getHistogram(hName, bucketType).update(20);
nextContainer.getHistogram(hName, bucketType).update(20);
+ // Set PerWorkerBucketCounts to [1,0,0,0,0,0,1]
+ nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(-1);
+ nextContainer.getPerWorkerHistogram(pwhName, bucketType).update(20);
+
MetricsContainerImpl deltaContainer =
MetricsContainerImpl.deltaContainer(prevContainer, nextContainer);
// Expect counter value: 7 = 9 - 2
@@ -426,6 +435,20 @@ public void testDeltaCounters() {
}
assertEquals(
2, deltaContainer.getHistogram(hName, bucketType).getCumulative().getTopBucketCount());
+
+ // Expect per worker bucket counts: [1,0,0,0,0,0,1]
+ assertEquals(
+ 1,
+ deltaContainer
+ .getPerWorkerHistogram(pwhName, bucketType)
+ .getCumulative()
+ .getBottomBucketCount());
+ assertEquals(
+ 1,
+ deltaContainer
+ .getPerWorkerHistogram(pwhName, bucketType)
+ .getCumulative()
+ .getTopBucketCount());
}
@Test
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
index c4cef7d69c0e..9c6dd756a0ac 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MonitoringInfoEncodingsTest.java
@@ -33,22 +33,32 @@
import static org.junit.Assert.assertEquals;
import java.util.Collections;
+import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link MonitoringInfoEncodings}. */
@RunWith(JUnit4.class)
public class MonitoringInfoEncodingsTest {
+
+ @Rule
+ public ExpectedLogs monitoringInfoCodingsExpectedLogs =
+ ExpectedLogs.none(MonitoringInfoEncodings.class);
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
@Test
public void testInt64DistributionEncoding() {
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
ByteString payload = encodeInt64Distribution(data);
- System.out.println("xxxx " + payload);
+ assertEquals(ByteString.copyFrom(new byte[] {2, 1, 3, 4}), payload);
assertEquals(data, decodeInt64Distribution(payload));
}
@@ -64,18 +74,37 @@ public void testDoubleDistributionEncoding() {
}
@Test
- public void testHistgramInt64Encoding() {
+ public void testHistgramInt64EncodingLinearHist() {
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);
HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(5, 10, 15, 20);
- // LOG.info("Xxx: inputHistogram {}, {} ", inputHistogram.getBoun, payload);
ByteString payload = encodeInt64Histogram(inputHistogram);
- // HistogramData data = inputHistogram.extractResult();
- // System.out.println("xxx data {}" + data);
+
+ assertEquals(inputHistogram, decodeInt64Histogram(payload));
+ }
+
+ @Test
+ public void testHistgramInt64EncodingExpHist() {
+ HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
+ HistogramData inputHistogram = new HistogramData(buckets);
+ inputHistogram.record(2, 4, 8, 16, 32);
+ ByteString payload = encodeInt64Histogram(inputHistogram);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}
+ @Test
+ public void testHistgramInt64EncodingUnsupportedBucket() {
+ thrown.expect(Exception.class);
+ thrown.expectMessage("Unable to parse histogram, bucket is not recognized");
+
+ HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();
+
+ HistogramData inputHistogram = new HistogramData(buckets);
+ inputHistogram.record(2, 4, 8, 16, 32);
+ encodeInt64Histogram(inputHistogram);
+ }
+
@Test
public void testInt64GaugeEncoding() {
GaugeData data = GaugeData.create(1L, new Instant(2L));
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
index 00df20c4ac39..cf06e5b01143 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -91,8 +91,8 @@ public void testApplyCommittedNoFilter() {
MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(15L))),
ImmutableList.of(
MetricUpdate.create(
- MetricKey.create("step1", NAME4),
- StringSetData.create(ImmutableSet.of("ab"))))));
+ MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("ab")))),
+ ImmutableList.of()));
metrics.commitLogical(
bundle1,
MetricUpdates.create(
@@ -106,8 +106,8 @@ public void testApplyCommittedNoFilter() {
MetricUpdate.create(MetricKey.create("step1", NAME4), GaugeData.create(27L))),
ImmutableList.of(
MetricUpdate.create(
- MetricKey.create("step1", NAME4),
- StringSetData.create(ImmutableSet.of("cd"))))));
+ MetricKey.create("step1", NAME4), StringSetData.create(ImmutableSet.of("cd")))),
+ ImmutableList.of()));
MetricQueryResults results = metrics.allMetrics();
assertThat(
@@ -157,6 +157,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() {
MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
ImmutableList.of(),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
@@ -166,6 +167,7 @@ public void testApplyAttemptedCountersQueryOneNamespace() {
MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
ImmutableList.of(),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
@@ -195,6 +197,7 @@ public void testApplyAttemptedQueryCompositeScope() {
MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
ImmutableList.of(),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
@@ -204,6 +207,7 @@ public void testApplyAttemptedQueryCompositeScope() {
MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 18L)),
ImmutableList.of(),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
@@ -233,6 +237,7 @@ public void testPartialScopeMatchingInMetricsQuery() {
MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)),
ImmutableList.of(),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
metrics.updatePhysical(
bundle1,
@@ -242,6 +247,7 @@ public void testPartialScopeMatchingInMetricsQuery() {
MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)),
ImmutableList.of(),
ImmutableList.of(),
+ ImmutableList.of(),
ImmutableList.of()));
MetricQueryResults results =
diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
index 96c0374067cf..5e3319a0f0df 100644
--- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
+++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/CustomMetricQueryResults.java
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.metrics.StringSetResult;
+import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
@@ -82,4 +83,9 @@ public Iterable> getStringSets() {
StringSetResult.create(ImmutableSet.of("ab")),
StringSetResult.create(ImmutableSet.of("cd")));
}
+
+ @Override
+ public Iterable> getPerWorkerHistograms() {
+ return Collections.emptyList();
+ }
}
diff --git a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
index 10e9481d271b..ed7b408926a3 100644
--- a/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
+++ b/runners/extensions-java/metrics/src/test/java/org/apache/beam/runners/extensions/metrics/MetricsHttpSinkTest.java
@@ -94,7 +94,7 @@ public void testWriteMetricsWithCommittedSupported() throws Exception {
+ "\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"committed\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":100},\"name\":{\"name\":\"n3\",\"namespace\":"
- + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd"
+ + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd"
+ "\"]},\"committed\":{\"stringSet\":[\"ab\"]},\"name\":{\"name\":\"n3\","
+ "\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size());
@@ -116,7 +116,7 @@ public void testWriteMetricsWithCommittedUnSupported() throws Exception {
+ "{\"count\":4,\"max\":9,\"mean\":6.25,\"min\":3,\"sum\":25},\"name\":{\"name\":\"n2\""
+ ",\"namespace\":\"ns1\"},\"step\":\"s2\"}],\"gauges\":[{\"attempted\":{\"timestamp\":"
+ "\"1970-01-05T00:04:22.800Z\",\"value\":120},\"name\":{\"name\":\"n3\",\"namespace\":"
- + "\"ns1\"},\"step\":\"s3\"}],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]},"
+ + "\"ns1\"},\"step\":\"s3\"}],\"perWorkerHistograms\":[],\"stringSets\":[{\"attempted\":{\"stringSet\":[\"cd\"]},"
+ "\"name\":{\"name\":\"n3\",\"namespace\":\"ns1\"},\"step\":\"s3\"}]}";
assertEquals("Wrong number of messages sent to HTTP server", 1, messages.size());
assertEquals("Wrong messages sent to HTTP server", expected, messages.get(0));
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index abe7d0d364d3..ae5dfd090b6d 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -98,6 +98,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscribeTransform;
import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -400,6 +401,20 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}
+ boolean hasExperimentEnableKafkaMetrics = false;
+ if (dataflowOptions.getExperiments() != null) {
+ for (String experiment : dataflowOptions.getExperiments()) {
+ if (experiment.startsWith("enable_kafka_metrics")) {
+ hasExperimentEnableKafkaMetrics = true;
+ break;
+ }
+ }
+ }
+
+ if (dataflowOptions.isStreaming() && hasExperimentEnableKafkaMetrics) {
+ KafkaSinkMetrics.setSupportKafkaMetrics(true);
+ }
+
// Adding the Java version to the SDK name for user's and support convenience.
String agentJavaVer = "(JRE 8 environment)";
if (Environments.getJavaVersion() != Environments.JavaVersion.java8) {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 33c92e097868..ff72add83e4d 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -662,10 +662,9 @@ public static void main(String[] args) throws Exception {
enableBigQueryMetrics();
}
- // if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) {
- // KafkaSinkMetrics.setSupportKafkaMetrics(true);
- // }
- KafkaSinkMetrics.setSupportKafkaMetrics(true);
+ if (DataflowRunner.hasExperiment(options, "enable_kafka_metrics")) {
+ KafkaSinkMetrics.setSupportKafkaMetrics(true);
+ }
JvmInitializers.runBeforeProcessing(options);
worker.startStatusPages();
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
index c716f1430ba5..7cc0dc68f7e7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java
@@ -346,11 +346,7 @@ Iterable extractPerWorkerMetricUpdates() {
});
perWorkerHistograms.forEach(
(k, v) -> {
- System.out.println("xxx per worker histogram: " + k.getName());
- // System.out.println("xxx per worker histogram: " + v.toString());
v.getSnapshotAndReset().ifPresent(snapshot -> histograms.put(k, snapshot));
- System.out.println("xxx per worker histogram snapshot: " + histograms.get(k).toString());
-
});
deleteStaleCounters(currentZeroValuedCounters, Instant.now(clock));
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java
index 67cf3280a83c..281f903f2c54 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/FailedRunningPipelineResults.java
@@ -26,6 +26,7 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
+import org.apache.beam.sdk.util.HistogramData;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
@@ -90,6 +91,11 @@ public Iterable> getGauges() {
public Iterable> getStringSets() {
return Collections.emptyList();
}
+
+ @Override
+ public Iterable> getPerWorkerHistograms() {
+ return Collections.emptyList();
+ }
};
}
};
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
index 44681a626cc0..fdedcc0086be 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricResults.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.jet.metrics;
import com.hazelcast.map.IMap;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.concurrent.GuardedBy;
@@ -35,6 +36,7 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
+import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -104,6 +106,7 @@ private static class QueryResults extends MetricQueryResults {
private final Iterable> distributions;
private final Iterable> gauges;
private final Iterable> stringSets;
+ private final Iterable> perWorkerHistograms;
private QueryResults(
Iterable> counters,
@@ -114,6 +117,7 @@ private QueryResults(
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
+ this.perWorkerHistograms = Collections.emptyList(); // not implemented
}
@Override
@@ -135,6 +139,11 @@ public Iterable> getGauges() {
public Iterable> getStringSets() {
return stringSets;
}
+
+ @Override
+ public Iterable> getPerWorkerHistograms() {
+ return perWorkerHistograms;
+ }
}
private static class Counters {
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
index 64455d704c9b..09e19219ed9b 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/metrics/JetMetricsContainer.java
@@ -21,6 +21,7 @@
import com.hazelcast.jet.core.Processor;
import com.hazelcast.map.IMap;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.core.metrics.DistributionData;
@@ -34,6 +35,7 @@
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.StringSet;
+import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
/** Jet specific implementation of {@link MetricsContainer}. */
@@ -152,5 +154,10 @@ public Iterable> gaugeUpdates() {
public Iterable> stringSetUpdates() {
return stringSets;
}
+
+ @Override
+ public Iterable> perWorkerHistogramsUpdates() {
+ return Collections.emptyList(); // not implemented
+ }
}
}
diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 9502eaabbeae..a95667fd0b0f 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -71,7 +71,7 @@ test {
dependencies {
antlr library.java.antlr
- implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
+ provided library.java.google_api_services_dataflow
// antlr is used to generate code from sdks/java/core/src/main/antlr/
permitUnusedDeclared library.java.antlr
// Required to load constants from the model, e.g. max timestamp for global window
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
index e6c3ac6843e3..e657b5483a78 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java
@@ -21,9 +21,6 @@
import java.util.Optional;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.util.HistogramData;
-// import org.slf4j.Logger;
-// import org.slf4j.LoggerFactory;
-import org.apache.beam.sdk.util.Preconditions;
/** Implementation of {@link Histogram} that delegates to the instance for the current context. */
@Internal
@@ -33,8 +30,6 @@ public class DelegatingHistogram implements Metric, Histogram, Serializable {
private final boolean processWideContainer;
private final boolean perWorkerHistogram;
- // private static final Logger LOG = LoggerFactory.getLogger(DelegatingHistogram.class);
-
/**
* Create a {@code DelegatingHistogram} with {@code perWorkerHistogram} set to false.
*
@@ -64,14 +59,6 @@ public DelegatingHistogram(
this.bucketType = bucketType;
this.processWideContainer = processWideContainer;
this.perWorkerHistogram = perWorkerHistogram;
- // What is the container here?
- MetricsContainer container =
- processWideContainer
- ? MetricsEnvironment.getProcessWideContainer()
- : MetricsEnvironment.getCurrentContainer();
- if (container == null) {
- } else {
- }
}
private Optional getHistogram() {
@@ -80,33 +67,23 @@ private Optional getHistogram() {
? MetricsEnvironment.getProcessWideContainer()
: MetricsEnvironment.getCurrentContainer();
if (container == null) {
- // LOG.info("xxx getHistogram container is null {}");
return Optional.empty();
}
if (perWorkerHistogram) {
- // LOG.info("xxx is this null? perWorkerHistogram {}", container.getPerWorkerHistogram(name, bucketType).toString());
return Optional.of(container.getPerWorkerHistogram(name, bucketType));
} else {
- // LOG.info("xxx is this null? histogram {}", container.getHistogram(name, bucketType).toString());
return Optional.of(container.getHistogram(name, bucketType));
}
}
@Override
public void update(double value) {
- // LOG.info("xxx updating histogram in container");// SPAMS logs
getHistogram().ifPresent(histogram -> histogram.update(value));
}
@Override
public void update(double... values) {
- MetricsContainer container =
- this.processWideContainer
- ? MetricsEnvironment.getProcessWideContainer()
- : MetricsEnvironment.getCurrentContainer();
- if (container != null) {
- getHistogram().ifPresent(histogram -> histogram.update(values));
- }
+ getHistogram().ifPresent(histogram -> histogram.update(values));
}
@Override
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java
index a8c4116a30b8..943e00dc11da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Histogram.java
@@ -30,7 +30,7 @@ default void update(double... values) {
update(value);
}
}
-
+
/** Add a histogram to this histogram. Requires underlying implementation to implement this */
- default void update(HistogramData data) {}
+ default void update(HistogramData data) {}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
index 1e604e883cd7..a83c33d2d4f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -18,12 +18,10 @@
package org.apache.beam.sdk.metrics;
import com.google.auto.value.AutoValue;
-
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.util.HistogramData;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
/** The results of a query for metrics. Allows accessing all the metrics that matched the filter. */
@AutoValue
@@ -72,7 +70,7 @@ public final String toString() {
printMetrics("Distributions", getDistributions(), sb);
printMetrics("Gauges", getGauges(), sb);
printMetrics("StringSets", getStringSets(), sb);
- printMetrics("perWorkerHistograms", getPerWorkerHistograms(), sb);
+ // printMetrics("perWorkerHistograms", getPerWorkerHistograms(), sb);
sb.append(")");
return sb.toString();
}
@@ -81,16 +79,18 @@ public static MetricQueryResults create(
Iterable> counters,
Iterable> distributions,
Iterable> gauges,
- Iterable> stringSets,
- Iterable> perWorkerHistograms) {
- return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets, perWorkerHistograms);
+ Iterable> stringSets) {
+ return new AutoValue_MetricQueryResults(
+ counters, distributions, gauges, stringSets, Collections.emptyList());
}
public static MetricQueryResults create(
- Iterable> counters,
- Iterable> distributions,
- Iterable> gauges,
- Iterable> stringSets) {
- return new AutoValue_MetricQueryResults(counters, distributions, gauges, stringSets, Collections.emptyList());
-}
+ Iterable> counters,
+ Iterable> distributions,
+ Iterable> gauges,
+ Iterable> stringSets,
+ Iterable> histogramData) {
+ return new AutoValue_MetricQueryResults(
+ counters, distributions, gauges, stringSets, histogramData);
+ }
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
index 345d0b775b5c..a963015e98a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -16,10 +16,8 @@
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import java.io.Serializable;
-import org.apache.beam.sdk.util.HistogramData;
/**
* The Metrics is a utility class for producing various kinds of metrics for reporting
@@ -53,8 +51,6 @@
*/
public class Metrics {
- // private static final Logger LOG = LoggerFactory.getLogger(Metrics.class);
-
private Metrics() {}
/**
@@ -158,7 +154,6 @@ private DelegatingGauge(MetricName name) {
public void set(long value) {
MetricsContainer container = MetricsEnvironment.getCurrentContainer();
if (container != null) {
- // LOG.info("xxx delegating gauge container name {}", container.stepName);
container.getGauge(name).set(value);
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
index 3ed250cf7089..0c4766bb2c0b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -63,7 +63,6 @@ default Counter getPerWorkerCounter(MetricName metricName) {
* Return the {@link Histogram} that should be used for implementing the given {@code metricName}
* in this container.
*/
- // Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType);
default Histogram getHistogram(MetricName metricName, HistogramData.BucketType bucketType) {
throw new RuntimeException("Histogram metric is not supported yet.");
}
@@ -71,13 +70,10 @@ default Histogram getHistogram(MetricName metricName, HistogramData.BucketType b
* Return the {@link Histogram} that should be used for implementing the given per-worker {@code
* metricName} in this container.
*/
- Histogram getPerWorkerHistogram(
- MetricName metricName, HistogramData.BucketType bucketType) ;
- // default Histogram getPerWorkerHistogram(
- // MetricName metricName, HistogramData.BucketType bucketType) {
- // return NoOpHistogram.getInstance();
- // }
-
+ default Histogram getPerWorkerHistogram(
+ MetricName metricName, HistogramData.BucketType bucketType) {
+ return NoOpHistogram.getInstance();
+ }
/** Return the cumulative values for any metrics in this container as MonitoringInfos. */
default Iterable getMonitoringInfos() {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
index e606a54d3a3b..3421bb4afc85 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -16,6 +16,7 @@
* limitations under the License.
*/
package org.apache.beam.sdk.metrics;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
index dc280e7e0a3a..3baf1532b30f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java
@@ -29,9 +29,6 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Base2Exponent;
-import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram.BucketOptions.Linear;
/**
* A histogram that supports estimated percentile with linear interpolation.
@@ -63,7 +60,7 @@ public class HistogramData implements Serializable {
/**
* Create a histogram.
*
- * @param bucketType a bucket type for a new histogram instance. does all of this need to be sent?
+ * @param bucketType a bucket type for a new histogram instance.
*/
public HistogramData(BucketType bucketType) {
this.bucketType = bucketType;
@@ -77,48 +74,37 @@ public HistogramData(BucketType bucketType) {
this.sumOfSquaredDeviations = 0;
}
- public HistogramData(org.apache.beam.runners.dataflow.worker.windmill.Windmill.Histogram histogramProto) {
- // HistogramData newHist = null;
+ public HistogramData(
+ com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto) {
int numBuckets;
- if(histogramProto.getBucketOptions().hasLinear()){
+ if (histogramProto.getBucketOptions().getLinear() != null) {
double start = histogramProto.getBucketOptions().getLinear().getStart();
double width = histogramProto.getBucketOptions().getLinear().getWidth();
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
this.bucketType = LinearBuckets.of(start, width, numBuckets);
- int idx = 0;
-
this.buckets = new long[bucketType.getNumBuckets()];
- // populate with bucket counts with mean type for now, not used to determine equality
- for (long val: histogramProto.getBucketCountsList()){
- this.buckets[idx] = val; // is this valid?
- if (!(idx == 0 || idx == bucketType.getNumBuckets()-1 )){
- LOG.info("xxx {} {}", val, idx);
- this.numBoundedBucketRecords+= val;
- }
+
+ int idx = 0;
+ for (long val : histogramProto.getBucketCounts()) {
+ this.buckets[idx] = val;
+ this.numBoundedBucketRecords += val;
idx++;
}
- // update with counts
} else {
- // assume exp, add handling for wrong type later
+ // Assume it's a exponential histogram if its not linear
int scale = histogramProto.getBucketOptions().getExponential().getScale();
numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets();
- int idx = 0;
-
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];
- // populate with bucket counts with mean type for now, not used to determine equality
- for (long val: histogramProto.getBucketCountsList()){
- this.buckets[idx] = val; // is this valid?
- if (!(idx == 0 || idx == bucketType.getNumBuckets()-1 )){
- this.numBoundedBucketRecords+= val;
- }
+ int idx = 0;
+ for (long val : histogramProto.getBucketCounts()) {
+ this.buckets[idx] = val;
+ this.numBoundedBucketRecords += val;
idx++;
}
}
- LOG.info("xxx numBoundedBucketRecords when creating from proto {}", numBoundedBucketRecords);
}
-
public BucketType getBucketType() {
return this.bucketType;
}
@@ -137,8 +123,6 @@ public static HistogramData linear(double start, double width, int numBuckets) {
return new HistogramData(LinearBuckets.of(start, width, numBuckets));
}
-
-
/**
* Returns a histogram object with exponential boundaries. The input parameter {@code scale}
* determines a coefficient 'base' which species bucket boundaries.
@@ -263,9 +247,7 @@ public synchronized void record(double value) {
recordBottomRecordsValue(value);
} else {
buckets[bucketType.getBucketIndex(value)]++;
- if (!(bucketType.getBucketIndex(value) == 0 || bucketType.getBucketIndex(value) == buckets.length -1 )){
- numBoundedBucketRecords++;
- }
+ numBoundedBucketRecords++;
}
updateStatistics(value);
}
@@ -317,19 +299,17 @@ public synchronized long getTotalCount() {
return numBoundedBucketRecords + numTopRecords + numBottomRecords;
}
- public HistogramData combine(HistogramData value) {
- // reutrn new hist for now, ignore the old, though this is incorrect.
-
- return value;
- }
-
- // same as HistogramData, but doesn't reset
public HistogramData extractResult() {
HistogramData other = new HistogramData(this.getBucketType());
other.update(this);
return other;
}
+ public HistogramData combine(HistogramData value) {
+ this.update(value);
+ return this;
+ }
+
public synchronized String getPercentileString(String elemType, String unit) {
return String.format(
"Total number of %s: %s, P99: %.0f %s, P90: %.0f %s, P50: %.0f %s",
@@ -351,6 +331,10 @@ public synchronized long getCount(int bucketIndex) {
return buckets[bucketIndex];
}
+ public synchronized long[] getBucketCount() {
+ return buckets;
+ }
+
public synchronized long getTopBucketCount() {
return numTopRecords;
}
@@ -635,15 +619,43 @@ public double getRangeTo() {
// Note: equals() and hashCode() are implemented by the AutoValue.
}
+ @AutoValue
+ public abstract static class UnsupportedBuckets implements BucketType {
+
+ public static UnsupportedBuckets of() {
+ return new AutoValue_HistogramData_UnsupportedBuckets(0);
+ }
+
+ @Override
+ public int getBucketIndex(double value) {
+ return 0;
+ }
+
+ @Override
+ public double getBucketSize(int index) {
+ return 0;
+ }
+
+ @Override
+ public double getAccumulatedBucketSize(int index) {
+ return 0;
+ }
+
+ @Override
+ public double getRangeFrom() {
+ return 0;
+ }
+
+ @Override
+ public double getRangeTo() {
+ return 0;
+ }
+ }
+
@Override
public synchronized boolean equals(@Nullable Object object) {
if (object instanceof HistogramData) {
HistogramData other = (HistogramData) object;
- LOG.info("xxx {}, {}, {}", numBoundedBucketRecords == other.numBoundedBucketRecords, numBoundedBucketRecords, other.numBoundedBucketRecords);
- LOG.info("xxx {}", numTopRecords == other.numTopRecords);
- LOG.info("xxx {}", numBottomRecords == other.numBottomRecords);
- LOG.info("xxx {}", Arrays.equals(buckets, other.buckets));
-
synchronized (other) {
return Objects.equals(bucketType, other.bucketType)
&& numBoundedBucketRecords == other.numBoundedBucketRecords
diff --git a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java
index a3277de7f97b..a76fb59c7d10 100644
--- a/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java
+++ b/sdks/java/extensions/avro/src/test/java/org/apache/beam/sdk/extensions/avro/coders/AvroCoderTest.java
@@ -1,1164 +1,1164 @@
-// /*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-// package org.apache.beam.sdk.extensions.avro.coders;
-
-// import static org.hamcrest.MatcherAssert.assertThat;
-// import static org.hamcrest.Matchers.containsString;
-// import static org.hamcrest.Matchers.equalTo;
-// import static org.junit.Assert.assertArrayEquals;
-// import static org.junit.Assert.assertEquals;
-// import static org.junit.Assert.assertTrue;
-// import static org.junit.Assert.fail;
-
-// import com.esotericsoftware.kryo.Kryo;
-// import com.esotericsoftware.kryo.io.Input;
-// import com.esotericsoftware.kryo.io.Output;
-// import com.esotericsoftware.kryo.serializers.JavaSerializer;
-// import java.io.ByteArrayInputStream;
-// import java.io.ByteArrayOutputStream;
-// import java.io.ObjectInputStream;
-// import java.io.ObjectOutputStream;
-// import java.nio.ByteBuffer;
-// import java.util.ArrayList;
-// import java.util.Collection;
-// import java.util.HashSet;
-// import java.util.LinkedHashMap;
-// import java.util.List;
-// import java.util.Map;
-// import java.util.Objects;
-// import java.util.SortedMap;
-// import java.util.SortedSet;
-// import java.util.TreeMap;
-// import java.util.TreeSet;
-// import org.apache.avro.AvroRuntimeException;
-// import org.apache.avro.Schema;
-// import org.apache.avro.SchemaBuilder;
-// import org.apache.avro.generic.GenericData;
-// import org.apache.avro.generic.GenericRecord;
-// import org.apache.avro.io.DatumReader;
-// import org.apache.avro.reflect.AvroName;
-// import org.apache.avro.reflect.AvroSchema;
-// import org.apache.avro.reflect.ReflectData;
-// import org.apache.avro.reflect.Stringable;
-// import org.apache.avro.reflect.Union;
-// import org.apache.avro.specific.SpecificData;
-// import org.apache.avro.specific.SpecificDatumReader;
-// import org.apache.avro.specific.SpecificRecord;
-// import org.apache.avro.util.Utf8;
-// import org.apache.beam.sdk.coders.Coder.Context;
-// import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-// import org.apache.beam.sdk.coders.DefaultCoder;
-// import org.apache.beam.sdk.coders.SerializableCoder;
-// import org.apache.beam.sdk.extensions.avro.io.AvroDatumFactory;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvro;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroConversion;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroConversionFactory;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroFactory;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestAvroNested;
-// import org.apache.beam.sdk.extensions.avro.schemas.TestEnum;
-// import org.apache.beam.sdk.extensions.avro.schemas.fixed4;
-// import org.apache.beam.sdk.testing.CoderProperties;
-// import org.apache.beam.sdk.testing.InterceptingUrlClassLoader;
-// import org.apache.beam.sdk.testing.NeedsRunner;
-// import org.apache.beam.sdk.testing.PAssert;
-// import org.apache.beam.sdk.testing.TestPipeline;
-// import org.apache.beam.sdk.transforms.Create;
-// import org.apache.beam.sdk.transforms.DoFn;
-// import org.apache.beam.sdk.transforms.ParDo;
-// import org.apache.beam.sdk.util.CoderUtils;
-// import org.apache.beam.sdk.util.InstanceBuilder;
-// import org.apache.beam.sdk.util.SerializableUtils;
-// import org.apache.beam.sdk.values.PCollection;
-// import org.apache.beam.sdk.values.TypeDescriptor;
-// import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
-// import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-// import org.checkerframework.checker.nullness.qual.Nullable;
-// import org.hamcrest.Description;
-// import org.hamcrest.Matcher;
-// import org.hamcrest.Matchers;
-// import org.hamcrest.TypeSafeMatcher;
-// import org.joda.time.DateTime;
-// import org.joda.time.DateTimeZone;
-// import org.joda.time.LocalDate;
-// import org.junit.Rule;
-// import org.junit.Test;
-// import org.junit.experimental.categories.Category;
-// import org.junit.runner.RunWith;
-// import org.junit.runners.JUnit4;
-// import org.objenesis.strategy.StdInstantiatorStrategy;
-
-// /** Tests for {@link AvroCoder}. */
-// @RunWith(JUnit4.class)
-// public class AvroCoderTest {
-
-// public static final DateTime DATETIME_A =
-// new DateTime().withDate(1994, 10, 31).withZone(DateTimeZone.UTC);
-// public static final DateTime DATETIME_B =
-// new DateTime().withDate(1997, 4, 25).withZone(DateTimeZone.UTC);
-// private static final TestAvroNested AVRO_NESTED_SPECIFIC_RECORD = new TestAvroNested(true, 42);
-// private static final TestAvro AVRO_SPECIFIC_RECORD =
-// TestAvroFactory.newInstance(
-// true,
-// 43,
-// 44L,
-// 44.1f,
-// 44.2d,
-// "mystring",
-// ByteBuffer.wrap(new byte[] {1, 2, 3, 4}),
-// new fixed4(new byte[] {1, 2, 3, 4}),
-// new LocalDate(1979, 3, 14),
-// new DateTime().withDate(1979, 3, 14).withTime(1, 2, 3, 4),
-// TestEnum.abc,
-// AVRO_NESTED_SPECIFIC_RECORD,
-// ImmutableList.of(AVRO_NESTED_SPECIFIC_RECORD, AVRO_NESTED_SPECIFIC_RECORD),
-// ImmutableMap.of("k1", AVRO_NESTED_SPECIFIC_RECORD, "k2", AVRO_NESTED_SPECIFIC_RECORD));
-
-// private static final String VERSION_AVRO = Schema.class.getPackage().getImplementationVersion();
-
-// @DefaultCoder(AvroCoder.class)
-// private static class Pojo {
-// public String text;
-// public int count;
-
-// @AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
-// public DateTime timestamp;
-
-// // Empty constructor required for Avro decoding.
-// @SuppressWarnings("unused")
-// public Pojo() {}
-
-// public Pojo(String text, int count, DateTime timestamp) {
-// this.text = text;
-// this.count = count;
-// this.timestamp = timestamp;
-// }
-
-// @Override
-// public boolean equals(@Nullable Object other) {
-// if (this == other) {
-// return true;
-// }
-// if (other == null || getClass() != other.getClass()) {
-// return false;
-// }
-// Pojo that = (Pojo) other;
-// return this.count == that.count
-// && Objects.equals(this.text, that.text)
-// && Objects.equals(this.timestamp, that.timestamp);
-// }
-
-// @Override
-// public int hashCode() {
-// return Objects.hash(text, count, timestamp);
-// }
-
-// @Override
-// public String toString() {
-// return "Pojo{"
-// + "text='"
-// + text
-// + '\''
-// + ", count="
-// + count
-// + ", timestamp="
-// + timestamp
-// + '}';
-// }
-// }
-
-// private static class GetTextFn extends DoFn {
-// @ProcessElement
-// public void processElement(ProcessContext c) {
-// c.output(c.element().text);
-// }
-// }
-
-// @Rule public TestPipeline pipeline = TestPipeline.create();
-
-// @Test
-// public void testAvroCoderEncoding() throws Exception {
-// AvroCoder coder = AvroCoder.of(Pojo.class);
-// CoderProperties.coderSerializable(coder);
-// AvroCoder copy = SerializableUtils.clone(coder);
-
-// Pojo pojo = new Pojo("foo", 3, DATETIME_A);
-// Pojo equalPojo = new Pojo("foo", 3, DATETIME_A);
-// Pojo otherPojo = new Pojo("bar", -19, DATETIME_B);
-// CoderProperties.coderConsistentWithEquals(coder, pojo, equalPojo);
-// CoderProperties.coderConsistentWithEquals(copy, pojo, equalPojo);
-// CoderProperties.coderConsistentWithEquals(coder, pojo, otherPojo);
-// CoderProperties.coderConsistentWithEquals(copy, pojo, otherPojo);
-// }
-
-// /**
-// * Tests that {@link AvroCoder} works around issues in Avro where cache classes might be from the
-// * wrong ClassLoader, causing confusing "Cannot cast X to X" error messages.
-// */
-// @SuppressWarnings("ReturnValueIgnored")
-// @Test
-// public void testTwoClassLoaders() throws Exception {
-// ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
-// ClassLoader loader1 =
-// new InterceptingUrlClassLoader(contextClassLoader, AvroCoderTestPojo.class.getName());
-// ClassLoader loader2 =
-// new InterceptingUrlClassLoader(contextClassLoader, AvroCoderTestPojo.class.getName());
-
-// Class> pojoClass1 = loader1.loadClass(AvroCoderTestPojo.class.getName());
-// Class> pojoClass2 = loader2.loadClass(AvroCoderTestPojo.class.getName());
-
-// Object pojo1 = InstanceBuilder.ofType(pojoClass1).withArg(String.class, "hello").build();
-// Object pojo2 = InstanceBuilder.ofType(pojoClass2).withArg(String.class, "goodbye").build();
-
-// // Confirm incompatibility
-// try {
-// pojoClass2.cast(pojo1);
-// fail("Expected ClassCastException; without it, this test is vacuous");
-// } catch (ClassCastException e) {
-// // g2g
-// }
-
-// // The first coder is expected to populate the Avro SpecificData cache
-// // The second coder is expected to be corrupted if the caching is done wrong.
-// AvroCoder