Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into subscription-stream…
Browse files Browse the repository at this point in the history
…-control
  • Loading branch information
svroonland committed Oct 10, 2024
2 parents 5fec195 + a0f45ca commit 699e6e8
Show file tree
Hide file tree
Showing 16 changed files with 619 additions and 109 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
# - "-rf json": Format type for machine-readable results. JSON
# - "-foe true": Should JMH fail immediately if any benchmark had experienced an unrecoverable error?. True
# - "-to 60": 1 minute timeout per iteration
run: sbt "zioKafkaBench/Jmh/run -wi 5 -i 5 -r 1 -w 1 -t 1 -to 60 -rf json -foe true"
run: sbt "zioKafkaBench/Jmh/run -wi 5 -i 5 -r 1 -w 1 -t 1 -to 120 -rf json -foe true"

- name: Download previous benchmark data
uses: actions/cache@v3
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scala-steward.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
name: Scala Steward
steps:
- name: Scala Steward
uses: scala-steward-org/scala-steward-action@v2.65.0
uses: scala-steward-org/scala-steward-action@v2.70.0
with:
github-app-id: ${{ secrets.SCALA_STEWARD_GITHUB_APP_ID }}
github-app-installation-id: ${{ secrets.SCALA_STEWARD_GITHUB_APP_INSTALLATION_ID }}
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@ Kafka has a mature Java client for producing and consuming events, but it has a
In order to use this library, we need to add the following line in our `build.sbt` file:

```scala
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.8.0"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.8.0" % Test
libraryDependencies += "dev.zio" %% "zio-kafka" % "2.8.2"
libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "2.8.2" % Test
```

Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots.
[Browse here](https://oss.sonatype.org/content/repositories/snapshots/dev/zio/zio-kafka_3/) to find available versions.

For `zio-kafka-testkit` together with Scala 3, you also need to add the following to your `build.sbt` file:

```scala
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
```

## Example

Let's write a simple Kafka producer and consumer using ZIO Kafka with ZIO Streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the `docker-compose.yml` file and run `docker-compose up`:
Expand Down
22 changes: 11 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ import MimaSettings.mimaSettings
*/
lazy val binCompatVersionToCompare = None // Some("2.8.0")

lazy val kafkaVersion = "3.7.1"
lazy val embeddedKafkaVersion = "3.7.0" // Should be the same as kafkaVersion, except for the patch part
lazy val kafkaVersion = "3.8.0"
lazy val embeddedKafkaVersion = "3.8.0" // Should be the same as kafkaVersion, except for the patch part

lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.6"
lazy val logback = "ch.qos.logback" % "logback-classic" % "1.5.9"

enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin)

lazy val _scala213 = "2.13.14"
lazy val _scala3 = "3.3.3"
lazy val _scala213 = "2.13.15"
lazy val _scala3 = "3.3.4"

inThisBuild(
List(
Expand Down Expand Up @@ -158,7 +158,7 @@ lazy val zioKafkaTest =
libraryDependencies ++= Seq(
kafkaClients,
logback % Test,
"dev.zio" %% "zio-logging-slf4j" % "2.3.0" % Test
"dev.zio" %% "zio-logging-slf4j" % "2.3.1" % Test
) ++ `embedded-kafka`.value
)

Expand All @@ -180,13 +180,13 @@ lazy val zioKafkaExample =
.settings(run / fork := false)
.settings(
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.1.6",
"dev.zio" %% "zio-kafka" % "2.8.0",
"dev.zio" %% "zio-logging-slf4j2" % "2.3.0",
"dev.zio" %% "zio" % "2.1.9",
"dev.zio" %% "zio-kafka" % "2.8.2",
"dev.zio" %% "zio-logging-slf4j2" % "2.3.1",
"io.github.embeddedkafka" %% "embedded-kafka" % embeddedKafkaVersion,
logback,
"dev.zio" %% "zio-kafka-testkit" % "2.8.0" % Test,
"dev.zio" %% "zio-test" % "2.1.6" % Test
"dev.zio" %% "zio-kafka-testkit" % "2.8.2" % Test,
"dev.zio" %% "zio-test" % "2.1.9" % Test
),
// Scala 3 compiling fails with:
// [error] Modules were resolved with conflicting cross-version suffixes in ProjectRef(uri("file:/home/runner/work/zio-kafka/zio-kafka/"), "zioKafkaExample"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id: example-of-consuming-producing-and-committing-offsets
title: "Example of Consuming, Producing and Committing Offsets"
---

This example shows how to consume messages from topic `topic_a` and produce transformed messages to `topic_b`, after which consumer offsets are committed. Processing is done in chunks using `ZStreamChunk` for more efficiency.
This example shows how to consume messages from topic `topic_a` and produce transformed messages to `topic_b`, after which consumer offsets are committed. Processing is done in chunks using `ZStreamChunk` for more efficiency. Please note: ZIO consumer does not support automatic offset committing. As a result, it ignores the Kafka consumer setting `enable.auto.commit=true`. Developers should manually commit offsets using the provided commit methods, typically after processing messages or at appropriate points in their application logic.

```scala
import zio.ZLayer
Expand Down
6 changes: 6 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "@VERSION@" % Test
Snapshots are available on Sonatype's snapshot repository https://oss.sonatype.org/content/repositories/snapshots.
[Browse here](https://oss.sonatype.org/content/repositories/snapshots/dev/zio/zio-kafka_3/) to find available versions.

For `zio-kafka-testkit` together with Scala 3, you also need to add the following to your `build.sbt` file:

```scala
excludeDependencies += "org.scala-lang.modules" % "scala-collection-compat_2.13"
```

## Example

Let's write a simple Kafka producer and consumer using ZIO Kafka with ZIO Streams. Before everything, we need a running instance of Kafka. We can do that by saving the following docker-compose script in the `docker-compose.yml` file and run `docker-compose up`:
Expand Down
131 changes: 123 additions & 8 deletions docs/serialization-and-deserialization.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,63 @@
---
id: serialization-and-deserialization
title: "Serialization And Deserialization"
title: "Serialization and Deserialization"
---

Zio-kafka deserializes incoming data, and deserializes outgoing data (both keys and values) from byte arrays to any
other type and back. This works by providing a key and value `Deserializer` while constructing a `Consumer`,
and a key and value `Serializer` while constructing the `Producer`.

A `Serde` combines a `Deserializer` and a `Serializer`. Common serdes are provided in the `Serdes` object, e.g.
`Serdes.byteArray`, `Serdes.string` and `Serdes.long`. A serde can be converted to other serdes, or you can create a
custom serde by implementing the `Serde` trait directly.

This document contains:

- Handling failures in a serde
- How to create a custom serde
- How to create and use a custom serde that wraps invalid data
- How to do deserialization in the consumer stream
- A warning about using `mapZIO`

## Handling failures in a serde

Ideally, a serde can not fail serializing and deserializing. This is for example the case with the provided
`Serdes.byteArray` and `Serdes.string`. This is not the case for any serde that needs to handle invalid input,
(for example `Serdes.long`), or a serde that needs to do a remote lookup.

By default, a consumer stream will fail if it encounters a deserialization error in the serde. Unfortunately, the
resulting failure might not clearly indicate that the cause is in the serde.

There are 2 solutions for improving this:

- Wrap the result of the serde in a `Try` with the `Serde.asTry` method.
- Use `Serdes.byteArray`, put the deserialization code in the consumer stream, or do serialization before handing the
data to zio-kafka. This way you can handle failures any way you want.

Both approaches are discussed further below.

## Custom Data Type Serdes

Serializers and deserializers (serdes) for custom data types can be constructed from scratch or by converting existing serdes. For example, to create a serde for an `Instant`:
Serializers and deserializers for custom data types can be created from scratch, or by converting existing
serdes. For example, to create a serde for an `Instant` from a serde for a `Long`:

```scala
import java.time.Instant
import zio.kafka.serde._

val instantSerde: Serde[Any, Instant] = Serde.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli)
val instantSerde: Serde[Any, Instant] =
Serdes.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli)
```

## Handling deserialization failures
To handle missing data (an empty key or value), you can use the `Serde.asOption` transformer. For example:
`Serdes.string.asOption`. This results in a `None` if the key or value is empty, and in a `Some(string)` otherwise.

The default behavior for a consumer stream when encountering a deserialization failure is to fail the stream. In many cases you may want to handle this situation differently, e.g. by skipping the message that failed to deserialize or by executing an alternative effect. For this purpose, any `Deserializer[T]` for some type `T` can be easily converted into a `Deserializer[Try[T]]` where deserialization failures are converted to a `Failure` using the `asTry` method.
## Custom serdes that wraps invalid data

Below is an example of skipping messages that fail to deserialize. The offset is passed downstream to be committed.
Any `Deserializer[A]` for a given type `A` can be converted into a `Deserializer[Try[A]]` where deserialization
failures are converted to a `Failure` using the `asTry` method. (Method `asTry` is also available on `Serde`.)

Below is an example of skipping records that fail to deserialize. The offset is passed downstream to be committed.

```scala
import zio._, stream._
Expand All @@ -28,11 +67,13 @@ import scala.util.{Try, Success, Failure}

val consumer = ZLayer.scoped(Consumer.make(consumerSettings))

val keySerde = Serdes.string
val valueSerde = Serdes.string.asTry // <-- using `.asTry`
val stream = Consumer
.plainStream(Subscription.topics("topic150"), Serde.string, Serde.string.asTry)
.plainStream(Subscription.topics("topic150"), keySerde, valueSerde)

stream
.mapZIO { record =>
.mapZIO { record => // ⚠️ see section about `mapZIO` below!
val tryValue: Try[String] = record.record.value()
val offset: Offset = record.offset

Expand All @@ -50,3 +91,77 @@ stream
.runDrain
.provideSomeLayer(consumer)
```

## Deserialization in the consumer stream

In this approach we provide zio-kafka with the `Serdes.byteArray` serde (which is a pass-through serde) and do the
deserialization in the consumer stream. The deserialization can be done with regular ZIO operators.

This approach provides more freedom at the cost of having to write more code. It also allows for optimizations such as
operating on chunks of records (see next section), and more contextual failure handling.

Here is an example:

```scala
import zio._, stream._
import zio.kafka.consumer._

val consumer = ZLayer.scoped(Consumer.make(consumerSettings))

val stream = Consumer
.plainStream(Subscription.topics("topic150"), Serde.byteArray, Serde.byteArray)

def deserialize(value: Array[Byte]): ZIO[Any, Throwable, Message] = ???

stream
.mapZIO { record => // ⚠️ see section about `mapZIO` below!
val value: Array[Byte] = record.record.value()
val offset: Offset = record.offset

deserialize(value)
// possible action to take if deserialization fails:
.recoverWith(_ => someEffect(value))
.flatMap(processMessage)
.as(offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
.provideSomeLayer(consumer)
```

## A warning about `mapZIO`

Be careful with using `mapZIO` as it breaks the chunking structure of the stream (or more precisely, the resulting
stream has chunks with a single element). Throughput can be considerably lower than with the chunking structure intact.

If your application requirements allow all elements of a chunk to be processed in one go, then you can use one of these
techniques to preserve the chunking structure:

### Use `chunksWith`

Use `chunksWith` when you have a single processing step that needs to work on a chunk.

```scala
def f(a: A): ZIO[R, E, B]

stream // ZStream[R, E, A]
.chunksWith { stream => stream.mapZIO(f) } // ZStream[R, E, B]
```

### Expose chunking structure with `chunks`

Use `chunks` when you have multiple processing steps that can all work on a chunk at a time. Since `chunks` exposes the
chunking structure explicitly, the program can no longer accidentally break the chunking structure (unless
`flattenChunks` is also used).

```scala
def f(a: A): ZIO[R, E, B]
def g(b: B): ZIO[R, E, C]

stream // ZStream[R, E, A]
.chunks // ZStream[R, E, Chunk[A]]
.mapZIO { chunk => ZIO.foreach(chunk)(f) } // ZStream[R, E, Chunk[B]]
.mapZIO { chunk => ZIO.foreach(chunk)(g) } // ZStream[R, E, Chunk[C]]
.flattenChunks // ZStream[R, E, C]
```
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.10.1
sbt.version=1.10.2
8 changes: 4 additions & 4 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion)
addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion)

addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1")
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.10.0")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.3")
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.13.0")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.2")
addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.10.4")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.4")

resolvers ++= Resolver.sonatypeOssRepos("public")
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package zio.kafka.bench

import io.github.embeddedkafka.EmbeddedKafka
import org.apache.kafka.clients.producer.ProducerRecord
import org.openjdk.jmh.annotations._
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.producer
import zio.stream.ZStream
import zio.{ Chunk, ZIO, ZLayer }

import java.util.concurrent.TimeUnit

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
class ProducerBenchmark extends ZioBenchmark[Kafka with Producer] {
val topic1 = "topic1"
val nrPartitions = 6
val nrMessages = 500
val kvs: List[(String, String)] = List.tabulate(nrMessages)(i => (s"key$i", s"msg$i"))
val records: Chunk[ProducerRecord[String, String]] = Chunk.fromIterable(kvs.map { case (k, v) =>
new ProducerRecord(topic1, k, v)
})

override protected def bootstrap: ZLayer[Any, Nothing, Kafka with Producer] =
ZLayer.make[Kafka with Producer](Kafka.embedded, producer).orDie

override def initialize: ZIO[Kafka with Producer, Throwable, Any] = for {
_ <- ZIO.succeed(EmbeddedKafka.deleteTopics(List(topic1))).ignore
_ <- ZIO.succeed(EmbeddedKafka.createCustomTopic(topic1, partitions = nrPartitions))
} yield ()

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
def produceChunkSeq(): Any = runZIO {
// Produce 30 chunks sequentially
Producer.produceChunk(records, Serde.string, Serde.string).repeatN(29)
}

@Benchmark
@BenchmarkMode(Array(Mode.AverageTime))
def produceChunkPar(): Any = runZIO {
// Produce 30 chunks of which 4 run in parallel
ZStream
.range(0, 30, 1)
.mapZIOParUnordered(4) { _ =>
Producer.produceChunk(records, Serde.string, Serde.string)
}
.runDrain
}

@Benchmark
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
def produceSingleRecordSeq(): Any = runZIO {
// Produce 50 records sequentially
Producer.produce(topic1, "key", "value", Serde.string, Serde.string).repeatN(99)
}

@Benchmark
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
def produceSingleRecordPar(): Any = runZIO {
// Produce 100 records of which 4 run in parallel
ZStream
.range(0, 100, 1)
.mapZIOParUnordered(4) { _ =>
Producer.produce(topic1, "key", "value", Serde.string, Serde.string)
}
.runDrain
}
}
Loading

0 comments on commit 699e6e8

Please sign in to comment.