diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index b428e38f0..c433ead32 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -85,383 +85,384 @@ object ProducerSpec extends ZIOSpecWithKafka { metrics <- Producer.metrics } yield assertTrue(metrics.nonEmpty) }, - test("a simple transaction") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - initialBobAccount = new ProducerRecord(topic, "bob", 0) - - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialBobAccount, Serde.string, Serde.int, None) *> - t.produce(initialAliceAccount, Serde.string, Serde.int, None) + suite("transactions")( + test("a simple transaction") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + initialBobAccount = new ProducerRecord(topic, "bob", 0) + + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialBobAccount, Serde.string, Serde.int, None) *> + t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } } - } - settings <- transactionalConsumerSettings(group, client) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") - } yield record + settings <- transactionalConsumerSettings(group, client) + recordChunk <- ZIO.scoped { + withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + for { + messages <- consumer.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + record = messages.filter(rec => rec.record.key == "bob") + } yield record + } } - } - } yield assertTrue(recordChunk.map(_.value).last == 0) - }, - test("an aborted transaction should not be read") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - initialBobAccount = new ProducerRecord(topic, "bob", 0) - aliceGives20 = new ProducerRecord(topic, "alice", 0) - bobReceives20 = new ProducerRecord(topic, "bob", 20) - - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialBobAccount, Serde.string, Serde.int, None) *> - t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } yield assertTrue(recordChunk.map(_.value).last == 0) + }, + test("an aborted transaction should not be read") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + initialBobAccount = new ProducerRecord(topic, "bob", 0) + aliceGives20 = new ProducerRecord(topic, "alice", 0) + bobReceives20 = new ProducerRecord(topic, "bob", 20) + + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialBobAccount, Serde.string, Serde.int, None) *> + t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } } - } - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(aliceGives20, Serde.string, Serde.int, None) *> - t.produce(bobReceives20, Serde.string, Serde.int, None) *> - t.abort + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(aliceGives20, Serde.string, Serde.int, None) *> + t.produce(bobReceives20, Serde.string, Serde.int, None) *> + t.abort + } + }.catchSome { case UserInitiatedAbort => + ZIO.unit // silences the abort } - }.catchSome { case UserInitiatedAbort => - ZIO.unit // silences the abort - } - settings <- transactionalConsumerSettings(group, client) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") - } yield record + settings <- transactionalConsumerSettings(group, client) + recordChunk <- ZIO.scoped { + withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + for { + messages <- consumer.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + record = messages.filter(rec => rec.record.key == "bob") + } yield record + } } - } - } yield assertTrue(recordChunk.map(_.value).last == 0) - }, - test("serialize concurrent transactions") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - initialBobAccount = new ProducerRecord(topic, "bob", 0) - - transaction1 = ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } yield assertTrue(recordChunk.map(_.value).last == 0) + }, + test("serialize concurrent transactions") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + initialBobAccount = new ProducerRecord(topic, "bob", 0) + + transaction1 = ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } } - } - transaction2 = ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialBobAccount, Serde.string, Serde.int, None) + transaction2 = ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialBobAccount, Serde.string, Serde.int, None) + } } - } - _ <- transaction1 <&> transaction2 - settings <- transactionalConsumerSettings(group, client) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - } yield messages + _ <- transaction1 <&> transaction2 + settings <- transactionalConsumerSettings(group, client) + recordChunk <- ZIO.scoped { + withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + for { + messages <- consumer.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + } yield messages + } } - } - } yield assert(recordChunk.map(_.value))(contains(0) && contains(20)) - }, - test("exception management") { - for { - topic <- randomTopic - initialBobAccount = new ProducerRecord(topic, "bob", 0) - - result <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce( - initialBobAccount, - Serde.string, - Serde.int.contramap((_: Int) => throw new RuntimeException("test")), - None - ) - } - }.unit.exit - } yield assert(result)(dies(hasMessage(equalTo("test")))) - }, - test("interleaving transaction with non-transactional consumer") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - initialBobAccount = new ProducerRecord(topic, "bob", 0) - nonTransactional = new ProducerRecord(topic, "no one", -1) - aliceGives20 = new ProducerRecord(topic, "alice", 0) - - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialBobAccount, Serde.string, Serde.int, None) *> - t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } yield assert(recordChunk.map(_.value))(contains(0) && contains(20)) + }, + test("exception management") { + for { + topic <- randomTopic + initialBobAccount = new ProducerRecord(topic, "bob", 0) + + result <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce( + initialBobAccount, + Serde.string, + Serde.int.contramap((_: Int) => throw new RuntimeException("test")), + None + ) + } + }.unit.exit + } yield assert(result)(dies(hasMessage(equalTo("test")))) + }, + test("interleaving transaction with non-transactional consumer") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + initialBobAccount = new ProducerRecord(topic, "bob", 0) + nonTransactional = new ProducerRecord(topic, "no one", -1) + aliceGives20 = new ProducerRecord(topic, "alice", 0) + + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialBobAccount, Serde.string, Serde.int, None) *> + t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } } - } - assertion <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - for { - _ <- t.produce(aliceGives20, Serde.string, Serde.int, None) - _ <- Producer.produce(nonTransactional, Serde.string, Serde.int) - settings <- consumerSettings(client, Some(group)) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") - } yield record - + assertion <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + for { + _ <- t.produce(aliceGives20, Serde.string, Serde.int, None) + _ <- Producer.produce(nonTransactional, Serde.string, Serde.int) + settings <- consumerSettings(client, Some(group)) + recordChunk <- ZIO.scoped { + withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + for { + messages <- consumer.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + record = messages.filter(rec => rec.record.key == "no one") + } yield record + + } } - } - } yield assertTrue(recordChunk.nonEmpty) + } yield assertTrue(recordChunk.nonEmpty) + } } - } - } yield assertion - }, - test("interleaving transaction with transactional consumer should not be read during transaction") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - initialBobAccount = new ProducerRecord(topic, "bob", 0) - nonTransactional = new ProducerRecord(topic, "no one", -1) - aliceGives20 = new ProducerRecord(topic, "alice", 0) - - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialBobAccount, Serde.string, Serde.int, None) *> - t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } yield assertion + }, + test("interleaving transaction with transactional consumer should not be read during transaction") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + initialBobAccount = new ProducerRecord(topic, "bob", 0) + nonTransactional = new ProducerRecord(topic, "no one", -1) + aliceGives20 = new ProducerRecord(topic, "alice", 0) + + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialBobAccount, Serde.string, Serde.int, None) *> + t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } } - } - assertion <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - for { - _ <- t.produce(aliceGives20, Serde.string, Serde.int, None) - _ <- Producer.produce(nonTransactional, Serde.string, Serde.int) - settings <- transactionalConsumerSettings(group, client) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") - } yield record + assertion <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + for { + _ <- t.produce(aliceGives20, Serde.string, Serde.int, None) + _ <- Producer.produce(nonTransactional, Serde.string, Serde.int) + settings <- transactionalConsumerSettings(group, client) + recordChunk <- ZIO.scoped { + withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + for { + messages <- consumer.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + record = messages.filter(rec => rec.record.key == "no one") + } yield record + } } - } - } yield assertTrue(recordChunk.isEmpty) + } yield assertTrue(recordChunk.isEmpty) + } } - } - } yield assertion - }, - test("interleaving transaction with transactional consumer when aborted") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - initialBobAccount = new ProducerRecord(topic, "bob", 0) - aliceGives20 = new ProducerRecord(topic, "alice", 0) - nonTransactional = new ProducerRecord(topic, "no one", -1) - bobReceives20 = new ProducerRecord(topic, "bob", 20) - - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(initialBobAccount, Serde.string, Serde.int, None) *> - t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } yield assertion + }, + test("interleaving transaction with transactional consumer when aborted") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + initialBobAccount = new ProducerRecord(topic, "bob", 0) + aliceGives20 = new ProducerRecord(topic, "alice", 0) + nonTransactional = new ProducerRecord(topic, "no one", -1) + bobReceives20 = new ProducerRecord(topic, "bob", 20) + + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(initialBobAccount, Serde.string, Serde.int, None) *> + t.produce(initialAliceAccount, Serde.string, Serde.int, None) + } } - } - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce(aliceGives20, Serde.string, Serde.int, None) *> - Producer.produce(nonTransactional, Serde.string, Serde.int) *> - t.produce(bobReceives20, Serde.string, Serde.int, None) *> - t.abort + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce(aliceGives20, Serde.string, Serde.int, None) *> + Producer.produce(nonTransactional, Serde.string, Serde.int) *> + t.produce(bobReceives20, Serde.string, Serde.int, None) *> + t.abort + } + }.catchSome { case UserInitiatedAbort => + ZIO.unit // silences the abort } - }.catchSome { case UserInitiatedAbort => - ZIO.unit // silences the abort - } - settings <- transactionalConsumerSettings(group, client) - recordChunk <- ZIO.scoped { - withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => - for { - messages <- consumer.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") - } yield record + settings <- transactionalConsumerSettings(group, client) + recordChunk <- ZIO.scoped { + withConsumerInt(Topics(Set(topic)), settings).flatMap { consumer => + for { + messages <- consumer.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + record = messages.filter(rec => rec.record.key == "no one") + } yield record + } } - } - } yield assertTrue(recordChunk.nonEmpty) - }, - test("committing offsets after a successful transaction") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0) - - _ <- Producer.produce(initialAliceAccount, Serde.string, Serde.int) - settings <- transactionalConsumerSettings(group, client) - committedOffset <- - ZIO.scoped { - Consumer.make(settings).flatMap { c => - c.subscribe(Topics(Set(topic))) *> - ZIO.scoped { - c - .plainStream(Serde.string, Serde.int) - .toQueue() - .flatMap { q => - val readAliceAccount = for { - messages <- q.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - } yield messages.head - for { - aliceHadMoneyCommittableMessage <- readAliceAccount - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce( - aliceAccountFeesPaid, - Serde.string, - Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) - ) + } yield assertTrue(recordChunk.nonEmpty) + }, + test("committing offsets after a successful transaction") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0) + + _ <- Producer.produce(initialAliceAccount, Serde.string, Serde.int) + settings <- transactionalConsumerSettings(group, client) + committedOffset <- + ZIO.scoped { + Consumer.make(settings).flatMap { c => + c.subscribe(Topics(Set(topic))) *> + ZIO.scoped { + c + .plainStream(Serde.string, Serde.int) + .toQueue() + .flatMap { q => + val readAliceAccount = for { + messages <- q.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + } yield messages.head + for { + aliceHadMoneyCommittableMessage <- readAliceAccount + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce( + aliceAccountFeesPaid, + Serde.string, + Serde.int, + Some(aliceHadMoneyCommittableMessage.offset) + ) + } } - } - aliceTopicPartition = - new TopicPartition(topic, aliceHadMoneyCommittableMessage.partition) - committed <- c.committed(Set(aliceTopicPartition)) - } yield committed(aliceTopicPartition) - } - } + aliceTopicPartition = + new TopicPartition(topic, aliceHadMoneyCommittableMessage.partition) + committed <- c.committed(Set(aliceTopicPartition)) + } yield committed(aliceTopicPartition) + } + } + } } - } - } yield assertTrue(committedOffset.get.offset() == 1L) - }, - test("not committing offsets after a failed transaction") { - import Subscription._ - - for { - topic <- randomTopic - group <- randomGroup - client <- randomClient - - initialAliceAccount = new ProducerRecord(topic, "alice", 20) - aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0) - - _ <- Producer.produce(initialAliceAccount, Serde.string, Serde.int) - settings <- transactionalConsumerSettings(group, client) - committedOffset <- ZIO.scoped { - Consumer.make(settings).flatMap { c => - c.subscribe(Topics(Set(topic))) *> c - .plainStream(Serde.string, Serde.int) - .toQueue() - .flatMap { q => - val readAliceAccount = for { - messages <- q.take - .flatMap(_.done) - .mapError(_.getOrElse(new NoSuchElementException)) - } yield messages.head - for { - aliceHadMoneyCommittableMessage <- readAliceAccount - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - t.produce( - aliceAccountFeesPaid, - Serde.string, - Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) - ) *> - t.abort + } yield assertTrue(committedOffset.get.offset() == 1L) + }, + test("not committing offsets after a failed transaction") { + import Subscription._ + + for { + topic <- randomTopic + group <- randomGroup + client <- randomClient + + initialAliceAccount = new ProducerRecord(topic, "alice", 20) + aliceAccountFeesPaid = new ProducerRecord(topic, "alice", 0) + + _ <- Producer.produce(initialAliceAccount, Serde.string, Serde.int) + settings <- transactionalConsumerSettings(group, client) + committedOffset <- ZIO.scoped { + Consumer.make(settings).flatMap { c => + c.subscribe(Topics(Set(topic))) *> c + .plainStream(Serde.string, Serde.int) + .toQueue() + .flatMap { q => + val readAliceAccount = for { + messages <- q.take + .flatMap(_.done) + .mapError(_.getOrElse(new NoSuchElementException)) + } yield messages.head + for { + aliceHadMoneyCommittableMessage <- readAliceAccount + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + t.produce( + aliceAccountFeesPaid, + Serde.string, + Serde.int, + Some(aliceHadMoneyCommittableMessage.offset) + ) *> + t.abort + } + }.catchSome { case UserInitiatedAbort => + ZIO.unit // silences the abort } - }.catchSome { case UserInitiatedAbort => - ZIO.unit // silences the abort - } - aliceTopicPartition = - new TopicPartition(topic, aliceHadMoneyCommittableMessage.partition) - committed <- c.committed(Set(aliceTopicPartition)) - } yield committed(aliceTopicPartition) - } + aliceTopicPartition = + new TopicPartition(topic, aliceHadMoneyCommittableMessage.partition) + committed <- c.committed(Set(aliceTopicPartition)) + } yield committed(aliceTopicPartition) + } + } } - } - - } yield assert(committedOffset)(isNone) - }, - test("fails if transaction leaks") { - val test = for { - topic <- randomTopic - transactionThief <- Ref.make(Option.empty[Transaction]) - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - transactionThief.set(Some(t)) + } yield assert(committedOffset)(isNone) + }, + test("fails if transaction leaks") { + val test = for { + topic <- randomTopic + transactionThief <- Ref.make(Option.empty[Transaction]) + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + transactionThief.set(Some(t)) + } } - } - t <- transactionThief.get - _ <- t.get.produce(topic, 0, 0, Serde.int, Serde.int, None) - } yield () - assertZIO(test.exit)(failsCause(containsCause(Cause.fail(TransactionLeaked(OffsetBatch.empty))))) - }, - test("fails if transaction leaks in an open transaction") { - val test = for { - topic <- randomTopic - transactionThief <- Ref.make(Option.empty[Transaction]) - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { t => - transactionThief.set(Some(t)) + t <- transactionThief.get + _ <- t.get.produce(topic, 0, 0, Serde.int, Serde.int, None) + } yield () + assertZIO(test.exit)(failsCause(containsCause(Cause.fail(TransactionLeaked(OffsetBatch.empty))))) + }, + test("fails if transaction leaks in an open transaction") { + val test = for { + topic <- randomTopic + transactionThief <- Ref.make(Option.empty[Transaction]) + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { t => + transactionThief.set(Some(t)) + } } - } - t <- transactionThief.get - _ <- ZIO.scoped { - TransactionalProducer.createTransaction.flatMap { _ => - t.get.produce(topic, 0, 0, Serde.int, Serde.int, None) + t <- transactionThief.get + _ <- ZIO.scoped { + TransactionalProducer.createTransaction.flatMap { _ => + t.get.produce(topic, 0, 0, Serde.int, Serde.int, None) + } } - } - } yield () - assertZIO(test.exit)(failsCause(containsCause(Cause.fail(TransactionLeaked(OffsetBatch.empty))))) - } + } yield () + assertZIO(test.exit)(failsCause(containsCause(Cause.fail(TransactionLeaked(OffsetBatch.empty))))) + } + ) ).provideSomeLayerShared[TestEnvironment & Kafka]( (KafkaTestUtils.producer ++ transactionalProducer) .mapError(TestFailure.fail) - ) @@ withLiveClock + ) @@ withLiveClock @@ TestAspect.timeout(2.minutes) @@ TestAspect.sequential } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala index bcbe0b018..3c720bed8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala @@ -11,7 +11,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.{ Metric, MetricName } import zio._ import zio.kafka.serde.Serializer -import zio.stream.ZPipeline +import zio.stream.{ ZPipeline, ZStream } import java.util.concurrent.atomic.AtomicLong import scala.annotation.nowarn @@ -126,7 +126,9 @@ object Producer { private[producer] final case class Live( p: JProducer[Array[Byte], Array[Byte]], - producerSettings: ProducerSettings + producerSettings: ProducerSettings, + runtime: Runtime[Any], + sendQueue: Queue[(Chunk[ByteRecord], Promise[Throwable, Chunk[RecordMetadata]])] ) extends Producer { override def produceAsync[R, K, V]( @@ -135,25 +137,10 @@ object Producer { valueSerializer: Serializer[R, V] ): RIO[R, Task[RecordMetadata]] = for { - done <- Promise.make[Throwable, RecordMetadata] + done <- Promise.make[Throwable, Chunk[RecordMetadata]] serializedRecord <- serialize(record, keySerializer, valueSerializer) - runtime <- ZIO.runtime[Any] - _ <- ZIO.attempt { - p.send( - serializedRecord, - new Callback { - def onCompletion(metadata: RecordMetadata, err: Exception): Unit = - Unsafe.unsafe { implicit u => - ( - if (err != null) runtime.unsafe.run(done.fail(err)).getOrThrowFiberFailure(): Unit - else runtime.unsafe.run(done.succeed(metadata)).getOrThrowFiberFailure(): Unit - ): @nowarn("msg=discarded non-Unit value") - () - } - } - ) - } - } yield done.await + _ <- sendQueue.offer((Chunk.single(serializedRecord), done)) + } yield done.await.map(_.head) override def produceChunkAsync[R, K, V]( records: Chunk[ProducerRecord[K, V]], @@ -164,40 +151,50 @@ object Producer { else { for { done <- Promise.make[Throwable, Chunk[RecordMetadata]] - runtime <- ZIO.runtime[Any] - serializedRecords <- ZIO.foreach(records.toSeq)(serialize(_, keySerializer, valueSerializer)) - _ <- ZIO.attempt { - val it: Iterator[(ByteRecord, Int)] = - serializedRecords.iterator.zipWithIndex - val res: Array[RecordMetadata] = new Array[RecordMetadata](records.length) - val count: AtomicLong = new AtomicLong - - while (it.hasNext) { - val (rec, idx): (ByteRecord, Int) = it.next() - - p.send( - rec, - new Callback { - def onCompletion(metadata: RecordMetadata, err: Exception): Unit = - Unsafe.unsafe { implicit u => - ( - if (err != null) runtime.unsafe.run(done.fail(err)).getOrThrowFiberFailure(): Unit - else { - res(idx) = metadata - if (count.incrementAndGet == records.length) - runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure(): Unit - } - ): @nowarn("msg=discarded non-Unit value") - - () - } - } - ) - } - } + serializedRecords <- ZIO.foreach(records)(serialize(_, keySerializer, valueSerializer)) + _ <- sendQueue.offer((serializedRecords, done)) } yield done.await } + /** + * Calls to send may block when updating metadata or when communication with the broker is (temporarily) lost, + * therefore this stream is run on a the blocking thread pool + */ + val sendFromQueue: ZIO[Any, Nothing, Any] = + ZStream + .fromQueueWithShutdown(sendQueue) + .mapZIO { case (serializedRecords, done) => + ZIO.attempt { + val it: Iterator[(ByteRecord, Int)] = serializedRecords.iterator.zipWithIndex + val res: Array[RecordMetadata] = new Array[RecordMetadata](serializedRecords.length) + val count: AtomicLong = new AtomicLong + val length = serializedRecords.length + + while (it.hasNext) { + val (rec, idx): (ByteRecord, Int) = it.next() + + p.send( + rec, + new Callback { + def onCompletion(metadata: RecordMetadata, err: Exception): Unit = + Unsafe.unsafe { implicit u => + (if (err != null) runtime.unsafe.run(done.fail(err)).getOrThrowFiberFailure(): Unit + else { + res(idx) = metadata + if (count.incrementAndGet == length) { + runtime.unsafe.run(done.succeed(Chunk.fromArray(res))).getOrThrowFiberFailure(): Unit + } + }): @nowarn("msg=discarded non-Unit value") + () + } + } + ) + } + } + .foldCauseZIO(done.failCause, _ => ZIO.unit) + } + .runDrain + override def produce[R, K, V]( record: ProducerRecord[K, V], keySerializer: Serializer[R, K], @@ -244,7 +241,7 @@ object Producer { value <- valueSerializer.serialize(r.topic, r.headers, r.value()) } yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers) - private[producer] def close: UIO[Unit] = ZIO.succeed(p.close(producerSettings.closeTimeout)) + private[producer] def close: UIO[Unit] = ZIO.attemptBlocking(p.close(producerSettings.closeTimeout)).orDie } val live: RLayer[ProducerSettings, Producer] = @@ -256,20 +253,21 @@ object Producer { } def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] = - ZIO.acquireRelease { - for { - props <- ZIO.attempt(settings.driverSettings) - rawProducer <- ZIO.attempt( - new KafkaProducer[Array[Byte], Array[Byte]]( - props.asJava, - new ByteArraySerializer(), - new ByteArraySerializer() - ) + for { + props <- ZIO.attempt(settings.driverSettings) + rawProducer <- ZIO.attempt( + new KafkaProducer[Array[Byte], Array[Byte]]( + props.asJava, + new ByteArraySerializer(), + new ByteArraySerializer() ) - } yield Live(rawProducer, settings) - } { producer => - producer.close - } + ) + runtime <- ZIO.runtime[Any] + sendQueue <- + Queue.bounded[(Chunk[ByteRecord], Promise[Throwable, Chunk[RecordMetadata]])](settings.sendBufferSize) + producer <- ZIO.acquireRelease(ZIO.succeed(Live(rawProducer, settings, runtime, sendQueue)))(_.close) + _ <- ZIO.blocking(producer.sendFromQueue).forkScoped + } yield producer def withProducerService[R, A]( r: Producer => RIO[R, A] diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala index 60dd82c42..34c08bf75 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/ProducerSettings.scala @@ -7,6 +7,7 @@ import zio.kafka.security.KafkaCredentialStore final case class ProducerSettings( bootstrapServers: List[String], closeTimeout: Duration, + sendBufferSize: Int, properties: Map[String, AnyRef] ) { def driverSettings: Map[String, AnyRef] = @@ -33,9 +34,11 @@ final case class ProducerSettings( def withCredentials(credentialsStore: KafkaCredentialStore): ProducerSettings = withProperties(credentialsStore.properties) + + def withSendBufferSize(sendBufferSize: Int) = copy(sendBufferSize = sendBufferSize) } object ProducerSettings { def apply(bootstrapServers: List[String]): ProducerSettings = - new ProducerSettings(bootstrapServers, 30.seconds, Map()) + new ProducerSettings(bootstrapServers, 30.seconds, 4096, Map.empty) } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 52425dc8d..c38657a77 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -1,7 +1,7 @@ package zio.kafka.producer import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata } import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail @@ -73,21 +73,25 @@ object TransactionalProducer { } def make(settings: TransactionalProducerSettings): ZIO[Scope, Throwable, TransactionalProducer] = - ZIO.acquireRelease { - for { - props <- ZIO.attempt(settings.producerSettings.driverSettings) - rawProducer <- ZIO.attempt( - new KafkaProducer[Array[Byte], Array[Byte]]( - props.asJava, - new ByteArraySerializer(), - new ByteArraySerializer() - ) + for { + props <- ZIO.attempt(settings.producerSettings.driverSettings) + rawProducer <- ZIO.attempt( + new KafkaProducer[Array[Byte], Array[Byte]]( + props.asJava, + new ByteArraySerializer(), + new ByteArraySerializer() ) - _ <- ZIO.attemptBlocking(rawProducer.initTransactions()) - semaphore <- Semaphore.make(1) - live = Producer.Live(rawProducer, settings.producerSettings) - } yield LiveTransactionalProducer(live, semaphore) - } { producer => - producer.live.close - } + ) + _ <- ZIO.attemptBlocking(rawProducer.initTransactions()) + semaphore <- Semaphore.make(1) + runtime <- ZIO.runtime[Any] + sendQueue <- + Queue.bounded[(Chunk[ByteRecord], Promise[Throwable, Chunk[RecordMetadata]])]( + settings.producerSettings.sendBufferSize + ) + live <- ZIO.acquireRelease( + ZIO.succeed(Producer.Live(rawProducer, settings.producerSettings, runtime, sendQueue)) + )(_.close) + _ <- ZIO.blocking(live.sendFromQueue).forkScoped + } yield LiveTransactionalProducer(live, semaphore) } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala index a43747b2d..ffd7138c2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducerSettings.scala @@ -18,6 +18,7 @@ object TransactionalProducerSettings { ProducerSettings( bootstrapServers, 30.seconds, + 4096, Map(ProducerConfig.TRANSACTIONAL_ID_CONFIG -> transactionalId) ) ) {} @@ -26,12 +27,14 @@ object TransactionalProducerSettings { bootstrapServers: List[String], closeTimeout: Duration, properties: Map[String, AnyRef], - transactionalId: String + transactionalId: String, + sendBufferSize: Int ): TransactionalProducerSettings = new TransactionalProducerSettings( ProducerSettings( bootstrapServers, closeTimeout, + sendBufferSize, properties.updated(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId) ) ) {}