Skip to content

Commit

Permalink
Merge pull request #26 from vinted/feat/handle-closing-writers
Browse files Browse the repository at this point in the history
feat: only close writers on flush
  • Loading branch information
gintarasm authored Apr 4, 2024
2 parents 79df424 + 837ce1a commit 9905530
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class AsyncBigQuerySinkWriter<A> extends AsyncSinkWriter<Rows<A>, StreamR

private final Executor appendExecutor;

protected transient Queue<StreamWriter> writersToClose = new ArrayDeque<>();
protected transient Map<String, StreamWriter> streamMap = new ConcurrentHashMap<>();

public AsyncBigQuerySinkWriter(ExecutorProvider executorProvider, AsyncClientProvider clientProvider, ElementConverter<Rows<A>, StreamRequest> elementConverter, Sink.InitContext context, AsyncSinkWriterConfiguration configuration, Collection<BufferedRequestState<StreamRequest>> bufferedRequestStates) {
Expand Down Expand Up @@ -94,11 +95,7 @@ protected final void recreateStreamWriter(String traceId, String streamName, Str
streamMap.replaceAll((key, writer) -> {
var newWriter = writer;
if (writer.getWriterId().equals(writerId)) {
try {
writer.close();
} catch (Exception e) {
logger.trace("Trace-id {} Could not close writer for {}", traceId, streamName);
}
writersToClose.add(writer);
newWriter = this.clientProvider.getWriter(streamName, table);
registerInflightMetric(newWriter);
}
Expand Down Expand Up @@ -230,21 +227,23 @@ protected long getSizeInBytes(StreamRequest StreamRequest) {
}

@Override
public void close() {
logger.info("Closing BigQuery write stream");
try {
flush(true);
streamMap.values().forEach(stream -> {
try {
stream.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
public void flush(boolean flush) throws InterruptedException {
super.flush(flush);

while (writersToClose.peek() != null) {
var writer = writersToClose.poll();
try {
writer.close();
} catch (Exception e) {
logger.error("Could not close unused writer", e);
}
}
}

@Override
public void close() {
logger.info("Closing BigQuery write stream");
streamMap.values().forEach(StreamWriter::close);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void shouldRecreateWriterAndRetryFailingWithMaximumRequestCallbackWaitTim

@Test
public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam FlinkTest.PipelineRunner runner, @FlinkTest.FlinkParam MockAsyncProtoClientProvider mockClientProvider) throws Exception {
mockClientProvider.givenStreamIsFinalized(stream);
mockClientProvider.givenFailingAppendWithStatus(Status.UNKNOWN);

assertThatThrownBy(() -> {
runner
Expand All @@ -76,7 +76,7 @@ public void shouldFailAndNotRetryWhenUnknownErrorReceived(@FlinkTest.FlinkParam
}).isInstanceOf(JobExecutionException.class);


verify(mockClientProvider.getMockProtoWriter(), times(2)).append(any());
verify(mockClientProvider.getMockProtoWriter(), times(1)).append(any());
}

@Test
Expand Down

0 comments on commit 9905530

Please sign in to comment.