From a8591ce4e0f2ff9e6f58c376953f507c1af0b41f Mon Sep 17 00:00:00 2001 From: edeweerd Date: Fri, 11 Oct 2024 15:47:55 +0200 Subject: [PATCH] fix(impl): Support custom span creation in Processor Probably a very narrow use case. But it could happen. Along the way of a [recent PR](https://github.com/quarkiverse/quarkus-kafka-streams-processor/pull/109) by @nreant1A trying to make sure: 1. Baggage is properly propagated 2. Custom span creationg is supported in process method First added 1 QuarkusTest in impl. The implementation was: 1. extracting the parent context (but not making it current, by which baggage were lost) 2. creating a "process" span and making that a current span 3. early on reinjecting the new span context ids in the message before it starts being processed, relying on the fact that output message are created by modification of the incoming one: `ping.withValue(...)` The issue we have: baggage cannot be modified in the processor, and any other context modification in the processor is basically ignored (creating a custom span for instance). Solution: move the injection of the contextual data (traceparent, tracestate, baggage) at message production by implementing a TracingProducerInterceptor. The OpenTelemetryExtension for JUnit 5 has bean copy pasted so we can inject the W3CBaggagePropagator. Indeed the original class is final, with private constructor. A fix upstream will be done shortly. --- .../ProducerInterceptorPriorities.java | 34 +++ .../decorator/processor/TracingDecorator.java | 106 +++++----- .../producer/TracingProducerInterceptor.java | 61 ++++++ ...emetryBaggageAndCustomSpanQuarkusTest.java | 182 ++++++++++++++++ .../processor/TracingDecoratorTest.java | 37 ++-- .../TracingProducerInterceptorTest.java | 108 ++++++++++ .../impl/utils/KafkaHeaderUtils.java | 45 ++++ .../OpenTelemetryWithBaggageExtension.java | 198 ++++++++++++++++++ 8 files changed, 689 insertions(+), 82 deletions(-) create mode 100644 api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java create mode 100644 impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/OpenTelemetryBaggageAndCustomSpanQuarkusTest.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java create mode 100644 impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java diff --git a/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java new file mode 100644 index 0000000..a5613ec --- /dev/null +++ b/api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java @@ -0,0 +1,34 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.api.decorator.producer; + +/** + * Priorities of the producer interceptors the framework provides. + */ +public final class ProducerInterceptorPriorities { + /** + * Priority of the interceptor that will inject the tracing headers for propagation. + */ + public static final int TRACING = 100; + + private ProducerInterceptorPriorities() { + + } +} diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java index 5c05333..4e79e4c 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java @@ -46,7 +46,6 @@ import com.google.protobuf.util.JsonFormat; import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.StatusCode; @@ -58,7 +57,6 @@ import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter; -import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -92,11 +90,6 @@ public class TracingDecorator implements Processor implements Processor implements Processor delegate, OpenTelemetry openTelemetry, - KafkaTextMapGetter textMapGetter, KafkaTextMapSetter textMapSetter, Tracer tracer, - TopologyConfigurationImpl configuration) { - this(delegate, openTelemetry, textMapGetter, textMapSetter, tracer, - configuration.getProcessorPayloadType().getName(), + KafkaTextMapGetter textMapGetter, Tracer tracer, TopologyConfigurationImpl configuration) { + this(delegate, openTelemetry, textMapGetter, tracer, configuration.getProcessorPayloadType().getName(), JsonFormat.printer()); } @@ -167,52 +156,55 @@ public void init(final ProcessorContext context) { public void process(Record record) { SpanBuilder spanBuilder = tracer.spanBuilder(applicationName); final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator(); - - // going through all propagation field names defined in the OTel configuration - // we look if any of them has been set with a non-null value in the headers of the incoming message - Context extractedContext = null; - if (propagator.fields() - .stream() - .map(record.headers()::lastHeader) - .anyMatch(Objects::nonNull)) { - // if that is the case, let's extract a Context initialized with the parent trace id, span id - // and baggage present as headers in the incoming message - extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter); - // use the context as parent span for the built span - spanBuilder.setParent(extractedContext); - // we clean the headers to avoid their propagation in any outgoing message (knowing that by - // default Kafka Streams copies all headers of the incoming message into any outgoing message) - propagator.fields().forEach(record.headers()::remove); - } - Span span = spanBuilder.startSpan(); - // baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span - // in opentelemetry) and actually lost as kafka headers are cleaned - try (Scope ignored = (extractedContext != null) ? Baggage.fromContext(extractedContext).makeCurrent() : Scope.noop(); - Scope scope = span.makeCurrent()) { - try { - // now that the context has been set to the new started child span of this microservice, we replace - // the headers in the incoming message so when an outgoing message is produced with the copied - // header values it already has the span id from this new child span - propagator.inject(Context.current(), record.headers(), textMapSetter); - delegate.process(record); - span.setStatus(StatusCode.OK); - } catch (KafkaException e) { - // we got a Kafka exception, we record the exception in the span, log but rethrow the exception - // with the idea that it will be caught by one of the DLQ in error management - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - logInputMessageMetadata(record); - throw e; - } catch (RuntimeException e) { // NOSONAR - // very last resort, even the DLQs are not working, then we still record the exception and - // log the message but do not rethrow the exception otherwise we'd end up in an infinite loop - log.error("Runtime error caught while processing the message", e); - span.recordException(e); - span.setStatus(StatusCode.ERROR, e.getMessage()); - logInputMessageMetadata(record); + Scope parentScope = null; + + try { + // going through all propagation field names defined in the OTel configuration + // we look if any of them has been set with a non-null value in the headers of the incoming message + if (propagator.fields() + .stream() + .map(record.headers()::lastHeader) + .anyMatch(Objects::nonNull)) { + // if that is the case, let's extract a Context initialized with the parent trace id, span id + // and baggage present as headers in the incoming message + Context extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter); + // use the context as parent span for the built span + spanBuilder.setParent(extractedContext); + // we clean the headers to avoid their propagation in any outgoing message (knowing that by + // default Kafka Streams copies all headers of the incoming message into any outgoing message) + propagator.fields().forEach(record.headers()::remove); + // we make the parent context current to not loose the baggage + parentScope = extractedContext.makeCurrent(); + } + Span span = spanBuilder.startSpan(); + // baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span + // in opentelemetry) and actually lost as kafka headers are cleaned + try (Scope ignored = span.makeCurrent()) { + try { + delegate.process(record); + span.setStatus(StatusCode.OK); + } catch (KafkaException e) { + // we got a Kafka exception, we record the exception in the span, log but rethrow the exception + // with the idea that it will be caught by one of the DLQ in error management + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); + logInputMessageMetadata(record); + throw e; + } catch (RuntimeException e) { // NOSONAR + // very last resort, even the DLQs are not working, then we still record the exception and + // log the message but do not rethrow the exception otherwise we'd end up in an infinite loop + log.error("Runtime error caught while processing the message", e); + span.recordException(e); + span.setStatus(StatusCode.ERROR, e.getMessage()); + logInputMessageMetadata(record); + } + } finally { + span.end(); } } finally { - span.end(); + if (parentScope != null) { + parentScope.close(); + } } } diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java new file mode 100644 index 0000000..5950eb3 --- /dev/null +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java @@ -0,0 +1,61 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl.decorator.producer; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.apache.kafka.clients.producer.ProducerRecord; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.Context; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerInterceptorPriorities; +import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; + +/** + * Producer interceptor that injects the tracing headers for propagation. + */ +@ApplicationScoped +public class TracingProducerInterceptor implements ProducerOnSendInterceptor { + private final OpenTelemetry openTelemetry; + + private final KafkaTextMapSetter kafkaTextMapSetter; + + @Inject + public TracingProducerInterceptor(OpenTelemetry openTelemetry, KafkaTextMapSetter kafkaTextMapSetter) { + this.openTelemetry = openTelemetry; + this.kafkaTextMapSetter = kafkaTextMapSetter; + } + + @Override + public ProducerRecord onSend(ProducerRecord record) { + openTelemetry.getPropagators().getTextMapPropagator().fields().forEach(record.headers()::remove); + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), record.headers(), kafkaTextMapSetter); + return record; + } + + @Override + public int priority() { + return ProducerInterceptorPriorities.TRACING; + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/OpenTelemetryBaggageAndCustomSpanQuarkusTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/OpenTelemetryBaggageAndCustomSpanQuarkusTest.java new file mode 100644 index 0000000..66182be --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/OpenTelemetryBaggageAndCustomSpanQuarkusTest.java @@ -0,0 +1,182 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl; + +import static io.quarkiverse.kafkastreamsprocessor.impl.utils.KafkaHeaderUtils.getHeader; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import jakarta.enterprise.inject.Alternative; +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufDeserializer; +import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerializer; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.TestSpanExporter; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; +import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; +import lombok.extern.slf4j.Slf4j; + +@QuarkusTest +@TestProfile(OpenTelemetryBaggageAndCustomSpanQuarkusTest.TestProfile.class) +public class OpenTelemetryBaggageAndCustomSpanQuarkusTest { + @ConfigProperty(name = "kafkastreamsprocessor.input.topic") + String senderTopic; + + @ConfigProperty(name = "kafkastreamsprocessor.output.topic") + String consumerTopic; + + @ConfigProperty(name = "kafka.bootstrap.servers") + String bootstrapServers; + + KafkaProducer producer; + + KafkaConsumer consumer; + + @Inject + OpenTelemetry openTelemetry; + + @Inject + KafkaTextMapSetter kafkaTextMapSetter; + + @Inject + Tracer tracer; + + @Inject + TestSpanExporter spanCaptor; + + @BeforeEach + public void setup() { + producer = new KafkaProducer<>(KafkaTestUtils.producerProps(bootstrapServers), new StringSerializer(), + new KafkaProtobufSerializer<>()); + Map consumerProps = KafkaTestUtils.consumerProps(bootstrapServers, "test", "true"); + consumer = new KafkaConsumer<>(consumerProps, new StringDeserializer(), + new KafkaProtobufDeserializer<>(PingMessage.Ping.parser())); + consumer.subscribe(List.of(consumerTopic)); + } + + @AfterEach + public void tearDown() throws Exception { + producer.close(); + consumer.close(); + } + + @Test + public void baggageModifiedPropagatedAndExtraSpanCreated() { + ConsumerRecord receivedRecord; + Span span = tracer.spanBuilder("god").startSpan(); + try (Scope ignored = span.makeCurrent(); + Scope ignored2 = Baggage.current().toBuilder().put("key1", "value1").build().makeCurrent()) { + PingMessage.Ping input = PingMessage.Ping.newBuilder().setMessage("world").build(); + RecordHeaders recordHeaders = new RecordHeaders(); + openTelemetry.getPropagators() + .getTextMapPropagator() + .inject(Context.current(), recordHeaders, kafkaTextMapSetter); + ProducerRecord sentRecord = new ProducerRecord<>(senderTopic, 0, "key", input, + recordHeaders); + + producer.send(sentRecord); + producer.flush(); + + receivedRecord = KafkaTestUtils.getSingleRecord(consumer, consumerTopic); + + assertThat(getHeader(receivedRecord, "baggage"), containsString("key1=value1")); + assertThat(getHeader(receivedRecord, "baggage"), containsString("key2=value2")); + } finally { + span.end(); + } + + ((OpenTelemetrySdk) openTelemetry).getSdkTracerProvider().forceFlush(); + + TracesAssert.assertThat(spanCaptor.getSpans()) + .hasTracesSatisfyingExactly( + trace -> trace.hasSpansSatisfyingExactly( + s -> s.hasSpanId(span.getSpanContext().getSpanId()), + s -> s.hasTraceId(span.getSpanContext().getTraceId()) + .hasParentSpanId(span.getSpanContext().getSpanId()), + s -> s.hasTraceId(span.getSpanContext().getTraceId()) + .hasAttribute(AttributeKey.stringKey("an-attribute"), "a-value") + .hasSpanId( + new String(receivedRecord.headers().lastHeader("traceparent").value(), + StandardCharsets.UTF_8) + .split("-")[2]))); + } + + @Processor + @Alternative + @Slf4j + public static class TestProcessor extends ContextualProcessor { + @Inject + Tracer tracer; + + @Override + public void process(Record record) { + Span span = tracer.spanBuilder("custom span") + .setAttribute("an-attribute", "a-value") + .startSpan(); + try (Scope ignored2 = span.makeCurrent(); + Scope ignored = Baggage.current().toBuilder().put("key2", "value2").build().makeCurrent()) { + context().forward(record); + } finally { + span.end(); + } + } + } + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Set> getEnabledAlternatives() { + return Set.of(TestProcessor.class); + } + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java index 930b32f..d6b2115 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java @@ -73,7 +73,6 @@ import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.baggage.Baggage; -import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanId; @@ -82,16 +81,12 @@ import io.opentelemetry.api.trace.TraceId; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; import io.opentelemetry.sdk.trace.IdGenerator; import io.quarkiverse.kafkastreamsprocessor.impl.TestException; import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.OpenTelemetryWithBaggageExtension; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import io.quarkiverse.kafkastreamsprocessor.sample.message.PingMessage.Ping; @@ -112,7 +107,7 @@ public class TracingDecoratorTest { private static final java.util.logging.Logger rootLogger = LogManager.getLogManager().getLogger("io.quarkiverse"); @RegisterExtension - static final OpenTelemetryExtension otel = OpenTelemetryExtension.create(); + static final OpenTelemetryWithBaggageExtension otel = OpenTelemetryWithBaggageExtension.create(); TracingDecorator decorator; @@ -140,7 +135,7 @@ public void setUp() { rootLogger.addHandler(inMemoryLogHandler); rootLogger.setLevel(Level.DEBUG); when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class); - decorator = new TracingDecorator<>(kafkaProcessor, otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter, + decorator = new TracingDecorator<>(kafkaProcessor, otel.getOpenTelemetry(), kafkaTextMapGetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.init(processorContext); } @@ -207,7 +202,7 @@ public void shouldCleanMDCAndScopeInCaseOfException() { .build(), 0L, headers); decorator = new TracingDecorator<>(new ThrowExceptionProcessor(), otel.getOpenTelemetry(), kafkaTextMapGetter, - kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.init(processorContext); assertDoesNotThrow(() -> decorator.process(record)); @@ -309,7 +304,7 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable { ProcessorContext processorContext = mock(ProcessorContext.class); TracingDecorator decorator = new TracingDecorator<>( kafkaProcessor, GlobalOpenTelemetry.get(), kafkaTextMapGetter, - kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.init(processorContext); RuntimeException exception = new TestException(); @@ -327,23 +322,15 @@ void shouldPropagateOpentelemetryW3CBaggage() { Headers headers = new RecordHeaders().add(W3C_TRACE_ID, TRACE_PARENT.getBytes()) .add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes()); Record record = new Record<>(null, Ping.newBuilder().setMessage("blabla").build(), 0L, headers); + decorator = new TracingDecorator<>( + new LogOpentelemetryBaggageProcessor(), otel.getOpenTelemetry(), kafkaTextMapGetter, + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + decorator.init(processorContext); - // the opentelemetry injected and used throughout the unit tests in this class is not configured with W3C baggage propagator - // as coming from the OpenTelemetryExtension, so we need to create a new one with baggage propagator. - try (OpenTelemetrySdk openTelemetryWithBaggageSdk = OpenTelemetrySdk.builder() - .setPropagators(ContextPropagators.create(TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), - W3CBaggagePropagator.getInstance()))) - .build()) { - decorator = new TracingDecorator<>(new LogOpentelemetryBaggageProcessor(), openTelemetryWithBaggageSdk, - kafkaTextMapGetter, kafkaTextMapSetter, openTelemetryWithBaggageSdk.getTracer("test"), PROCESSOR_NAME, - jsonPrinter); - decorator.init(processorContext); - - decorator.process(record); + decorator.process(record); - assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key1 value1")))); - assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key2 value2")))); - } + assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key1 value1")))); + assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("baggage: key2 value2")))); } @Slf4j diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java new file mode 100644 index 0000000..632e05e --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptorTest.java @@ -0,0 +1,108 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl.decorator.producer; + +import static io.quarkiverse.kafkastreamsprocessor.impl.utils.KafkaHeaderUtils.getHeader; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.nullValue; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.api.trace.Tracer; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.sdk.trace.IdGenerator; +import io.quarkiverse.kafkastreamsprocessor.impl.utils.OpenTelemetryWithBaggageExtension; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; + +class TracingProducerInterceptorTest { + @RegisterExtension + static final OpenTelemetryWithBaggageExtension otel = OpenTelemetryWithBaggageExtension.create(); + + KafkaTextMapSetter setter = new KafkaTextMapSetter(); + + TracingProducerInterceptor interceptor = new TracingProducerInterceptor(otel.getOpenTelemetry(), setter); + + @Test + void testTraceInjected() { + ProducerRecord record = new ProducerRecord<>("topic", 0, "key".getBytes(), + "value".getBytes(), new RecordHeaders().add("traceparent", "ll".getBytes(StandardCharsets.UTF_8)) + .add("tracestate", "state".getBytes(StandardCharsets.UTF_8)) + .add("baggage", "baggage".getBytes(StandardCharsets.UTF_8))); + Span span = otel.getOpenTelemetry().getTracer("test").spanBuilder("test").startSpan(); + try (Scope ignored = span.makeCurrent()) { + ProducerRecord output = interceptor.onSend(record); + + assertThat(getHeader(output, "traceparent"), containsString(span.getSpanContext().getSpanId())); + assertThat(getHeader(output, "tracestate"), nullValue()); + assertThat(getHeader(output, "baggage"), nullValue()); + } finally { + span.end(); + } + + } + + @Test + void testStateAndBaggageAreInjected() { + ProducerRecord record = new ProducerRecord<>("topic", 0, "key".getBytes(), + "value".getBytes(), new RecordHeaders().add("traceparent", "ll".getBytes(StandardCharsets.UTF_8)) + .add("tracestate", "state".getBytes(StandardCharsets.UTF_8)) + .add("baggage", "baggage".getBytes(StandardCharsets.UTF_8))); + Tracer tracer = otel.getOpenTelemetry().getTracer("test"); + TraceState state = TraceState.builder() + .put("foo", "bar") + .put("baz", "42") + .build(); + SpanContext withTraceState = SpanContext.create(IdGenerator.random().generateTraceId(), + IdGenerator.random().generateSpanId(), + TraceFlags.getSampled(), state); + Span span = tracer.spanBuilder("span") + .setParent(Context.root().with(Span.wrap(withTraceState))) + .startSpan(); + Baggage baggage = Baggage.builder() + .put("picky", "frown") + .put("abandoned", "ship") + .build(); + try (Scope ignored = span.makeCurrent(); + Scope ignored2 = baggage.makeCurrent()) { + ProducerRecord output = interceptor.onSend(record); + + assertThat(getHeader(output, "traceparent"), containsString(span.getSpanContext().getTraceId())); + assertThat(getHeader(output, "tracestate"), both(containsString("foo=bar")).and(containsString("baz=42"))); + assertThat(getHeader(output, "baggage"), + both(containsString("picky=frown")).and(containsString("abandoned=ship"))); + } finally { + span.end(); + } + } + +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java new file mode 100644 index 0000000..7337c67 --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/KafkaHeaderUtils.java @@ -0,0 +1,45 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.impl.utils; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +public final class KafkaHeaderUtils { + public static String getHeader(ProducerRecord record, String key) { + return getHeader(record.headers(), key); + } + + public static String getHeader(ConsumerRecord record, String key) { + return getHeader(record.headers(), key); + } + + public static String getHeader(Headers headers, String key) { + Header header = headers.lastHeader(key); + return header == null ? null : new String(header.value(), StandardCharsets.UTF_8); + } + + private KafkaHeaderUtils() { + } +} diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java new file mode 100644 index 0000000..a05c229 --- /dev/null +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/utils/OpenTelemetryWithBaggageExtension.java @@ -0,0 +1,198 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + * + * This file is a modified version of the OpenTelemetryExtension.java file from the OpenTelemetry Java SDK. + */ + +package io.quarkiverse.kafkastreamsprocessor.impl.utils; + +import static io.opentelemetry.sdk.testing.assertj.TracesAssert.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.logs.SdkLoggerProvider; +import io.opentelemetry.sdk.logs.data.LogRecordData; +import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.internal.SdkMeterProviderUtil; +import io.opentelemetry.sdk.testing.assertj.TracesAssert; +import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; +import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; + +/** + * A JUnit5 extension which sets up the {@link OpenTelemetrySdk} for testing, resetting state between tests. + * + *
{@code
+ * // class CoolTest {
+ * // @RegisterExtension
+ * // static final OpenTelemetryWithBaggageExtension otelTesting = OpenTelemetryWithBaggageExtension.create();
+ * //
+ * // private final Tracer tracer = otelTesting.getOpenTelemetry().getTracer("test");
+ * // private final Meter meter = otelTesting.getOpenTelemetry().getMeter("test");
+ * //
+ * // @Test
+ * // void test() {
+ * // tracer.spanBuilder("name").startSpan().end();
+ * // assertThat(otelTesting.getSpans()).containsExactly(expected);
+ * //
+ * // LongCounter counter = meter.counterBuilder("counter-name").build();
+ * // counter.add(1);
+ * // assertThat(otelTesting.getMetrics()).satisfiesExactlyInAnyOrder(metricData -> {});
+ * // }
+ * // }
+ * }
+ */ +public class OpenTelemetryWithBaggageExtension implements BeforeEachCallback, BeforeAllCallback, AfterAllCallback { + + /** + * Returns a {@link OpenTelemetryWithBaggageExtension} with a default SDK initialized with an in-memory span exporter + * and W3C trace context propagation. + */ + public static OpenTelemetryWithBaggageExtension create() { + InMemorySpanExporter spanExporter = InMemorySpanExporter.create(); + + SdkTracerProvider tracerProvider = SdkTracerProvider.builder() + .addSpanProcessor(SimpleSpanProcessor.create(spanExporter)) + .build(); + + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + + SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + + InMemoryLogRecordExporter logRecordExporter = InMemoryLogRecordExporter.create(); + + SdkLoggerProvider loggerProvider = SdkLoggerProvider.builder() + .addLogRecordProcessor(SimpleLogRecordProcessor.create(logRecordExporter)) + .build(); + + OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create( + // Here we inject the baggage propagator as well + TextMapPropagator.composite(W3CTraceContextPropagator.getInstance(), + W3CBaggagePropagator.getInstance()))) + .setTracerProvider(tracerProvider) + .setMeterProvider(meterProvider) + .setLoggerProvider(loggerProvider) + .build(); + + return new OpenTelemetryWithBaggageExtension(openTelemetry, spanExporter, metricReader, logRecordExporter); + } + + private final OpenTelemetrySdk openTelemetry; + private final InMemorySpanExporter spanExporter; + private final InMemoryMetricReader metricReader; + private final InMemoryLogRecordExporter logRecordExporter; + + private OpenTelemetryWithBaggageExtension( + OpenTelemetrySdk openTelemetry, + InMemorySpanExporter spanExporter, + InMemoryMetricReader metricReader, + InMemoryLogRecordExporter logRecordExporter) { + this.openTelemetry = openTelemetry; + this.spanExporter = spanExporter; + this.metricReader = metricReader; + this.logRecordExporter = logRecordExporter; + } + + /** Returns the {@link OpenTelemetrySdk} created by this extension. */ + public OpenTelemetry getOpenTelemetry() { + return openTelemetry; + } + + /** Returns all the exported {@link SpanData} so far. */ + public List getSpans() { + return spanExporter.getFinishedSpanItems(); + } + + /** + * Returns the current {@link MetricData} in {@link AggregationTemporality#CUMULATIVE} format. + * + * @since 1.15.0 + */ + public List getMetrics() { + return new ArrayList<>(metricReader.collectAllMetrics()); + } + + /** + * Returns all the exported {@link LogRecordData} so far. + * + * @since 1.32.0 + */ + public List getLogRecords() { + return new ArrayList<>(logRecordExporter.getFinishedLogRecordItems()); + } + + /** + * Returns a {@link TracesAssert} for asserting on the currently exported traces. This method requires AssertJ to be + * on the classpath. + */ + public TracesAssert assertTraces() { + return assertThat(spanExporter.getFinishedSpanItems()); + } + + /** + * Clears the collected exported {@link SpanData}. Consider making your test smaller instead of manually clearing + * state using this method. + */ + public void clearSpans() { + spanExporter.reset(); + } + + /** + * Clears all registered metric instruments, such that {@link #getMetrics()} is empty. + * + * @since 1.15.0 + */ + public void clearMetrics() { + SdkMeterProviderUtil.resetForTest(openTelemetry.getSdkMeterProvider()); + } + + /** + * Clears the collected exported {@link LogRecordData}. Consider making your test smaller instead of manually clearing + * state using this method. + * + * @since 1.32.0 + */ + public void clearLogRecords() { + logRecordExporter.reset(); + } + + @Override + public void beforeEach(ExtensionContext context) { + clearSpans(); + clearMetrics(); + clearLogRecords(); + } + + @Override + public void beforeAll(ExtensionContext context) { + GlobalOpenTelemetry.resetForTest(); + GlobalOpenTelemetry.set(openTelemetry); + } + + @Override + public void afterAll(ExtensionContext context) { + GlobalOpenTelemetry.resetForTest(); + openTelemetry.close(); + } +}