diff --git a/.github/workflows/benchs.yml b/.github/workflows/benchs.yml index 782f1d8a7..f6439ec0e 100644 --- a/.github/workflows/benchs.yml +++ b/.github/workflows/benchs.yml @@ -58,7 +58,7 @@ jobs: # - "-rf json": Format type for machine-readable results. JSON # - "-foe true": Should JMH fail immediately if any benchmark had experienced an unrecoverable error?. True # - "-to 60": 1 minute timeout per iteration - run: sbt "zioKafkaBench/Jmh/run -wi 5 -i 5 -r 1 -w 1 -t 1 -to 60 -rf json -foe true" + run: sbt "zioKafkaBench/Jmh/run -wi 5 -i 5 -r 1 -w 1 -t 1 -to 120 -rf json -foe true" - name: Download previous benchmark data uses: actions/cache@v3 diff --git a/.github/workflows/scala-steward.yml b/.github/workflows/scala-steward.yml index b5475716e..26dab1611 100644 --- a/.github/workflows/scala-steward.yml +++ b/.github/workflows/scala-steward.yml @@ -13,7 +13,7 @@ jobs: name: Scala Steward steps: - name: Scala Steward - uses: scala-steward-org/scala-steward-action@v2.65.0 + uses: scala-steward-org/scala-steward-action@v2.70.0 with: github-app-id: ${{ secrets.SCALA_STEWARD_GITHUB_APP_ID }} github-app-installation-id: ${{ secrets.SCALA_STEWARD_GITHUB_APP_INSTALLATION_ID }} diff --git a/README.md b/README.md index a9c998814..656234a7c 100644 --- a/README.md +++ b/README.md @@ -19,13 +19,19 @@ Kafka has a mature Java client for producing and consuming events, but it has a In order to use this library, we need to add the following line in our `build.sbt` file: ```scala -libraryDependencies += "dev.zio" %% "zio-kafka" % "2.8.0" -libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.8.0" % Test +libraryDependencies += "dev.zio" %% "zio-kafka" % "2.8.2" +libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.8.2" % Test ``` Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots. [Browse here](https://oss.sonatype.org/content/repositories/snapshots/dev/zio/zio-kafka_3/) to find available versions. +For `zio-kafka-testkit` together with Scala 3, you also need to add the following to your `build.sbt` file: + +```scala +excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" +``` + ## Example Let's write a simple Kafka producer and consumer using ZIO Kafka with ZIO Streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the `docker-compose.yml` file and run `docker-compose up`: diff --git a/build.sbt b/build.sbt index 81360bf3c..ba61620d2 100644 --- a/build.sbt +++ b/build.sbt @@ -11,16 +11,16 @@ import MimaSettings.mimaSettings */ lazy val binCompatVersionToCompare = None // Some("2.8.0") -lazy val kafkaVersion = "3.7.1" -lazy val embeddedKafkaVersion = "3.7.0" // Should be the same as kafkaVersion, except for the patch part +lazy val kafkaVersion = "3.8.0" +lazy val embeddedKafkaVersion = "3.8.0" // Should be the same as kafkaVersion, except for the patch part lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion -lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.6" +lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.9" enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) -lazy val _scala213 = "2.13.14" -lazy val _scala3 = "3.3.3" +lazy val _scala213 = "2.13.15" +lazy val _scala3 = "3.3.4" inThisBuild( List( @@ -158,7 +158,7 @@ lazy val zioKafkaTest = libraryDependencies ++= Seq( kafkaClients, logback % Test, - "dev.zio" %% "zio-logging-slf4j" % "2.3.0" % Test + "dev.zio" %% "zio-logging-slf4j" % "2.3.1" % Test ) ++ `embedded-kafka`.value ) @@ -180,13 +180,13 @@ lazy val zioKafkaExample = .settings(run / fork := false) .settings( libraryDependencies ++= Seq( - "dev.zio" %% "zio" % "2.1.6", - "dev.zio" %% "zio-kafka" % "2.8.0", - "dev.zio" %% "zio-logging-slf4j2" % "2.3.0", + "dev.zio" %% "zio" % "2.1.9", + "dev.zio" %% "zio-kafka" % "2.8.2", + "dev.zio" %% "zio-logging-slf4j2" % "2.3.1", "io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion, logback, - "dev.zio" %% "zio-kafka-testkit" % "2.8.0" % Test, - "dev.zio" %% "zio-test" % "2.1.6" % Test + "dev.zio" %% "zio-kafka-testkit" % "2.8.2" % Test, + "dev.zio" %% "zio-test" % "2.1.9" % Test ), // Scala 3 compiling fails with: // [error] Modules were resolved with conflicting cross-version suffixes in ProjectRef(uri("file:/home/runner/work/zio-kafka/zio-kafka/"), "zioKafkaExample"): diff --git a/docs/example-of-consuming-producing-and-committing-offsets.md b/docs/example-of-consuming-producing-and-committing-offsets.md index b1a2c4485..30f1f1073 100644 --- a/docs/example-of-consuming-producing-and-committing-offsets.md +++ b/docs/example-of-consuming-producing-and-committing-offsets.md @@ -3,7 +3,7 @@ id: example-of-consuming-producing-and-committing-offsets title: "Example of Consuming, Producing and Committing Offsets" --- -This example shows how to consume messages from topic `topic_a` and produce transformed messages to `topic_b`, after which consumer offsets are committed. Processing is done in chunks using `ZStreamChunk` for more efficiency. +This example shows how to consume messages from topic `topic_a` and produce transformed messages to `topic_b`, after which consumer offsets are committed. Processing is done in chunks using `ZStreamChunk` for more efficiency. Please note: ZIO consumer does not support automatic offset committing. As a result, it ignores the Kafka consumer setting `enable.auto.commit=true`. Developers should manually commit offsets using the provided commit methods, typically after processing messages or at appropriate points in their application logic. ```scala import zio.ZLayer diff --git a/docs/index.md b/docs/index.md index 0dc59fdf2..1c8e97d3a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,6 +26,12 @@ libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "@VERSION@" % Test Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots. [Browse here](https://oss.sonatype.org/content/repositories/snapshots/dev/zio/zio-kafka_3/) to find available versions. +For `zio-kafka-testkit` together with Scala 3, you also need to add the following to your `build.sbt` file: + +```scala +excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13" +``` + ## Example Let's write a simple Kafka producer and consumer using ZIO Kafka with ZIO Streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the `docker-compose.yml` file and run `docker-compose up`: diff --git a/docs/serialization-and-deserialization.md b/docs/serialization-and-deserialization.md index e3d29308b..a789352d3 100644 --- a/docs/serialization-and-deserialization.md +++ b/docs/serialization-and-deserialization.md @@ -1,24 +1,63 @@ --- id: serialization-and-deserialization -title: "Serialization And Deserialization" +title: "Serialization and Deserialization" --- +Zio-kafka deserializes incoming data, and deserializes outgoing data (both keys and values) from byte arrays to any +other type and back. This works by providing a key and value `Deserializer` while constructing a `Consumer`, +and a key and value `Serializer` while constructing the `Producer`. + +A `Serde` combines a `Deserializer` and a `Serializer`. Common serdes are provided in the `Serdes` object, e.g. +`Serdes.byteArray`, `Serdes.string` and `Serdes.long`. A serde can be converted to other serdes, or you can create a +custom serde by implementing the `Serde` trait directly. + +This document contains: + +- Handling failures in a serde +- How to create a custom serde +- How to create and use a custom serde that wraps invalid data +- How to do deserialization in the consumer stream +- A warning about using `mapZIO` + +## Handling failures in a serde + +Ideally, a serde can not fail serializing and deserializing. This is for example the case with the provided +`Serdes.byteArray` and `Serdes.string`. This is not the case for any serde that needs to handle invalid input, +(for example `Serdes.long`), or a serde that needs to do a remote lookup. + +By default, a consumer stream will fail if it encounters a deserialization error in the serde. Unfortunately, the +resulting failure might not clearly indicate that the cause is in the serde. + +There are 2 solutions for improving this: + +- Wrap the result of the serde in a `Try` with the `Serde.asTry` method. +- Use `Serdes.byteArray`, put the deserialization code in the consumer stream, or do serialization before handing the + data to zio-kafka. This way you can handle failures any way you want. + +Both approaches are discussed further below. + ## Custom Data Type Serdes -Serializers and deserializers (serdes) for custom data types can be constructed from scratch or by converting existing serdes. For example, to create a serde for an `Instant`: +Serializers and deserializers for custom data types can be created from scratch, or by converting existing +serdes. For example, to create a serde for an `Instant` from a serde for a `Long`: ```scala import java.time.Instant import zio.kafka.serde._ -val instantSerde: Serde[Any, Instant] = Serde.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli) +val instantSerde: Serde[Any, Instant] = + Serdes.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli) ``` -## Handling deserialization failures +To handle missing data (an empty key or value), you can use the `Serde.asOption` transformer. For example: +`Serdes.string.asOption`. This results in a `None` if the key or value is empty, and in a `Some(string)` otherwise. -The default behavior for a consumer stream when encountering a deserialization failure is to fail the stream. In many cases you may want to handle this situation differently, e.g. by skipping the message that failed to deserialize or by executing an alternative effect. For this purpose, any `Deserializer[T]` for some type `T` can be easily converted into a `Deserializer[Try[T]]` where deserialization failures are converted to a `Failure` using the `asTry` method. +## Custom serdes that wraps invalid data -Below is an example of skipping messages that fail to deserialize. The offset is passed downstream to be committed. +Any `Deserializer[A]` for a given type `A` can be converted into a `Deserializer[Try[A]]` where deserialization +failures are converted to a `Failure` using the `asTry` method. (Method `asTry` is also available on `Serde`.) + +Below is an example of skipping records that fail to deserialize. The offset is passed downstream to be committed. ```scala import zio._, stream._ @@ -28,11 +67,13 @@ import scala.util.{Try, Success, Failure} val consumer = ZLayer.scoped(Consumer.make(consumerSettings)) +val keySerde = Serdes.string +val valueSerde = Serdes.string.asTry // <-- using `.asTry` val stream = Consumer - .plainStream(Subscription.topics("topic150"), Serde.string, Serde.string.asTry) + .plainStream(Subscription.topics("topic150"), keySerde, valueSerde) stream - .mapZIO { record => + .mapZIO { record => // ⚠️ see section about `mapZIO` below! val tryValue: Try[String] = record.record.value() val offset: Offset = record.offset @@ -50,3 +91,77 @@ stream .runDrain .provideSomeLayer(consumer) ``` + +## Deserialization in the consumer stream + +In this approach we provide zio-kafka with the `Serdes.byteArray` serde (which is a pass-through serde) and do the +deserialization in the consumer stream. The deserialization can be done with regular ZIO operators. + +This approach provides more freedom at the cost of having to write more code. It also allows for optimizations such as +operating on chunks of records (see next section), and more contextual failure handling. + +Here is an example: + +```scala +import zio._, stream._ +import zio.kafka.consumer._ + +val consumer = ZLayer.scoped(Consumer.make(consumerSettings)) + +val stream = Consumer + .plainStream(Subscription.topics("topic150"), Serde.byteArray, Serde.byteArray) + +def deserialize(value: Array[Byte]): ZIO[Any, Throwable, Message] = ??? + +stream + .mapZIO { record => // ⚠️ see section about `mapZIO` below! + val value: Array[Byte] = record.record.value() + val offset: Offset = record.offset + + deserialize(value) + // possible action to take if deserialization fails: + .recoverWith(_ => someEffect(value)) + .flatMap(processMessage) + .as(offset) + } + .aggregateAsync(Consumer.offsetBatches) + .mapZIO(_.commit) + .runDrain + .provideSomeLayer(consumer) +``` + +## A warning about `mapZIO` + +Be careful with using `mapZIO` as it breaks the chunking structure of the stream (or more precisely, the resulting +stream has chunks with a single element). Throughput can be considerably lower than with the chunking structure intact. + +If your application requirements allow all elements of a chunk to be processed in one go, then you can use one of these +techniques to preserve the chunking structure: + +### Use `chunksWith` + +Use `chunksWith` when you have a single processing step that needs to work on a chunk. + +```scala +def f(a: A): ZIO[R, E, B] + +stream // ZStream[R, E, A] + .chunksWith { stream => stream.mapZIO(f) } // ZStream[R, E, B] +``` + +### Expose chunking structure with `chunks` + +Use `chunks` when you have multiple processing steps that can all work on a chunk at a time. Since `chunks` exposes the +chunking structure explicitly, the program can no longer accidentally break the chunking structure (unless +`flattenChunks` is also used). + +```scala +def f(a: A): ZIO[R, E, B] +def g(b: B): ZIO[R, E, C] + +stream // ZStream[R, E, A] + .chunks // ZStream[R, E, Chunk[A]] + .mapZIO { chunk => ZIO.foreach(chunk)(f) } // ZStream[R, E, Chunk[B]] + .mapZIO { chunk => ZIO.foreach(chunk)(g) } // ZStream[R, E, Chunk[C]] + .flattenChunks // ZStream[R, E, C] +``` diff --git a/project/build.properties b/project/build.properties index ee4c672cd..0b699c305 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.10.1 +sbt.version=1.10.2 diff --git a/project/plugins.sbt b/project/plugins.sbt index 53aa8e900..a2a45ec4b 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,9 +4,9 @@ addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion) addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion) -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") -addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1") -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.10.0") -addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.3") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.13.0") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2") +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.10.4") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.4") resolvers ++= Resolver.sonatypeOssRepos("public") diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala new file mode 100644 index 000000000..a6c66e278 --- /dev/null +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ProducerBenchmark.scala @@ -0,0 +1,73 @@ +package zio.kafka.bench + +import io.github.embeddedkafka.EmbeddedKafka +import org.apache.kafka.clients.producer.ProducerRecord +import org.openjdk.jmh.annotations._ +import zio.kafka.producer.Producer +import zio.kafka.serde.Serde +import zio.kafka.testkit.Kafka +import zio.kafka.testkit.KafkaTestUtils.producer +import zio.stream.ZStream +import zio.{ Chunk, ZIO, ZLayer } + +import java.util.concurrent.TimeUnit + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] { + val topic1 = "topic1" + val nrPartitions = 6 + val nrMessages = 500 + val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i")) + val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) => + new ProducerRecord(topic1, k, v) + }) + + override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] = + ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie + + override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for { + _ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore + _ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions)) + } yield () + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + def produceChunkSeq(): Any = runZIO { + // Produce 30 chunks sequentially + Producer.produceChunk(records, Serde.string, Serde.string).repeatN(29) + } + + @Benchmark + @BenchmarkMode(Array(Mode.AverageTime)) + def produceChunkPar(): Any = runZIO { + // Produce 30 chunks of which 4 run in parallel + ZStream + .range(0, 30, 1) + .mapZIOParUnordered(4) { _ => + Producer.produceChunk(records, Serde.string, Serde.string) + } + .runDrain + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + def produceSingleRecordSeq(): Any = runZIO { + // Produce 50 records sequentially + Producer.produce(topic1, "key", "value", Serde.string, Serde.string).repeatN(99) + } + + @Benchmark + @BenchmarkMode(Array(Mode.Throughput)) + @OutputTimeUnit(TimeUnit.SECONDS) + def produceSingleRecordPar(): Any = runZIO { + // Produce 100 records of which 4 run in parallel + ZStream + .range(0, 100, 1) + .mapZIOParUnordered(4) { _ => + Producer.produce(topic1, "key", "value", Serde.string, Serde.string) + } + .runDrain + } +} diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala index 94213604f..126ed41fa 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopSpec.scala @@ -5,7 +5,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{ AuthenticationException, AuthorizationException } import zio._ import zio.kafka.consumer.{ ConsumerSettings, Subscription } -import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment import zio.metrics.{ MetricState, Metrics } import zio.stream.{ Take, ZStream } @@ -20,6 +20,7 @@ object RunloopSpec extends ZIOSpecDefault { private type PartitionsHub = Hub[Take[Throwable, PartitionAssignment]] private val tp10 = new TopicPartition("t1", 0) + private val tp11 = new TopicPartition("t1", 1) private val key123 = "123".getBytes private val consumerSettings = ConsumerSettings(List("bootstrap")) @@ -27,7 +28,7 @@ object RunloopSpec extends ZIOSpecDefault { override def spec: Spec[TestEnvironment with Scope, Any] = suite("RunloopSpec")( test("runloop creates a new partition stream and polls for new records") { - withRunloop { (mockConsumer, partitionsHub, runloop) => + withRunloop() { (mockConsumer, partitionsHub, runloop) => mockConsumer.schedulePollTask { () => mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L)).asJava) mockConsumer.rebalance(Seq(tp10).asJava) @@ -52,8 +53,48 @@ object RunloopSpec extends ZIOSpecDefault { ) } }, + test( + "runloop does not starts a new stream for partition which being revoked right after assignment within the same RebalanceEvent" + ) { + Diagnostics.SlidingQueue.make(100).flatMap { diagnostics => + withRunloop(diagnostics) { (mockConsumer, partitionsHub, runloop) => + mockConsumer.schedulePollTask { () => + mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L), tp11 -> Long.box(0L)).asJava) + mockConsumer.rebalance(Seq(tp10, tp11).asJava) + mockConsumer.rebalance(Seq(tp10).asJava) + mockConsumer.addRecord(makeConsumerRecord(tp10, key123)) + } + for { + streamStream <- ZStream.fromHubScoped(partitionsHub) + _ <- runloop.addSubscription(Subscription.Topics(Set(tp10, tp11).map(_.topic()))) + _ <- streamStream + .map(_.exit) + .flattenExitOption + .flattenChunks + .take(1) + .mapZIO { case (_, stream) => + stream.runHead + } + .runDrain + diagnosticEvents <- diagnostics.queue.takeAll + rebalanceEvents = + diagnosticEvents.collect { case rebalanceEvent: DiagnosticEvent.Rebalance => + rebalanceEvent + } + } yield assertTrue( + rebalanceEvents.length == 1, + rebalanceEvents.head == DiagnosticEvent.Rebalance( + revoked = Set(tp11), + assigned = Set(tp10), + lost = Set.empty, + ended = Set.empty + ) + ) + } + } + }, test("runloop retries poll upon AuthorizationException and AuthenticationException") { - withRunloop { (mockConsumer, partitionsHub, runloop) => + withRunloop() { (mockConsumer, partitionsHub, runloop) => mockConsumer.schedulePollTask { () => mockConsumer.updateEndOffsets(Map(tp10 -> Long.box(0L)).asJava) mockConsumer.rebalance(Seq(tp10).asJava) @@ -90,7 +131,7 @@ object RunloopSpec extends ZIOSpecDefault { } ) @@ withLiveClock - private def withRunloop( + private def withRunloop(diagnostics: Diagnostics = Diagnostics.NoOp)( f: (BinaryMockConsumer, PartitionsHub, Runloop) => ZIO[Scope, Throwable, TestResult] ): ZIO[Scope, Throwable, TestResult] = ZIO.scoped { @@ -105,7 +146,7 @@ object RunloopSpec extends ZIOSpecDefault { consumerSettings, 100.millis, 100.millis, - Diagnostics.NoOp, + diagnostics, consumerAccess, partitionsHub ) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala new file mode 100644 index 000000000..5fe6fa203 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/producer/ProducerSpec.scala @@ -0,0 +1,228 @@ +package zio.kafka.producer + +import org.apache.kafka.clients.producer +import org.apache.kafka.clients.producer.{ MockProducer, ProducerRecord, RecordMetadata } +import org.apache.kafka.common.KafkaException +import org.apache.kafka.common.errors.AuthenticationException +import org.apache.kafka.common.serialization.ByteArraySerializer +import zio._ +import zio.test.TestAspect.withLiveClock +import zio.test._ + +import java.util.concurrent.Future +import java.util.concurrent.atomic.AtomicBoolean + +object ProducerSpec extends ZIOSpecDefault { + + private object TestKeyValueSerializer extends ByteArraySerializer + + private class BinaryMockProducer(autoComplete: Boolean) + extends MockProducer[Array[Byte], Array[Byte]]( + autoComplete, + TestKeyValueSerializer, + TestKeyValueSerializer + ) { + + private val nextSendAllowed = new AtomicBoolean(autoComplete) + + override def send( + record: ProducerRecord[Array[Byte], Array[Byte]], + callback: producer.Callback + ): Future[RecordMetadata] = { + awaitSendAllowed() + val sendResult = super.send(record, callback) + nextSendAllowed.set(autoComplete) + + sendResult + } + + def allowNextSendAndAwaitSendCompletion(): Unit = { + allowNextSend() + awaitSendCompletion() + } + + def allowNextSend(): Unit = + nextSendAllowed.set(true) + + def awaitSendAllowed(): Unit = + awaitSendCondition(true) + + def awaitSendCompletion(): Unit = + awaitSendCondition(false) + + private def awaitSendCondition(expectedCondition: Boolean): Unit = { + var awaitingSendCondition = true + while (awaitingSendCondition) + awaitingSendCondition = expectedCondition != nextSendAllowed.get() + } + + } + + override def spec: Spec[TestEnvironment with Scope, Any] = + suite("Producer")( + suite("produceChunkAsyncWithFailures")( + test("successfully produces chunk of records") { + withProducer() { (_, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + for { + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results.forall(_.isRight) + ) + } + }, + test("omits sending further records in chunk in case the first send call fails") { + withProducer() { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + for { + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results.head.isLeft, + results.head.left.forall(_.getMessage == testAuthenticationExceptionMessage), + results.tail.forall(_ == Left(Producer.PublishOmittedException)) + ) + } + }, + test("provides correct results in case last send call fails") { + withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + val mockJavaProducerBehaviour = ZIO.succeed { + // Send calls behaviours + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + mockJavaProducer.allowNextSend() + // Send callbacks behaviours + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + } + for { + _ <- mockJavaProducerBehaviour.forkScoped + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results.init.forall(_.isRight), + results.last.isLeft, + results.last.left.forall(_.getMessage == testAuthenticationExceptionMessage) + ) + } + }, + test("omits sending further records in chunk and provides correct results in case middle send call fails") { + withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + val mockJavaProducerBehaviour = ZIO.succeed { + // Send calls behaviours + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + mockJavaProducer.allowNextSend() + // Send callbacks behaviours + mockJavaProducer.completeNext() + mockJavaProducer.completeNext() + } + for { + _ <- mockJavaProducerBehaviour.forkScoped + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results(0).isRight, + results(1).isRight, + results(2).left.forall(_.getMessage == testAuthenticationExceptionMessage), + results(3) == Left(Producer.PublishOmittedException), + results(4) == Left(Producer.PublishOmittedException) + ) + } + }, + test( + "omits sending further records in chunk and provides correct results in case second publication to broker fails along with middle send call failure" + ) { + withProducer(autoCompleteProducerRequests = false) { (mockJavaProducer, producer) => + val recordsToSend = Chunk( + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord(), + makeProducerRecord() + ) + val testAuthenticationExceptionMessage = "test authentication exception" + val testKafkaExceptionMessage = "unexpected broker exception" + val mockJavaProducerBehaviour = ZIO.succeed { + // Send calls behaviours + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.allowNextSendAndAwaitSendCompletion() + mockJavaProducer.sendException = new AuthenticationException(testAuthenticationExceptionMessage) + mockJavaProducer.allowNextSend() + // Send callbacks behaviours + mockJavaProducer.completeNext() + mockJavaProducer.errorNext(new KafkaException(testKafkaExceptionMessage)) + } + for { + _ <- mockJavaProducerBehaviour.forkScoped + results <- producer.produceChunkAsyncWithFailures(recordsToSend).flatten + } yield assertTrue( + results.length == recordsToSend.length, + results(0).isRight, + results(1).isLeft, + results(1).left.forall(_.getMessage == testKafkaExceptionMessage), + results(2).left.forall(_.getMessage == testAuthenticationExceptionMessage), + results(3) == Left(Producer.PublishOmittedException), + results(4) == Left(Producer.PublishOmittedException) + ) + } + } + ) + ) @@ withLiveClock + + private def withProducer(autoCompleteProducerRequests: Boolean = true)( + producerTest: (BinaryMockProducer, Producer) => ZIO[Scope, Throwable, TestResult] + ): ZIO[Scope, Throwable, TestResult] = + ZIO.scoped { + val mockJavaProducer = new BinaryMockProducer(autoCompleteProducerRequests) + + Producer + .fromJavaProducer(mockJavaProducer, ProducerSettings()) + .flatMap(producerTest(mockJavaProducer, _)) + } + + private def makeProducerRecord( + topic: String = "testTopic", + key: String = "key", + value: String = "value" + ): ProducerRecord[Array[Byte], Array[Byte]] = + new ProducerRecord[Array[Byte], Array[Byte]](topic, key.getBytes, value.getBytes) + +} diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala index 0524c8804..77e025293 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/Kafka.scala @@ -1,6 +1,5 @@ package zio.kafka.testkit -import _root_.kafka.server.KafkaConfig import io.github.embeddedkafka.{ EmbeddedK, EmbeddedKafka, EmbeddedKafkaConfig } import zio._ @@ -78,23 +77,23 @@ object Kafka { embeddedWithBrokerProps( ports => Map( - "group.min.session.timeout.ms" -> "500", - "group.initial.rebalance.delay.ms" -> "0", - "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", - "super.users" -> "User:ANONYMOUS", - "ssl.client.auth" -> "required", - "ssl.enabled.protocols" -> "TLSv1.2", - "ssl.truststore.type" -> "JKS", - "ssl.keystore.type" -> "JKS", - "ssl.truststore.location" -> KafkaTestUtils.trustStoreFile.getAbsolutePath, - "ssl.truststore.password" -> "123456", - "ssl.keystore.location" -> KafkaTestUtils.keyStoreFile.getAbsolutePath, - "ssl.keystore.password" -> "123456", - "ssl.key.password" -> "123456", - KafkaConfig.InterBrokerListenerNameProp -> "SSL", - KafkaConfig.ListenersProp -> s"SSL://localhost:${ports.kafkaPort}", - KafkaConfig.AdvertisedListenersProp -> s"SSL://localhost:${ports.kafkaPort}", - KafkaConfig.ZkConnectionTimeoutMsProp -> s"${30.second.toMillis}" + "group.min.session.timeout.ms" -> "500", + "group.initial.rebalance.delay.ms" -> "0", + "authorizer.class.name" -> "kafka.security.authorizer.AclAuthorizer", + "super.users" -> "User:ANONYMOUS", + "ssl.client.auth" -> "required", + "ssl.enabled.protocols" -> "TLSv1.2", + "ssl.truststore.type" -> "JKS", + "ssl.keystore.type" -> "JKS", + "ssl.truststore.location" -> KafkaTestUtils.trustStoreFile.getAbsolutePath, + "ssl.truststore.password" -> "123456", + "ssl.keystore.location" -> KafkaTestUtils.keyStoreFile.getAbsolutePath, + "ssl.keystore.password" -> "123456", + "ssl.key.password" -> "123456", + "inter.broker.listener.name" -> "SSL", + "listeners" -> s"SSL://localhost:${ports.kafkaPort}", + "advertised.listeners" -> s"SSL://localhost:${ports.kafkaPort}", + "zookeeper.connection.timeout.ms" -> s"${30.second.toMillis}" ), customBrokerProps ) diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index eb0caa125..1d408a0ef 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -1046,6 +1046,14 @@ object AdminClient { override def asJava: JConsumerGroupState = JConsumerGroupState.EMPTY } + case object Assigning extends ConsumerGroupState { + override def asJava: JConsumerGroupState = JConsumerGroupState.ASSIGNING + } + + case object Reconciling extends ConsumerGroupState { + override def asJava: JConsumerGroupState = JConsumerGroupState.RECONCILING + } + def apply(state: JConsumerGroupState): ConsumerGroupState = state match { case JConsumerGroupState.UNKNOWN => ConsumerGroupState.Unknown @@ -1054,6 +1062,8 @@ object AdminClient { case JConsumerGroupState.STABLE => ConsumerGroupState.Stable case JConsumerGroupState.DEAD => ConsumerGroupState.Dead case JConsumerGroupState.EMPTY => ConsumerGroupState.Empty + case JConsumerGroupState.ASSIGNING => ConsumerGroupState.Assigning + case JConsumerGroupState.RECONCILING => ConsumerGroupState.Reconciling } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 02fa83719..c8524b708 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -847,12 +847,17 @@ object Runloop { ): RebalanceEvent = copy( wasInvoked = true, + assignedTps = assignedTps -- revoked, revokedTps = revokedTps ++ revoked, endedStreams = this.endedStreams ++ endedStreams ) def onLost(lost: Set[TopicPartition]): RebalanceEvent = - copy(wasInvoked = true, lostTps = lostTps ++ lost) + copy( + wasInvoked = true, + assignedTps = assignedTps -- lost, + lostTps = lostTps ++ lost + ) } private object RebalanceEvent { diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index e4dc34496..bc77c6469 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -8,7 +8,9 @@ import zio.kafka.serde.Serializer import zio.kafka.utils.SslHelper import zio.stream.{ ZPipeline, ZStream } +import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ +import scala.util.control.{ NoStackTrace, NonFatal } trait Producer { @@ -102,6 +104,9 @@ trait Producer { /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time * penalty for each chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunk( records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] @@ -110,6 +115,9 @@ trait Producer { /** * Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time * penalty for each chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunk[R, K, V]( records: Chunk[ProducerRecord[K, V]], @@ -126,6 +134,9 @@ trait Producer { * It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal * the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the * entire chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunkAsync( records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]] @@ -140,6 +151,9 @@ trait Producer { * It is possible that for chunks that exceed the producer's internal buffer size, the outer layer will also signal * the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the * entire chunk. + * + * When publishing any of the records fails, the whole batch fails even though some records might have been published. + * Use [[produceChunkAsyncWithFailures]] to get results per record. */ def produceChunkAsync[R, K, V]( records: Chunk[ProducerRecord[K, V]], @@ -160,6 +174,9 @@ trait Producer { * This variant of `produceChunkAsync` more accurately reflects that individual records within the Chunk can fail to * publish, rather than the failure being at the level of the Chunk. * + * When attempt to send a record into buffer for publication fails, the following records in the chunk are not + * published. This is indicated with a [[Producer.PublishOmittedException]]. + * * This variant does not accept serializers as they may also fail independently of each record and this is not * reflected in the return type. */ @@ -185,6 +202,10 @@ trait Producer { } object Producer { + case object PublishOmittedException + extends RuntimeException("Publish omitted due to a publish error for a previous record in the chunk") + with NoStackTrace + val live: RLayer[ProducerSettings, Producer] = ZLayer.scoped { for { @@ -218,12 +239,13 @@ object Producer { settings: ProducerSettings ): ZIO[Scope, Throwable, Producer] = for { + runtime <- ZIO.runtime[Any] sendQueue <- Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]( settings.sendBufferSize ) - producer = new ProducerLive(javaProducer, sendQueue) - _ <- producer.sendFromQueue.forkScoped + producer = new ProducerLive(javaProducer, runtime, sendQueue) + _ <- ZIO.blocking(producer.sendFromQueue).forkScoped } yield producer /** @@ -359,6 +381,7 @@ object Producer { private[producer] final class ProducerLive( private[producer] val p: JProducer[Array[Byte], Array[Byte]], + runtime: Runtime[Any], sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])] ) extends Producer { @@ -455,63 +478,68 @@ private[producer] final class ProducerLive( override def metrics: Task[Map[MetricName, Metric]] = ZIO.attemptBlocking(p.metrics().asScala.toMap) /** - * Currently sending has the following characteristics: - * - You can submit many chunks, they get buffered in the send queue. - * - A chunk only gets send after the previous chunk completes (completes means that the callbacks for each record - * was invoked). - * - The records in a chunk are send in one go, in order. Records for the same partition have a high chance that - * they land in the same batch (which is good for compression). - * - Record ordering is retained and guaranteed between chunks. - * - Record ordering is retained and guaranteed within a chunk (per partition) unless `retries` has been enabled - * (see https://kafka.apache.org/documentation/#producerconfigs_retries). + * Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost, + * therefore this stream is run on the blocking thread pool */ val sendFromQueue: ZIO[Any, Nothing, Any] = - ZIO.runtime[Any].flatMap { runtime => - // Calls to 'send' may block when updating metadata or when communication with the broker is (temporarily) lost, - // therefore this stream is run on the blocking thread pool. - ZIO.blocking { - ZStream - .fromQueueWithShutdown(sendQueue) - .mapZIO { case (serializedRecords, done) => - sendChunk(runtime, serializedRecords) - .flatMap(done.succeed(_)) + ZStream + .fromQueueWithShutdown(sendQueue) + .mapZIO { case (serializedRecords, done) => + ZIO.succeed { + val recordsLength = serializedRecords.length + val sentRecordsCounter = new AtomicInteger(0) + val recordsIterator: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex + val sentResults: Array[Either[Throwable, RecordMetadata]] = + new Array[Either[Throwable, RecordMetadata]](recordsLength) + + @inline def insertSentResult(resultIndex: Int, sentResult: Either[Throwable, RecordMetadata]): Unit = { + // Updating sentResults[resultIndex] here is safe, + // because only a single update for every resultIndex of sentResults is performed + sentResults.update(resultIndex, sentResult) + + // Reading from sentRecordsCounter guarantees fully updated version of sentResults + // will be visible and used to complete done promise + if (sentRecordsCounter.incrementAndGet() == recordsLength) { + val sentResultsChunk = Chunk.fromArray(sentResults) + + Unsafe.unsafe { implicit u => + val _ = runtime.unsafe.run(done.succeed(sentResultsChunk)) + } + } } - .runDrain - } - } - private def sendChunk( - runtime: Runtime[Any], - serializedRecords: Chunk[ByteRecord] - ): ZIO[Any, Nothing, Chunk[Either[Throwable, RecordMetadata]]] = - for { - promises <- ZIO.foreach(serializedRecords)(sendRecord(runtime)) - results <- ZIO.foreach(promises)(_.await.either) - } yield results - - private def sendRecord( - runtime: Runtime[Any] - )(record: ByteRecord): ZIO[Any, Nothing, Promise[Throwable, RecordMetadata]] = { - def unsafeRun(f: => ZIO[Any, Nothing, Any]): Unit = { - val _ = Unsafe.unsafe(implicit u => runtime.unsafe.run(f)) - } - - for { - done <- Promise.make[Throwable, RecordMetadata] - _ <- ZIO - .attempt[Any] { - p.send( - record, - (metadata: RecordMetadata, err: Exception) => - unsafeRun { - if (err == null) done.succeed(metadata) - else done.fail(err) - } - ) - } - .catchAll(err => done.fail(err)) - } yield done - } + var previousSendCallsSucceed = true + + recordsIterator.foreach { case (record: ByteRecord, recordIndex: Int) => + if (previousSendCallsSucceed) { + try { + val _ = p.send( + record, + (metadata: RecordMetadata, err: Exception) => + insertSentResult( + recordIndex, + if (err eq null) Right(metadata) else Left(err) + ) + ) + } catch { + case NonFatal(err) => + previousSendCallsSucceed = false + + insertSentResult( + recordIndex, + Left(err) + ) + } + } else { + insertSentResult( + recordIndex, + Left(Producer.PublishOmittedException) + ) + } + } + } + } + .runDrain private def serialize[R, K, V]( r: ProducerRecord[K, V], @@ -522,5 +550,4 @@ private[producer] final class ProducerLive( key <- keySerializer.serialize(r.topic, r.headers, r.key()) value <- valueSerializer.serialize(r.topic, r.headers, r.value()) } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) - }