Skip to content

Commit

Permalink
Merge pull request #22 from vinted/fix/retry-logic
Browse files Browse the repository at this point in the history
Fix/retry logic
  • Loading branch information
gintarasm authored Mar 7, 2024
2 parents 4df9abb + c78b2b2 commit ca9a289
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 90 deletions.
26 changes: 26 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,29 @@ publishing {
}
}

tasks.withType(Test).configureEach {
doFirst {
jvmArgs = [
'--add-exports=java.base/sun.net.util=ALL-UNNAMED',
'--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED',
'--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED',
'--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED',
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.net=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
'--add-opens=java.base/sun.nio.ch=ALL-UNNAMED',
'--add-opens=java.base/java.lang.reflect=ALL-UNNAMED',
'--add-opens=java.base/java.text=ALL-UNNAMED',
'--add-opens=java.base/java.time=ALL-UNNAMED',
'--add-opens=java.base/java.util=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED',
'--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED'
]
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
Expand All @@ -10,6 +11,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.Optional;
Expand Down Expand Up @@ -46,13 +48,25 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();
var writer = JsonStreamWriter
var writerBuilder = JsonStreamWriter
.newBuilder(streamName, getTableSchema(table), this.getClient())
.setEnableConnectionPool(this.writerSettings.getEnableConnectionPool())
.setExecutorProvider(executorProvider)
.build();
.setExecutorProvider(executorProvider);

if (writerSettings.getRetrySettings() != null) {
var settings = writerSettings.getRetrySettings();
var retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
.setMaxAttempts(settings.getMaxRetryAttempts())
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
.build();

writerBuilder.setRetrySettings(retrySettings);
}
JsonStreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writer);
return new com.vinted.flink.bigquery.client.JsonStreamWriter<>(serializer, writerBuilder.build());
} catch (Descriptors.DescriptorValidationException | IOException | InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.vinted.flink.bigquery.client;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.storage.v1.*;
Expand All @@ -9,6 +10,7 @@
import com.vinted.flink.bigquery.model.config.WriterSettings;
import com.vinted.flink.bigquery.schema.SchemaTransformer;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import org.threeten.bp.Duration;

import java.io.IOException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -45,6 +47,9 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
var executorProvider = this.writerSettings.getWriterThreads() > 1 ?
FixedExecutorProvider.create(Executors.newScheduledThreadPool(writerSettings.getWriterThreads())) :
BigQueryWriteSettings.defaultExecutorProviderBuilder().build();



var streamWriterBuilder = StreamWriter
.newBuilder(streamName, getClient())
.setMaxInflightRequests(this.writerSettings.getMaxInflightRequests())
Expand All @@ -56,6 +61,19 @@ public BigQueryStreamWriter<A> getWriter(String streamName, TableId table, RowVa
.setLocation(table.getProject())
.setWriterSchema(protoSchema);

if (writerSettings.getRetrySettings() != null) {
var settings = writerSettings.getRetrySettings();
var retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(settings.getInitialRetryDelay().toMillis()))
.setRetryDelayMultiplier(settings.getRetryDelayMultiplier())
.setMaxAttempts(settings.getMaxRetryAttempts())
.setMaxRetryDelay(Duration.ofMillis(settings.getMaxRetryDelay().toMillis()))
.build();

streamWriterBuilder.setRetrySettings(retrySettings);
}

StreamWriter.setMaxRequestCallbackWaitTime(this.writerSettings.getMaxRequestWaitCallbackTime());
return new ProtoStreamWriter<>(serializer, streamWriterBuilder.build());
} catch (IOException | Descriptors.DescriptorValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.vinted.flink.bigquery.model.config;

import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;

import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;

public class WriterRetrySettings implements Serializable {

private Duration initialRetryDelay;
private double retryDelayMultiplier;

private int maxRetryAttempts;

private Duration maxRetryDelay;

public Duration getInitialRetryDelay() {
return initialRetryDelay;
}

public void setInitialRetryDelay(Duration initialRetryDelay) {
this.initialRetryDelay = initialRetryDelay;
}

public double getRetryDelayMultiplier() {
return retryDelayMultiplier;
}

public void setRetryDelayMultiplier(double retryDelayMultiplier) {
this.retryDelayMultiplier = retryDelayMultiplier;
}

public int getMaxRetryAttempts() {
return maxRetryAttempts;
}

public void setMaxRetryAttempts(int maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
}

public Duration getMaxRetryDelay() {
return maxRetryDelay;
}

public void setMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
}
public static WriterRetrySettingsBuilder newBuilder() {
return new WriterRetrySettingsBuilder();
}

public static final class WriterRetrySettingsBuilder implements Serializable {
private Duration initialRetryDelay = Duration.ofMillis(500);
private double retryDelayMultiplier = 1.1;

private int maxRetryAttempts = 5;

private Duration maxRetryDelay = Duration.ofMinutes(1);
private WriterRetrySettingsBuilder() {
}

public WriterRetrySettingsBuilder withInitialRetryDelay(Duration initialRetryDelay) {
this.initialRetryDelay = initialRetryDelay;
return this;
}

public WriterRetrySettingsBuilder withRetryDelayMultiplier(double retryDelayMultiplier) {
this.retryDelayMultiplier = retryDelayMultiplier;
return this;
}

public WriterRetrySettingsBuilder withMaxRetryAttempts(int maxRetryAttempts) {
this.maxRetryAttempts = maxRetryAttempts;
return this;
}

public WriterRetrySettingsBuilder withMaxRetryDelay(Duration maxRetryDelay) {
this.maxRetryDelay = maxRetryDelay;
return this;
}

public WriterRetrySettings build() {
WriterRetrySettings retrySettings = new WriterRetrySettings();
retrySettings.initialRetryDelay = this.initialRetryDelay;
retrySettings.retryDelayMultiplier = this.retryDelayMultiplier;
retrySettings.maxRetryAttempts = this.maxRetryAttempts;
retrySettings.maxRetryDelay = this.maxRetryDelay;
return retrySettings;
}
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ public class WriterSettings implements Serializable {
private Duration timeout;
private int retryCount;
private Duration retryPause;
private Long maxInflightRequests;
private Long maxInflightBytes;
private long maxInflightRequests;
private long maxInflightBytes;
private Duration maxRetryDuration;

private Duration maxRequestWaitCallbackTime;
private Boolean enableConnectionPool;
private boolean enableConnectionPool;

private WriterRetrySettings retrySettings;

public int getStreamsPerTable() {
return streamsPerTable;
Expand Down Expand Up @@ -79,6 +81,14 @@ public void setMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
}

public WriterRetrySettings getRetrySettings() {
return retrySettings;
}

public void setRetrySettings(WriterRetrySettings retrySettings) {
this.retrySettings = retrySettings;
}

public static final class WriterSettingsBuilder implements Serializable {
private int streamsPerTable = 1;
private int writerThreads = 1;
Expand All @@ -91,6 +101,8 @@ public static final class WriterSettingsBuilder implements Serializable {
private Duration maxRequestWaitCallbackTime = Duration.ofMinutes(5);
private Boolean enableConnectionPool = false;

private WriterRetrySettings retrySettings = null;

private WriterSettingsBuilder() {
}

Expand Down Expand Up @@ -135,7 +147,7 @@ public WriterSettingsBuilder withMaxRetryDuration(Duration maxRetryDuration) {
}

public WriterSettingsBuilder withMaxRequestWaitCallbackTime(Duration maxRequestWaitCallbackTime) {
this.maxRequestWaitCallbackTime = maxRetryDuration;
this.maxRequestWaitCallbackTime = maxRequestWaitCallbackTime;
return this;
}

Expand All @@ -144,6 +156,11 @@ public WriterSettingsBuilder withEnableConnectionPool(Boolean enableConnectionPo
return this;
}

public WriterSettingsBuilder withRetrySettings(WriterRetrySettings retrySettings) {
this.retrySettings = retrySettings;
return this;
}

public WriterSettings build() {
WriterSettings writerSettings = new WriterSettings();
writerSettings.writerThreads = this.writerThreads;
Expand All @@ -156,6 +173,7 @@ public WriterSettings build() {
writerSettings.retryPause = this.retryPause;
writerSettings.maxRetryDuration = this.maxRetryDuration;
writerSettings.maxRequestWaitCallbackTime = this.maxRequestWaitCallbackTime;
writerSettings.retrySettings = this.retrySettings;
return writerSettings;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void writeWithRetry(String traceId, Rows<A> rows, int retryCount) thro
traceId, rows.getStream(), rows.getTable(), rows.getOffset(), rows.getData().size(), retryCount
);
var result = append(traceId, rows);
var callback = new AppendCallBack<>(this, result.writerId, traceId, rows, retryCount);
var callback = new AppendCallBack<>(this, traceId, rows, retryCount);
ApiFutures.addCallback(result.response, callback, appendExecutor);
inflightRequestCount.register();
} catch (AppendException exception) {
Expand Down Expand Up @@ -130,14 +130,12 @@ static class AppendCallBack<A> implements ApiFutureCallback<AppendRowsResponse>
private final BigQueryDefaultSinkWriter<A> parent;
private final Rows<A> rows;

private final String writerId;
private final String traceId;

private final int retryCount;

public AppendCallBack(BigQueryDefaultSinkWriter<A> parent, String writerId, String traceId, Rows<A> rows, int retryCount) {
public AppendCallBack(BigQueryDefaultSinkWriter<A> parent, String traceId, Rows<A> rows, int retryCount) {
this.parent = parent;
this.writerId = writerId;
this.traceId = traceId;
this.rows = rows;
this.retryCount = retryCount;
Expand All @@ -155,79 +153,8 @@ public void onSuccess(AppendRowsResponse result) {

@Override
public void onFailure(Throwable t) {
var status = Status.fromThrowable(t);
switch (status.getCode()) {
case INTERNAL:
case CANCELLED:
case FAILED_PRECONDITION:
case DEADLINE_EXCEEDED:
doPauseBeforeRetry();
retryWrite(t, retryCount - 1);
break;
case ABORTED:
case UNAVAILABLE: {
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
break;
}
case INVALID_ARGUMENT:
if (t.getMessage().contains("INVALID_ARGUMENT: MessageSize is too large.")) {
Optional.ofNullable(this.parent.metrics.get(rows.getStream())).ifPresent(BigQueryStreamMetrics::incSplitCount);
logger.warn("Trace-id {} MessageSize is too large. Splitting batch", traceId);
var data = rows.getData();
var first = data.subList(0, data.size() / 2);
var second = data.subList(data.size() / 2, data.size());
try {
this.parent.writeWithRetry(traceId, rows.updateBatch(first, rows.getOffset()), retryCount - 1);
this.parent.writeWithRetry(traceId, rows.updateBatch(second, rows.getOffset() + first.size()), retryCount - 1);
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
case UNKNOWN:
if (t instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException || t.getCause() instanceof Exceptions.MaximumRequestCallbackWaitTimeExceededException) {
logger.info("Trace-id {} request timed out: {}", traceId, t.getMessage());
Optional.ofNullable(this.parent.metrics.get(rows.getStream()))
.ifPresent(BigQueryStreamMetrics::incrementTimeoutCount);
this.parent.recreateStreamWriter(traceId, rows.getStream(), writerId, rows.getTable());
retryWrite(t, retryCount - 1);
} else {
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
break;
default:
logger.error("Trace-id {} Received error {} with status {}", traceId, t.getMessage(), status.getCode());
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
}
this.parent.appendAsyncException = new AppendException(traceId, rows, retryCount, t);
this.parent.inflightRequestCount.arriveAndDeregister();
}

private void retryWrite(Throwable t, int newRetryCount) {
var status = Status.fromThrowable(t);
try {
if (newRetryCount > 0) {
logger.warn("Trace-id {} Recoverable error {}. Retrying {} ...", traceId, status.getCode(), retryCount);
this.parent.writeWithRetry(traceId, rows, newRetryCount);
} else {
logger.error("Trace-id {} Recoverable error {}. No more retries left", traceId, status.getCode(), t);
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, t);
}
} catch (Throwable e) {
this.parent.appendAsyncException = new AppendException(traceId, rows, newRetryCount, e);
}
}

private void doPauseBeforeRetry() {
try {
Thread.sleep(parent.clientProvider.writeSettings().getRetryPause().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Loading

0 comments on commit ca9a289

Please sign in to comment.