diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java index 427b03429d22..a2b506408ee0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.bigquery; import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.Map; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; @@ -216,6 +217,13 @@ public interface BigQueryOptions void setEnableStorageReadApiV2(Boolean value); + @Description( + "A map with string labels to be passed to BigQuery export, query and other jobs. " + + "See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration") + Map getJobLabelsMap(); + + void setJobLabelsMap(Map value); + /** BQ endpoint to use. If unspecified, uses the default endpoint. */ @JsonIgnore @Hidden diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 97ba11b42ac6..432f31e81c90 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -221,19 +221,26 @@ static class JobServiceImpl implements BigQueryServices.JobService { private final ApiErrorExtractor errorExtractor; private final Bigquery client; private final BigQueryIOMetadata bqIOMetadata; + private final Map jobLabels; @VisibleForTesting JobServiceImpl(Bigquery client) { + this(client, new HashMap<>()); + } + + @VisibleForTesting + JobServiceImpl(Bigquery client, Map jobLabels) { this.errorExtractor = new ApiErrorExtractor(); this.client = client; this.bqIOMetadata = BigQueryIOMetadata.create(); + this.jobLabels = new HashMap<>(jobLabels); } @VisibleForTesting JobServiceImpl(BigQueryOptions options) { - this.errorExtractor = new ApiErrorExtractor(); - this.client = newBigQueryClient(options).build(); - this.bqIOMetadata = BigQueryIOMetadata.create(); + this( + newBigQueryClient(options).build(), + options.getJobLabelsMap() != null ? options.getJobLabelsMap() : new HashMap<>()); } @VisibleForTesting @@ -251,14 +258,13 @@ Bigquery getClient() { @Override public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { - Map labelMap = new HashMap<>(); Job job = new Job() .setJobReference(jobRef) .setConfiguration( new JobConfiguration() .setLoad(loadConfig) - .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels))); startJob(job, errorExtractor, client); } @@ -273,14 +279,13 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) public void startLoadJob( JobReference jobRef, JobConfigurationLoad loadConfig, AbstractInputStreamContent stream) throws InterruptedException, IOException { - Map labelMap = new HashMap<>(); Job job = new Job() .setJobReference(jobRef) .setConfiguration( new JobConfiguration() .setLoad(loadConfig) - .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels))); startJobStream(job, stream, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff()); } @@ -294,14 +299,13 @@ public void startLoadJob( @Override public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException { - Map labelMap = new HashMap<>(); Job job = new Job() .setJobReference(jobRef) .setConfiguration( new JobConfiguration() .setExtract(extractConfig) - .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels))); startJob(job, errorExtractor, client); } @@ -315,14 +319,13 @@ public void startExtractJob(JobReference jobRef, JobConfigurationExtract extract @Override public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) throws IOException, InterruptedException { - Map labelMap = new HashMap<>(); Job job = new Job() .setJobReference(jobRef) .setConfiguration( new JobConfiguration() .setQuery(queryConfig) - .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels))); startJob(job, errorExtractor, client); } @@ -336,14 +339,13 @@ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig @Override public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) throws IOException, InterruptedException { - Map labelMap = new HashMap<>(); Job job = new Job() .setJobReference(jobRef) .setConfiguration( new JobConfiguration() .setCopy(copyConfig) - .setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap))); + .setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels))); startJob(job, errorExtractor, client); }