From eb1852a6bc1fbfe61c76a55699cab1eb129d3e79 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Wed, 10 May 2023 19:52:23 -0400 Subject: [PATCH] Bigtable: add an experiments flag to enable client side metrics (#26607) --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 7 +++++++ .../io/gcp/bigtable/BigtableServiceFactory.java | 16 ++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 5a9dc94859c..88e94f03140 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -247,6 +247,13 @@ * .withStartTime(startTime)); * } * + *

Enable client side metrics

+ * + *

Client side metrics can be enabled with an experiments flag when you run the pipeline: + * --experiments=bigtable_enable_client_side_metrics. These metrics can provide additional insights + * to your job. You can read more about client side metrics in this documentation: + * https://cloud.google.com/bigtable/docs/client-side-metrics. + * *

Permissions

* *

Permission requirements depend on the {@link PipelineRunner} that is used to execute the diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java index b909f667c1a..a9db4943c65 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java @@ -28,6 +28,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,9 @@ class BigtableServiceFactory implements Serializable { private static final ConcurrentHashMap refCounts = new ConcurrentHashMap<>(); private static final Object lock = new Object(); + private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS = + "bigtable_enable_client_side_metrics"; + @AutoValue abstract static class ConfigId implements Serializable { @@ -115,6 +119,12 @@ BigtableServiceEntry getServiceForReading( } BigtableDataSettings settings = BigtableConfigTranslator.translateReadToVeneerSettings(config, opts, pipelineOptions); + + if (ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_CLIENT_SIDE_METRICS)) { + LOG.info("Enabling client side metrics"); + BigtableDataSettings.enableBuiltinMetrics(); + } + BigtableService service; if (opts.getWaitTimeout() != null) { service = new BigtableServiceImpl(settings, opts.getWaitTimeout()); @@ -155,6 +165,12 @@ BigtableServiceEntry getServiceForWriting( BigtableDataSettings settings = BigtableConfigTranslator.translateWriteToVeneerSettings(config, opts, pipelineOptions); + + if (ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_CLIENT_SIDE_METRICS)) { + LOG.info("Enabling client side metrics"); + BigtableDataSettings.enableBuiltinMetrics(); + } + BigtableService service = new BigtableServiceImpl(settings); entry = BigtableServiceEntry.create(configId, service); entries.put(configId.id(), entry);