From 33d269828c08f633f507962848745dc9c66e56ed Mon Sep 17 00:00:00 2001 From: Aleks Clark Date: Fri, 16 Dec 2022 13:35:37 -0600 Subject: [PATCH 1/4] Ensure batch callbacks aren't run unexpectedly and fully document new behavior --- README.md | 36 ++++++++++++++++++++++++- lib/sidekiq/batch.rb | 61 ++++++++++++++++++++++++++++++++++--------- sidekiq-batch.gemspec | 2 +- 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 7526bc2..31649ba 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,41 @@ Or install it yourself as: ## Usage -Sidekiq Batch is drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage. +Sidekiq Batch is MOSTLY a drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage. + +## Caveats/Gotchas + +Consider the following workflow: + + * Batch Z created + * Worker A queued in batch Z + * Worker A starts Worker B in batch Z + * Worker B completes *before* worker A does + * Worker A completes + +In the standard configuration, the `on(:success)` and `on(:complete)` callbacks will be triggered when Worker B completes. +This configuration is the default, simply for legacy reasons. This gem adds the following option to the sidekiq.yml options: + +```yaml +:batch_push_interval: 0 +``` + +When this value is *absent* (aka legacy), Worker A will only push the increment of batch jobs (aka Worker B) *when it completes* + +When this value is set to `0`, Worker A will increment the count as soon as `WorkerB.perform_async` is called + +When this value is a positive number, Worker A will wait a maximum of value-seconds before pushing the increment to redis, or until it's done, whichever comes first. + +This comes into play if Worker A is queueing thousands of WorkerB jobs, or has some other reason for WorkerB to complete beforehand. + +If you are queueing many WorkerB jobs, it is recommended to set this value to something like `3` to avoid thousands of calls to redis, and call WorkerB like so: +```ruby +WorkerB.perform_in(4.seconds, some, args) +``` +this will ensure that the batch callback does not get triggered until WorkerA *and* the last WorkerB job complete. + +If WorkerA is just slow for whatever reason, setting to `0` will update the batch status immediately so that the callbacks don't fire. + ## Contributing diff --git a/lib/sidekiq/batch.rb b/lib/sidekiq/batch.rb index fb2aba7..222f7cb 100644 --- a/lib/sidekiq/batch.rb +++ b/lib/sidekiq/batch.rb @@ -20,7 +20,10 @@ def initialize(existing_bid = nil) @initialized = false @created_at = Time.now.utc.to_f @bidkey = "BID-" + @bid.to_s - @ready_to_queue = [] + @queued_jids = [] + @pending_jids = [] + @incremental_push = Sidekiq.options.keys.include?(:batch_push_interval) + @batch_push_interval = Sidekiq.options[:batch_push_interval] end def description=(description) @@ -59,20 +62,24 @@ def jobs begin if !@existing && !@initialized - parent_bid = Thread.current[:batch].bid if Thread.current[:batch] + parent_bid, Thread.current[:parent_id] = Thread.current[:batch].bid if Thread.current[:batch] Sidekiq.redis do |r| r.multi do |pipeline| pipeline.hset(@bidkey, "created_at", @created_at) - pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid pipeline.expire(@bidkey, BID_EXPIRE_TTL) + if parent_bid + pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s) + pipeline.hincrby("BID-#{parent_bid}", "children", 1) + end end end @initialized = true end - @ready_to_queue = [] + @queued_jids = [] + @pending_jids = [] begin parent = Thread.current[:batch] @@ -81,34 +88,62 @@ def jobs ensure Thread.current[:batch] = parent end - - return [] if @ready_to_queue.size == 0 + conditional_redis_increment!(true) + return [] if @queued_jids.size == 0 Sidekiq.redis do |r| r.multi do |pipeline| if parent_bid - pipeline.hincrby("BID-#{parent_bid}", "children", 1) - pipeline.hincrby("BID-#{parent_bid}", "total", @ready_to_queue.size) pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL) end - pipeline.hincrby(@bidkey, "pending", @ready_to_queue.size) - pipeline.hincrby(@bidkey, "total", @ready_to_queue.size) pipeline.expire(@bidkey, BID_EXPIRE_TTL) - pipeline.sadd(@bidkey + "-jids", [@ready_to_queue]) + pipeline.sadd(@bidkey + "-jids", [@queued_jids]) pipeline.expire(@bidkey + "-jids", BID_EXPIRE_TTL) end end - @ready_to_queue + @queued_jids ensure Thread.current[:bid_data] = bid_data end end def increment_job_queue(jid) - @ready_to_queue << jid + @queued_jids << jid + @pending_jids << jid + conditional_redis_increment! + end + + def conditional_redis_increment!(force=false) + if should_increment? || force + parent_bid = Thread.current[:parent_id] + Sidekiq.redis do |r| + r.multi do |pipeline| + if parent_bid + pipeline.hincrby("BID-#{parent_bid}", "total", @pending_jids.length) + pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL) + end + + pipeline.hincrby(@bidkey, "pending", @pending_jids.length) + pipeline.hincrby(@bidkey, "total", @pending_jids.length) + pipeline.expire(@bidkey, BID_EXPIRE_TTL) + end + end + @pending_jids = [] + end + end + + def should_increment? + return false unless @incremental_push + return true if @batch_push_interval == 0 || @queued_jids.length == 1 + + @last_increment ||= Time.now.to_f + if @last_increment + @batch_push_interval > Time.now.to_f + @last_increment = Time.now.to_f + return true + end end def invalidate_all diff --git a/sidekiq-batch.gemspec b/sidekiq-batch.gemspec index c5be436..79269d5 100644 --- a/sidekiq-batch.gemspec +++ b/sidekiq-batch.gemspec @@ -19,7 +19,7 @@ Gem::Specification.new do |spec| spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) } spec.require_paths = ["lib"] - spec.add_dependency "sidekiq", ">= 3" + spec.add_dependency "sidekiq", ">= 3", "<7" spec.add_development_dependency "bundler", "~> 2.1" spec.add_development_dependency "rake", "~> 13.0" From a0fb2056a301a4b59b7d6493b5db559c6b7205a5 Mon Sep 17 00:00:00 2001 From: Aleks Clark Date: Fri, 16 Dec 2022 13:43:16 -0600 Subject: [PATCH 2/4] minor perf tweak --- lib/sidekiq/batch.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/sidekiq/batch.rb b/lib/sidekiq/batch.rb index 222f7cb..0c09fee 100644 --- a/lib/sidekiq/batch.rb +++ b/lib/sidekiq/batch.rb @@ -138,10 +138,10 @@ def conditional_redis_increment!(force=false) def should_increment? return false unless @incremental_push return true if @batch_push_interval == 0 || @queued_jids.length == 1 - - @last_increment ||= Time.now.to_f - if @last_increment + @batch_push_interval > Time.now.to_f - @last_increment = Time.now.to_f + now = Time.now.to_f + @last_increment ||= now + if @last_increment + @batch_push_interval > now + @last_increment = now return true end end From 4d90b326e09162b175cfda1021687bf1d3c9b055 Mon Sep 17 00:00:00 2001 From: Aleks Clark Date: Fri, 16 Dec 2022 13:46:14 -0600 Subject: [PATCH 3/4] fix instance of no queued jids --- lib/sidekiq/batch.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/sidekiq/batch.rb b/lib/sidekiq/batch.rb index 0c09fee..c10db5e 100644 --- a/lib/sidekiq/batch.rb +++ b/lib/sidekiq/batch.rb @@ -88,8 +88,9 @@ def jobs ensure Thread.current[:batch] = parent end - conditional_redis_increment!(true) + return [] if @queued_jids.size == 0 + conditional_redis_increment!(true) Sidekiq.redis do |r| r.multi do |pipeline| From c9babda4f1f73622d4aa31bfe98b3f42fab71a13 Mon Sep 17 00:00:00 2001 From: Aleks Clark Date: Sat, 17 Dec 2022 15:11:29 -0600 Subject: [PATCH 4/4] fix some parent issues --- lib/sidekiq/batch.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/sidekiq/batch.rb b/lib/sidekiq/batch.rb index c10db5e..187397a 100644 --- a/lib/sidekiq/batch.rb +++ b/lib/sidekiq/batch.rb @@ -62,7 +62,7 @@ def jobs begin if !@existing && !@initialized - parent_bid, Thread.current[:parent_id] = Thread.current[:batch].bid if Thread.current[:batch] + parent_bid = Thread.current[:batch].bid if Thread.current[:batch] Sidekiq.redis do |r| r.multi do |pipeline| @@ -84,9 +84,11 @@ def jobs begin parent = Thread.current[:batch] Thread.current[:batch] = self + Thread.current[:parent_bid] = parent_bid yield ensure Thread.current[:batch] = parent + Thread.current[:parent_bid] = nil end return [] if @queued_jids.size == 0 @@ -119,7 +121,7 @@ def increment_job_queue(jid) def conditional_redis_increment!(force=false) if should_increment? || force - parent_bid = Thread.current[:parent_id] + parent_bid = Thread.current[:parent_bid] Sidekiq.redis do |r| r.multi do |pipeline| if parent_bid