Skip to content

Commit

Permalink
Merge pull request #23 from vinted/feat/async-writer
Browse files Browse the repository at this point in the history
Feat/async writer
  • Loading branch information
gintarasm authored Mar 18, 2024
2 parents ca9a289 + 638fb28 commit 8065b79
Show file tree
Hide file tree
Showing 12 changed files with 1,005 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.vinted.flink.bigquery.metric;

import java.util.concurrent.atomic.AtomicInteger;

public class AsyncBigQueryStreamMetrics {

private long batchCount = 0;
private double batchSizeInMb = 0.0;
private long splitBatchCount = 0;
private int timeoutCount = 0;
private final AtomicInteger inflightRequests = new AtomicInteger(0);

public void incSplitCount() {
splitBatchCount += 1;
}
public void updateSize(long sizeInBytes) {
batchSizeInMb = sizeInBytes / 1000000.0;
}

public long getBatchCount() {
return batchCount;
}

public void setBatchCount(long batchCount) {
this.batchCount = batchCount;
}

public double getBatchSizeInMb() {
return batchSizeInMb;
}

public long getSplitBatchCount() {
return splitBatchCount;
}

public int getTimeoutCount() {
return timeoutCount;
}

public void incrementTimeoutCount() {
this.timeoutCount++;
}

public int getInflightRequests() {
return inflightRequests.get();
}

public void incrementInflightRequests() {
this.inflightRequests.incrementAndGet();
}

public void decrementInflightRequests() {
this.inflightRequests.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.vinted.flink.bigquery.sink.async;

import com.vinted.flink.bigquery.model.Rows;
import com.vinted.flink.bigquery.serializer.RowValueSerializer;
import com.vinted.flink.bigquery.sink.ExecutorProvider;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.base.sink.writer.strategy.RateLimitingStrategy;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;

public class AsyncBigQuerySink<A> extends AsyncSinkBase<Rows<A>, StreamRequest> {
private final AsyncClientProvider provider;
private final RateLimitingStrategy strategy;

private final ExecutorProvider executorProvider;

public static <A> AsyncBigQuerySinkBuilder<A> builder() {
return new AsyncBigQuerySinkBuilder<>();
}

protected AsyncBigQuerySink(ExecutorProvider executorProvider, AsyncClientProvider provider, RateLimitingStrategy rateLimitingStrategy, ElementConverter<Rows<A>, StreamRequest> elementConverter, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes) {
super(elementConverter, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes);
this.executorProvider = executorProvider;
this.provider = provider;
this.strategy = rateLimitingStrategy;
}

@Override
public StatefulSinkWriter<Rows<A>, BufferedRequestState<StreamRequest>> createWriter(InitContext initContext) throws IOException {
return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.setRateLimitingStrategy(strategy)
.build(),
List.of()
);
}

@Override
public StatefulSinkWriter<Rows<A>, BufferedRequestState<StreamRequest>> restoreWriter(InitContext initContext, Collection<BufferedRequestState<StreamRequest>> collection) throws IOException {
return new AsyncBigQuerySinkWriter<>(executorProvider, provider, this.getElementConverter(), initContext,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.setRateLimitingStrategy(strategy)
.build(),
collection
);
}

@Override
public SimpleVersionedSerializer<BufferedRequestState<StreamRequest>> getWriterStateSerializer() {
return new StreamRequestSerializer();
}

static public class AsyncBigQuerySinkBuilder<A> extends AsyncSinkBaseBuilder<Rows<A>, StreamRequest, AsyncBigQuerySinkBuilder<A>> {
private static final int DEFAULT_MAX_BATCH_SIZE = 1;
private static final int DEFAULT_IN_FLIGHT_REQUESTS = 4;
private static final int DEFAULT_MAX_BUFFERED_REQUESTS = DEFAULT_MAX_BATCH_SIZE + 1;
private static final int DEFAULT_MAX_BATCH_SIZE_IN_BYTES = 500000000; //500MB

private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = Duration.ofSeconds(10).toMillis();

private static final long DEFAULT_MAX_RECORD_SIZE_IN_BYTES = 10000000;
private AsyncClientProvider provider;

private RowValueSerializer<A> serializer;

private RateLimitingStrategy strategy = null;

private ExecutorProvider executorProvider = () -> Executors.newFixedThreadPool(4);

public AsyncBigQuerySinkBuilder<A> setClientProvider(AsyncClientProvider provider) {
this.provider = provider;
return this;
}

public AsyncBigQuerySinkBuilder<A> setRowSerializer(RowValueSerializer<A> serializer) {
this.serializer = serializer;
return this;
}

public AsyncBigQuerySinkBuilder<A> setRateLimitStrategy(RateLimitingStrategy strategy) {
this.strategy = strategy;
return this;
}

public AsyncBigQuerySinkBuilder<A> setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return this;
}

@Override
public AsyncSinkBase<Rows<A>, StreamRequest> build() {
if (getMaxBatchSize() == null) {
setMaxBatchSize(DEFAULT_MAX_BATCH_SIZE);
}

if (getMaxInFlightRequests() == null) {
setMaxInFlightRequests(DEFAULT_IN_FLIGHT_REQUESTS);
}

if (getMaxBufferedRequests() == null) {
setMaxBufferedRequests(DEFAULT_MAX_BUFFERED_REQUESTS);
}

if (getMaxBatchSizeInBytes() == null) {
setMaxBatchSizeInBytes(DEFAULT_MAX_BATCH_SIZE_IN_BYTES);
}

if (getMaxTimeInBufferMS() == null) {
setMaxTimeInBufferMS(DEFAULT_MAX_TIME_IN_BUFFER_MS);
}

if (getMaxRecordSizeInBytes() == null) {
setMaxRecordSizeInBytes(DEFAULT_MAX_RECORD_SIZE_IN_BYTES);
}

return new AsyncBigQuerySink<>(
this.executorProvider,
this.provider,
this.strategy,
new ProtoElementConverter<>(this.serializer, this.provider.writeSettings().getRetryCount()),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes()
);
}
}
}
Loading

0 comments on commit 8065b79

Please sign in to comment.