Skip to content

Commit

Permalink
fix translation
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax committed Aug 28, 2024
1 parent 868dbaa commit ff84ea5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
Expand Down Expand Up @@ -398,6 +399,7 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator<Wri
.addNullableInt32Field("num_file_shards")
.addNullableInt32Field("num_storage_write_api_streams")
.addNullableBooleanField("propagate_successful_storage_api_writes")
.addNullableByteArrayField("propagate_successful_storage_api_writes_predicate")
.addNullableInt32Field("max_files_per_partition")
.addNullableInt64Field("max_bytes_per_partition")
.addNullableLogicalTypeField("triggering_frequency", new NanosDuration())
Expand Down Expand Up @@ -522,6 +524,9 @@ public Row toConfigRow(Write<?> transform) {
fieldValues.put(
"propagate_successful_storage_api_writes",
transform.getPropagateSuccessfulStorageApiWrites());
fieldValues.put(
"propagate_successful_storage_api_writes_predicate",
toByteArray(transform.getPropagateSuccessfulStorageApiWritesPredicate()));
fieldValues.put("max_files_per_partition", transform.getMaxFilesPerPartition());
fieldValues.put("max_bytes_per_partition", transform.getMaxBytesPerPartition());
if (transform.getTriggeringFrequency() != null) {
Expand Down Expand Up @@ -752,6 +757,12 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
builder =
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
}
byte[] predicate = configRow.getBytes("propagate_successful_storage_api_writes_predicate");
if (predicate != null) {
builder =
builder.setPropagateSuccessfulStorageApiWritesPredicate(
(Predicate<String>) fromByteArray(predicate));
}
Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition");
if (maxFilesPerPartition != null) {
builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public class BigQueryIOTranslationTest {
"getNumStorageWriteApiStreams", "num_storage_write_api_streams");
WRITE_TRANSFORM_SCHEMA_MAPPING.put(
"getPropagateSuccessfulStorageApiWrites", "propagate_successful_storage_api_writes");
WRITE_TRANSFORM_SCHEMA_MAPPING.put(
"getPropagateSuccessfulStorageApiWritesPredicate",
"propagate_successful_storage_api_writes_predicate");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxFilesPerPartition", "max_files_per_partition");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getMaxBytesPerPartition", "max_bytes_per_partition");
WRITE_TRANSFORM_SCHEMA_MAPPING.put("getTriggeringFrequency", "triggering_frequency");
Expand Down

0 comments on commit ff84ea5

Please sign in to comment.