-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(impl): Support custom span creation in Processor
Probably a very narrow use case. But it could happen. Along the way of a [recent PR](#109) by @nreant1A trying to make sure: 1. Baggage is properly propagated 2. Custom span creationg is supported in process method First added 1 QuarkusTest in impl. The implementation was: 1. extracting the parent context (but not making it current, by which baggage were lost) 2. creating a "process" span and making that a current span 3. early on reinjecting the new span context ids in the message before it starts being processed, relying on the fact that output message are created by modification of the incoming one: `ping.withValue(...)` The issue we have: baggage cannot be modified in the processor, and any other context modification in the processor is basically ignored (creating a custom span for instance). Solution: move the injection of the contextual data (traceparent, tracestate, baggage) at message production by implementing a TracingProducerInterceptor. The OpenTelemetryExtension for JUnit 5 has bean copy pasted so we can inject the W3CBaggagePropagator. Indeed the original class is final, with private constructor. A fix upstream will be done shortly.
- Loading branch information
1 parent
50cb5f9
commit a8591ce
Showing
8 changed files
with
689 additions
and
82 deletions.
There are no files selected for viewing
34 changes: 34 additions & 0 deletions
34
...arkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
/*- | ||
* #%L | ||
* Quarkus Kafka Streams Processor | ||
* %% | ||
* Copyright (C) 2024 Amadeus s.a.s. | ||
* %% | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* #L% | ||
*/ | ||
package io.quarkiverse.kafkastreamsprocessor.api.decorator.producer; | ||
|
||
/** | ||
* Priorities of the producer interceptors the framework provides. | ||
*/ | ||
public final class ProducerInterceptorPriorities { | ||
/** | ||
* Priority of the interceptor that will inject the tracing headers for propagation. | ||
*/ | ||
public static final int TRACING = 100; | ||
|
||
private ProducerInterceptorPriorities() { | ||
|
||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
61 changes: 61 additions & 0 deletions
61
...quarkiverse/kafkastreamsprocessor/impl/decorator/producer/TracingProducerInterceptor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
/*- | ||
* #%L | ||
* Quarkus Kafka Streams Processor | ||
* %% | ||
* Copyright (C) 2024 Amadeus s.a.s. | ||
* %% | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
* #L% | ||
*/ | ||
package io.quarkiverse.kafkastreamsprocessor.impl.decorator.producer; | ||
|
||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
|
||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
|
||
import io.opentelemetry.api.OpenTelemetry; | ||
import io.opentelemetry.context.Context; | ||
import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerInterceptorPriorities; | ||
import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor; | ||
import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; | ||
|
||
/** | ||
* Producer interceptor that injects the tracing headers for propagation. | ||
*/ | ||
@ApplicationScoped | ||
public class TracingProducerInterceptor implements ProducerOnSendInterceptor { | ||
private final OpenTelemetry openTelemetry; | ||
|
||
private final KafkaTextMapSetter kafkaTextMapSetter; | ||
|
||
@Inject | ||
public TracingProducerInterceptor(OpenTelemetry openTelemetry, KafkaTextMapSetter kafkaTextMapSetter) { | ||
this.openTelemetry = openTelemetry; | ||
this.kafkaTextMapSetter = kafkaTextMapSetter; | ||
} | ||
|
||
@Override | ||
public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> record) { | ||
openTelemetry.getPropagators().getTextMapPropagator().fields().forEach(record.headers()::remove); | ||
openTelemetry.getPropagators() | ||
.getTextMapPropagator() | ||
.inject(Context.current(), record.headers(), kafkaTextMapSetter); | ||
return record; | ||
} | ||
|
||
@Override | ||
public int priority() { | ||
return ProducerInterceptorPriorities.TRACING; | ||
} | ||
} |
Oops, something went wrong.