Skip to content

Commit

Permalink
fix: ensure that all outstanding future have completed in FinishBundle (
Browse files Browse the repository at this point in the history
#32454)

* fix: ensure that all outstanding future have completed in FinishBundle

Previously this was done as a side effect of batcher.close. Now we explicitly ensure that all element future have been resolved and any handler errors are propagated

* simplify

* refactor

* use a simpler queue

* cleanup

* format

* fix warning
  • Loading branch information
igorbernstein2 authored Sep 16, 2024
1 parent 1297459 commit 6514136
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@
import com.google.protobuf.ByteString;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
Expand Down Expand Up @@ -1341,6 +1348,8 @@ private static class BigtableWriterFn
// Assign serviceEntry in startBundle and clear it in tearDown.
@Nullable private BigtableServiceEntry serviceEntry;

private transient Queue<CompletableFuture<?>> outstandingWrites;

BigtableWriterFn(
BigtableServiceFactory factory,
BigtableConfig bigtableConfig,
Expand Down Expand Up @@ -1376,25 +1385,41 @@ public void startBundle(StartBundleContext c) throws IOException {
}

badRecords = new ConcurrentLinkedQueue<>();
outstandingWrites = new ArrayDeque<>();
}

@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
drainCompletedElementFutures();
checkForFailures();
KV<ByteString, Iterable<Mutation>> record = c.element();
Instant writeStart = Instant.now();
pendingThrottlingMsecs = 0;
bigtableWriter
.writeRecord(record)
.whenComplete(handleMutationException(record, window, writeStart));
CompletableFuture<Void> f =
bigtableWriter
.writeRecord(record)
// transform the next CompletionStage to have its own status
// this allows us to capture any unexpected errors in the handler
.handle(handleMutationException(record, window, writeStart));
outstandingWrites.add(f);
if (pendingThrottlingMsecs > 0) {
throttlingMsecs.inc(pendingThrottlingMsecs);
}
++recordsWritten;
seenWindows.compute(window, (key, count) -> (count != null ? count : 0) + 1);
}

private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
private void drainCompletedElementFutures() throws ExecutionException, InterruptedException {
// burn down the completed futures to avoid unbounded memory growth
for (Future<?> f = outstandingWrites.peek();
f != null && f.isDone();
f = outstandingWrites.peek()) {
// Also ensure that errors in the handler get bubbled up
outstandingWrites.remove().get();
}
}

private BiFunction<MutateRowResponse, Throwable, Void> handleMutationException(
KV<ByteString, Iterable<Mutation>> record, BoundedWindow window, Instant writeStart) {
return (MutateRowResponse result, Throwable exception) -> {
if (exception != null) {
Expand Down Expand Up @@ -1429,6 +1454,7 @@ private BiConsumer<MutateRowResponse, Throwable> handleMutationException(
}
}
}
return null;
};
}

Expand Down Expand Up @@ -1476,6 +1502,18 @@ public void finishBundle(FinishBundleContext c) throws Exception {
throw e;
}
}

// Sanity check: ensure that all element futures are resolved. This should be already be the
// case once bigtableWriter.close() finishes.
try {
CompletableFuture.allOf(outstandingWrites.toArray(new CompletableFuture<?>[0]))
.get(1, TimeUnit.MINUTES);
} catch (TimeoutException e) {
throw new IllegalStateException(
"Unexpected timeout waiting for element future to resolve after the writer was closed",
e);
}

// add the excessive amount to throttling metrics if elapsed time > target latency
if (throttleReportThresMsecs > 0) {
long excessTime =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.io.Serializable;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.values.KV;

Expand All @@ -42,7 +42,7 @@ interface Writer {
*
* @throws IOException if there is an error submitting the write.
*/
CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
CompletableFuture<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -552,8 +551,8 @@ public void close() throws IOException {
}

@Override
public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
throws IOException {
public CompletableFuture<MutateRowResponse> writeRecord(
KV<ByteString, Iterable<Mutation>> record) throws IOException {

com.google.cloud.bigtable.data.v2.models.Mutation mutation =
com.google.cloud.bigtable.data.v2.models.Mutation.fromProtoUnsafe(record.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1921,8 +1921,8 @@ public FakeBigtableWriter(String tableId) {
}

@Override
public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
throws IOException {
public CompletableFuture<MutateRowResponse> writeRecord(
KV<ByteString, Iterable<Mutation>> record) throws IOException {
service.verifyTableExists(tableId);
Map<ByteString, ByteString> table = service.getTable(tableId);
ByteString key = record.getKey();
Expand Down Expand Up @@ -1954,8 +1954,8 @@ public FailureBigtableWriter(
}

@Override
public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)
throws IOException {
public CompletableFuture<MutateRowResponse> writeRecord(
KV<ByteString, Iterable<Mutation>> record) throws IOException {
if (failureOptions.getFailAtWriteRecord()) {
throw new IOException("Fake IOException in writeRecord()");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
Expand All @@ -60,6 +61,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -863,7 +865,8 @@ public void testWrite() throws IOException {
.build())
.build();

underTest.writeRecord(KV.of(key, ImmutableList.of(mutation)));
CompletableFuture<MutateRowResponse> unusedElementFuture =
underTest.writeRecord(KV.of(key, ImmutableList.of(mutation)));

verify(mockBatcher).add(captor.capture());

Expand Down

0 comments on commit 6514136

Please sign in to comment.