Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jobLabelsMap parameter to BigQueryOptions #31698

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Map;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
Expand Down Expand Up @@ -216,6 +217,13 @@ public interface BigQueryOptions

void setEnableStorageReadApiV2(Boolean value);

@Description(
"A map with string labels to be passed to BigQuery export, query and other jobs. "
+ "See: https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration")
Map<String, String> getJobLabelsMap();

void setJobLabelsMap(Map<String, String> value);

/** BQ endpoint to use. If unspecified, uses the default endpoint. */
@JsonIgnore
@Hidden
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,19 +221,26 @@ static class JobServiceImpl implements BigQueryServices.JobService {
private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
private final BigQueryIOMetadata bqIOMetadata;
private final Map<String, String> jobLabels;

@VisibleForTesting
JobServiceImpl(Bigquery client) {
this(client, new HashMap<>());
}

@VisibleForTesting
JobServiceImpl(Bigquery client, Map<String, String> jobLabels) {
this.errorExtractor = new ApiErrorExtractor();
this.client = client;
this.bqIOMetadata = BigQueryIOMetadata.create();
this.jobLabels = new HashMap<>(jobLabels);
}

@VisibleForTesting
JobServiceImpl(BigQueryOptions options) {
this.errorExtractor = new ApiErrorExtractor();
this.client = newBigQueryClient(options).build();
this.bqIOMetadata = BigQueryIOMetadata.create();
this(
newBigQueryClient(options).build(),
options.getJobLabelsMap() != null ? options.getJobLabelsMap() : new HashMap<>());
}

@VisibleForTesting
Expand All @@ -251,14 +258,13 @@ Bigquery getClient() {
@Override
public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setLoad(loadConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
Abacn marked this conversation as resolved.
Show resolved Hide resolved
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels)));
startJob(job, errorExtractor, client);
}

Expand All @@ -273,14 +279,13 @@ public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
public void startLoadJob(
JobReference jobRef, JobConfigurationLoad loadConfig, AbstractInputStreamContent stream)
throws InterruptedException, IOException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setLoad(loadConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels)));
startJobStream(job, stream, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff());
}

Expand All @@ -294,14 +299,13 @@ public void startLoadJob(
@Override
public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setExtract(extractConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels)));
startJob(job, errorExtractor, client);
}

Expand All @@ -315,14 +319,13 @@ public void startExtractJob(JobReference jobRef, JobConfigurationExtract extract
@Override
public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
throws IOException, InterruptedException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setQuery(queryConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels)));
startJob(job, errorExtractor, client);
}

Expand All @@ -336,14 +339,13 @@ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig
@Override
public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig)
throws IOException, InterruptedException {
Map<String, String> labelMap = new HashMap<>();
Job job =
new Job()
.setJobReference(jobRef)
.setConfiguration(
new JobConfiguration()
.setCopy(copyConfig)
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(labelMap)));
.setLabels(this.bqIOMetadata.addAdditionalJobLabels(this.jobLabels)));
startJob(job, errorExtractor, client);
}

Expand Down
Loading