From 2af06e174e529b1812f52569a68de31c4fe30e04 Mon Sep 17 00:00:00 2001 From: edeweerd Date: Fri, 11 Oct 2024 15:47:55 +0200 Subject: [PATCH] fix(impl): Support custom span creation in Processor Probably a very narrow use case. But it could happen. Along the way of a [recent PR](https://github.com/quarkiverse/quarkus-kafka-streams-processor/pull/109) by @nreant1A trying to make sure: 1. Baggage is properly propagated 2. Custom span creationg is supported in process method First added 1 QuarkusTest in impl. The implementation was: 1. extracting the parent context (but not making it current, by which baggage were lost) 2. creating a "process" span and making that a current span 3. early on reinjecting the new span context ids in the message before it starts being processed, relying on the fact that output message are created by modification of the incoming one: `ping.withValue(...)` The issue we have: baggage cannot be modified in the processor, and any other context modification in the processor is basically ignored (creating a custom span for instance). Solution: move the injection of the contextual data (traceparent, tracestate, baggage) at message production by implementing a TracingProducerInterceptor. The OpenTelemetryExtension for JUnit 5 has bean copy pasted so we can inject the W3CBaggagePropagator. Indeed the original class is final, with private constructor. A fix upstream will be done shortly. Also the KStreamTopologyDriverTest started failing in tests related to tracing. Actually in tests based on TopologyTestDriver, the ProducerOnSendInterceptors are not called. The solution is to transform those tests methods in ones using the Kafka Devservices. --- .../ProducerInterceptorPriorities.java | 34 +++ .../decorator/processor/TracingDecorator.java | 109 +++++----- .../producer/TracingProducerInterceptor.java | 61 ++++++ .../impl/KStreamTopologyDriverTest.java | 97 --------- ...racingBaggageAndCustomSpanQuarkusTest.java | 192 +++++++++++++++++ .../impl/TracingQuarkusTest.java | 204 ++++++++++++++++++ .../processor/TracingDecoratorTest.java | 38 ++-- .../TracingProducerInterceptorTest.java | 108 ++++++++++ .../impl/utils/KafkaHeaderUtils.java | 45 ++++ .../OpenTelemetryWithBaggageExtension.java | 198 +++++++++++++++++ 10 files changed, 904 insertions(+), 182 deletions(-) create mode 100644 api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java create mode 100644 impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingBaggageAndCustomSpanQuarkusTest.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingQuarkusTest.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java new file mode 100644 index 0000000..a5613ec --- /dev/null +++ b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java @@ -0,0 +1,34 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.api.decorator.producer; + +/** + * Priorities of the producer interceptors the framework provides. + */ +public final class ProducerInterceptorPriorities { + /** + * Priority of the interceptor that will inject the tracing headers for propagation. + */ + public static final int TRACING = 100; + + private ProducerInterceptorPriorities() { + + } +} diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java index 5597a68..b30dfd5 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java @@ -46,7 +46,6 @@ import com.google.protobuf.util.JsonFormat; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.StatusCode; @@ -59,7 +58,6 @@ import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter; -import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import lombok.extern.slf4j.Slf4j; /** @@ -85,11 +83,6 @@ public class TracingDecorator extends AbstractProcessorDecorator { */ private final KafkaTextMapGetter textMapGetter; - /** - * Injects Context in the Kafka headers of a message - */ - private final KafkaTextMapSetter textMapSetter; - /** * The tracer instance to create spans */ @@ -117,27 +110,22 @@ public class TracingDecorator extends AbstractProcessorDecorator { * The {@link OpenTelemetry} configured by Quarkus * @param textMapGetter * Extracts Context from the Kafka headers of a message - * @param textMapSetter - * Injects Context in the Kafka headers of a message * @param tracer * The tracer instance to create spans * @param configuration * The TopologyConfiguration after customization. */ @Inject - public TracingDecorator(OpenTelemetry openTelemetry, - KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer, + public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, Tracer tracer, TopologyConfigurationImpl configuration) { - this(openTelemetry, textMapGetter, textMapSetter, tracer, - configuration.getProcessorPayloadType().getName(), + this(openTelemetry, textMapGetter, tracer, configuration.getProcessorPayloadType().getName(), JsonFormat.printer()); } - public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, + public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) { this.openTelemetry = openTelemetry; this.textMapGetter = textMapGetter; - this.textMapSetter = textMapSetter; this.tracer = tracer; this.applicationName = applicationName; this.jsonPrinter = jsonPrinter; @@ -168,52 +156,55 @@ public void init(final ProcessorContext context) { public void process(Record record) { SpanBuilder spanBuilder = tracer.spanBuilder(applicationName); final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator(); - - // going through all propagation field names defined in the OTel configuration - // we look if any of them has been set with a non-null value in the headers of the incoming message - Context extractedContext = null; - if (propagator.fields() - .stream() - .map(record.headers()::lastHeader) - .anyMatch(Objects::nonNull)) { - // if that is the case, let's extract a Context initialized with the parent trace id, span id - // and baggage present as headers in the incoming message - extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter); - // use the context as parent span for the built span - spanBuilder.setParent(extractedContext); - // we clean the headers to avoid their propagation in any outgoing message (knowing that by - // default Kafka Streams copies all headers of the incoming message into any outgoing message) - propagator.fields().forEach(record.headers()::remove); - } - Span span = spanBuilder.startSpan(); - // baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span - // in opentelemetry) and actually lost as kafka headers are cleaned - try (Scope ignored = (extractedContext != null) ? Baggage.fromContext(extractedContext).makeCurrent() : Scope.noop(); - Scope scope = span.makeCurrent()) { - try { - // now that the context has been set to the new started child span of this microservice, we replace - // the headers in the incoming message so when an outgoing message is produced with the copied - // header values it already has the span id from this new child span - propagator.inject(Context.current(), record.headers(), textMapSetter); - getDelegate().process(record); - span.setStatus(StatusCode.OK); - } catch (KafkaException e) { - // we got a Kafka exception, we record the exception in the span, log but rethrow the exception - // with the idea that it will be caught by one of the DLQ in error management - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - logInputMessageMetadata(record); - throw e; - } catch (RuntimeException e) { // NOSONAR - // very last resort, even the DLQs are not working, then we still record the exception and - // log the message but do not rethrow the exception otherwise we'd end up in an infinite loop - log.error("Runtime error caught while processing the message", e); - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - logInputMessageMetadata(record); + Scope parentScope = null; + + try { + // going through all propagation field names defined in the OTel configuration + // we look if any of them has been set with a non-null value in the headers of the incoming message + if (propagator.fields() + .stream() + .map(record.headers()::lastHeader) + .anyMatch(Objects::nonNull)) { + // if that is the case, let's extract a Context initialized with the parent trace id, span id + // and baggage present as headers in the incoming message + Context extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter); + // use the context as parent span for the built span + spanBuilder.setParent(extractedContext); + // we clean the headers to avoid their propagation in any outgoing message (knowing that by + // default Kafka Streams copies all headers of the incoming message into any outgoing message) + propagator.fields().forEach(record.headers()::remove); + // we make the parent context current to not loose the baggage + parentScope = extractedContext.makeCurrent(); + } + Span span = spanBuilder.startSpan(); + // baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span + // in opentelemetry) and actually lost as kafka headers are cleaned + try (Scope ignored = span.makeCurrent()) { + try { + getDelegate().process(record); + span.setStatus(StatusCode.OK); + } catch (KafkaException e) { + // we got a Kafka exception, we record the exception in the span, log but rethrow the exception + // with the idea that it will be caught by one of the DLQ in error management + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); + logInputMessageMetadata(record); + throw e; + } catch (RuntimeException e) { // NOSONAR + // very last resort, even the DLQs are not working, then we still record the exception and + // log the message but do not rethrow the exception otherwise we'd end up in an infinite loop + log.error("Runtime error caught while processing the message", e); + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); + logInputMessageMetadata(record); + } + } finally { + span.end(); } } finally { - span.end(); + if (parentScope != null) { + parentScope.close(); + } } } diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java new file mode 100644 index 0000000..5950eb3 --- /dev/null +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java @@ -0,0 +1,61 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl.decorator.producer; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerInterceptorPriorities; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; + +/** + * Producer interceptor that injects the tracing headers for propagation. + */ +@ApplicationScoped +public class TracingProducerInterceptor implements ProducerOnSendInterceptor { + private final OpenTelemetry openTelemetry; + + private final KafkaTextMapSetter kafkaTextMapSetter; + + @Inject + public TracingProducerInterceptor(OpenTelemetry openTelemetry, KafkaTextMapSetter kafkaTextMapSetter) { + this.openTelemetry = openTelemetry; + this.kafkaTextMapSetter = kafkaTextMapSetter; + } + + @Override + public ProducerRecord onSend(ProducerRecord record) { + openTelemetry.getPropagators().getTextMapPropagator().fields().forEach(record.headers()::remove); + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), record.headers(), kafkaTextMapSetter); + return record; + } + + @Override + public int priority() { + return ProducerInterceptorPriorities.TRACING; + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java index 3da5f76..4f89615 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/KStreamTopologyDriverTest.java @@ -19,25 +19,16 @@ */ package io.quarkiverse.kafkastreamsprocessor.impl; -import static io.opentelemetry.sdk.testing.assertj.TracesAssert.assertThat; import static io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping.newBuilder; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; import java.util.Set; import jakarta.enterprise.inject.Alternative; import jakarta.inject.Inject; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; @@ -47,24 +38,13 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Record; -import org.apache.kafka.streams.test.TestRecord; -import org.hamcrest.MatcherAssert; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.sdk.OpenTelemetrySdk; import io.quarkiverse.kafkastreamsprocessor.api.Processor; -import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; -import io.quarkiverse.kafkastreamsprocessor.impl.utils.TestSpanExporter; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig; @@ -91,15 +71,6 @@ public class KStreamTopologyDriverTest { TestOutputTopic testOutputTopic; - @Inject - Tracer tracer; - - @Inject - OpenTelemetry openTelemetry; - - @Inject - TestSpanExporter testSpanExporter; - @BeforeEach public void setup() { Properties config = new Properties(); @@ -111,19 +82,6 @@ public void setup() { testOutputTopic = testDriver.createOutputTopic(kStreamsProcessorConfig.output().topic().get(), new StringDeserializer(), new KafkaProtobufDeserializer<>(Ping.parser())); - - clearSpans(); - } - - @AfterEach - public void clearOtel() { - clearSpans(); - } - - private void clearSpans() { - // force a flush to make sure there are no remaining spans still in the buffers - ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); - testSpanExporter.getSpans().clear(); } @Test @@ -140,61 +98,6 @@ public void shouldManageRuntimeException() { assertTrue(testOutputTopic.isEmpty()); } - @Test - public void tracingShouldBePropagatedW3C() { - Span parentSpan = tracer.spanBuilder("parent").startSpan(); - try (Scope parentScope = parentSpan.makeCurrent()) { - - Ping input = newBuilder().setMessage("world").build(); - RecordHeaders headers = new RecordHeaders(); - openTelemetry.getPropagators() - .getTextMapPropagator() - .inject(Context.current(), headers, kafkaTextMapSetter); - TestRecord record = new TestRecord("key", input, headers); - testInputTopic.pipeInput(record); - MatcherAssert.assertThat(toMap(testOutputTopic.readRecord().getHeaders()), - hasEntry(equalTo(KafkaStreamsProcessorHeaders.W3C_TRACE_ID), - containsString(parentSpan.getSpanContext().getTraceId()))); - } finally { - parentSpan.end(); - } - } - - private static Map toMap(Iterable
headers) { - Map result = new HashMap<>(); - for (Header h : headers) { - result.put(h.key(), new String(h.value(), StandardCharsets.UTF_8)); - } - return result; - } - - @Test - public void spanShouldBeCreatedW3C() { - - Span parentSpan = tracer.spanBuilder("parent").startSpan(); - try (Scope ignored = parentSpan.makeCurrent()) { - Ping input = newBuilder().setMessage("world").build(); - RecordHeaders headers = new RecordHeaders(); - openTelemetry.getPropagators() - .getTextMapPropagator() - .inject(Context.current(), headers, kafkaTextMapSetter); - TestRecord record = new TestRecord<>("key", input, headers); - - testInputTopic.pipeInput(record); - testOutputTopic.readRecord(); - } finally { - parentSpan.end(); - } - - ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); - - assertThat(testSpanExporter.getSpans()).hasTracesSatisfyingExactly( - trace -> trace.hasSpansSatisfyingExactly( - span -> span.hasSpanId(parentSpan.getSpanContext().getSpanId()), - span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId()) - .hasParentSpanId(parentSpan.getSpanContext().getSpanId()))); - } - @Processor @Alternative @Slf4j diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingBaggageAndCustomSpanQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingBaggageAndCustomSpanQuarkusTest.java new file mode 100644 index 0000000..f3df39f --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingBaggageAndCustomSpanQuarkusTest.java @@ -0,0 +1,192 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl; + +import static io.quarkiverse.kafkastreamsprocessor.impl.utils.KafkaHeaderUtils.getHeader; +import static io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping.newBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import jakarta.enterprise.inject.Alternative; +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; +import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.TestSpanExporter; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; +import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import lombok.extern.slf4j.Slf4j; + +@QuarkusTest +@TestProfile(TracingBaggageAndCustomSpanQuarkusTest.TestProfile.class) +public class TracingBaggageAndCustomSpanQuarkusTest { + @ConfigProperty(name = "kafkastreamsprocessor.input.topic") + String senderTopic; + + @ConfigProperty(name = "kafkastreamsprocessor.output.topic") + String consumerTopic; + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrapServers; + + KafkaProducer producer; + + KafkaConsumer consumer; + + @Inject + OpenTelemetry openTelemetry; + + @Inject + KafkaTextMapSetter kafkaTextMapSetter; + + @Inject + Tracer tracer; + + @Inject + TestSpanExporter testSpanExporter; + + @BeforeEach + public void setup() { + producer = new KafkaProducer<>(KafkaTestUtils.producerProps(bootstrapServers), new StringSerializer(), + new KafkaProtobufSerializer<>()); + Map consumerProps = KafkaTestUtils.consumerProps(bootstrapServers, "test", "true"); + consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), + new KafkaProtobufDeserializer<>(PingMessage.Ping.parser())); + consumer.subscribe(List.of(consumerTopic)); + + clearSpans(); + } + + private void clearSpans() { + // force a flush to make sure there are no remaining spans still in the buffers + ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); + testSpanExporter.getSpans().clear(); + } + + @AfterEach + public void tearDown() throws Exception { + clearSpans(); + producer.close(); + consumer.close(); + } + + @Test + public void baggageModifiedPropagatedAndExtraSpanCreated() { + ConsumerRecord receivedRecord; + Span span = tracer.spanBuilder("god").startSpan(); + try (Scope ignored = span.makeCurrent(); + Scope ignored2 = Baggage.current().toBuilder().put("key1", "value1").build().makeCurrent()) { + PingMessage.Ping input = PingMessage.Ping.newBuilder().setMessage("world").build(); + RecordHeaders recordHeaders = new RecordHeaders(); + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), recordHeaders, kafkaTextMapSetter); + ProducerRecord sentRecord = new ProducerRecord<>(senderTopic, 0, "key", input, + recordHeaders); + + producer.send(sentRecord); + producer.flush(); + + receivedRecord = KafkaTestUtils.getSingleRecord(consumer, consumerTopic); + + assertThat(getHeader(receivedRecord, "baggage"), containsString("key1=value1")); + assertThat(getHeader(receivedRecord, "baggage"), containsString("key2=value2")); + } finally { + span.end(); + } + + ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); + + TracesAssert.assertThat(testSpanExporter.getSpans()) + .hasTracesSatisfyingExactly( + trace -> trace.hasSpansSatisfyingExactly( + s -> s.hasSpanId(span.getSpanContext().getSpanId()), + s -> s.hasTraceId(span.getSpanContext().getTraceId()) + .hasParentSpanId(span.getSpanContext().getSpanId()), + s -> s.hasTraceId(span.getSpanContext().getTraceId()) + .hasAttribute(AttributeKey.stringKey("an-attribute"), "a-value") + .hasSpanId( + new String(receivedRecord.headers().lastHeader("traceparent").value(), + StandardCharsets.UTF_8) + .split("-")[2]))); + } + + @Processor + @Alternative + @Slf4j + public static class TestProcessor extends ContextualProcessor { + @Inject + Tracer tracer; + + @Override + public void process(Record record) { + Span span = tracer.spanBuilder("custom span") + .setAttribute("an-attribute", "a-value") + .startSpan(); + try (Scope ignored2 = span.makeCurrent(); + Scope ignored = Baggage.current().toBuilder().put("key2", "value2").build().makeCurrent()) { + context().forward(record); + } finally { + span.end(); + } + } + } + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Set> getEnabledAlternatives() { + return Set.of(TestProcessor.class); + } + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingQuarkusTest.java new file mode 100644 index 0000000..66445d4 --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/TracingQuarkusTest.java @@ -0,0 +1,204 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl; + +import static io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping.newBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import jakarta.enterprise.inject.Alternative; +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; +import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.TestSpanExporter; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; +import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import lombok.extern.slf4j.Slf4j; + +@QuarkusTest +@TestProfile(TracingQuarkusTest.TestProfile.class) +public class TracingQuarkusTest { + @ConfigProperty(name = "kafkastreamsprocessor.input.topic") + String senderTopic; + + @ConfigProperty(name = "kafkastreamsprocessor.output.topic") + String consumerTopic; + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrapServers; + + KafkaProducer producer; + + KafkaConsumer consumer; + + @Inject + OpenTelemetry openTelemetry; + + @Inject + KafkaTextMapSetter kafkaTextMapSetter; + + @Inject + Tracer tracer; + + @Inject + TestSpanExporter testSpanExporter; + + @BeforeEach + public void setup() { + producer = new KafkaProducer<>(KafkaTestUtils.producerProps(bootstrapServers), new StringSerializer(), + new KafkaProtobufSerializer<>()); + Map consumerProps = KafkaTestUtils.consumerProps(bootstrapServers, "test", "true"); + consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), + new KafkaProtobufDeserializer<>(PingMessage.Ping.parser())); + consumer.subscribe(List.of(consumerTopic)); + + clearSpans(); + } + + private void clearSpans() { + // force a flush to make sure there are no remaining spans still in the buffers + ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); + testSpanExporter.getSpans().clear(); + } + + @AfterEach + public void tearDown() throws Exception { + clearSpans(); + producer.close(); + consumer.close(); + } + + @Test + public void tracingShouldBePropagatedW3C() { + Span parentSpan = tracer.spanBuilder("parent").startSpan(); + try (Scope parentScope = parentSpan.makeCurrent()) { + PingMessage.Ping input = newBuilder().setMessage("world").build(); + RecordHeaders headers = new RecordHeaders(); + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), headers, kafkaTextMapSetter); + ProducerRecord record = new ProducerRecord<>(senderTopic, 0, "key", input, headers); + producer.send(record); + + ConsumerRecord singleRecord = KafkaTestUtils.getSingleRecord(consumer, consumerTopic, + Duration.ofSeconds(5)); + MatcherAssert.assertThat(toMap(singleRecord.headers()), + hasEntry(equalTo(KafkaStreamsProcessorHeaders.W3C_TRACE_ID), + containsString(parentSpan.getSpanContext().getTraceId()))); + } finally { + parentSpan.end(); + } + } + + private static Map toMap(Iterable
headers) { + Map result = new HashMap<>(); + for (Header h : headers) { + result.put(h.key(), new String(h.value(), StandardCharsets.UTF_8)); + } + return result; + } + + @Test + public void spanShouldBeCreatedW3C() { + + Span parentSpan = tracer.spanBuilder("parent").startSpan(); + try (Scope ignored = parentSpan.makeCurrent()) { + PingMessage.Ping input = newBuilder().setMessage("world").build(); + RecordHeaders headers = new RecordHeaders(); + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), headers, kafkaTextMapSetter); + ProducerRecord record = new ProducerRecord<>(senderTopic, 0, "key", input, headers); + producer.send(record); + + KafkaTestUtils.getSingleRecord(consumer, consumerTopic, Duration.ofSeconds(5000)); + } finally { + parentSpan.end(); + } + + ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); + + TracesAssert.assertThat(testSpanExporter.getSpans()).hasTracesSatisfyingExactly( + trace -> trace.hasSpansSatisfyingExactly( + span -> span.hasSpanId(parentSpan.getSpanContext().getSpanId()), + span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId()) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()))); + } + + @Processor + @Alternative + @Slf4j + public static class TestProcessor extends ContextualProcessor { + @Inject + Tracer tracer; + + @Override + public void process(Record record) { + context().forward(record); + } + } + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Set> getEnabledAlternatives() { + return Set.of(TestProcessor.class); + } + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java index 6e137cc..da964df 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java @@ -73,7 +73,6 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.baggage.Baggage; -import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanId; @@ -82,16 +81,12 @@ import io.opentelemetry.api.trace.TraceId; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; import io.opentelemetry.sdk.trace.IdGenerator; import io.quarkiverse.kafkastreamsprocessor.impl.TestException; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.OpenTelemetryWithBaggageExtension; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; @@ -112,7 +107,7 @@ public class TracingDecoratorTest { private static final java.util.logging.Logger rootLogger = LogManager.getLogManager().getLogger("io.quarkiverse"); @RegisterExtension - static final OpenTelemetryExtension otel = OpenTelemetryExtension.create(); + static final OpenTelemetryWithBaggageExtension otel = OpenTelemetryWithBaggageExtension.create(); TracingDecorator decorator; @@ -140,7 +135,7 @@ public void setUp() { rootLogger.addHandler(inMemoryLogHandler); rootLogger.setLevel(Level.DEBUG); when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class); - decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter, + decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.setDelegate(kafkaProcessor); decorator.init(processorContext); @@ -208,7 +203,7 @@ public void shouldCleanMDCAndScopeInCaseOfException() { .build(), 0L, headers); decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, - kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.setDelegate(new ThrowExceptionProcessor()); decorator.init(processorContext); @@ -310,7 +305,7 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable { Processor kafkaProcessor = mock(Processor.class); ProcessorContext processorContext = mock(ProcessorContext.class); TracingDecorator decorator = new TracingDecorator(GlobalOpenTelemetry.get(), kafkaTextMapGetter, - kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.setDelegate(kafkaProcessor); decorator.init(processorContext); @@ -329,24 +324,15 @@ void shouldPropagateOpentelemetryW3CBaggage() { Headers headers = new RecordHeaders().add(W3C_TRACE_ID, TRACE_PARENT.getBytes()) .add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes()); Record record = new Record<>(null, Ping.newBuilder().setMessage("blabla").build(), 0L, headers); + decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + decorator.setDelegate(new LogOpentelemetryBaggageProcessor()); + decorator.init(processorContext); - // the opentelemetry injected and used throughout the unit tests in this class is not configured with W3C baggage propagator - // as coming from the OpenTelemetryExtension, so we need to create a new one with baggage propagator. - try (OpenTelemetrySdk openTelemetryWithBaggageSdk = OpenTelemetrySdk.builder() - .setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), - W3CBaggagePropagator.getInstance()))) - .build()) { - decorator = new TracingDecorator(openTelemetryWithBaggageSdk, - kafkaTextMapGetter, kafkaTextMapSetter, openTelemetryWithBaggageSdk.getTracer("test"), PROCESSOR_NAME, - jsonPrinter); - decorator.setDelegate(new LogOpentelemetryBaggageProcessor()); - decorator.init(processorContext); - - decorator.process(record); + decorator.process(record); - assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key1 value1")))); - assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key2 value2")))); - } + assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key1 value1")))); + assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key2 value2")))); } @Slf4j diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java new file mode 100644 index 0000000..632e05e --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java @@ -0,0 +1,108 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl.decorator.producer; + +import static io.quarkiverse.kafkastreamsprocessor.impl.utils.KafkaHeaderUtils.getHeader; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.nullValue; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.trace.IdGenerator; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.OpenTelemetryWithBaggageExtension; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; + +class TracingProducerInterceptorTest { + @RegisterExtension + static final OpenTelemetryWithBaggageExtension otel = OpenTelemetryWithBaggageExtension.create(); + + KafkaTextMapSetter setter = new KafkaTextMapSetter(); + + TracingProducerInterceptor interceptor = new TracingProducerInterceptor(otel.getOpenTelemetry(), setter); + + @Test + void testTraceInjected() { + ProducerRecord record = new ProducerRecord<>("topic", 0, "key".getBytes(), + "value".getBytes(), new RecordHeaders().add("traceparent", "ll".getBytes(StandardCharsets.UTF_8)) + .add("tracestate", "state".getBytes(StandardCharsets.UTF_8)) + .add("baggage", "baggage".getBytes(StandardCharsets.UTF_8))); + Span span = otel.getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + try (Scope ignored = span.makeCurrent()) { + ProducerRecord output = interceptor.onSend(record); + + assertThat(getHeader(output, "traceparent"), containsString(span.getSpanContext().getSpanId())); + assertThat(getHeader(output, "tracestate"), nullValue()); + assertThat(getHeader(output, "baggage"), nullValue()); + } finally { + span.end(); + } + + } + + @Test + void testStateAndBaggageAreInjected() { + ProducerRecord record = new ProducerRecord<>("topic", 0, "key".getBytes(), + "value".getBytes(), new RecordHeaders().add("traceparent", "ll".getBytes(StandardCharsets.UTF_8)) + .add("tracestate", "state".getBytes(StandardCharsets.UTF_8)) + .add("baggage", "baggage".getBytes(StandardCharsets.UTF_8))); + Tracer tracer = otel.getOpenTelemetry().getTracer("test"); + TraceState state = TraceState.builder() + .put("foo", "bar") + .put("baz", "42") + .build(); + SpanContext withTraceState = SpanContext.create(IdGenerator.random().generateTraceId(), + IdGenerator.random().generateSpanId(), + TraceFlags.getSampled(), state); + Span span = tracer.spanBuilder("span") + .setParent(Context.root().with(Span.wrap(withTraceState))) + .startSpan(); + Baggage baggage = Baggage.builder() + .put("picky", "frown") + .put("abandoned", "ship") + .build(); + try (Scope ignored = span.makeCurrent(); + Scope ignored2 = baggage.makeCurrent()) { + ProducerRecord output = interceptor.onSend(record); + + assertThat(getHeader(output, "traceparent"), containsString(span.getSpanContext().getTraceId())); + assertThat(getHeader(output, "tracestate"), both(containsString("foo=bar")).and(containsString("baz=42"))); + assertThat(getHeader(output, "baggage"), + both(containsString("picky=frown")).and(containsString("abandoned=ship"))); + } finally { + span.end(); + } + } + +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java new file mode 100644 index 0000000..7337c67 --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java @@ -0,0 +1,45 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl.utils; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public final class KafkaHeaderUtils { + public static String getHeader(ProducerRecord record, String key) { + return getHeader(record.headers(), key); + } + + public static String getHeader(ConsumerRecord record, String key) { + return getHeader(record.headers(), key); + } + + public static String getHeader(Headers headers, String key) { + Header header = headers.lastHeader(key); + return header == null ? null : new String(header.value(), StandardCharsets.UTF_8); + } + + private KafkaHeaderUtils() { + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java new file mode 100644 index 0000000..e3dd889 --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java @@ -0,0 +1,198 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + * + * This file is a modified version of the OpenTelemetryExtension.java file from the OpenTelemetry Java SDK. + */ + +package io.quarkiverse.kafkastreamsprocessor.impl.utils; + +import static io.opentelemetry.sdk.testing.assertj.TracesAssert.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; + +/** + * A JUnit5 extension which sets up the {@link OpenTelemetrySdk} for testing, resetting state between tests. + * + *
{@code
+ * // class CoolTest {
+ * // @RegisterExtension
+ * // static final OpenTelemetryWithBaggageExtension otelTesting = OpenTelemetryWithBaggageExtension.create();
+ * //
+ * // private final Tracer tracer = otelTesting.getOpenTelemetry().getTracer("test");
+ * // private final Meter meter = otelTesting.getOpenTelemetry().getMeter("test");
+ * //
+ * // @Test
+ * // void test() {
+ * // tracer.spanBuilder("name").startSpan().end();
+ * // assertThat(otelTesting.getSpans()).containsExactly(expected);
+ * //
+ * // LongCounter counter = meter.counterBuilder("counter-name").build();
+ * // counter.add(1);
+ * // assertThat(otelTesting.getMetrics()).satisfiesExactlyInAnyOrder(metricData -> {});
+ * // }
+ * // }
+ * }
+ */ +public class OpenTelemetryWithBaggageExtension implements BeforeEachCallback, BeforeAllCallback, AfterAllCallback { + + /** + * Returns a {@link OpenTelemetryWithBaggageExtension} with a default SDK initialized with an in-memory span exporter + * and W3C trace context propagation. + */ + public static OpenTelemetryWithBaggageExtension create() { + InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); + + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + + SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + InMemoryLogRecordExporter logRecordExporter = InMemoryLogRecordExporter.create(); + + SdkLoggerProvider loggerProvider = SdkLoggerProvider.builder() + .addLogRecordProcessor(SimpleLogRecordProcessor.create(logRecordExporter)) + .build(); + + OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create( + // Here we inject the baggage propagator as well + TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), + W3CBaggagePropagator.getInstance()))) + .setTracerProvider(tracerProvider) + .setMeterProvider(meterProvider) + .setLoggerProvider(loggerProvider) + .build(); + + return new OpenTelemetryWithBaggageExtension(openTelemetry, spanExporter, metricReader, logRecordExporter); + } + + private final OpenTelemetrySdk openTelemetry; + private final InMemorySpanExporter spanExporter; + private final InMemoryMetricReader metricReader; + private final InMemoryLogRecordExporter logRecordExporter; + + private OpenTelemetryWithBaggageExtension( + OpenTelemetrySdk openTelemetry, + InMemorySpanExporter spanExporter, + InMemoryMetricReader metricReader, + InMemoryLogRecordExporter logRecordExporter) { + this.openTelemetry = openTelemetry; + this.spanExporter = spanExporter; + this.metricReader = metricReader; + this.logRecordExporter = logRecordExporter; + } + + /** Returns the {@link OpenTelemetrySdk} created by this extension. */ + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + /** Returns all the exported {@link SpanData} so far. */ + public List getSpans() { + return spanExporter.getFinishedSpanItems(); + } + + /** + * Returns the current {@link MetricData} in {@link AggregationTemporality#CUMULATIVE} format. + * + * @since 1.15.0 + */ + public List getMetrics() { + return new ArrayList<>(metricReader.collectAllMetrics()); + } + + /** + * Returns all the exported {@link LogRecordData} so far. + * + * @since 1.32.0 + */ + public List getLogRecords() { + return new ArrayList<>(logRecordExporter.getFinishedLogRecordItems()); + } + + /** + * Returns a {@link TracesAssert} for asserting on the currently exported traces. This method requires AssertJ to be + * on the classpath. + */ + public TracesAssert assertTraces() { + return assertThat(spanExporter.getFinishedSpanItems()); + } + + /** + * Clears the collected exported {@link SpanData}. Consider making your test smaller instead of manually clearing + * state using this method. + */ + public void clearSpans() { + spanExporter.reset(); + } + + /** + * Clears all registered metric instruments, such that {@link #getMetrics()} is empty. + * + * @since 1.15.0 + */ + public void clearMetrics() { + SdkMeterProviderUtil.resetForTest(openTelemetry.getSdkMeterProvider()); + } + + /** + * Clears the collected exported {@link LogRecordData}. Consider making your test smaller instead of manually clearing + * state using this method. + * + * @since 1.32.0 + */ + public void clearLogRecords() { + logRecordExporter.reset(); + } + + @Override + public void beforeEach(ExtensionContext context) { + clearSpans(); + clearMetrics(); + clearLogRecords(); + } + + @Override + public void beforeAll(ExtensionContext context) { + GlobalOpenTelemetry.resetForTest(); + GlobalOpenTelemetry.set(openTelemetry); + } + + @Override + public void afterAll(ExtensionContext context) { + GlobalOpenTelemetry.resetForTest(); + openTelemetry.close(); + } +}