Skip to content

Commit

Permalink
Revert stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
svroonland committed Oct 10, 2024
1 parent 699e6e8 commit aafd4ec
Showing 1 changed file with 3 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,12 @@ object TransactionalProducer {
)(p => ZIO.attemptBlocking(p.close(settings.producerSettings.closeTimeout)).orDie)
_ <- ZIO.attemptBlocking(rawProducer.initTransactions())
semaphore <- Semaphore.make(1)
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.producerSettings.sendBufferSize
)
live = new ProducerLive(rawProducer, sendQueue)
_ <- live.sendFromQueue.forkScoped
live = new ProducerLive(rawProducer, runtime, sendQueue)
_ <- ZIO.blocking(live.sendFromQueue).forkScoped
} yield new LiveTransactionalProducer(live, semaphore)
}

0 comments on commit aafd4ec

Please sign in to comment.