Fault tolerant platform for long running jobs.
gem install ultra_marathon
or with bundler:
gem 'ultra_marathon'
The UltraMarathon::AbstractRunner
class itself provides the functionality for
running complex jobs. It is best inheirited to fully customize.
A simple DSL, currently consisting of only the run
command, are used to
specify independent chunks of work. The first argument is the name of the run
block. Omitted, this defaults to ':main', though names must be unique within a
given Runner. E.g. for a runner with N run blocks, N - 1 must be manually named.
class MailRunner < UltraMarathon::AbstractRunner
# :main run block
run do
raise 'boom'
end
# :eat run block
# Must be named because there is already a :main block (one without
# a name explicitly defined)
run :eat do
add_butter
add_syrup
eat!
end
# Will throw an error, since the first runner is implicitly named :main
run :main do
puts 'nope nope nope!'
end
# Omitted for brevity
def add_butter; end
def add_syrup; end
def eat!; end
end
# Run the runner:
MailRunner.run!
Note that, while the run blocks are defined in the context of the class, they will be run within an individual instance. Any methods should be defined as instance methods.
In this instance, the eat
run block will still run even if the main
block is
executed first. Errors are caught — though they can be evaluated using the
on_error
callback, detailed below — and the runner will attempt to complete as
many run blocks as it is able.
Independent blocks are not guaranteed to run in any order, unless specifying
dependents using the :requires
option.
class WalrusRunner < UltraMarathon::AbstractRunner
run :bubbles, requires: [:don_scuba_gear] do
obtain_bubbles
end
run :don_scuba_gear do
aquire_snorkel
wear_flippers_on_flippers
end
end
In this instance, bubbles
will not be run until don_scuba_gear
successfully
finishes. If don_scuba_gear
explicitly fails, such as by raising an error,
bubbles
will never be run.
Sometimes you want to run a given run block once for each of a given set. Just
pass the :collection
option and all of your dreams will come true. Each
iteration will be passed one item along with the index.
class RangeRunner < UltraMarathon::AbstractRunner
run :counting!, collection: (1..100) do |number, index|
if index == 0
puts "We start with #{number}"
else
puts "And then comes #{number}"
end
end
end
The only requirement is that the :collection
option responds to #each. But
what if it doesn't? Just pass in the :iterator
option! This option was added
specifically for Rails ActiveRecord::Association instances that can fetch in
batches using :find_each
# Crow inherits from ActiveRecord::Base
class MurderRunner < UltraMarathon::AbstractRunner
run :coming_of_age, collection: :crows_to_bless, iterator: :find_each do |youngster_crow|
youngster_crow.update_attribute(blessed: true)
end
def crows_to_bless
Crow.unblessed.where(age: 10)
end
end
Passing threaded: true
will run that run block in its own thread. This is particularly useful for collections or run blocks which contain external API calls, hit a database, or any other candidate for concurrency.
class NapRunner < UltraMarathon::AbstractRunner
run :mass_nap, collection: (1..100), threaded: true do
sleep(0.01)
end
end
As we will see in the example below, for longer-running processes that Ruby can run concurrently, threading is a muy bueno idea.
require 'benchmark'
require 'ultra_marathon'
class ThreadedNapRunner < UltraMarathon::AbstractRunner
run_collection :mass_nap, items: (1..100), threaded: true do |n|
sleep(1)
end
end
class UnthreadedNapRunner < UltraMarathon::AbstractRunner
run_collection :mass_nap, items: (1..100), threaded: false do |n|
sleep(1)
end
end
Benchmark.bmbm do |reporter|
reporter.report(:threaded) { ThreadedNapRunner.new.run! }
reporter.report(:unthreaded) { UnthreadedNapRunner.new.run! }
end
# Rehearsal ----------------------------------------------
# threaded 1.270000 0.080000 1.350000 ( 1.346384)
# unthreaded 0.060000 0.010000 0.070000 (100.141031)
# ------------------------------------- total: 1.420000sec
#
# user system total real
# threaded 1.320000 0.060000 1.380000 ( 1.377940)
# unthreaded 0.060000 0.000000 0.060000 (100.128980)
Note, however, that threading is not free. In the final benchmark, we would expect a runtime of ~1 second but saw over a third of a second slower. If we run the same test with a run block of (n * 10_000).times { 'derp' }
, for example, the unthreaded version is about 10% faster.
tl;dr don't thread because it sounds cool. Use it when you need it.
UltraMarathon::AbstractRunner
includes numerous life-cycle callbacks for
tangential code execution. Callbacks may be either callable objects
(procs/lambdas) or symbols of methods defined on the runner.
The basic flow of execution is as follows:
before_run
- (
run!
)on_error
after_run
- (
reset
) on_reset
If there is an error raised in any run block, any on_error
callbacks will be
invoked, passing in the error if the callback takes arguments.
class NewsRunner < UltraMarathon::AbstractRunner
before_run :fetch_new_yorker_stories
after_run :get_learning_on
on_error :contemplate_existence
run do
NewYorker.fetch_rss_feed!
end
private
def contemplate_existence(error)
if error.is_a? HighBrowError
puts 'Not cultured enough to understand :('
else
puts "Error: #{error.message}"
end
end
end
Callbacks can, additionally, take a hash of options. Currently :if
and
:unless
are supported. They too can be callable objects or symbols.
class MercurialRunner < UltraMarathon::AbstractRunner
after_run :celebrate, :if => :success?
after_run :cry, unless: ->{ success? }
run do
raise 'hell' if rand(2) % 2 == 0
end
end
The entire run!
is instrumented and logged automatically. Additionally,
individual run blocks are instrumented and logged by default.
You can also choose to not instrument a block:
run :embarrassingly_slowly, instrument: false do
sleep(rand(10))
end
If any part of a runner fails, either by raising an error or explicitly setting
self.success = false
, the entire run will be considered a failure. Any run blocks
which rely on an unsuccessful run block will also be considered failed.
A failed runner can be reset, which essentially changes the failed runners to being unrun and returns the success flag to true. Runners that have been successfully run will not be rerun, though any failed dependencies will.
The runner will then execute any on_reset
callbacks before returning itself.
class WatRunner < UltraMarathon::AbstractRunner
after_reset ->{ $global_variable = 42 }
after_run ->{ puts 'all is well in the universe'}
run do
unless $global_variable == 42
puts 'wrong!'
raise 'boom'
end
end
end
WatRunner.run!.reset.run!
#=> wrong
#=> all is well in the universe