Skip to content

Commit

Permalink
Move bulk ingest to non-blocking virtual threads
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanlb committed Jan 4, 2024
1 parent 412a86f commit e2c2727
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.annotation.Blocking;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
Expand All @@ -14,6 +13,7 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,44 +44,59 @@ public BulkIngestApi(
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
}

@Blocking
@Post("/_bulk")
public HttpResponse addDocument(String bulkRequest) {
// 1. Kaldb does not support the concept of "updates". It's always an add.
// 2. The "index" is used as the span name
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
Timer.Sample sample = Timer.start(meterRegistry);
try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);

// todo - our rate limiter doesn't have a way to acquire permits across multiple datasets
// so today as a limitation we reject any request that has documents against multiple indexes
// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
// so today as a limitation we reject any request that has documents against
// multiple indexes
// We think most indexing requests will be against 1 index
if (docs.keySet().size() > 1) {
BulkIngestResponse response =
new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response);
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
return HttpResponse.ofJson(TOO_MANY_REQUESTS, response);
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
}
}

// getResponse will cause this thread to wait until the batch producer submits or it hits
// Armeria timeout
BulkIngestResponse response = bulkIngestKafkaProducer.submitRequest(docs).getResponse();
return HttpResponse.ofJson(response);
// getResponse will cause this thread to wait until the batch producer submits or it
// hits Armeria timeout
Thread.ofVirtual()
.start(
() -> {
try {
BulkIngestResponse response =
bulkIngestKafkaProducer.submitRequest(docs).getResponse();
future.complete(HttpResponse.ofJson(response));
} catch (InterruptedException e) {
LOG.error("Request failed ", e);
future.complete(
HttpResponse.ofJson(
INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage())));
}
});
} catch (Exception e) {
LOG.error("Request failed ", e);
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response);
} finally {
sample.stop(bulkIngestTimer);
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}
future.thenRun(() -> sample.stop(bulkIngestTimer));

return HttpResponse.of(future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.pool2.BasePooledObjectFactory;
Expand Down Expand Up @@ -76,6 +76,9 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
"bulk_ingest_producer_over_limit_pending";
private final Counter overLimitPendingRequests;

public static final String PENDING_REQUEST_QUEUE_SIZE = "bulk_ingest_pending_request_queue_size";
private AtomicInteger pendingRequestQueueSize;

public static final String STALL_COUNTER = "bulk_ingest_producer_stall_counter";
private final Counter stallCounter;

Expand All @@ -86,6 +89,8 @@ public class BulkIngestKafkaProducer extends AbstractExecutionThreadService {
Set.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);

public final int maxBatchSize;

public BulkIngestKafkaProducer(
final DatasetMetadataStore datasetMetadataStore,
final KaldbConfigs.PreprocessorConfig preprocessorConfig,
Expand All @@ -103,11 +108,13 @@ public BulkIngestKafkaProducer(

// todo - consider making these a configurable value, or determine a way to derive a reasonable
// value automatically
this.pendingRequests =
new ArrayBlockingQueue<>(
Integer.parseInt(System.getProperty("kalDb.bulkIngest.pendingLimit", "500")));
this.pendingRequests = new LinkedBlockingQueue<>();
// new ArrayBlockingQueue<>(
// Integer.parseInt(System.getProperty("kalDb.bulkIngest.pendingLimit", "500")));
this.producerSleep =
Integer.parseInt(System.getProperty("kalDb.bulkIngest.producerSleep", "2000"));
Integer.parseInt(System.getProperty("kalDb.bulkIngest.producerSleep", "200"));
this.maxBatchSize =
Integer.parseInt(System.getProperty("kalDb.bulkIngest.maxBatchSize", "1000"));

// since we use a new transaction ID every time we start a preprocessor there can be some zombie
// transactions?
Expand All @@ -116,7 +123,10 @@ public BulkIngestKafkaProducer(
// see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/
GenericObjectPoolConfig<KafkaProducer<String, byte[]>> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(
Integer.parseInt(System.getProperty("kalDb.bulkIngest.pooledKafkaProducers", "16")));
Integer.parseInt(
System.getProperty(
"kalDb.bulkIngest.pooledKafkaProducers",
String.valueOf(Runtime.getRuntime().availableProcessors()))));
this.kafkaProducerPool =
new GenericObjectPool<>(
new BasePooledObjectFactory<>() {
Expand Down Expand Up @@ -147,6 +157,8 @@ public PooledObject<KafkaProducer<String, byte[]>> wrap(
this.overLimitPendingRequests = meterRegistry.counter(OVER_LIMIT_PENDING_REQUESTS);
this.stallCounter = meterRegistry.counter(STALL_COUNTER);
this.batchSizeGauge = meterRegistry.gauge(BATCH_SIZE_GAUGE, new AtomicInteger(0));
this.pendingRequestQueueSize =
meterRegistry.gauge(PENDING_REQUEST_QUEUE_SIZE, new AtomicInteger(0));
}

private void cacheSortedDataset() {
Expand All @@ -167,8 +179,10 @@ protected void startUp() throws Exception {
@Override
protected void run() throws Exception {
while (isRunning()) {
pendingRequestQueueSize.set(pendingRequests.size());

List<BulkIngestRequest> requests = new ArrayList<>();
pendingRequests.drainTo(requests);
pendingRequests.drainTo(requests, maxBatchSize);
batchSizeGauge.set(requests.size());
if (requests.isEmpty()) {
try {
Expand Down

0 comments on commit e2c2727

Please sign in to comment.