diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fa9360..d4f6637 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Blocking consume won't log errors anymore. [#47](https://github.com/cloudamqp/amqp-client.cr/pull/47) + ## [1.2.6] - 2024-08-23 ### Added diff --git a/src/amqp-client/channel.cr b/src/amqp-client/channel.cr index e9b3698..061c05e 100644 --- a/src/amqp-client/channel.cr +++ b/src/amqp-client/channel.cr @@ -411,7 +411,7 @@ class AMQP::Client deliveries = ::Channel(DeliverMessage).new(@prefetch_count.to_i32) @consumers[ok.consumer_tag] = deliveries work_pool.times do |i| - spawn consume(ok.consumer_tag, deliveries, done, i, blk), + spawn consume(ok.consumer_tag, deliveries, done, i, !block, blk), same_thread: i.zero?, # only force put the first fiber on same thread name: "AMQPconsumer##{ok.consumer_tag} ##{i}" end @@ -429,13 +429,15 @@ class AMQP::Client ok.consumer_tag end - private def consume(consumer_tag, deliveries, done, i, blk) + private def consume(consumer_tag, deliveries, done, i, log_errors, blk) Log.context.set channel_id: @id.to_i, consumer: consumer_tag, worker: i while msg = deliveries.receive? begin blk.call(msg) rescue ex - Log.error(exception: ex) { "Uncaught exception in consumer, closing channel" } + if log_errors + Log.error(exception: ex) { "Uncaught exception in consumer, closing channel" } + end close("Uncaught exception in consumer #{consumer_tag}", 500) rescue nil done.send(ex) rescue nil return