Skip to content

Commit

Permalink
portable bigquery destinations
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Nov 5, 2024
1 parent 770cf50 commit 255c9de
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 138 deletions.
1 change: 1 addition & 0 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ task integrationTest(type: Test, dependsOn: processTestResources) {
"--runner=DirectRunner",
"--project=${gcpProject}",
"--tempRoot=${gcpTempRoot}",
"--tempLocation=${gcpTempRoot}",
"--firestoreDb=${firestoreDb}",
"--firestoreHost=${firestoreHost}",
"--bigtableChangeStreamInstanceId=${bigtableChangeStreamInstanceId}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryWriteConfiguration;
import org.apache.beam.sdk.io.gcp.bigquery.providers.PortableBigQueryDestinations;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
Expand Down Expand Up @@ -88,19 +90,20 @@ protected static class BigQueryWriteSchemaTransform extends SchemaTransform {
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
PCollection<Row> rowPCollection = input.getSinglePCollection();
BigQueryIO.Write<Row> write = toWrite();
BigQueryIO.Write<Row> write = toWrite(rowPCollection.getSchema());
rowPCollection.apply(write);

return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite() {
BigQueryIO.Write<Row> toWrite(Schema schema) {
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(schema, configuration);
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(configuration.getTable())
.to(dynamicDestinations)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
.useBeamSchema();
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
Expand All @@ -36,9 +33,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
Expand All @@ -58,7 +53,6 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.joda.time.Duration;
Expand Down Expand Up @@ -180,52 +174,6 @@ private static class NoOutputDoFn<T> extends DoFn<T, Row> {
public void process(ProcessContext c) {}
}

private static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
final Schema schema;
final String fixedDestination;
final List<String> primaryKey;

RowDynamicDestinations(Schema schema) {
this.schema = schema;
this.fixedDestination = null;
this.primaryKey = null;
}

public RowDynamicDestinations(
Schema schema, String fixedDestination, List<String> primaryKey) {
this.schema = schema;
this.fixedDestination = fixedDestination;
this.primaryKey = primaryKey;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return Optional.ofNullable(fixedDestination)
.orElseGet(() -> element.getValue().getString("destination"));
}

@Override
public TableDestination getTable(String destination) {
return new TableDestination(destination, null);
}

@Override
public TableSchema getSchema(String destination) {
return BigQueryUtils.toTableSchema(schema);
}

@Override
public TableConstraints getTableConstraints(String destination) {
return Optional.ofNullable(this.primaryKey)
.filter(pk -> !pk.isEmpty())
.map(
pk ->
new TableConstraints()
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
}
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
// Check that the input exists
Expand Down Expand Up @@ -327,13 +275,6 @@ public Row getConfigurationRow() {
}
}

void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
Method writeMethod =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -344,21 +285,37 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.withMethod(writeMethod)
.withFormatFunction(BigQueryUtils.toTableRow())
.withWriteDisposition(WriteDisposition.WRITE_APPEND);

// in case CDC writes are configured we validate and include them in the configuration
if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
Schema rowSchema = schema;
boolean fetchNestedRecord = false;
if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsSchema(schema);
rowSchema = schema.getField("record").getType().getRowSchema();
fetchNestedRecord = true;
}
if (Boolean.TRUE.equals(configuration.getUseCdcWrites())) {
validateCdcSchema(schema);
rowSchema = schema.getField("record").getType().getRowSchema();
fetchNestedRecord = true;
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")));
} else {
write = write.to(configuration.getTable()).useBeamSchema();
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_SQN)));
}
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(rowSchema, configuration);
write =
write
.to(dynamicDestinations)
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(fetchNestedRecord));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand All @@ -381,19 +338,25 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
return write;
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
void validateDynamicDestinationsSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

private void validateCdcSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" Row field and a \"record\" Row field.");

Schema rowSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();
Schema mutationSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();

checkArgument(
rowSchema.equals(ROW_SCHEMA_MUTATION_INFO),
mutationSchema != null && mutationSchema.equals(ROW_SCHEMA_MUTATION_INFO),
"When writing using CDC functionality, we expect a \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field of Row type with schema:\n"
Expand All @@ -402,31 +365,7 @@ BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
+ "Received \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field with schema:\n"
+ rowSchema.toString());

String tableDestination = null;

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
} else {
tableDestination = configuration.getTable();
}

return write
.to(
new RowDynamicDestinations(
schema.getField("record").getType().getRowSchema(),
tableDestination,
configuration.getPrimaryKey()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN)));
+ mutationSchema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
package org.apache.beam.sdk.io.gcp.bigquery.providers;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Configuration for writing to BigQuery with SchemaTransforms. Used by {@link
Expand Down Expand Up @@ -68,11 +66,6 @@ public void validate() {
!Strings.isNullOrEmpty(this.getTable()),
invalidConfigMessage + "Table spec for a BigQuery Write must be specified.");

// if we have an input table spec, validate it
if (!this.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkNotNull(BigQueryHelpers.parseTableSpec(this.getTable()));
}

// validate create and write dispositions
String createDisposition = getCreateDisposition();
if (createDisposition != null && !createDisposition.isEmpty()) {
Expand Down Expand Up @@ -186,6 +179,21 @@ public static Builder builder() {
@Nullable
public abstract List<String> getPrimaryKey();

@SchemaFieldDescription(
"A list of field names to keep in the input record. All other fields are dropped before writing. "
+ "Is mutually exclusive with 'drop' and 'only'.")
public abstract @Nullable List<String> getKeep();

@SchemaFieldDescription(
"A list of field names to drop from the input record before writing. "
+ "Is mutually exclusive with 'keep' and 'only'.")
public abstract @Nullable List<String> getDrop();

@SchemaFieldDescription(
"The name of a single record field that should be written. "
+ "Is mutually exclusive with 'keep' and 'drop'.")
public abstract @Nullable String getOnly();

/** Builder for {@link BigQueryWriteConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -212,6 +220,12 @@ public abstract static class Builder {

public abstract Builder setPrimaryKey(List<String> pkColumns);

public abstract Builder setKeep(List<String> keep);

public abstract Builder setDrop(List<String> drop);

public abstract Builder setOnly(String only);

/** Builds a {@link BigQueryWriteConfiguration} instance. */
public abstract BigQueryWriteConfiguration build();
}
Expand Down
Loading

0 comments on commit 255c9de

Please sign in to comment.