Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add error handling for bqio #30081

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -1681,7 +1681,11 @@ public CompositeBehavior enterCompositeTransform(Node node) {
String rootBigQueryTransform = "";
if (transform.getClass().equals(StorageApiLoads.class)) {
StorageApiLoads<?, ?> storageLoads = (StorageApiLoads<?, ?>) transform;
failedTag = storageLoads.getFailedRowsTag();
// If the storage load is directing exceptions to an error handler, we don't need to
// warn for unconsumed rows
if (!storageLoads.usesErrorHandler()) {
failedTag = storageLoads.getFailedRowsTag();
}
// For storage API the transform that outputs failed rows is nested one layer below
// BigQueryIO.
rootBigQueryTransform = node.getEnclosingNode().getFullName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,13 @@ public static class ErrorSinkTransform
}
}
}

public static class EchoErrorTransform
extends PTransform<PCollection<BadRecord>, PCollection<BadRecord>> {

@Override
public PCollection<BadRecord> expand(PCollection<BadRecord> input) {
return input;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,15 @@ class AvroRowWriter<AvroT, T> extends BigQueryRowWriter<T> {
}

@Override
public void write(T element) throws IOException {
public void write(T element) throws IOException, BigQueryRowSerializationException {
AvroWriteRequest<T> writeRequest = new AvroWriteRequest<>(element, schema);
writer.append(toAvroRecord.apply(writeRequest));
AvroT serializedRequest;
try {
serializedRequest = toAvroRecord.apply(writeRequest);
} catch (Exception e) {
throw new BigQueryRowSerializationException(e);
}
writer.append(serializedRequest);
}

public Schema getSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -57,6 +58,9 @@
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter;
import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
Expand All @@ -77,6 +81,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -161,6 +166,8 @@ class BatchLoads<DestinationT, ElementT>
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
private final @Nullable String kmsKey;
private final String tempDataset;
private final BadRecordRouter badRecordRouter;
private final ErrorHandler<BadRecord, ?> badRecordErrorHandler;
private Coder<TableDestination> tableDestinationCoder;

// The maximum number of times to retry failed load or copy jobs.
Expand All @@ -180,7 +187,9 @@ class BatchLoads<DestinationT, ElementT>
@Nullable String kmsKey,
boolean clusteringEnabled,
boolean useAvroLogicalTypes,
String tempDataset) {
String tempDataset,
BadRecordRouter badRecordRouter,
ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
Expand All @@ -207,6 +216,8 @@ class BatchLoads<DestinationT, ElementT>
this.tempDataset = tempDataset;
this.tableDestinationCoder =
clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
this.badRecordRouter = badRecordRouter;
this.badRecordErrorHandler = badRecordErrorHandler;
}

void setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
Expand Down Expand Up @@ -601,9 +612,13 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
unwrittedRecordsTag,
maxNumWritersPerBundle,
maxFileSize,
rowWriterFactory))
rowWriterFactory,
input.getCoder(),
badRecordRouter))
.withSideInputs(tempFilePrefix)
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
.withOutputTags(
writtenFilesTag,
TupleTagList.of(ImmutableList.of(unwrittedRecordsTag, BAD_RECORD_TAG))));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
writeBundlesTuple
.get(writtenFilesTag)
Expand All @@ -612,6 +627,8 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
writeBundlesTuple
.get(unwrittedRecordsTag)
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder));
badRecordErrorHandler.addErrorCollection(
writeBundlesTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));

// If the bundles contain too many output tables to be written inline to files (due to memory
// limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
Expand Down Expand Up @@ -680,62 +697,92 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
// parallelize properly. We also ensure that the files are written if a threshold number of
// records are ready. Dynamic sharding is achieved via the withShardedKey() option provided by
// GroupIntoBatches.
return input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(byteSize)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(elementCoder)))
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
PCollectionTuple writeResults =
input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withByteSize(byteSize)
.withMaxBufferingDuration(maxBufferingDuration)
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(elementCoder)))
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix,
maxFileSize,
rowWriterFactory,
badRecordRouter,
successfulResultsTag,
elementCoder))
.withSideInputs(tempFilePrefix)
.withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG)));
badRecordErrorHandler.addErrorCollection(
writeResults.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));

return writeResults
.get(successfulResultsTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}

private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
return shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
TupleTag<Result<DestinationT>> successfulResultsTag = new TupleTag<>();
PCollectionTuple writeResults =
shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<>(
tempFilePrefix,
maxFileSize,
rowWriterFactory,
badRecordRouter,
successfulResultsTag,
elementCoder))
.withSideInputs(tempFilePrefix)
.withOutputTags(successfulResultsTag, TupleTagList.of(BAD_RECORD_TAG)));

badRecordErrorHandler.addErrorCollection(
writeResults
.get(BAD_RECORD_TAG)
.setCoder(BadRecord.getCoder(shardedRecords.getPipeline())));
johnjcasey marked this conversation as resolved.
Show resolved Hide resolved

return writeResults
.get(successfulResultsTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}

Expand Down
Loading
Loading