Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentModificationException in the release effect #1238

Open
TobiasPfeifer opened this issue May 8, 2024 · 2 comments
Open

ConcurrentModificationException in the release effect #1238

TobiasPfeifer opened this issue May 8, 2024 · 2 comments

Comments

@TobiasPfeifer
Copy link

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: zio-default-blocking-4, id: 164) otherThread(id: 59)
  		at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
  		at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2343)
  		at zio.kafka.consumer.internal.ConsumerAccess$.$anonfun$make$5(ConsumerAccess.scala:62)
  		at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  		at zio.ZIOCompanionVersionSpecific.$anonfun$attempt$1(ZIOCompanionVersionSpecific.scala:100)
  		at zio.kafka.consumer.internal.ConsumerAccess.make(ConsumerAccess.scala:62)
  		at zio.kafka.consumer.internal.ConsumerAccess.make(ConsumerAccess.scala:61)

It seems that the KafkaConsumer must be closed from the same thread which is not guaranteed with ZIO.acquireRelease AFAIK

I´m providing the layers like this:

effect.provide(
          Kafka.embedded,
          KafkaTestUtils.producer,
          KafkaTestUtils.consumer(workerName)
)
@erikvanoosten
Copy link
Collaborator

erikvanoosten commented May 15, 2024

Hi @TobiasPfeifer. Thanks for the bug report.
How often do you see this? Which zio version are you using? Which zio-kafka version?

The problem is that the consumer is still in use when the release effect (closing the underlying java consumer) is executed. My guess is that during shutdown, the task that is using the consumer is not interrupted.

There is some funky code in ConsumerAccess.withConsumerNoPermit that should manage the interruption. @svroonland do remember how this works? It reminds me of the non-deterministic shutdown we were looking at recently.

@svroonland
Copy link
Collaborator

It's not that funky, it just forks it so it can interrupt it by calling wakeup() ;)

I agree with your idea that something is still using the consumer at the time when close() is called. I would expect everything to be using ZIO scopes where necessary, so all resources should be released in a proper order. Makes me wonder if some forked fiber is not interrupted, perhaps in the user's code.

@TobiasPfeifer Could you share some code that generates this error?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants