diff --git a/config/config.yaml b/config/config.yaml index 9835654318..6c95431fd1 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -125,6 +125,10 @@ preprocessorConfig: numStreamThreads: ${KAFKA_STREAM_THREADS:-2} processingGuarantee: ${KAFKA_STREAM_PROCESSING_GUARANTEE:-at_least_once} additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} + kafkaConfig: + kafkaTopic: ${KAFKA_TOPIC:-test-topic} + kafkaBootStrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} + additionalProps: ${KAFKA_ADDITIONAL_PROPS:-} serverConfig: serverPort: ${KALDB_PREPROCESSOR_SERVER_PORT:-8086} @@ -136,5 +140,4 @@ preprocessorConfig: dataTransformer: ${PREPROCESSOR_TRANSFORMER:-json} rateLimiterMaxBurstSeconds: ${PREPROCESSOR_RATE_LIMITER_MAX_BURST_SECONDS:-1} kafkaPartitionStickyTimeoutMs: ${KAFKA_PARTITION_STICKY_TIMEOUT_MS:-0} - bootstrapServers: ${KAFKA_BOOTSTRAP_SERVERS:-localhost:9092} useBulkApi: ${KALDB_PREPROCESSOR_USE_BULK_API:-false} diff --git a/kaldb/pom.xml b/kaldb/pom.xml index 0421035554..2a666aff8d 100644 --- a/kaldb/pom.xml +++ b/kaldb/pom.xml @@ -687,6 +687,14 @@ false + + + + com.google.googlejavaformat + google-java-format + 1.19.1 + + diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java new file mode 100644 index 0000000000..baf3686fbd --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestApi.java @@ -0,0 +1,105 @@ +package com.slack.kaldb.bulkIngestApi; + +import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; +import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.server.annotation.Post; +import com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser; +import com.slack.service.murron.trace.Trace; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Timer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class is responsible for defining the http endpoint behavior for the bulk ingest. It is + * expected to handle appropriate rate limiting, error handling, and submit the parsed messages to + * Kafka for ingestion. + */ +public class BulkIngestApi { + private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class); + private final BulkIngestKafkaProducer bulkIngestKafkaProducer; + private final DatasetRateLimitingService datasetRateLimitingService; + private final MeterRegistry meterRegistry; + private final Counter incomingByteTotal; + private final Timer bulkIngestTimer; + private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "kaldb_preprocessor_incoming_byte"; + private final String BULK_INGEST_TIMER = "kaldb_preprocessor_bulk_ingest"; + + public BulkIngestApi( + BulkIngestKafkaProducer bulkIngestKafkaProducer, + DatasetRateLimitingService datasetRateLimitingService, + MeterRegistry meterRegistry) { + + this.bulkIngestKafkaProducer = bulkIngestKafkaProducer; + this.datasetRateLimitingService = datasetRateLimitingService; + this.meterRegistry = meterRegistry; + this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL); + this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER); + } + + @Post("/_bulk") + public HttpResponse addDocument(String bulkRequest) { + // 1. Kaldb does not support the concept of "updates". It's always an add. + // 2. The "index" is used as the span name + CompletableFuture future = new CompletableFuture<>(); + Timer.Sample sample = Timer.start(meterRegistry); + future.thenRun(() -> sample.stop(bulkIngestTimer)); + + try { + byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8); + incomingByteTotal.increment(bulkRequestBytes.length); + Map> docs = BulkApiRequestParser.parseRequest(bulkRequestBytes); + + // todo - our rate limiter doesn't have a way to acquire permits across multiple + // datasets + // so today as a limitation we reject any request that has documents against + // multiple indexes + // We think most indexing requests will be against 1 index + if (docs.keySet().size() > 1) { + BulkIngestResponse response = + new BulkIngestResponse(0, 0, "request must contain only 1 unique index"); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + return HttpResponse.of(future); + } + + for (Map.Entry> indexDocs : docs.entrySet()) { + final String index = indexDocs.getKey(); + if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) { + BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded"); + future.complete(HttpResponse.ofJson(TOO_MANY_REQUESTS, response)); + return HttpResponse.of(future); + } + } + + // todo - explore the possibility of using the blocking task executor backed by virtual + // threads to fulfill this + Thread.ofVirtual() + .start( + () -> { + try { + BulkIngestResponse response = + bulkIngestKafkaProducer.submitRequest(docs).getResponse(); + future.complete(HttpResponse.ofJson(response)); + } catch (InterruptedException e) { + LOG.error("Request failed ", e); + future.complete( + HttpResponse.ofJson( + INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage()))); + } + }); + } catch (Exception e) { + LOG.error("Request failed ", e); + BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); + future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response)); + } + + return HttpResponse.of(future); + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java new file mode 100644 index 0000000000..651c49b6c4 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducer.java @@ -0,0 +1,298 @@ +package com.slack.kaldb.bulkIngestApi; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE; +import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; +import com.slack.kaldb.metadata.dataset.DatasetMetadata; +import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; +import com.slack.kaldb.preprocessor.PreprocessorService; +import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.util.RuntimeHalterImpl; +import com.slack.kaldb.writer.KafkaUtils; +import com.slack.service.murron.trace.Trace; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.TimeoutException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BulkIngestKafkaProducer extends AbstractExecutionThreadService { + private static final Logger LOG = LoggerFactory.getLogger(BulkIngestKafkaProducer.class); + + private final KafkaProducer kafkaProducer; + + private final KafkaClientMetrics kafkaMetrics; + + private final KaldbConfigs.KafkaConfig kafkaConfig; + + private final DatasetMetadataStore datasetMetadataStore; + private final KaldbMetadataStoreChangeListener datasetListener = + (_) -> cacheSortedDataset(); + + protected List throughputSortedDatasets; + + private final BlockingQueue pendingRequests; + + private final Integer producerSleepMs; + + public static final String FAILED_SET_RESPONSE_COUNTER = + "bulk_ingest_producer_failed_set_response"; + private final Counter failedSetResponseCounter; + public static final String STALL_COUNTER = "bulk_ingest_producer_stall_counter"; + private final Counter stallCounter; + + public static final String BATCH_SIZE_GAUGE = "bulk_ingest_producer_batch_size"; + private final AtomicInteger batchSizeGauge; + + private static final Set OVERRIDABLE_CONFIGS = + Set.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); + + public BulkIngestKafkaProducer( + final DatasetMetadataStore datasetMetadataStore, + final KaldbConfigs.PreprocessorConfig preprocessorConfig, + final PrometheusMeterRegistry meterRegistry) { + + this.kafkaConfig = preprocessorConfig.getKafkaConfig(); + + checkArgument( + !kafkaConfig.getKafkaBootStrapServers().isEmpty(), + "Kafka bootstrapServers must be provided"); + + checkArgument(!kafkaConfig.getKafkaTopic().isEmpty(), "Kafka topic must be provided"); + + this.datasetMetadataStore = datasetMetadataStore; + this.pendingRequests = new LinkedBlockingQueue<>(); + + // todo - consider making this a configurable value or removing the config + this.producerSleepMs = + Integer.parseInt(System.getProperty("kalDb.bulkIngest.producerSleepMs", "50")); + + // since we use a new transaction ID every time we start a preprocessor there can be some zombie + // transactions? + // I think they will remain in kafka till they expire. They should never be readable if the + // consumer sets isolation.level as "read_committed" + // see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/ + this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString()); + this.kafkaMetrics = new KafkaClientMetrics(kafkaProducer); + this.kafkaMetrics.bindTo(meterRegistry); + + this.failedSetResponseCounter = meterRegistry.counter(FAILED_SET_RESPONSE_COUNTER); + this.stallCounter = meterRegistry.counter(STALL_COUNTER); + this.batchSizeGauge = meterRegistry.gauge(BATCH_SIZE_GAUGE, new AtomicInteger(0)); + + this.kafkaProducer.initTransactions(); + } + + private void cacheSortedDataset() { + // we sort the datasets to rank from which dataset do we start matching candidate service names + // in the future we can change the ordering from sort to something else + this.throughputSortedDatasets = + datasetMetadataStore.listSync().stream() + .sorted(Comparator.comparingLong(DatasetMetadata::getThroughputBytes).reversed()) + .toList(); + } + + @Override + protected void startUp() throws Exception { + cacheSortedDataset(); + datasetMetadataStore.addListener(datasetListener); + } + + @Override + protected void run() throws Exception { + while (isRunning()) { + List requests = new ArrayList<>(); + pendingRequests.drainTo(requests); + batchSizeGauge.set(requests.size()); + if (requests.isEmpty()) { + try { + stallCounter.increment(); + Thread.sleep(producerSleepMs); + } catch (InterruptedException e) { + return; + } + } else { + produceDocumentsAndCommit(requests); + } + } + } + + @Override + protected void shutDown() throws Exception { + datasetMetadataStore.removeListener(datasetListener); + + kafkaProducer.close(); + if (kafkaMetrics != null) { + kafkaMetrics.close(); + } + } + + public BulkIngestRequest submitRequest(Map> inputDocs) { + BulkIngestRequest request = new BulkIngestRequest(inputDocs); + pendingRequests.add(request); + return request; + } + + protected Map produceDocumentsAndCommit( + List requests) { + Map responseMap = new HashMap<>(); + + // KafkaProducer kafkaProducer = null; + try { + // kafkaProducer = kafkaProducerPool.borrowObject(); + kafkaProducer.beginTransaction(); + for (BulkIngestRequest request : requests) { + responseMap.put(request, produceDocuments(request.getInputDocs(), kafkaProducer)); + } + kafkaProducer.commitTransaction(); + // kafkaProducerPool.returnObject(kafkaProducer); + } catch (TimeoutException te) { + LOG.error("Commit transaction timeout", te); + // the commitTransaction waits till "max.block.ms" after which it will time out + // in that case we cannot call abort exception because that throws the following error + // "Cannot attempt operation `abortTransaction` because the previous + // call to `commitTransaction` timed out and must be retried" + // so for now we just restart the preprocessor + new RuntimeHalterImpl() + .handleFatal( + new Throwable( + "KafkaProducer needs to shutdown as we don't have retry yet and we cannot call abortTxn on timeout", + te)); + } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { + // We can't recover from these exceptions, so our only option is to close the producer and + // exit. + new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e)); + } catch (Exception e) { + LOG.warn("failed transaction with error", e); + if (kafkaProducer != null) { + try { + kafkaProducer.abortTransaction(); + // kafkaProducerPool.returnObject(kafkaProducer); + } catch (ProducerFencedException err) { + LOG.error("Could not abort transaction", err); + } + } + + for (BulkIngestRequest request : requests) { + responseMap.put( + request, + new BulkIngestResponse( + 0, + request.getInputDocs().values().stream().mapToInt(List::size).sum(), + e.getMessage())); + } + } + + for (Map.Entry entry : responseMap.entrySet()) { + BulkIngestRequest key = entry.getKey(); + BulkIngestResponse value = entry.getValue(); + if (!key.setResponse(value)) { + LOG.warn("Failed to add result to the bulk ingest request, consumer thread went away?"); + failedSetResponseCounter.increment(); + } + } + return responseMap; + } + + @SuppressWarnings("FutureReturnValueIgnored") + private BulkIngestResponse produceDocuments( + Map> indexDocs, KafkaProducer kafkaProducer) { + int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum(); + + // we cannot create a generic pool of producers because the kafka API expects the transaction ID + // to be a property while creating the producer object. + for (Map.Entry> indexDoc : indexDocs.entrySet()) { + String index = indexDoc.getKey(); + + // call once per batch and use the same partition for better batching + // todo - this probably shouldn't be tied to the transaction batching logic? + int partition = getPartition(index); + + // since there isn't a dataset provisioned for this service/index we will not index this set + // of docs + if (partition < 0) { + LOG.warn("index=" + index + " does not have a provisioned dataset associated with it"); + continue; + } + + // KafkaProducer does not allow creating multiple transactions from a single object - + // rightfully so. + // Till we fix the producer design to allow for multiple /_bulk requests to be able to + // write to the same txn + // we will limit producing documents 1 thread at a time + for (Trace.Span doc : indexDoc.getValue()) { + ProducerRecord producerRecord = + new ProducerRecord<>(kafkaConfig.getKafkaTopic(), partition, index, doc.toByteArray()); + + // we intentionally suppress FutureReturnValueIgnored here in errorprone - this is because + // we wrap this in a transaction, which is responsible for flushing all of the pending + // messages + kafkaProducer.send(producerRecord); + } + } + + return new BulkIngestResponse(totalDocs, 0, ""); + } + + private KafkaProducer createKafkaTransactionProducer(String transactionId) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getKafkaBootStrapServers()); + props.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + props.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArraySerializer"); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId); + + // don't override the properties that we have already set explicitly using named properties + for (Map.Entry additionalProp : + kafkaConfig.getAdditionalPropsMap().entrySet()) { + props = + KafkaUtils.maybeOverrideProps( + props, + additionalProp.getKey(), + additionalProp.getValue(), + OVERRIDABLE_CONFIGS.contains(additionalProp.getKey())); + } + return new KafkaProducer<>(props); + } + + private int getPartition(String index) { + for (DatasetMetadata datasetMetadata : throughputSortedDatasets) { + String serviceNamePattern = datasetMetadata.getServiceNamePattern(); + + if (serviceNamePattern.equals(MATCH_ALL_SERVICE) + || serviceNamePattern.equals(MATCH_STAR_SERVICE) + || index.equals(serviceNamePattern)) { + List partitions = PreprocessorService.getActivePartitionList(datasetMetadata); + return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); + } + } + // We don't have a provisioned service for this index + return -1; + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestRequest.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestRequest.java new file mode 100644 index 0000000000..8b47a0310b --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestRequest.java @@ -0,0 +1,32 @@ +package com.slack.kaldb.bulkIngestApi; + +import com.slack.service.murron.trace.Trace; +import java.util.List; +import java.util.Map; +import java.util.concurrent.SynchronousQueue; + +/** + * Wrapper object to enable building a bulk request and awaiting on an asynchronous response to be + * populated. As this uses a synchronous queue internally, it expects a thread to already be waiting + * on getResponse when setResponse is invoked with the result data. + */ +public class BulkIngestRequest { + private final Map> inputDocs; + private final SynchronousQueue internalResponse = new SynchronousQueue<>(); + + protected BulkIngestRequest(Map> inputDocs) { + this.inputDocs = inputDocs; + } + + Map> getInputDocs() { + return inputDocs; + } + + boolean setResponse(BulkIngestResponse response) { + return internalResponse.offer(response); + } + + public BulkIngestResponse getResponse() throws InterruptedException { + return internalResponse.take(); + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestResponse.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestResponse.java new file mode 100644 index 0000000000..fa1947ae56 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/BulkIngestResponse.java @@ -0,0 +1,4 @@ +package com.slack.kaldb.bulkIngestApi; + +/** Metadata object for the result of a bulk ingest request */ +public record BulkIngestResponse(int totalDocs, long failedDocs, String errorMsg) {} diff --git a/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/DatasetRateLimitingService.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/DatasetRateLimitingService.java new file mode 100644 index 0000000000..c8cd900aba --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/DatasetRateLimitingService.java @@ -0,0 +1,74 @@ +package com.slack.kaldb.bulkIngestApi; + +import com.google.common.util.concurrent.AbstractIdleService; +import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; +import com.slack.kaldb.metadata.dataset.DatasetMetadata; +import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; +import com.slack.kaldb.preprocessor.PreprocessorRateLimiter; +import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.service.murron.trace.Trace; +import io.micrometer.core.instrument.Timer; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import java.util.List; +import java.util.function.BiPredicate; + +/** + * Guava service that maintains an rate limiting object consistent with the value stored in the + * dataset metadata store. + */ +public class DatasetRateLimitingService extends AbstractIdleService { + private final DatasetMetadataStore datasetMetadataStore; + private final KaldbMetadataStoreChangeListener datasetListener = + (_) -> updateRateLimiter(); + + private final PreprocessorRateLimiter rateLimiter; + private BiPredicate> rateLimiterPredicate; + + private final PrometheusMeterRegistry meterRegistry; + public static final String RATE_LIMIT_RELOAD_TIMER = + "preprocessor_dataset_rate_limit_reload_timer"; + private final Timer rateLimitReloadtimer; + + public DatasetRateLimitingService( + DatasetMetadataStore datasetMetadataStore, + KaldbConfigs.PreprocessorConfig preprocessorConfig, + PrometheusMeterRegistry meterRegistry) { + this.datasetMetadataStore = datasetMetadataStore; + this.meterRegistry = meterRegistry; + + this.rateLimiter = + new PreprocessorRateLimiter( + meterRegistry, + preprocessorConfig.getPreprocessorInstanceCount(), + preprocessorConfig.getRateLimiterMaxBurstSeconds(), + true); + + this.rateLimitReloadtimer = meterRegistry.timer(RATE_LIMIT_RELOAD_TIMER); + } + + private void updateRateLimiter() { + Timer.Sample sample = Timer.start(meterRegistry); + try { + List datasetMetadataList = datasetMetadataStore.listSync(); + this.rateLimiterPredicate = rateLimiter.createBulkIngestRateLimiter(datasetMetadataList); + } finally { + // TODO: re-work this so that we can add success/failure tags and capture them + sample.stop(rateLimitReloadtimer); + } + } + + @Override + protected void startUp() throws Exception { + updateRateLimiter(); + datasetMetadataStore.addListener(datasetListener); + } + + @Override + protected void shutDown() throws Exception { + datasetMetadataStore.removeListener(datasetListener); + } + + public boolean tryAcquire(String index, List value) { + return rateLimiterPredicate.test(index, value); + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkApiRequestParser.java b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java similarity index 79% rename from kaldb/src/main/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkApiRequestParser.java rename to kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java index 891e056ab1..527ecb77c5 100644 --- a/kaldb/src/main/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkApiRequestParser.java +++ b/kaldb/src/main/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -1,10 +1,9 @@ -package com.slack.kaldb.preprocessor.ingest; +package com.slack.kaldb.bulkIngestApi.opensearch; import com.google.protobuf.ByteString; import com.slack.kaldb.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.ArrayList; @@ -22,13 +21,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class OpenSearchBulkApiRequestParser { +/** + * This class uses the Opensearch libraries to parse the bulk ingest request into documents which + * can be inserted into Kafka. The goal of this is to leverage Opensearch where possible, while + * preventing opensearch abstractions from leaking further into KalDB. + */ +public class BulkApiRequestParser { - public static final Logger LOG = LoggerFactory.getLogger(OpenSearchBulkApiRequestParser.class); + private static final Logger LOG = LoggerFactory.getLogger(BulkApiRequestParser.class); - private static String SERVICE_NAME_KEY = "service_name"; + private static final String SERVICE_NAME_KEY = "service_name"; - public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { + public static Map> parseRequest(byte[] postBody) throws IOException { + return convertIndexRequestToTraceFormat(parseBulkRequest(postBody)); + } + + protected static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { ZonedDateTime timestamp = (ZonedDateTime) ingestDocument @@ -69,9 +77,9 @@ public static Trace.Span fromIngestDocument(IngestDocument ingestDocument) { return spanBuilder.build(); } - // key - index. value - list of docs to be indexed - public static Map> convertIndexRequestToTraceFormat( + protected static Map> convertIndexRequestToTraceFormat( List indexRequests) { + // key - index. value - list of docs to be indexed Map> indexDocs = new HashMap<>(); for (IndexRequest indexRequest : indexRequests) { @@ -81,7 +89,7 @@ public static Map> convertIndexRequestToTraceFormat( } IngestDocument ingestDocument = convertRequestToDocument(indexRequest); List docs = indexDocs.computeIfAbsent(index, key -> new ArrayList<>()); - docs.add(OpenSearchBulkApiRequestParser.fromIngestDocument(ingestDocument)); + docs.add(BulkApiRequestParser.fromIngestDocument(ingestDocument)); } return indexDocs; } @@ -101,12 +109,11 @@ protected static IngestDocument convertRequestToDocument(IndexRequest indexReque // and transform it } - public static List parseBulkRequest(String postBody) throws IOException { + protected static List parseBulkRequest(byte[] postBody) throws IOException { List indexRequests = new ArrayList<>(); BulkRequest bulkRequest = new BulkRequest(); // calls parse under the hood - byte[] bytes = postBody.getBytes(StandardCharsets.UTF_8); - bulkRequest.add(bytes, 0, bytes.length, null, MediaTypeRegistry.JSON); + bulkRequest.add(postBody, 0, postBody.length, null, MediaTypeRegistry.JSON); List> requests = bulkRequest.requests(); for (DocWriteRequest request : requests) { if (request.opType() == DocWriteRequest.OpType.INDEX) { diff --git a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/BulkIngestResponse.java b/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/BulkIngestResponse.java deleted file mode 100644 index 8d40b9d867..0000000000 --- a/kaldb/src/main/java/com/slack/kaldb/elasticsearchApi/BulkIngestResponse.java +++ /dev/null @@ -1,3 +0,0 @@ -package com.slack.kaldb.elasticsearchApi; - -public record BulkIngestResponse(int totalDocs, long failedDocs, String errorMsg) {} diff --git a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java index 1e87100fbd..5351ec9b01 100644 --- a/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java +++ b/kaldb/src/main/java/com/slack/kaldb/preprocessor/PreprocessorService.java @@ -2,7 +2,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; -import static com.slack.kaldb.writer.kafka.KaldbKafkaConsumer.maybeOverride; import com.google.common.util.concurrent.AbstractService; import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; @@ -10,6 +9,7 @@ import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.writer.KafkaUtils; import com.slack.service.murron.trace.Trace; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -313,7 +313,9 @@ protected static Properties makeKafkaStreamsProps( // don't override any property we already set for (Map.Entry additionalProp : kafkaStreamConfig.getAdditionalPropsMap().entrySet()) { - maybeOverride(props, additionalProp.getKey(), additionalProp.getValue(), false); + props = + KafkaUtils.maybeOverrideProps( + props, additionalProp.getKey(), additionalProp.getValue(), false); } return props; diff --git a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java index 4e2a94ab15..8c93374235 100644 --- a/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java +++ b/kaldb/src/main/java/com/slack/kaldb/server/Kaldb.java @@ -5,6 +5,9 @@ import com.google.common.util.concurrent.ServiceManager; import com.slack.kaldb.blobfs.BlobFs; import com.slack.kaldb.blobfs.s3.S3CrtBlobFs; +import com.slack.kaldb.bulkIngestApi.BulkIngestApi; +import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer; +import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService; import com.slack.kaldb.chunkManager.CachingChunkManager; import com.slack.kaldb.chunkManager.IndexingChunkManager; import com.slack.kaldb.clusterManager.ClusterHpaMetricService; @@ -388,12 +391,16 @@ private static Set getServices( KaldbConfigs.NodeRole.PREPROCESSOR, List.of(datasetMetadataStore))); if (preprocessorConfig.getUseBulkApi()) { - // TODO: using an armeria service that is also a guava service does not look elegant - // explore ways where we can control the lifecycle without the need for a guava service here - OpenSearchBulkIngestApi openSearchBulkApiService = - new OpenSearchBulkIngestApi(datasetMetadataStore, preprocessorConfig, meterRegistry); + BulkIngestKafkaProducer bulkIngestKafkaProducer = + new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry); + services.add(bulkIngestKafkaProducer); + DatasetRateLimitingService datasetRateLimitingService = + new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry); + services.add(datasetRateLimitingService); + + BulkIngestApi openSearchBulkApiService = + new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry); armeriaServiceBuilder.withAnnotatedService(openSearchBulkApiService); - services.add(openSearchBulkApiService); } else { PreprocessorService preprocessorService = new PreprocessorService(datasetMetadataStore, preprocessorConfig, meterRegistry); diff --git a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java b/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java deleted file mode 100644 index 32aaf4b760..0000000000 --- a/kaldb/src/main/java/com/slack/kaldb/server/OpenSearchBulkIngestApi.java +++ /dev/null @@ -1,300 +0,0 @@ -package com.slack.kaldb.server; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR; -import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS; -import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_ALL_SERVICE; -import static com.slack.kaldb.metadata.dataset.DatasetMetadata.MATCH_STAR_SERVICE; -import static com.slack.kaldb.preprocessor.PreprocessorService.CONFIG_RELOAD_TIMER; -import static com.slack.kaldb.preprocessor.PreprocessorService.INITIALIZE_RATE_LIMIT_WARM; -import static com.slack.kaldb.preprocessor.PreprocessorService.filterValidDatasetMetadata; -import static com.slack.kaldb.preprocessor.PreprocessorService.sortDatasetsOnThroughput; - -import com.google.common.util.concurrent.AbstractService; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.server.annotation.Blocking; -import com.linecorp.armeria.server.annotation.Post; -import com.slack.kaldb.elasticsearchApi.BulkIngestResponse; -import com.slack.kaldb.metadata.core.KaldbMetadataStoreChangeListener; -import com.slack.kaldb.metadata.dataset.DatasetMetadata; -import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; -import com.slack.kaldb.preprocessor.PreprocessorRateLimiter; -import com.slack.kaldb.preprocessor.PreprocessorService; -import com.slack.kaldb.preprocessor.ingest.OpenSearchBulkApiRequestParser; -import com.slack.kaldb.proto.config.KaldbConfigs; -import com.slack.kaldb.util.RuntimeHalterImpl; -import com.slack.service.murron.trace.Trace; -import io.micrometer.core.instrument.Timer; -import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; -import io.micrometer.prometheus.PrometheusMeterRegistry; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiPredicate; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.errors.AuthorizationException; -import org.apache.kafka.common.errors.OutOfOrderSequenceException; -import org.apache.kafka.common.errors.ProducerFencedException; -import org.apache.kafka.common.errors.TimeoutException; -import org.opensearch.action.index.IndexRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * batching is important - if we send one doc a time we will create a transaction per request which - * is expensive - */ -public class OpenSearchBulkIngestApi extends AbstractService { - - private static final Logger LOG = LoggerFactory.getLogger(OpenSearchBulkIngestApi.class); - private final PrometheusMeterRegistry meterRegistry; - - private final KaldbConfigs.PreprocessorConfig preprocessorConfig; - private final DatasetMetadataStore datasetMetadataStore; - - private final KaldbMetadataStoreChangeListener datasetListener = - (datasetMetadata) -> load(); - - private final PreprocessorRateLimiter rateLimiter; - private BiPredicate> rateLimiterPredicate; - protected List throughputSortedDatasets; - - private final Timer configReloadTimer; - - private final KafkaProducer kafkaProducer; - private final KafkaClientMetrics kafkaMetrics; - - private final ReentrantLock lockTransactionalProducer = new ReentrantLock(); - - @Override - protected void doStart() { - try { - LOG.info("Starting OpenSearchBulkIngestApi service"); - load(); - datasetMetadataStore.addListener(datasetListener); - LOG.info("OpenSearchBulkIngestAPI service started"); - notifyStarted(); - } catch (Throwable t) { - notifyFailed(t); - } - } - - @Override - protected void doStop() { - try { - LOG.info("Stopping OpenSearchBulkIngestApi service"); - datasetMetadataStore.removeListener(datasetListener); - kafkaProducer.close(); - if (kafkaMetrics != null) { - kafkaMetrics.close(); - } - LOG.info("OpenSearchBulkIngestApi service closed"); - notifyStopped(); - } catch (Throwable t) { - notifyFailed(t); - } - } - - public void load() { - Timer.Sample sample = Timer.start(meterRegistry); - try { - List datasetMetadataList = datasetMetadataStore.listSync(); - // only attempt to register stream processing on valid dataset configurations - List datasetMetadataToProcesses = - filterValidDatasetMetadata(datasetMetadataList); - - checkState(!datasetMetadataToProcesses.isEmpty(), "dataset metadata list must not be empty"); - - this.throughputSortedDatasets = sortDatasetsOnThroughput(datasetMetadataToProcesses); - this.rateLimiterPredicate = - rateLimiter.createBulkIngestRateLimiter(datasetMetadataToProcesses); - } catch (Exception e) { - notifyFailed(e); - } finally { - // TODO: re-work this so that we can add success/failure tags and capture them - sample.stop(configReloadTimer); - } - } - - public OpenSearchBulkIngestApi( - DatasetMetadataStore datasetMetadataStore, - KaldbConfigs.PreprocessorConfig preprocessorConfig, - PrometheusMeterRegistry meterRegistry) { - this(datasetMetadataStore, preprocessorConfig, meterRegistry, INITIALIZE_RATE_LIMIT_WARM); - } - - public OpenSearchBulkIngestApi( - DatasetMetadataStore datasetMetadataStore, - KaldbConfigs.PreprocessorConfig preprocessorConfig, - PrometheusMeterRegistry meterRegistry, - boolean initializeRateLimitWarm) { - - checkArgument( - !preprocessorConfig.getBootstrapServers().isEmpty(), - "Kafka bootstrapServers must be provided"); - - checkArgument( - !preprocessorConfig.getDownstreamTopic().isEmpty(), - "Kafka downstreamTopic must be provided"); - - this.datasetMetadataStore = datasetMetadataStore; - this.preprocessorConfig = preprocessorConfig; - this.meterRegistry = meterRegistry; - this.rateLimiter = - new PreprocessorRateLimiter( - meterRegistry, - preprocessorConfig.getPreprocessorInstanceCount(), - preprocessorConfig.getRateLimiterMaxBurstSeconds(), - initializeRateLimitWarm); - - this.configReloadTimer = meterRegistry.timer(CONFIG_RELOAD_TIMER); - - // since we use a new transaction ID every time we start a preprocessor there can be some zombie - // transactions? - // I think they will remain in kafka till they expire. They should never be readable if the - // consumer sets isolation.level as "read_committed" - // see "zombie fencing" https://www.confluent.io/blog/transactions-apache-kafka/ - this.kafkaProducer = createKafkaTransactionProducer(UUID.randomUUID().toString()); - kafkaMetrics = new KafkaClientMetrics(kafkaProducer); - kafkaMetrics.bindTo(meterRegistry); - this.kafkaProducer.initTransactions(); - } - - /** - * 1. Kaldb does not support the concept of "updates". It's always an add 2. The "index" is used - * as the span name - */ - @Blocking - @Post("/_bulk") - public HttpResponse addDocument(String bulkRequest) { - try { - List indexRequests = - OpenSearchBulkApiRequestParser.parseBulkRequest(bulkRequest); - Map> docs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); - // our rate limiter doesn't have a way to acquire permits across multiple datasets - // so today as a limitation we reject any request that has documents against multiple indexes - // We think most indexing requests will be against 1 index - if (docs.keySet().size() > 1) { - BulkIngestResponse response = - new BulkIngestResponse(0, 0, "request must contain only 1 unique index"); - return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response); - } - - for (Map.Entry> indexDocs : docs.entrySet()) { - final String index = indexDocs.getKey(); - if (!rateLimiterPredicate.test(index, indexDocs.getValue())) { - BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded"); - return HttpResponse.ofJson(TOO_MANY_REQUESTS, response); - } - } - BulkIngestResponse response = produceDocuments(docs); - return HttpResponse.ofJson(response); - } catch (Exception e) { - LOG.error("Request failed ", e); - BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage()); - return HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response); - } - } - - @SuppressWarnings("FutureReturnValueIgnored") - public BulkIngestResponse produceDocuments(Map> indexDocs) { - int totalDocs = indexDocs.values().stream().mapToInt(List::size).sum(); - - // we cannot create a generic pool of producers because the kafka API expects the transaction ID - // to be a property while creating the producer object. - for (Map.Entry> indexDoc : indexDocs.entrySet()) { - String index = indexDoc.getKey(); - // call once per batch and use the same partition for better batching - int partition = getPartition(index); - - // since there isn't a dataset provisioned for this service/index we will not index this set - // of docs - if (partition < 0) { - LOG.warn("index=" + index + " does not have a provisioned dataset associated with it"); - continue; - } - - // KafkaProducer does not allow creating multiple transactions from a single object - - // rightfully so. - // Till we fix the producer design to allow for multiple /_bulk requests to be able to - // write to the same txn - // we will limit producing documents 1 thread at a time - lockTransactionalProducer.lock(); - try { - kafkaProducer.beginTransaction(); - for (Trace.Span doc : indexDoc.getValue()) { - ProducerRecord producerRecord = - new ProducerRecord<>( - preprocessorConfig.getDownstreamTopic(), partition, index, doc.toByteArray()); - - // we intentionally supress FutureReturnValueIgnored here in errorprone - this is because - // we wrap this in a transaction, which is responsible for flushing all of the pending - // messages - kafkaProducer.send(producerRecord); - } - kafkaProducer.commitTransaction(); - } catch (TimeoutException te) { - LOG.error("Commit transaction timeout", te); - // the commitTransaction waits till "max.block.ms" after which it will time out - // in that case we cannot call abort exception because that throws the following error - // "Cannot attempt operation `abortTransaction` because the previous - // call to `commitTransaction` timed out and must be retried" - // so for now we just restart the preprocessor - new RuntimeHalterImpl() - .handleFatal( - new Throwable( - "KafkaProducer needs to shutdown as we don't have retry yet and we cannot call abortTxn on timeout", - te)); - } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { - // We can't recover from these exceptions, so our only option is to close the producer and - // exit. - new RuntimeHalterImpl().handleFatal(new Throwable("KafkaProducer needs to shutdown ", e)); - } catch (Exception e) { - LOG.warn("failed transaction with error", e); - try { - kafkaProducer.abortTransaction(); - } catch (ProducerFencedException err) { - LOG.error("Could not abort transaction", err); - } - return new BulkIngestResponse(0, totalDocs, e.getMessage()); - } finally { - lockTransactionalProducer.unlock(); - } - } - - return new BulkIngestResponse(totalDocs, 0, ""); - } - - private int getPartition(String index) { - for (DatasetMetadata datasetMetadata : throughputSortedDatasets) { - String serviceNamePattern = datasetMetadata.getServiceNamePattern(); - - if (serviceNamePattern.equals(MATCH_ALL_SERVICE) - || serviceNamePattern.equals(MATCH_STAR_SERVICE) - || index.equals(serviceNamePattern)) { - List partitions = PreprocessorService.getActivePartitionList(datasetMetadata); - return partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); - } - } - // We don't have a provisioned service for this index - return -1; - } - - private KafkaProducer createKafkaTransactionProducer(String transactionId) { - Properties props = new Properties(); - props.put("bootstrap.servers", preprocessorConfig.getBootstrapServers()); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - props.put("transactional.id", transactionId); - props.put("linger.ms", 250); - props.put("max.block.ms", "10000"); - props.put("compression.type", "snappy"); - return new KafkaProducer<>(props); - } -} diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java b/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java new file mode 100644 index 0000000000..7c351a6f00 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/writer/KafkaUtils.java @@ -0,0 +1,35 @@ +package com.slack.kaldb.writer; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Shared kafka functions for producers, consumers, and stream applications */ +public class KafkaUtils { + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class); + + @VisibleForTesting + public static Properties maybeOverrideProps( + Properties inputProps, String key, String value, boolean override) { + Properties changedProps = (Properties) inputProps.clone(); + String userValue = changedProps.getProperty(key); + if (userValue != null) { + if (override) { + LOG.warn( + String.format( + "Property %s is provided but will be overridden from %s to %s", + key, userValue, value)); + changedProps.setProperty(key, value); + } else { + LOG.warn( + String.format( + "Property %s is provided but won't be overridden from %s to %s", + key, userValue, value)); + } + } else { + changedProps.setProperty(key, value); + } + return changedProps; + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java b/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java index 368cd158c7..7e151df9d5 100644 --- a/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java +++ b/kaldb/src/main/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumer.java @@ -9,6 +9,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.slack.kaldb.proto.config.KaldbConfigs; import com.slack.kaldb.server.KaldbConfig; +import com.slack.kaldb.writer.KafkaUtils; import com.slack.kaldb.writer.LogMessageWriterImpl; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.MeterRegistry; @@ -82,40 +83,16 @@ public static Properties makeKafkaConsumerProps(KaldbConfigs.KafkaConfig kafkaCo // don't override the properties that we have already set explicitly using named properties for (Map.Entry additionalProp : kafkaConfig.getAdditionalPropsMap().entrySet()) { - maybeOverride( - props, - additionalProp.getKey(), - additionalProp.getValue(), - OVERRIDABLE_CONFIGS.contains(additionalProp.getKey())); + props = + KafkaUtils.maybeOverrideProps( + props, + additionalProp.getKey(), + additionalProp.getValue(), + OVERRIDABLE_CONFIGS.contains(additionalProp.getKey())); } return props; } - @VisibleForTesting - public static boolean maybeOverride( - Properties props, String key, String value, boolean override) { - boolean overridden = false; - String userValue = props.getProperty(key); - if (userValue != null) { - if (override) { - LOG.warn( - String.format( - "Property %s is provided but will be overridden from %s to %s", - key, userValue, value)); - props.setProperty(key, value); - overridden = true; - } else { - LOG.warn( - String.format( - "Property %s is provided but won't be overridden from %s to %s", - key, userValue, value)); - } - } else { - props.setProperty(key, value); - } - return overridden; - } - private KafkaConsumer kafkaConsumer; private final TopicPartition topicPartition; diff --git a/kaldb/src/main/proto/kaldb_configs.proto b/kaldb/src/main/proto/kaldb_configs.proto index 17b1656939..21ac233489 100644 --- a/kaldb/src/main/proto/kaldb_configs.proto +++ b/kaldb/src/main/proto/kaldb_configs.proto @@ -266,8 +266,8 @@ message PreprocessorConfig { // more docs on PreprocessorPartitioner#getDatasetPartitionSuppliers int32 kafka_partition_sticky_timeout_ms = 8; - // this value needs to be set if the bulk API is used to bootstrap the producer kafka + // Kafka config needs to be set if the bulk API is used to bootstrap the producer kafka // we plan on moving everything to the bulk API and removing KafkaStreamConfig in the future - string bootstrap_servers = 9; + KafkaConfig kafka_config = 9; bool use_bulk_api = 10; } diff --git a/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java new file mode 100644 index 0000000000..3035a09180 --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/BulkIngestKafkaProducerTest.java @@ -0,0 +1,264 @@ +package com.slack.kaldb.bulkIngestApi; + +import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import brave.Tracing; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.slack.kaldb.metadata.core.CuratorBuilder; +import com.slack.kaldb.metadata.dataset.DatasetMetadata; +import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; +import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; +import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.testlib.TestKafkaServer; +import com.slack.service.murron.trace.Trace; +import io.micrometer.prometheus.PrometheusConfig; +import io.micrometer.prometheus.PrometheusMeterRegistry; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.curator.test.TestingServer; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class BulkIngestKafkaProducerTest { + private static final Logger LOG = LoggerFactory.getLogger(BulkIngestKafkaProducerTest.class); + private static PrometheusMeterRegistry meterRegistry; + private static AsyncCuratorFramework curatorFramework; + private static KaldbConfigs.PreprocessorConfig preprocessorConfig; + private static DatasetMetadataStore datasetMetadataStore; + private static TestingServer zkServer; + private static TestKafkaServer kafkaServer; + + private BulkIngestKafkaProducer bulkIngestKafkaProducer; + + static String INDEX_NAME = "testtransactionindex"; + + private static String DOWNSTREAM_TOPIC = "test-transaction-topic-out"; + + @BeforeEach + public void bootstrapCluster() throws Exception { + Tracing.newBuilder().build(); + meterRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + + zkServer = new TestingServer(); + KaldbConfigs.ZookeeperConfig zkConfig = + KaldbConfigs.ZookeeperConfig.newBuilder() + .setZkConnectString(zkServer.getConnectString()) + .setZkPathPrefix("testZK") + .setZkSessionTimeoutMs(1000) + .setZkConnectionTimeoutMs(1000) + .setSleepBetweenRetriesMs(1000) + .build(); + curatorFramework = CuratorBuilder.build(meterRegistry, zkConfig); + + kafkaServer = new TestKafkaServer(); + kafkaServer.createTopicWithPartitions(DOWNSTREAM_TOPIC, 5); + + KaldbConfigs.ServerConfig serverConfig = + KaldbConfigs.ServerConfig.newBuilder() + .setServerPort(8080) + .setServerAddress("localhost") + .build(); + KaldbConfigs.KafkaConfig kafkaConfig = + KaldbConfigs.KafkaConfig.newBuilder() + .setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaTopic(DOWNSTREAM_TOPIC) + .build(); + preprocessorConfig = + KaldbConfigs.PreprocessorConfig.newBuilder() + .setKafkaConfig(kafkaConfig) + .setUseBulkApi(true) + .setServerConfig(serverConfig) + .setPreprocessorInstanceCount(1) + .setRateLimiterMaxBurstSeconds(1) + .build(); + + datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); + DatasetMetadata datasetMetadata = + new DatasetMetadata( + INDEX_NAME, + "owner", + 1, + List.of(new DatasetPartitionMetadata(1, Long.MAX_VALUE, List.of("0"))), + INDEX_NAME); + // Create an entry while init. Update the entry on every test run + datasetMetadataStore.createSync(datasetMetadata); + + bulkIngestKafkaProducer = + new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry); + bulkIngestKafkaProducer.startAsync(); + bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION); + } + + @Test + public void testDocumentInKafkaTransactionError() throws Exception { + KafkaConsumer kafkaConsumer = getTestKafkaConsumer(); + + // we want to inject a failure in the second doc and test if the abort transaction works and we + // don't index the first document + Trace.Span doc1 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error1")).build(); + Trace.Span doc2 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error2")).build(); + Trace.Span doc3 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error3")).build(); + Trace.Span doc4 = spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error4")).build()); + when(doc4.toByteArray()).thenThrow(new RuntimeException("exception")); + Trace.Span doc5 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error5")).build(); + + Map> indexDocs = + Map.of(INDEX_NAME, List.of(doc1, doc2, doc3, doc4, doc5)); + + BulkIngestRequest request1 = new BulkIngestRequest(indexDocs); + Thread.ofVirtual() + .start( + () -> { + try { + // because of the synchronous queue, we need someone consuming the response before + // we attempt to set it + request1.getResponse(); + } catch (InterruptedException ignored) { + } + }); + BulkIngestResponse responseObj = + (BulkIngestResponse) + bulkIngestKafkaProducer + .produceDocumentsAndCommit(List.of(request1)) + .values() + .toArray()[0]; + assertThat(responseObj.totalDocs()).isEqualTo(0); + assertThat(responseObj.failedDocs()).isEqualTo(5); + assertThat(responseObj.errorMsg()).isNotNull(); + + await() + .until( + () -> { + @SuppressWarnings("OptionalGetWithoutIsPresent") + long partitionOffset = + (Long) + kafkaConsumer + .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0))) + .values() + .stream() + .findFirst() + .get(); + LOG.debug( + "Current partitionOffset - {}. expecting offset to be less than 5", + partitionOffset); + return partitionOffset > 0 && partitionOffset < 5; + }); + + ConsumerRecords records = + kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS)); + + assertThat(records.count()).isEqualTo(0); + + long currentPartitionOffset = + (Long) + kafkaConsumer + .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0))) + .values() + .stream() + .findFirst() + .get(); + + Trace.Span doc6 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error6")).build(); + Trace.Span doc7 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error7")).build(); + Trace.Span doc8 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error8")).build(); + Trace.Span doc9 = + spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error9")).build()); + Trace.Span doc10 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error10")).build(); + + indexDocs = Map.of(INDEX_NAME, List.of(doc6, doc7, doc8, doc9, doc10)); + + BulkIngestRequest request2 = new BulkIngestRequest(indexDocs); + Thread.ofVirtual() + .start( + () -> { + try { + // because of the synchronous queue, we need someone consuming the response before + // we attempt to set it + request2.getResponse(); + } catch (InterruptedException ignored) { + } + }); + responseObj = + (BulkIngestResponse) + bulkIngestKafkaProducer + .produceDocumentsAndCommit(List.of(request2)) + .values() + .toArray()[0]; + assertThat(responseObj.totalDocs()).isEqualTo(5); + assertThat(responseObj.failedDocs()).isEqualTo(0); + assertThat(responseObj.errorMsg()).isNotNull(); + + // 5 docs. 1 control batch. initial offset was 1 after the first failed batch + validateOffset(kafkaConsumer, currentPartitionOffset + 5 + 1); + records = kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS)); + + assertThat(records.count()).isEqualTo(5); + records.forEach( + record -> + LOG.info( + "Trace= + " + TraceSpanParserSilenceError(record.value()).getId().toStringUtf8())); + + // close the kafka consumer used in the test + kafkaConsumer.close(); + } + + public KafkaConsumer getTestKafkaConsumer() { + // used to verify the message exist on the downstream topic + Properties properties = kafkaServer.getBroker().consumerConfig(); + properties.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); + properties.put("isolation.level", "read_committed"); + KafkaConsumer kafkaConsumer = new KafkaConsumer(properties); + kafkaConsumer.subscribe(List.of(DOWNSTREAM_TOPIC)); + return kafkaConsumer; + } + + public void validateOffset(KafkaConsumer kafkaConsumer, long expectedOffset) { + await() + .until( + () -> { + @SuppressWarnings("OptionalGetWithoutIsPresent") + long partitionOffset = + (Long) + kafkaConsumer + .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0))) + .values() + .stream() + .findFirst() + .get(); + LOG.debug( + "Current partitionOffset - {}. expecting offset to be - {}", + partitionOffset, + expectedOffset); + return partitionOffset == expectedOffset; + }); + } + + private static Trace.Span TraceSpanParserSilenceError(byte[] data) { + try { + return Trace.Span.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + return Trace.Span.newBuilder().build(); + } + } +} diff --git a/kaldb/src/test/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkRequestTest.java b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java similarity index 69% rename from kaldb/src/test/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkRequestTest.java rename to kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java index 62b3141f38..c25d76a548 100644 --- a/kaldb/src/test/java/com/slack/kaldb/preprocessor/ingest/OpenSearchBulkRequestTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/bulkIngestApi/opensearch/BulkApiRequestParserTest.java @@ -1,11 +1,13 @@ -package com.slack.kaldb.preprocessor.ingest; +package com.slack.kaldb.bulkIngestApi.opensearch; +import static com.slack.kaldb.bulkIngestApi.opensearch.BulkApiRequestParser.convertRequestToDocument; import static org.assertj.core.api.Assertions.assertThat; import com.google.common.io.Resources; import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; @@ -15,26 +17,27 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.ingest.IngestDocument; -public class OpenSearchBulkRequestTest { +public class BulkApiRequestParserTest { - private String getRawQueryString(String filename) throws IOException { + private byte[] getRawQueryBytes(String filename) throws IOException { return Resources.toString( - Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)), - Charset.defaultCharset()); + Resources.getResource(String.format("opensearchRequest/bulk/%s.ndjson", filename)), + Charset.defaultCharset()) + .getBytes(StandardCharsets.UTF_8); } @Test public void testSimpleIndexRequest() throws Exception { - String rawRequest = getRawQueryString("index_simple"); + byte[] rawRequest = getRawQueryBytes("index_simple"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); assertThat(indexRequests.get(0).index()).isEqualTo("test"); assertThat(indexRequests.get(0).id()).isEqualTo("1"); assertThat(indexRequests.get(0).sourceAsMap().size()).isEqualTo(2); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -52,12 +55,12 @@ public void testSimpleIndexRequest() throws Exception { @Test public void testIndexNoFields() throws Exception { - String rawRequest = getRawQueryString("index_no_fields"); + byte[] rawRequest = getRawQueryBytes("index_no_fields"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -75,12 +78,12 @@ public void testIndexNoFields() throws Exception { @Test public void testIndexNoFieldsNoId() throws Exception { - String rawRequest = getRawQueryString("index_no_fields_no_id"); + byte[] rawRequest = getRawQueryBytes("index_no_fields_no_id"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -98,29 +101,29 @@ public void testIndexNoFieldsNoId() throws Exception { @Test public void testIndexEmptyRequest() throws Exception { - String rawRequest = getRawQueryString("index_empty_request"); + byte[] rawRequest = getRawQueryBytes("index_empty_request"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(0); } @Test public void testOtherBulkRequests() throws Exception { - String rawRequest = getRawQueryString("non_index"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + byte[] rawRequest = getRawQueryBytes("non_index"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(0); } @Test public void testIndexRequestWithSpecialChars() throws Exception { - String rawRequest = getRawQueryString("index_request_with_special_chars"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + byte[] rawRequest = getRawQueryBytes("index_request_with_special_chars"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("index_name").size()).isEqualTo(1); @@ -138,12 +141,12 @@ public void testIndexRequestWithSpecialChars() throws Exception { @Test public void testBulkRequests() throws Exception { - String rawRequest = getRawQueryString("bulk_requests"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + byte[] rawRequest = getRawQueryBytes("bulk_requests"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(1); assertThat(indexDocs.get("test").size()).isEqualTo(1); @@ -161,12 +164,12 @@ public void testBulkRequests() throws Exception { @Test public void testUpdatesAgainstTwoIndexes() throws Exception { - String rawRequest = getRawQueryString("two_indexes"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + byte[] rawRequest = getRawQueryBytes("two_indexes"); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(2); Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); + BulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); assertThat(indexDocs.keySet().size()).isEqualTo(2); assertThat(indexDocs.get("test1").size()).isEqualTo(1); assertThat(indexDocs.get("test2").size()).isEqualTo(1); @@ -177,14 +180,13 @@ public void testUpdatesAgainstTwoIndexes() throws Exception { @Test public void testTraceSpanGeneratedTimestamp() throws IOException { - String rawRequest = getRawQueryString("index_simple"); + byte[] rawRequest = getRawQueryBytes("index_simple"); - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(rawRequest); + List indexRequests = BulkApiRequestParser.parseBulkRequest(rawRequest); assertThat(indexRequests.size()).isEqualTo(1); - IngestDocument ingestDocument = - OpenSearchBulkApiRequestParser.convertRequestToDocument(indexRequests.get(0)); - Trace.Span span = OpenSearchBulkApiRequestParser.fromIngestDocument(ingestDocument); + IngestDocument ingestDocument = convertRequestToDocument(indexRequests.get(0)); + Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument); // timestamp is in microseconds based on the trace.proto definition Instant ingestDocumentTime = diff --git a/kaldb/src/test/java/com/slack/kaldb/server/OpenSearchBulkIngestApiTest.java b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java similarity index 55% rename from kaldb/src/test/java/com/slack/kaldb/server/OpenSearchBulkIngestApiTest.java rename to kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java index 3581237c96..c112d35bc0 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/OpenSearchBulkIngestApiTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/BulkIngestApiTest.java @@ -5,32 +5,34 @@ import static com.linecorp.armeria.common.HttpStatus.TOO_MANY_REQUESTS; import static com.slack.kaldb.server.KaldbConfig.DEFAULT_START_STOP_DURATION; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.awaitility.Awaitility.await; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import brave.Tracing; -import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.linecorp.armeria.common.AggregatedHttpResponse; -import com.slack.kaldb.elasticsearchApi.BulkIngestResponse; +import com.slack.kaldb.bulkIngestApi.BulkIngestApi; +import com.slack.kaldb.bulkIngestApi.BulkIngestKafkaProducer; +import com.slack.kaldb.bulkIngestApi.BulkIngestResponse; +import com.slack.kaldb.bulkIngestApi.DatasetRateLimitingService; import com.slack.kaldb.metadata.core.CuratorBuilder; import com.slack.kaldb.metadata.dataset.DatasetMetadata; import com.slack.kaldb.metadata.dataset.DatasetMetadataStore; import com.slack.kaldb.metadata.dataset.DatasetPartitionMetadata; -import com.slack.kaldb.preprocessor.PreprocessorRateLimiter; -import com.slack.kaldb.preprocessor.ingest.OpenSearchBulkApiRequestParser; import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.testlib.MetricsUtil; import com.slack.kaldb.testlib.TestKafkaServer; import com.slack.kaldb.util.JsonUtil; import com.slack.service.murron.trace.Trace; import io.micrometer.prometheus.PrometheusConfig; import io.micrometer.prometheus.PrometheusMeterRegistry; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import org.apache.curator.test.TestingServer; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -40,21 +42,21 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.opensearch.action.index.IndexRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class OpenSearchBulkIngestApiTest { - - private static final Logger LOG = LoggerFactory.getLogger(OpenSearchBulkIngestApi.class); - +public class BulkIngestApiTest { + private static final Logger LOG = LoggerFactory.getLogger(BulkIngestApi.class); private static PrometheusMeterRegistry meterRegistry; private static AsyncCuratorFramework curatorFramework; private static KaldbConfigs.PreprocessorConfig preprocessorConfig; private static DatasetMetadataStore datasetMetadataStore; private static TestingServer zkServer; private static TestKafkaServer kafkaServer; - private OpenSearchBulkIngestApi openSearchBulkAPI; + private BulkIngestApi bulkApi; + + private BulkIngestKafkaProducer bulkIngestKafkaProducer; + private DatasetRateLimitingService datasetRateLimitingService; static String INDEX_NAME = "testindex"; @@ -84,14 +86,18 @@ public void bootstrapCluster() throws Exception { .setServerPort(8080) .setServerAddress("localhost") .build(); + KaldbConfigs.KafkaConfig kafkaConfig = + KaldbConfigs.KafkaConfig.newBuilder() + .setKafkaBootStrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaTopic(DOWNSTREAM_TOPIC) + .build(); preprocessorConfig = KaldbConfigs.PreprocessorConfig.newBuilder() - .setBootstrapServers(kafkaServer.getBroker().getBrokerList().get()) + .setKafkaConfig(kafkaConfig) .setUseBulkApi(true) .setServerConfig(serverConfig) .setPreprocessorInstanceCount(1) .setRateLimiterMaxBurstSeconds(1) - .setDownstreamTopic(DOWNSTREAM_TOPIC) .build(); datasetMetadataStore = new DatasetMetadataStore(curatorFramework, true); @@ -105,18 +111,29 @@ public void bootstrapCluster() throws Exception { // Create an entry while init. Update the entry on every test run datasetMetadataStore.createSync(datasetMetadata); - openSearchBulkAPI = - new OpenSearchBulkIngestApi(datasetMetadataStore, preprocessorConfig, meterRegistry, false); + datasetRateLimitingService = + new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry); + bulkIngestKafkaProducer = + new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry); - openSearchBulkAPI.startAsync(); - openSearchBulkAPI.awaitRunning(DEFAULT_START_STOP_DURATION); + datasetRateLimitingService.startAsync(); + bulkIngestKafkaProducer.startAsync(); + + datasetRateLimitingService.awaitRunning(DEFAULT_START_STOP_DURATION); + bulkIngestKafkaProducer.awaitRunning(DEFAULT_START_STOP_DURATION); + + bulkApi = new BulkIngestApi(bulkIngestKafkaProducer, datasetRateLimitingService, meterRegistry); } // I looked at making this a @BeforeEach. it's possible if you annotate a test with a @Tag and // pass throughputBytes. // However, decided not to go with that because it involved hardcoding the throughput bytes // when defining the test. We need it to be dynamic based on the size of the docs - public void updateDatasetThroughput(int throughputBytes) throws Exception { + public void updateDatasetThroughput(int throughputBytes) { + double timerCount = + MetricsUtil.getTimerCount( + DatasetRateLimitingService.RATE_LIMIT_RELOAD_TIMER, meterRegistry); + // dataset metadata already exists. Update with the throughput value DatasetMetadata datasetMetadata = new DatasetMetadata( @@ -127,17 +144,13 @@ public void updateDatasetThroughput(int throughputBytes) throws Exception { INDEX_NAME); datasetMetadataStore.updateSync(datasetMetadata); - // Need to wait until we've verified the rate limit has been correctly loaded + // Need to wait until the rate limit has been loaded await() .until( () -> - openSearchBulkAPI.throughputSortedDatasets.stream() - .filter(datasetMetadata1 -> datasetMetadata1.name.equals(INDEX_NAME)) - .findFirst(), - (storedDatasetMetadata) -> { - //noinspection OptionalGetWithoutIsPresent - return storedDatasetMetadata.get().getThroughputBytes() == throughputBytes; - }); + MetricsUtil.getTimerCount( + DatasetRateLimitingService.RATE_LIMIT_RELOAD_TIMER, meterRegistry) + > timerCount); } @AfterEach @@ -145,9 +158,13 @@ public void updateDatasetThroughput(int throughputBytes) throws Exception { // Instead of calling stop from every test and ensuring it's part of a finally block we just call // the shutdown code with the @AfterEach annotation public void shutdownOpenSearchAPI() throws Exception { - if (openSearchBulkAPI != null) { - openSearchBulkAPI.stopAsync(); - openSearchBulkAPI.awaitTerminated(DEFAULT_START_STOP_DURATION); + if (datasetRateLimitingService != null) { + datasetRateLimitingService.stopAsync(); + datasetRateLimitingService.awaitTerminated(DEFAULT_START_STOP_DURATION); + } + if (bulkIngestKafkaProducer != null) { + bulkIngestKafkaProducer.stopAsync(); + bulkIngestKafkaProducer.awaitTerminated(DEFAULT_START_STOP_DURATION); } kafkaServer.close(); curatorFramework.unwrap().close(); @@ -178,19 +195,10 @@ public void testBulkApiBasic() throws Exception { { "index": {"_index": "testindex", "_id": "1"} } { "field1" : "value1" } """; - // get num bytes that can be used to create the dataset. When we make 2 successive calls the - // second one should fail - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(request1); - Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); - assertThat(indexDocs.keySet().size()).isEqualTo(1); - assertThat(indexDocs.get("testindex").size()).isEqualTo(1); - assertThat(indexDocs.get("testindex").get(0).getId().toStringUtf8()).isEqualTo("1"); - int throughputBytes = PreprocessorRateLimiter.getSpanBytes(indexDocs.get("testindex")); - updateDatasetThroughput(throughputBytes); + updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length); // test with empty causes a parse exception - AggregatedHttpResponse response = openSearchBulkAPI.addDocument("{}\n").aggregate().join(); + AggregatedHttpResponse response = bulkApi.addDocument("{}\n").aggregate().join(); assertThat(response.status().isSuccess()).isEqualTo(false); assertThat(response.status().code()).isEqualTo(INTERNAL_SERVER_ERROR.code()); BulkIngestResponse responseObj = @@ -200,20 +208,49 @@ public void testBulkApiBasic() throws Exception { // test with request1 twice. first one should succeed, second one will fail because of rate // limiter - response = openSearchBulkAPI.addDocument(request1).aggregate().join(); - assertThat(response.status().isSuccess()).isEqualTo(true); - assertThat(response.status().code()).isEqualTo(OK.code()); - responseObj = JsonUtil.read(response.contentUtf8(), BulkIngestResponse.class); - assertThat(responseObj.totalDocs()).isEqualTo(1); - assertThat(responseObj.failedDocs()).isEqualTo(0); - - response = openSearchBulkAPI.addDocument(request1).aggregate().join(); - assertThat(response.status().isSuccess()).isEqualTo(false); - assertThat(response.status().code()).isEqualTo(TOO_MANY_REQUESTS.code()); - responseObj = JsonUtil.read(response.contentUtf8(), BulkIngestResponse.class); - assertThat(responseObj.totalDocs()).isEqualTo(0); - assertThat(responseObj.failedDocs()).isEqualTo(0); - assertThat(responseObj.errorMsg()).isEqualTo("rate limit exceeded"); + CompletableFuture response1 = + bulkApi + .addDocument(request1) + .aggregate() + .thenApply( + httpResponse -> { + assertThat(httpResponse.status().isSuccess()).isEqualTo(true); + assertThat(httpResponse.status().code()).isEqualTo(OK.code()); + BulkIngestResponse httpResponseObj = null; + try { + httpResponseObj = + JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); + } catch (IOException e) { + fail("", e); + } + assertThat(httpResponseObj.totalDocs()).isEqualTo(1); + assertThat(httpResponseObj.failedDocs()).isEqualTo(0); + return httpResponse; + }); + + CompletableFuture response2 = + bulkApi + .addDocument(request1) + .aggregate() + .thenApply( + httpResponse -> { + assertThat(httpResponse.status().isSuccess()).isEqualTo(false); + assertThat(httpResponse.status().code()).isEqualTo(TOO_MANY_REQUESTS.code()); + BulkIngestResponse httpResponseObj = null; + try { + httpResponseObj = + JsonUtil.read(httpResponse.contentUtf8(), BulkIngestResponse.class); + } catch (IOException e) { + fail("", e); + } + assertThat(httpResponseObj.totalDocs()).isEqualTo(0); + assertThat(httpResponseObj.failedDocs()).isEqualTo(0); + assertThat(httpResponseObj.errorMsg()).isEqualTo("rate limit exceeded"); + return httpResponse; + }); + + await().until(response1::isDone); + await().until(response2::isDone); // test with multiple indexes String request2 = @@ -223,7 +260,7 @@ public void testBulkApiBasic() throws Exception { { "index": {"_index": "testindex2", "_id": "1"} } { "field1" : "value1" } """; - response = openSearchBulkAPI.addDocument(request2).aggregate().join(); + response = bulkApi.addDocument(request2).aggregate().join(); assertThat(response.status().isSuccess()).isEqualTo(false); assertThat(response.status().code()).isEqualTo(INTERNAL_SERVER_ERROR.code()); responseObj = JsonUtil.read(response.contentUtf8(), BulkIngestResponse.class); @@ -241,17 +278,11 @@ public void testDocumentInKafkaSimple() throws Exception { { "index": {"_index": "testindex", "_id": "2"} } { "field1" : "value2" } """; - List indexRequests = OpenSearchBulkApiRequestParser.parseBulkRequest(request1); - Map> indexDocs = - OpenSearchBulkApiRequestParser.convertIndexRequestToTraceFormat(indexRequests); - assertThat(indexDocs.keySet().size()).isEqualTo(1); - assertThat(indexDocs.get("testindex").size()).isEqualTo(2); - int throughputBytes = PreprocessorRateLimiter.getSpanBytes(indexDocs.get("testindex")); - updateDatasetThroughput(throughputBytes); + updateDatasetThroughput(request1.getBytes(StandardCharsets.UTF_8).length); KafkaConsumer kafkaConsumer = getTestKafkaConsumer(); - AggregatedHttpResponse response = openSearchBulkAPI.addDocument(request1).aggregate().join(); + AggregatedHttpResponse response = bulkApi.addDocument(request1).aggregate().join(); assertThat(response.status().isSuccess()).isEqualTo(true); assertThat(response.status().code()).isEqualTo(OK.code()); BulkIngestResponse responseObj = @@ -279,89 +310,6 @@ record -> kafkaConsumer.close(); } - @Test - public void testDocumentInKafkaTransactionError() throws Exception { - updateDatasetThroughput(100_1000); - - KafkaConsumer kafkaConsumer = getTestKafkaConsumer(); - - // we want to inject a failure in the second doc and test if the abort transaction works and we - // don't index the first document - Trace.Span doc1 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error1")).build(); - Trace.Span doc2 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error2")).build(); - Trace.Span doc3 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error3")).build(); - Trace.Span doc4 = spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error4")).build()); - when(doc4.toByteArray()).thenThrow(new RuntimeException("exception")); - Trace.Span doc5 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("error5")).build(); - - Map> indexDocs = - Map.of("testindex", List.of(doc1, doc2, doc3, doc4, doc5)); - - BulkIngestResponse responseObj = openSearchBulkAPI.produceDocuments(indexDocs); - assertThat(responseObj.totalDocs()).isEqualTo(0); - assertThat(responseObj.failedDocs()).isEqualTo(5); - assertThat(responseObj.errorMsg()).isNotNull(); - - await() - .until( - () -> { - @SuppressWarnings("OptionalGetWithoutIsPresent") - long partitionOffset = - (Long) - kafkaConsumer - .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0))) - .values() - .stream() - .findFirst() - .get(); - LOG.debug( - "Current partitionOffset - {}. expecting offset to be less than 5", - partitionOffset); - return partitionOffset > 0 && partitionOffset < 5; - }); - - ConsumerRecords records = - kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS)); - - assertThat(records.count()).isEqualTo(0); - - long currentPartitionOffset = - (Long) - kafkaConsumer - .endOffsets(List.of(new TopicPartition(DOWNSTREAM_TOPIC, 0))) - .values() - .stream() - .findFirst() - .get(); - - Trace.Span doc6 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error6")).build(); - Trace.Span doc7 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error7")).build(); - Trace.Span doc8 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error8")).build(); - Trace.Span doc9 = - spy(Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error9")).build()); - Trace.Span doc10 = Trace.Span.newBuilder().setId(ByteString.copyFromUtf8("no_error10")).build(); - - indexDocs = Map.of("testindex", List.of(doc6, doc7, doc8, doc9, doc10)); - - responseObj = openSearchBulkAPI.produceDocuments(indexDocs); - assertThat(responseObj.totalDocs()).isEqualTo(5); - assertThat(responseObj.failedDocs()).isEqualTo(0); - assertThat(responseObj.errorMsg()).isNotNull(); - - // 5 docs. 1 control batch. initial offset was 1 after the first failed batch - validateOffset(kafkaConsumer, currentPartitionOffset + 5 + 1); - records = kafkaConsumer.poll(Duration.of(10, ChronoUnit.SECONDS)); - - assertThat(records.count()).isEqualTo(5); - records.forEach( - record -> - LOG.info( - "Trace= + " + TraceSpanParserSilenceError(record.value()).getId().toStringUtf8())); - - // close the kafka consumer used in the test - kafkaConsumer.close(); - } - public void validateOffset(KafkaConsumer kafkaConsumer, long expectedOffset) { await() .until( diff --git a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java index 9f72902287..adb80f0fea 100644 --- a/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/server/KaldbConfigTest.java @@ -295,7 +295,11 @@ public void testParseKaldbJsonConfigFile() throws IOException { assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(false); - assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092"); + + final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = + config.getPreprocessorConfig().getKafkaConfig(); + assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092"); + assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic"); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); @@ -466,8 +470,12 @@ public void testParseKaldbYamlConfigFile() throws IOException { assertThat(preprocessorConfig.getDataTransformer()).isEqualTo("api_log"); assertThat(preprocessorConfig.getRateLimiterMaxBurstSeconds()).isEqualTo(2); + final KaldbConfigs.KafkaConfig preprocessorKafkaConfig = + config.getPreprocessorConfig().getKafkaConfig(); + assertThat(preprocessorKafkaConfig.getKafkaBootStrapServers()).isEqualTo("localhost:9092"); + assertThat(preprocessorKafkaConfig.getKafkaTopic()).isEqualTo("test-topic"); + assertThat(preprocessorConfig.getUseBulkApi()).isEqualTo(true); - assertThat(preprocessorConfig.getBootstrapServers()).isEqualTo("localhost:9092"); final KaldbConfigs.ServerConfig preprocessorServerConfig = preprocessorConfig.getServerConfig(); assertThat(preprocessorServerConfig.getServerPort()).isEqualTo(8085); diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java new file mode 100644 index 0000000000..e3e425044b --- /dev/null +++ b/kaldb/src/test/java/com/slack/kaldb/writer/KafkaUtilsTest.java @@ -0,0 +1,48 @@ +package com.slack.kaldb.writer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.slack.kaldb.proto.config.KaldbConfigs; +import com.slack.kaldb.testlib.TestKafkaServer; +import com.slack.kaldb.writer.kafka.KaldbKafkaConsumer; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.jupiter.api.Test; + +class KafkaUtilsTest { + public static final String TEST_KAFKA_CLIENT_GROUP = "test_kaldb_consumer"; + + @Test + public void testOverridingProperties() { + KaldbConfigs.KafkaConfig kafkaConfig = + KaldbConfigs.KafkaConfig.newBuilder() + .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC) + .setKafkaTopicPartition("0") + .setKafkaBootStrapServers("bootstrap_server") + .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP) + .setEnableKafkaAutoCommit("true") + .setKafkaAutoCommitInterval("5000") + .setKafkaSessionTimeout("5000") + .build(); + + Properties properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig); + assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + .isEqualTo("org.apache.kafka.common.serialization.StringDeserializer"); + + kafkaConfig = + KaldbConfigs.KafkaConfig.newBuilder() + .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC) + .setKafkaTopicPartition("0") + .setKafkaBootStrapServers("bootstrap_server") + .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP) + .setEnableKafkaAutoCommit("true") + .setKafkaAutoCommitInterval("5000") + .setKafkaSessionTimeout("5000") + .putAdditionalProps(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "test_serializer") + .build(); + + properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig); + assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) + .isEqualTo("test_serializer"); + } +} diff --git a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java index fae04c0700..2185beb1bf 100644 --- a/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java +++ b/kaldb/src/test/java/com/slack/kaldb/writer/kafka/KaldbKafkaConsumerTest.java @@ -117,40 +117,6 @@ public void tearDown() throws Exception { metricsRegistry.close(); } - @Test - public void testOverridingProperties() { - KaldbConfigs.KafkaConfig kafkaConfig = - KaldbConfigs.KafkaConfig.newBuilder() - .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC) - .setKafkaTopicPartition("0") - .setKafkaBootStrapServers("bootstrap_server") - .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP) - .setEnableKafkaAutoCommit("true") - .setKafkaAutoCommitInterval("5000") - .setKafkaSessionTimeout("5000") - .build(); - - Properties properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig); - assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - .isEqualTo("org.apache.kafka.common.serialization.StringDeserializer"); - - kafkaConfig = - KaldbConfigs.KafkaConfig.newBuilder() - .setKafkaTopic(TestKafkaServer.TEST_KAFKA_TOPIC) - .setKafkaTopicPartition("0") - .setKafkaBootStrapServers("bootstrap_server") - .setKafkaClientGroup(TEST_KAFKA_CLIENT_GROUP) - .setEnableKafkaAutoCommit("true") - .setKafkaAutoCommitInterval("5000") - .setKafkaSessionTimeout("5000") - .putAdditionalProps(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "test_serializer") - .build(); - - properties = KaldbKafkaConsumer.makeKafkaConsumerProps(kafkaConfig); - assertThat(properties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) - .isEqualTo("test_serializer"); - } - @Test public void testGetEndOffsetForPartition() throws Exception { EphemeralKafkaBroker broker = kafkaServer.getBroker(); diff --git a/kaldb/src/test/resources/test_config.json b/kaldb/src/test/resources/test_config.json index fa675fa2a8..1d1f4e6833 100644 --- a/kaldb/src/test/resources/test_config.json +++ b/kaldb/src/test/resources/test_config.json @@ -142,6 +142,10 @@ "numStreamThreads": 2, "processingGuarantee": "at_least_once" }, + "kafkaConfig": { + "kafkaTopic": "test-topic", + "kafkaBootStrapServers": "localhost:9092" + }, "serverConfig": { "serverPort": 8085, "serverAddress": "localhost", diff --git a/kaldb/src/test/resources/test_config.yaml b/kaldb/src/test/resources/test_config.yaml index 00c96a30da..e5c62cd740 100644 --- a/kaldb/src/test/resources/test_config.yaml +++ b/kaldb/src/test/resources/test_config.yaml @@ -114,6 +114,10 @@ preprocessorConfig: applicationId: kaldb_preprocessor numStreamThreads: 2 processingGuarantee: at_least_once + kafkaConfig: + kafkaTopic: test-topic + kafkaBootStrapServers: "localhost:9092" + serverConfig: serverPort: 8085 serverAddress: localhost