Skip to content

Commit

Permalink
feat(docs): Restore doc about non CDI decorator and Quarkus 3.8
Browse files Browse the repository at this point in the history
Also restore the corresponding integration test that was testing this use case.
Whilst doing that noticed that the QuarkusTest is becoming flaky again.
So reintroducing the `quarkus.test.flat-class-path=true` in `impl` so noone will be impacted by this flakiness anymore.

Also noting the `quarkus-3.8` branch did not have a build, whilst it holds the 2.0x branch which we will need to build in parallel.
  • Loading branch information
edeweerd1A committed Oct 11, 2024
1 parent bda540b commit c03349c
Show file tree
Hide file tree
Showing 18 changed files with 772 additions and 11 deletions.
4 changes: 2 additions & 2 deletions .github/project.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
release:
current-version: "2.0.1"
next-version: "2.0.0-SNAPSHOT"
current-version: "3.0.1"
next-version: "3.0.0-SNAPSHOT"
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ on:
push:
branches:
- "main"
- "quarkus-3.8"
paths-ignore:
- '.gitignore'
- 'CODEOWNERS'
Expand Down
54 changes: 45 additions & 9 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public class PojoProcessor extends ContextualProcessor<String, SamplePojo, Strin
}
}
----

<1> Your Processor is declared with the annotation as for a regular processor.
<2> The handled value type, in this example, is a simple POJO, nothing fancy.
<3> Same POJO value in the _process()_ method.
Expand Down Expand Up @@ -423,6 +424,7 @@ public class PingerService {
}
}
----

<1> Define the method to retry with `org.eclipse.microprofile.faulttolerance.Retry` annotation

.application.properties
Expand Down Expand Up @@ -484,7 +486,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g
| The number of times a Punctuator's execution failed with an exception since the start of the microservice.
|===


.Dead Letter Queue Metrics
[options="header",cols="30%,20%,40%"]
|===
Expand All @@ -501,7 +502,6 @@ This list includes the additional metrics, on top of the Kafka Streams and the g
| The number of messages sent to global DLQ.
|===


== A comparison between Reactive Messaging Kafka and Kafka Streams

These two technologies can be used to create streaming microservices to be used in Event-Driven architecture applications.
Expand Down Expand Up @@ -536,6 +536,7 @@ The purpose of increasing concurrency is to be able to cope with streaming micro
return api.remoteCall();
}
----

<1> `@Incoming` is declaring this method as a subscriber for the channel named `ping-events`
<2> `@Outgoing` is declaring this method as a producer for the channel named `pong-events`
<3> `@io.smallrye.reactive.messaging.annotations.Blocking` Indicates that this method is running out of the processing thread, inside a worker thread and the order of the messages is not important.
Expand Down Expand Up @@ -640,6 +641,7 @@ public class PingProcessor extends ContextualProcessor<String, Ping, String, Pin
}
}
----

<1> Your Processor is declared with the annotation as for a regular processor.
<2> The definition and initialization of your state store.

Expand Down Expand Up @@ -688,9 +690,9 @@ The extension proposes some capabilities to customize more finely the behaviour
=== Processor decorator

The following decoration layer is already extensively used in this extension's source code and allows to use composition around the main processor class you have to define.
Example of a new decorator:
Depending on the version of Quarkus you are using, the pattern differs:

.ProcessorDecorator.java
.ProcessorDecorator.java with Quarkus 3.2 or 3.11.0+
[source,java]
----
@Decorator // <1>
Expand Down Expand Up @@ -719,6 +721,7 @@ public class ProcessorDecorator<KIn, VIn, KOut, VOut> implements Processor<KIn,
}
}
----

<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI
<2> Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
Expand Down Expand Up @@ -759,23 +762,54 @@ The priority is to be set based on the priorities of the existing decorators whi
----
<3> The decorator should have the same generics declaration `<KIn, VIn, KOut, VOut>` as the `Processor<KIn, VIn, KOut, VOut>` interface that it implements
<4> Delegate reference to use when decorating methods.
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate
passthrough decorated methods that this Decorator class won't decorate.
It is annotated with lombok's https://projectlombok.org/features/experimental/Delegate[Delegate] annotation to generate passthrough decorated methods that this Decorator class won't decorate.
The selection is done through a blacklist of method signatures gathered in a private `Excludes` interface declared at the end of the class.
<5> Injection constructor which must have a delegate argument annotated with the `Delegate` annotation from CDI.
You can also, as a regular CDI bean, inject any another CDI bean reference to be used in this decorator.
<6> Example of decorated method, here the main `process` method of `Processor` API of Kafka Streams.

Such a decorator will automatically been taken into account by CDI through the combination of `Decorator` and `Priority` annotations.
.ProcessorDecorator.java for Quarkus 3.8 -> 3.10
[source,java]
----
@Dependent // <1>
@Priority(150) // <2>
public class ProcessorDecorator extends AbstractProcessorDecorator { // <3>
@Override
public void process(Record record) { // <4>
// use bean before
getDelegate().process(record);
// use bean after
}
}
----

<1> We have to mark the bean `Dependent` so it is instantiated at every use.
Indeed, `KStreamProcessorSupplier` needs to return a new `Processor` instance everytime it is called, by Kafka Streams' specification.
<2> We add a `Priority`, with same pattern as a CDI decorator.
<3> We remove the generic types from the class signature, because CDI does not like generics in beans.
<4> Example of override of process method and call to underlying decorator.

Such a decorator will automatically been taken into account by CDI.
The priority will control at which point your decorator will be called among all other decorators.

[CAUTION]
====
We noticed with a new integration-test that is using a custom serde, that usage of custom CDI `Decorator` is causing microservices to randomly crash at startup.
This happens for specific versions of Quarkus.
Known impacted versions are 3.8.x, 3.9.x and 3.10.x.
The 3.2 LTS and upcoming 3.15 LTS versions do not suffer from this symptom.
The **only** solution found was to remove usage of `@Decorator` for `Processor` decorators for microservices based on Quarkus 3.8 LTS.
This change will be reverted in quarkus-kafka-streams-processor 3.0.
This is probably the https://github.com/quarkusio/quarkus/pull/41258[PR] on Quarkus side that has fixed the issue in Quarkus 3.11.
====

=== Producer interceptor

Kafka Streams already has the notion of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
Kafka Streams already has the concept of a https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html[ProducerInterceptor].
But as the rest of Kafka Streams SPI, it is based on a class name and a default constructor for instantiation.
It does not support CDI resolution.

This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumentated through CDI.
This is why this extension's API defines a `ProducerOnSendInterceptor` interface that is instrumented through CDI.
Example of usage:

.MyProducerInterceptor.java
Expand All @@ -796,6 +830,7 @@ public class HeaderAddingProducerInterceptor implements ProducerOnSendIntercepto
}
}
----

<1> Producer interceptors are discovered by CDI by the `ApplicationScoped` annotation
<2> The interceptor class should extend `ProducerOnSendInterceptor`.
`ProducerOnSendInterceptor` extends `ProducerInterceptor<byte[], byte[]>` and overrides some of its methods with default implementations to exempt their forced implementations further down the line.
Expand Down Expand Up @@ -847,6 +882,7 @@ public class CdiRequestContextPunctuatorDecorator implements DecoratedPunctuator
}
}
----

<1> Decorator annotation to profit from the {cdi-spec}/cdi-spec.html#decorators[decorator] feature of CDI
<2> Force the instantiation of the decorator with the Priority annotation.
Indeed, otherwise the decorator is not taken into account by Quarkus.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ kafka-streams.internal.leave.group.on.close=true
# Deactivate exposure of metrics through JMX beans
# It is still adding a mxBean in AppInfoParser though
kafka-streams.auto.include.jmx.reporter=false
# without this, because we are using CDI Decorators with generics, QuarkusTests start to be flaky
# To be removed in 3.16 with this fix? https://github.com/quarkusio/quarkus/pull/43245
quarkus.test.flat-class-path=true
18 changes: 18 additions & 0 deletions integration-tests/custom-serde/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Sample with multiple TopologyConfigCustomizers

EDA to EDA stateless microservice implementation using [KafkaStreams](https://kafka.apache.org/documentation/streams/)

## Introduction

This module showcases the implementation of a
[KafkaStream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with multiple [ConfigurationCustomizer](../../api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/configuration/ConfigurationCustomizer.java) instances.

## Quarkus Dev mode

The sample is fully working with the Quarkus Dev mode that allows to
modify the code and have a hot replacement when the file is saved. It
can be used also to launch the application.

```
$> mvn clean install quarkus:dev
```
184 changes: 184 additions & 0 deletions integration-tests/custom-serde/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-integration-tests</artifactId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-kafka-streams-processor-custom-serde-sample</artifactId>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- Microprofile APIs -->
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.microprofile.config</groupId>
<artifactId>microprofile-config-api</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<!-- Quarkus extensions -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>

<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-protobuf-binding</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>de.sven-jacobs</groupId>
<artifactId>loremipsum</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.daniel-shuy</groupId>
<artifactId>kafka-protobuf-serde</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-test-framework</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit c03349c

Please sign in to comment.