diff --git a/lib/sneakers/worker.rb b/lib/sneakers/worker.rb index 7ea61d4..425f2c3 100644 --- a/lib/sneakers/worker.rb +++ b/lib/sneakers/worker.rb @@ -74,6 +74,11 @@ def process_work(delivery_info, metadata, msg, handler) block_to_call = middlewares.reverse.reduce(app) do |mem, h| h[:class].new(mem, *h[:args]) end + + # send acknowledgement right away if job is marked as idempotent + if @should_ack && deserialized_msg['is_idempotent'] + handler.acknowledge(delivery_info, metadata, msg) + end res = block_to_call.call(deserialized_msg, delivery_info, metadata, handler) end rescue SignalException, SystemExit @@ -89,7 +94,10 @@ def process_work(delivery_info, metadata, msg, handler) if @should_ack case res # note to future-self. never acknowledge multiple (multiple=true) messages under threads. - when :ack then handler.acknowledge(delivery_info, metadata, msg) + when :ack + if !deserialized_msg['is_idempotent'] + handler.acknowledge(delivery_info, metadata, msg) + end when :error then handler.error(delivery_info, metadata, msg, error) when :reject then handler.reject(delivery_info, metadata, msg) when :requeue then handler.reject(delivery_info, metadata, msg, true)