Skip to content

Commit

Permalink
use different proto
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Oct 28, 2024
1 parent 6887687 commit 7401920
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 371 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ message MonitoringInfoSpecs {
]
}];

//import com.google.api.services.dataflow.model.PerWorkerMetrics;
API_REQUEST_LATENCIES = 20 [(monitoring_info_spec) = {
urn: "beam:metric:io:api_request_latencies:v1",
type: "beam:metrics:histogram_int64:v1",
Expand Down Expand Up @@ -587,9 +586,7 @@ message MonitoringInfoTypeUrns {
SET_STRING_TYPE = 11 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:set_string:v1"];

// Encoding: <iter><value1><value2>...<valueN></iter>
// - iter: beam:coder:iterable:v1
// - valueX: beam:coder:stringutf8:v1
// Represents histograms
PER_WORKER_HISTOGRAM = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:per_worker_histogram_int64:v1"];

Expand Down
7 changes: 3 additions & 4 deletions runners/core-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ test {
}
}

// def google_api_services_dataflow = library.java.google_api_services_dataflow

dependencies {
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:job-management", configuration: "shadow")
// implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill") // need histogram proto
provided library.java.google_api_services_dataflow

implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.joda_time
implementation library.java.vendored_grpc_1_60_1
implementation library.java.slf4j_api
implementation library.java.jackson_core
implementation library.java.jackson_databind
// implementation library.java.proto_google_common_protos
implementation library.java.google_cloud_dataflow_java_proto_library_all

testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
Expand All @@ -62,5 +62,4 @@ dependencies {
provided library.java.google_cloud_dataflow_java_proto_library_all
testImplementation library.java.google_cloud_dataflow_java_proto_library_all
testImplementation(library.java.google_api_services_dataflow)
implementation project(path: ":runners:google-cloud-dataflow-java:worker:windmill", configuration: "shadow") // need histogram proto
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of {@link org.apache.beam.sdk.metrics.MetricResults}, which takes static
Expand All @@ -40,7 +38,6 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class DefaultMetricResults extends MetricResults {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMetricResults.class);

private final Iterable<MetricResult<Long>> counters;
private final Iterable<MetricResult<DistributionResult>> distributions;
Expand All @@ -54,7 +51,6 @@ public DefaultMetricResults(
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<HistogramData>> perWorkerHistograms) {
LOG.info("xxx does this get here? DefaultMetricResults ");
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,15 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer {

private MetricsMap<MetricName, GaugeCell> gauges = new MetricsMap<>(GaugeCell::new);

// Should it be a cell Instead?
// Can this be a regular histogram instead of a cell'? see
// dirty state acts as being lock free, commits only non dirty metrics.
// also of type DISTRIBUTION_INT64_TYPE
// refactor to use Lock free histograms? later?
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> perWorkerHistograms =
new MetricsMap<>(HistogramCell::new);

private MetricsMap<MetricName, StringSetCell> stringSets = new MetricsMap<>(StringSetCell::new);

// assume the same bucket type?
private MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> histograms =
new MetricsMap<>(HistogramCell::new);

private MetricsContainerImpl(@Nullable String stepName, boolean isProcessWide) {
LOG.info("xxx create metric container {}: isProcessWide {}", stepName, isProcessWide);
this.stepName = stepName;
this.isProcessWide = isProcessWide;
}
Expand All @@ -127,7 +120,6 @@ public MetricsContainerImpl(@Nullable String stepName) {
* collecting processWide metrics for HarnessMonitoringInfoRequest/Response.
*/
public static MetricsContainerImpl createProcessWideContainer() {
LOG.info("xxx create createProcessWideContainer");
return new MetricsContainerImpl(null, true);
}

Expand Down Expand Up @@ -182,16 +174,8 @@ public DistributionCell getDistribution(MetricName metricName) {
@Override
public HistogramCell getPerWorkerHistogram(
MetricName metricName, HistogramData.BucketType bucketType) {
// LOG.info("xxx stepName {}, getPerWorkerHistogram metric {}", stepName, metricName.getName());
// if not enabled, return a no op container from parent class
// if (!enablePerWorkerMetrics) {
// // will be a no op
// return null;
// // return MetricsContainer.super.getPerWorkerHistogram(metricName, bucketType);
// }
// return no op histogram instead
HistogramCell val = perWorkerHistograms.get(KV.of(metricName, bucketType));
return val; // no null chceks for the others
return val;
}

/**
Expand Down Expand Up @@ -897,7 +881,6 @@ public static MetricsContainerImpl deltaContainer(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}

// treat per worker histograms differently
for (Map.Entry<KV<MetricName, HistogramData.BucketType>, HistogramCell> cell :
curr.perWorkerHistograms.entries()) {
HistogramData.BucketType bt = cell.getKey().getValue();
Expand Down
Loading

0 comments on commit 7401920

Please sign in to comment.