Skip to content

Commit

Permalink
Merge pull request #69 from eezyinc/fix-success-callbacks
Browse files Browse the repository at this point in the history
Ensure batch callbacks aren't run unexpectedly
  • Loading branch information
nglx authored Feb 1, 2023
2 parents 663debd + c9babda commit a761df6
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 13 deletions.
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,41 @@ Or install it yourself as:

## Usage

Sidekiq Batch is drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.
Sidekiq Batch is MOSTLY a drop-in replacement for the API from Sidekiq PRO. See https://github.com/mperham/sidekiq/wiki/Batches for usage.

## Caveats/Gotchas

Consider the following workflow:

* Batch Z created
* Worker A queued in batch Z
* Worker A starts Worker B in batch Z
* Worker B completes *before* worker A does
* Worker A completes

In the standard configuration, the `on(:success)` and `on(:complete)` callbacks will be triggered when Worker B completes.
This configuration is the default, simply for legacy reasons. This gem adds the following option to the sidekiq.yml options:

```yaml
:batch_push_interval: 0
```
When this value is *absent* (aka legacy), Worker A will only push the increment of batch jobs (aka Worker B) *when it completes*
When this value is set to `0`, Worker A will increment the count as soon as `WorkerB.perform_async` is called

When this value is a positive number, Worker A will wait a maximum of value-seconds before pushing the increment to redis, or until it's done, whichever comes first.

This comes into play if Worker A is queueing thousands of WorkerB jobs, or has some other reason for WorkerB to complete beforehand.

If you are queueing many WorkerB jobs, it is recommended to set this value to something like `3` to avoid thousands of calls to redis, and call WorkerB like so:
```ruby
WorkerB.perform_in(4.seconds, some, args)
```
this will ensure that the batch callback does not get triggered until WorkerA *and* the last WorkerB job complete.

If WorkerA is just slow for whatever reason, setting to `0` will update the batch status immediately so that the callbacks don't fire.


## Contributing

Expand Down
60 changes: 49 additions & 11 deletions lib/sidekiq/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ def initialize(existing_bid = nil)
@initialized = false
@created_at = Time.now.utc.to_f
@bidkey = "BID-" + @bid.to_s
@ready_to_queue = []
@queued_jids = []
@pending_jids = []
@incremental_push = Sidekiq.options.keys.include?(:batch_push_interval)
@batch_push_interval = Sidekiq.options[:batch_push_interval]
end

def description=(description)
Expand Down Expand Up @@ -64,51 +67,86 @@ def jobs
Sidekiq.redis do |r|
r.multi do |pipeline|
pipeline.hset(@bidkey, "created_at", @created_at)
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
if parent_bid
pipeline.hset(@bidkey, "parent_bid", parent_bid.to_s)
pipeline.hincrby("BID-#{parent_bid}", "children", 1)
end
end
end

@initialized = true
end

@ready_to_queue = []
@queued_jids = []
@pending_jids = []

begin
parent = Thread.current[:batch]
Thread.current[:batch] = self
Thread.current[:parent_bid] = parent_bid
yield
ensure
Thread.current[:batch] = parent
Thread.current[:parent_bid] = nil
end

return [] if @ready_to_queue.size == 0
return [] if @queued_jids.size == 0
conditional_redis_increment!(true)

Sidekiq.redis do |r|
r.multi do |pipeline|
if parent_bid
pipeline.hincrby("BID-#{parent_bid}", "children", 1)
pipeline.hincrby("BID-#{parent_bid}", "total", @ready_to_queue.size)
pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
end

pipeline.hincrby(@bidkey, "pending", @ready_to_queue.size)
pipeline.hincrby(@bidkey, "total", @ready_to_queue.size)
pipeline.expire(@bidkey, BID_EXPIRE_TTL)

pipeline.sadd(@bidkey + "-jids", [@ready_to_queue])
pipeline.sadd(@bidkey + "-jids", [@queued_jids])
pipeline.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
end
end

@ready_to_queue
@queued_jids
ensure
Thread.current[:bid_data] = bid_data
end
end

def increment_job_queue(jid)
@ready_to_queue << jid
@queued_jids << jid
@pending_jids << jid
conditional_redis_increment!
end

def conditional_redis_increment!(force=false)
if should_increment? || force
parent_bid = Thread.current[:parent_bid]
Sidekiq.redis do |r|
r.multi do |pipeline|
if parent_bid
pipeline.hincrby("BID-#{parent_bid}", "total", @pending_jids.length)
pipeline.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
end

pipeline.hincrby(@bidkey, "pending", @pending_jids.length)
pipeline.hincrby(@bidkey, "total", @pending_jids.length)
pipeline.expire(@bidkey, BID_EXPIRE_TTL)
end
end
@pending_jids = []
end
end

def should_increment?
return false unless @incremental_push
return true if @batch_push_interval == 0 || @queued_jids.length == 1
now = Time.now.to_f
@last_increment ||= now
if @last_increment + @batch_push_interval > now
@last_increment = now
return true
end
end

def invalidate_all
Expand Down
2 changes: 1 addition & 1 deletion sidekiq-batch.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^exe/}) { |f| File.basename(f) }
spec.require_paths = ["lib"]

spec.add_dependency "sidekiq", ">= 3"
spec.add_dependency "sidekiq", ">= 3", "<7"

spec.add_development_dependency "bundler", "~> 2.1"
spec.add_development_dependency "rake", "~> 13.0"
Expand Down

0 comments on commit a761df6

Please sign in to comment.