diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java index 5b9b80071ecd..efbe97f36522 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; @@ -398,6 +399,7 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator transform) { fieldValues.put( "propagate_successful_storage_api_writes", transform.getPropagateSuccessfulStorageApiWrites()); + fieldValues.put( + "propagate_successful_storage_api_writes_predicate", + toByteArray(transform.getPropagateSuccessfulStorageApiWritesPredicate())); fieldValues.put("max_files_per_partition", transform.getMaxFilesPerPartition()); fieldValues.put("max_bytes_per_partition", transform.getMaxBytesPerPartition()); if (transform.getTriggeringFrequency() != null) { @@ -752,6 +757,12 @@ public Write fromConfigRow(Row configRow, PipelineOptions options) { builder = builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites); } + byte[] predicate = configRow.getBytes("propagate_successful_storage_api_writes_predicate"); + if (predicate != null) { + builder = + builder.setPropagateSuccessfulStorageApiWritesPredicate( + (Predicate) fromByteArray(predicate)); + } Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition"); if (maxFilesPerPartition != null) { builder = builder.setMaxFilesPerPartition(maxFilesPerPartition); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java index 950ebcaafe45..e15258e6ab40 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslationTest.java @@ -105,6 +105,9 @@ public class BigQueryIOTranslationTest { "getNumStorageWriteApiStreams", "num_storage_write_api_streams"); WRITE_TRANSFORM_SCHEMA_MAPPING.put( "getPropagateSuccessfulStorageApiWrites", "propagate_successful_storage_api_writes"); + WRITE_TRANSFORM_SCHEMA_MAPPING.put( + "getPropagateSuccessfulStorageApiWritesPredicate", + "propagate_successful_storage_api_writes_predicate"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxFilesPerPartition", "max_files_per_partition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxBytesPerPartition", "max_bytes_per_partition"); WRITE_TRANSFORM_SCHEMA_MAPPING.put("getTriggeringFrequency", "triggering_frequency");