Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reactor-kafka activation/deactivation is not consistent #3837

Open
jackshirazi opened this issue Oct 9, 2024 · 1 comment
Open

reactor-kafka activation/deactivation is not consistent #3837

jackshirazi opened this issue Oct 9, 2024 · 1 comment

Comments

@jackshirazi
Copy link
Contributor

Describe the bug

See https://discuss.elastic.co/t/reactor-parallel-streams-causes-apm-log-spam-and-possibly-missed-spans/366890 - basically when used with parallel() there is an inconsistency

Steps to reproduce

See https://discuss.elastic.co/t/reactor-parallel-streams-causes-apm-log-spam-and-possibly-missed-spans/366890

Expected behavior

No related errors or warnings

Note that #3830 prevents the error stack trace, but the related warnings show the issue elsewhere

@jackshirazi jackshirazi added the bug Bugs label Oct 9, 2024
@JonasKunz
Copy link
Contributor

Managed to reproduce using the following code:

Code
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core</artifactId>
	<version>3.5.11</version>
</dependency>
<dependency>
	<groupId>org.testcontainers</groupId>
	<artifactId>kafka</artifactId>
	<version>1.20.3</version>
</dependency>
<dependency>
	<groupId>io.projectreactor.kafka</groupId>
	<artifactId>reactor-kafka</artifactId>
	<version>1.3.23</version>
</dependency>
public static void main(String[] args) throws Exception {

        KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka").withTag("7.1.0"));
        kafka.withCreateContainerCmdModifier(cmd -> cmd.getHostConfig().withMemory(4096 * 1024 * 1024L));
        kafka.start();
        String bootstrapServers = kafka.getBootstrapServers();


        KafkaProducer producer = new KafkaProducer<>(
                ImmutableMap.of(
                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                        ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
                        ProducerConfig.LINGER_MS_CONFIG, 50
                ),
                new StringSerializer(),
                new StringSerializer()
        );

        Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ReceiverOptions<Integer, String> receiverOptions =
                ReceiverOptions.<Integer, String>create(consumerProps)
                        .subscription(Collections.singleton("my-topic"));

        CountDownLatch doneLatch = new CountDownLatch(1);
        KafkaReceiver.create(receiverOptions)
                .receiveAutoAck()
                .concatMap(flux ->
                        flux
                                .parallel(2)
                                .runOn(Schedulers.parallel())
                                .map(rec -> {
                                    doSleep(1500);
                                    return rec;
                                })
                )
                .subscribe(rec -> {
                    System.out.println("Processed " + rec.value());
                    doneLatch.countDown();
                });

        for (int i = 0; i < 5; i++) {
            producer.send(new ProducerRecord<>("my-topic", "foo", "bar" + i))
                    .get();
        }
        doneLatch.await();
        kafka.close();
    }

    private static void doSleep(int ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

The problem seems to rather lie within our kafka instrumentation than reactor:

24-10-24 12:35:24,000 [reactive-kafka-sample-group-1] WARN  co.elastic.apm.agent.impl.transaction.AbstractSpanImpl - End has already been called: 'Kafka record from my-topic' 00-52fd7d71b659a6efaa8a0c6c467994e8-b852e654e3370031-01 (540273f4)
java.lang.Throwable: null
	at co.elastic.apm.agent.impl.transaction.AbstractSpanImpl.end(AbstractSpanImpl.java:582) [elastic-apm-agent-1.52.1-SNAPSHOT.jar:1.52.1-SNAPSHOT]
	at co.elastic.apm.agent.impl.transaction.AbstractSpanImpl.end(AbstractSpanImpl.java:549) [elastic-apm-agent-1.52.1-SNAPSHOT.jar:1.52.1-SNAPSHOT]
	at co.elastic.apm.agent.kafka.KafkaConsumerInstrumentation$KafkaPollEntryAdvice.pollStart(KafkaConsumerInstrumentation.java:75) [elastic-apm-agent-1.52.1-SNAPSHOT.jar:?]
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) [kafka-clients-2.7.1.jar:?]
	at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:366) [reactor-kafka-1.3.23.jar:1.3.23]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) [reactor-core-3.5.11.jar:3.5.11]
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) [reactor-core-3.5.11.jar:3.5.11]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
	at java.lang.Thread.run(Thread.java:833) [?:?]

That code first deactives and then ends the kafka transaction. It seems to be invoked twice (deactivating again and ending again), which in turn causes the activation stack to be broken.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants