Skip to content

Commit

Permalink
add ability to encode and decode histogram data to portable runners
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Nov 5, 2024
1 parent 14be0f6 commit 3a39975
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 0 deletions.
2 changes: 2 additions & 0 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
implementation library.java.google_api_services_dataflow
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
Expand All @@ -52,5 +53,6 @@ dependencies {
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.slf4j_api
testImplementation(library.java.google_api_services_dataflow)
testRuntimeOnly library.java.slf4j_simple
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,34 @@
*/
package org.apache.beam.runners.core.metrics;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.dataflow.model.Base2Exponent;
import com.google.api.services.dataflow.model.BucketOptions;
import com.google.api.services.dataflow.model.DataflowHistogramValue;
import com.google.api.services.dataflow.model.Linear;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;

// TODO(naireenhussain): Refactor out DataflowHistogramValue to be runner agnostic, and rename to
// remove Dataflow reference.

/** A set of functions used to encode and decode common monitoring info types. */
public class MonitoringInfoEncodings {
private static final Coder<Long> VARINT_CODER = VarLongCoder.of();
Expand Down Expand Up @@ -163,4 +178,90 @@ public static double decodeDoubleCounter(ByteString payload) {
throw new RuntimeException(e);
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
try {
int numberOfBuckets = inputHistogram.getBucketType().getNumBuckets();

DataflowHistogramValue outputHistogram2 = new DataflowHistogramValue();

if (inputHistogram.getBucketType() instanceof HistogramData.LinearBuckets) {
HistogramData.LinearBuckets buckets =
(HistogramData.LinearBuckets) inputHistogram.getBucketType();
Linear linear = new Linear();
linear.setNumberOfBuckets(numberOfBuckets);
linear.setWidth(buckets.getWidth());
linear.setStart(buckets.getStart());
outputHistogram2.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (inputHistogram.getBucketType() instanceof HistogramData.ExponentialBuckets) {
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) inputHistogram.getBucketType();
Base2Exponent base2Exp = new Base2Exponent();
base2Exp.setNumberOfBuckets(numberOfBuckets);
base2Exp.setScale(buckets.getScale());
outputHistogram2.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else {
throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
}

outputHistogram2.setCount(inputHistogram.getTotalCount());

List<Long> bucketCounts = new ArrayList<>();

Arrays.stream(inputHistogram.getBucketCount())
.forEach(
val -> {
bucketCounts.add(val);
});

outputHistogram2.setBucketCounts(bucketCounts);

ObjectMapper objectMapper = new ObjectMapper();
String jsonString = objectMapper.writeValueAsString(outputHistogram2);

return ByteString.copyFromUtf8(jsonString);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
try {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
JsonNode jsonNode = objectMapper.readTree(payload.toStringUtf8()); // parse afterwards
DataflowHistogramValue newHist = new DataflowHistogramValue();
newHist.setCount(jsonNode.get("count").asLong());

List<Long> bucketCounts = new ArrayList<>();
Iterator<JsonNode> itr = jsonNode.get("bucketCounts").iterator();
while (itr.hasNext()) {
Long item = itr.next().asLong();
bucketCounts.add(item);
}
newHist.setBucketCounts(bucketCounts);

if (jsonNode.get("bucketOptions").has("linear")) {
Linear linear = new Linear();
JsonNode linearNode = jsonNode.get("bucketOptions").get("linear");
linear.setNumberOfBuckets(linearNode.get("numberOfBuckets").asInt());
linear.setWidth(linearNode.get("width").asDouble());
linear.setStart(linearNode.get("start").asDouble());
newHist.setBucketOptions(new BucketOptions().setLinear(linear));
} else if (jsonNode.get("bucketOptions").has("exponential")) {
Base2Exponent base2Exp = new Base2Exponent();
JsonNode expNode = jsonNode.get("bucketOptions").get("exponential");
base2Exp.setNumberOfBuckets(expNode.get("numberOfBuckets").asInt());
base2Exp.setScale(expNode.get("scale").asInt());
newHist.setBucketOptions(new BucketOptions().setExponential(base2Exp));
} else {
throw new RuntimeException("Unable to parse histogram, bucket is not recognized");
}
return new HistogramData(newHist);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,38 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.junit.Assert.assertEquals;

import java.util.Collections;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link MonitoringInfoEncodings}. */
@RunWith(JUnit4.class)
public class MonitoringInfoEncodingsTest {
@Rule
public ExpectedLogs monitoringInfoCodingsExpectedLogs =
ExpectedLogs.none(MonitoringInfoEncodings.class);

@Rule public ExpectedException thrown = ExpectedException.none();

@Test
public void testInt64DistributionEncoding() {
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
Expand Down Expand Up @@ -105,4 +117,36 @@ public void testDoubleCounterEncoding() {
assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload);
assertEquals(1.0, decodeDoubleCounter(payload), 0.001);
}

@Test
public void testHistgramInt64EncodingLinearHist() {
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(5, 10, 15, 20);
ByteString payload = encodeInt64Histogram(inputHistogram);

assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingExpHist() {
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
ByteString payload = encodeInt64Histogram(inputHistogram);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingUnsupportedBucket() {
thrown.expect(Exception.class);
thrown.expectMessage("Unable to parse histogram, bucket is not recognized");

HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
encodeInt64Histogram(inputHistogram);
}
}
1 change: 1 addition & 0 deletions sdks/java/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ dependencies {
// antlr is used to generate code from sdks/java/core/src/main/antlr/
permitUnusedDeclared library.java.antlr
// Required to load constants from the model, e.g. max timestamp for global window
provided library.java.google_api_services_dataflow
shadow project(path: ":model:pipeline", configuration: "shadow")
shadow project(path: ":model:fn-execution", configuration: "shadow")
shadow project(path: ":model:job-management", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,42 @@ public HistogramData(BucketType bucketType) {
this.sumOfSquaredDeviations = 0;
}

/**
* Create a histogram from DataflowHistogramValue proto.
*
* @param histogramProto DataflowHistogramValue proto used to populate stats for the histogram.
*/
public HistogramData(
com.google.api.services.dataflow.model.DataflowHistogramValue histogramProto) {
int numBuckets;
if (histogramProto.getBucketOptions().getLinear() != null) {
double start = histogramProto.getBucketOptions().getLinear().getStart();
double width = histogramProto.getBucketOptions().getLinear().getWidth();
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
this.bucketType = LinearBuckets.of(start, width, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];

int idx = 0;
for (long val : histogramProto.getBucketCounts()) {
this.buckets[idx] = val;
this.numBoundedBucketRecords += val;
idx++;
}
} else {
// Assume it's a exponential histogram if its not linear
int scale = histogramProto.getBucketOptions().getExponential().getScale();
numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets();
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];
int idx = 0;
for (long val : histogramProto.getBucketCounts()) {
this.buckets[idx] = val;
this.numBoundedBucketRecords += val;
idx++;
}
}
}

public BucketType getBucketType() {
return this.bucketType;
}
Expand Down Expand Up @@ -293,6 +329,10 @@ public synchronized long getTopBucketCount() {
return numTopRecords;
}

public synchronized long[] getBucketCount() {
return buckets;
}

public synchronized double getTopBucketMean() {
return numTopRecords == 0 ? 0 : topRecordsSum / numTopRecords;
}
Expand Down Expand Up @@ -573,6 +613,40 @@ public double getRangeTo() {
// Note: equals() and hashCode() are implemented by the AutoValue.
}

// Used for testing unsupported Bucket formats
@AutoValue
public abstract static class UnsupportedBuckets implements BucketType {

public static UnsupportedBuckets of() {
return new AutoValue_HistogramData_UnsupportedBuckets(0);
}

@Override
public int getBucketIndex(double value) {
return 0;
}

@Override
public double getBucketSize(int index) {
return 0;
}

@Override
public double getAccumulatedBucketSize(int index) {
return 0;
}

@Override
public double getRangeFrom() {
return 0;
}

@Override
public double getRangeTo() {
return 0;
}
}

@Override
public synchronized boolean equals(@Nullable Object object) {
if (object instanceof HistogramData) {
Expand Down
1 change: 1 addition & 0 deletions sdks/java/harness/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies {
provided project(path: ":model:pipeline", configuration: "shadow")
provided project(path: ":sdks:java:core", configuration: "shadow")
provided project(path: ":sdks:java:transform-service:launcher")
provided library.java.google_api_services_dataflow
provided library.java.avro
provided library.java.jackson_databind
provided library.java.joda_time
Expand Down

0 comments on commit 3a39975

Please sign in to comment.