Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transaction batching Kafka producer #735

Merged
merged 18 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ preprocessorConfig:
numStreamThreads: ${KAFKA_STREAM_THREADS:-2}
processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}
kafkaConfig:
kafkaTopic: ${KAFKA_TOPIC:-test-topic}
kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
additionalProps: ${KAFKA_ADDITIONAL_PROPS:-}

serverConfig:
serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086}
Expand All @@ -136,5 +140,4 @@ preprocessorConfig:
dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json}
rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1}
kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0}
bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092}
useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false}
8 changes: 8 additions & 0 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,14 @@
<skipSortingImports>false</skipSortingImports>
<style>google</style>
</configuration>
<dependencies>
<!-- Remove after https://github.com/spotify/fmt-maven-plugin/pull/185 merged -->
<dependency>
<groupId>com.google.googlejavaformat</groupId>
<artifactId>google-java-format</artifactId>
<version>1.19.1</version>
</dependency>
</dependencies>
<executions>
<execution>
<goals>
Expand Down
105 changes: 105 additions & 0 deletions kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package com.slack.kaldb.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is responsible for defining the http endpoint behavior for the bulk ingest. It is
* expected to handle appropriate rate limiting, error handling, and submit the parsed messages to
* Kafka for ingestion.
*/
public class BulkIngestApi {
private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class);
private final BulkIngestKafkaProducer bulkIngestKafkaProducer;
private final DatasetRateLimitingService datasetRateLimitingService;
private final MeterRegistry meterRegistry;
private final Counter incomingByteTotal;
private final Timer bulkIngestTimer;
private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte";
private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest";

public BulkIngestApi(
BulkIngestKafkaProducer bulkIngestKafkaProducer,
DatasetRateLimitingService datasetRateLimitingService,
MeterRegistry meterRegistry) {

this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
this.datasetRateLimitingService = datasetRateLimitingService;
this.meterRegistry = meterRegistry;
this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
}

@Post("/_bulk")
public HttpResponse addDocument(String bulkRequest) {
// 1. Kaldb does not support the concept of "updates". It's always an add.
// 2. The "index" is used as the span name
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
Timer.Sample sample = Timer.start(meterRegistry);
future.thenRun(() -> sample.stop(bulkIngestTimer));

try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
// so today as a limitation we reject any request that has documents against
// multiple indexes
// We think most indexing requests will be against 1 index
if (docs.keySet().size() > 1) {
BulkIngestResponse response =
new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
return HttpResponse.of(future);
}

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
final String index = indexDocs.getKey();
if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response));
return HttpResponse.of(future);
}
}

// todo - explore the possibility of using the blocking task executor backed by virtual
// threads to fulfill this
Thread.ofVirtual()
bryanlb marked this conversation as resolved.
Show resolved Hide resolved
.start(
() -> {
try {
BulkIngestResponse response =
bulkIngestKafkaProducer.submitRequest(docs).getResponse();
future.complete(HttpResponse.ofJson(response));
} catch (InterruptedException e) {
LOG.error("Request failed ", e);
future.complete(
HttpResponse.ofJson(
INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage())));
}
});
} catch (Exception e) {
LOG.error("Request failed ", e);
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}

return HttpResponse.of(future);
}
}
Loading