Skip to content

Commit

Permalink
feat: [#45] Configure concurrency of Harmoniser process
Browse files Browse the repository at this point in the history
* Update Bunny to its latest to get notified when a channel is going to
  be closed because any of the consumer did not ACK is the timeout
  defined at the server
* Introduce a concurrency example using Bunny
* Add concurrency option for the CLI. It defaults to unbounded
  concurrency
* Introduce Launcher#stop to manage cancelling subscribers and closing
  connection instead of using at_exit hook.
* Introduce concurrency at Launcher#start
* Move recovery_attempt_started and recovery_completed callbacks to
  Connectable so that do not depend on Harmoniser but on connection
  method instead
* Create/Push tag anytime a merge to master happens.
  • Loading branch information
jollopre committed Aug 22, 2024
1 parent 88499a3 commit 17b326d
Show file tree
Hide file tree
Showing 24 changed files with 453 additions and 92 deletions.
15 changes: 14 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
name: Publish
runs-on: ubuntu-latest
permissions:
contents: read
contents: write
packages: write

steps:
Expand All @@ -29,3 +29,16 @@ jobs:
gem push *.gem
env:
GEM_HOST_API_KEY: "${{secrets.RUBYGEMS_AUTH_TOKEN}}"

- name: Get version from gemspec
id: get_version
run: echo "VERSION=$(ruby -e 'puts Gem::Specification.load("harmoniser.gemspec").version')" >> $GITHUB_ENV

- name: Create and Push Git Tag
run: |
git config --global user.name "github-actions"
git config --global user.email "[email protected]"
git tag v$VERSION
git push origin v$VERSION
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
command: ["tail", "-f", "/dev/null"]
rabbitmq:
image: rabbitmq:3.9.29-management
volumes:
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
ports:
- "15672:15672"
payments:
Expand Down
99 changes: 99 additions & 0 deletions examples/bunny/concurrency.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
require "bunny"
require "logger"
require "json"

CONCURRENCY = ARGV[0].to_i
PREFETCH_COUNT = ARGV[1].nil? ? nil : ARGV[1].to_i
MANUAL_ACK = ARGV[2].match?(/^true$/)

logger = Logger.new($stdout, level: :DEBUG)
connection = Bunny.new({
host: "rabbitmq",
port: 5672,
username: "guest",
password: "guest",
vhost: "/",
logger: logger
}).start

# Define Topology
ch1 = connection.create_channel
Bunny::Exchange.new(ch1, :fanout, "a_exchange")
Bunny::Queue.new(ch1, "a_queue1", durable: true, auto_delete: false).purge
Bunny::Queue.new(ch1, "a_queue2", durable: true, auto_delete: false).purge
ch1.queue_bind("a_queue1", "a_exchange")
ch1.queue_bind("a_queue2", "a_exchange")
ch1.close

# Create Publisher
ch2 = connection.create_channel
publisher = Bunny::Exchange.new(ch2, nil, "a_exchange", { passive: true })

# Create Channel
subscribers_channel = lambda do |concurrency, prefetch_count|
ch = connection.create_channel(nil, concurrency)
ch.basic_qos(prefetch_count) unless prefetch_count.nil?
ch.on_error do |ch, amq_method|
logger.error("Error produced in the channel: amp_method = `#{amq_method}`, reply_code = `#{amq_method.reply_code}`, reply_text = `#{amq_method.reply_text}`")
end
ch
end.call(CONCURRENCY, PREFETCH_COUNT)

create_consumer = lambda do |queue_name, tag, no_ack, waiting_time|
consumer = Bunny::Consumer.new(subscribers_channel, queue_name, tag, no_ack)
consumer.on_delivery do |delivery_info, properties, payload|
sleep(waiting_time)

id, _ = JSON.parse(payload, symbolize_names: true).values_at(:id)
raise "Unexpected error" if id == 5

subscribers_channel.basic_ack(delivery_info.delivery_tag) unless consumer.no_ack
logger.info("Message processed by a consumer: consumer_tag = `#{consumer.consumer_tag}`, `payload = `#{payload}`, queue = `#{consumer.queue}`")
end
consumer.on_cancellation do |basic_consume|
logger.info("Default on_cancellation handler executed for consumer")
end
end

c1 = create_consumer.call("a_queue1", "c1", !MANUAL_ACK, 60)
c2 = create_consumer.call("a_queue2", "c2", !MANUAL_ACK, 60)
subscribers_channel.basic_consume_with(c1)
subscribers_channel.basic_consume_with(c2)

# Publish on a separate thread
Thread.abort_on_exception = true
Thread.new do
id = 0
loop do
payload = { hello: "World", id: id+=1 }
publisher.publish(payload.to_json)
logger.info("Message published: payload = `#{payload}`")
id % 10 == 0 ? sleep(60) : sleep(1)
end
end

# Stats for subscribers_channel
Thread.new do
loop do
work_pool = subscribers_channel.work_pool
logger.info("Stats: work_pool = `#{work_pool.backlog}`, number_of_threads = `#{work_pool.threads.size}`, prefetch_count = `#{subscribers_channel.prefetch_count}`, prefetch_global = `#{subscribers_channel.prefetch_global}`")
sleep 1
end
end

begin
sleep
rescue SignalException
puts "Cancelling all the subscribers"
logger.info("Subscribers are going to be cancelled")
work_pool = subscribers_channel.work_pool
c1.cancel
c2.cancel
puts "Messages might be still in progress since work pool size is `#{work_pool.backlog}`" if work_pool.busy?
connection.close
logger.info("Bye!")
end

###
# ruby examples/concurrency.rb CONCURRENCY PREFETCH_COUNT MANUAL_ACK
# ruby examples/concurrency.rb 2 4 true
20 changes: 15 additions & 5 deletions lib/harmoniser/channelable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,25 @@ def harmoniser_channel
end
end

def create_channel
channel = Harmoniser.connection.create_channel
channel.on_error(&method(:on_error).to_proc)
channel.on_uncaught_exception(&method(:on_uncaught_exception).to_proc)
channel
def create_channel(consumer_pool_size: 1)
connection
.create_channel(nil, consumer_pool_size)
.tap do |channel|
attach_callbacks(channel)
end
end

private

def connection
Harmoniser.connection
end

def attach_callbacks(channel)
channel.on_error(&method(:on_error).to_proc)
channel.on_uncaught_exception(&method(:on_uncaught_exception).to_proc)
end

def on_error(channel, amq_method)
Harmoniser.logger.error("Default on_error handler executed for channel: method = `#{amq_method}`, exchanges = `#{channel.exchanges.keys}`, queues = `#{channel.queues.keys}`")
end
Expand Down
6 changes: 3 additions & 3 deletions lib/harmoniser/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CLI
def initialize
@configuration = Harmoniser.default_configuration
@logger = Harmoniser.logger
@launcher = Launcher.new(configuration: @configuration, logger: @logger)
end

def call
Expand Down Expand Up @@ -45,9 +46,7 @@ def define_signals
end

def run
Launcher
.new(configuration: configuration, logger: logger)
.start
@launcher.start

define_signals

Expand All @@ -58,6 +57,7 @@ def run
rescue Interrupt
@write_io.close
@read_io.close
@launcher.stop
exit(0)
end

Expand Down
3 changes: 2 additions & 1 deletion lib/harmoniser/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Configuration
include Connectable

attr_reader :logger, :options
def_delegators :options, :environment, :require, :verbose
def_delegators :options, :concurrency, :environment, :require, :verbose

def initialize
@logger = Harmoniser.logger
Expand All @@ -33,6 +33,7 @@ def options_with(**)

def default_options
{
concurrency: Float::INFINITY,
environment: ENV.fetch("RAILS_ENV", ENV.fetch("RACK_ENV", "production")),
require: ".",
verbose: false
Expand Down
34 changes: 13 additions & 21 deletions lib/harmoniser/connectable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,18 @@ module Connectable
MUTEX = Mutex.new

def connection_opts
@connection_opts ||= Connection::DEFAULT_CONNECTION_OPTS.merge({logger: Harmoniser.logger})
@connection_opts ||= Connection::DEFAULT_CONNECTION_OPTS
.merge({
logger: Harmoniser.logger,
recovery_attempt_started: proc {
stringified_connection = connection.to_s
Harmoniser.logger.info("Recovery attempt started: connection = `#{stringified_connection}`")
},
recovery_completed: proc {
stringified_connection = connection.to_s
Harmoniser.logger.info("Recovery completed: connection = `#{stringified_connection}`")
}
})
end

def connection_opts=(opts)
Expand All @@ -16,7 +27,7 @@ def connection_opts=(opts)

def connection
MUTEX.synchronize do
@connection ||= create_connection
@connection ||= Connection.new(connection_opts)
@connection.start unless @connection.open? || @connection.recovering_from_network_failure?
@connection
end
Expand All @@ -25,24 +36,5 @@ def connection
def connection?
!!defined?(@connection)
end

private

def create_connection
at_exit(&method(:at_exit_handler).to_proc)
Connection.new(connection_opts)
end

def at_exit_handler
logger = Harmoniser.logger

logger.info("Shutting down!")
if connection? && @connection.open?
logger.info("Connection will be closed: connection = `#{@connection}`")
@connection.close
logger.info("Connection closed: connection = `#{@connection}`")
end
logger.info("Bye!")
end
end
end
10 changes: 1 addition & 9 deletions lib/harmoniser/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,11 @@ class Connection

DEFAULT_CONNECTION_OPTS = {
connection_name: "harmoniser@#{VERSION}",
connection_timout: 5,
connection_timeout: 5,
host: "127.0.0.1",
password: "guest",
port: 5672,
read_timeout: 5,
recovery_attempt_started: proc {
stringified_connection = Harmoniser.connection.to_s
Harmoniser.logger.info("Recovery attempt started: connection = `#{stringified_connection}`")
},
recovery_completed: proc {
stringified_connection = Harmoniser.connection.to_s
Harmoniser.logger.info("Recovery completed: connection = `#{stringified_connection}`")
},
tls_silence_warnings: true,
username: "guest",
verify_peer: false,
Expand Down
47 changes: 42 additions & 5 deletions lib/harmoniser/launcher.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
require "harmoniser/channelable"

module Harmoniser
class Launcher
include Channelable
attr_reader :subscribers

def initialize(configuration:, logger:)
@configuration = configuration
@logger = logger
@subscribers = []
end

def start
boot_app
start_subscribers
@subscribers = start_subscribers
end

def stop
@logger.info("Shutting down!")
maybe_close
@logger.info("Bye!")
end

private
Expand All @@ -22,15 +34,18 @@ def boot_app

# TODO - Frameworks like Rails which have autoload for development/test will not start any subscriber unless the files where subscribers are located are required explicitly. Since we premier production and the eager load ensures that every file is loaded, this approach works
def start_subscribers
# TODO - Move this logic to Subscriber, does not make sense to be that generic
klasses = Subscriber.harmoniser_included
klasses.each do |klass|
klass.harmoniser_subscriber_start
if @configuration.options.unbounded_concurrency?
klasses.each(&:harmoniser_subscriber_start)
else
klasses.each { |klass| klass.harmoniser_subscriber_start(channel: channel) }
end

@logger.info("Subscribers registered to consume messages from queues: klasses = `#{klasses}`")
klasses
end

private

def load_rails
filepath = File.expand_path("#{@configuration.require}/config/environment.rb")
require filepath
Expand All @@ -43,5 +58,27 @@ def load_file
rescue LoadError => e
@logger.warn("Error while requiring file. No subscribers will run for this process: require = `#{@configuration.require}`, error_class = `#{e.class}`, error_message = `#{e.message}`, error_backtrace = `#{e.backtrace&.first(5)}`")
end

def maybe_close
return unless @configuration.connection?
return unless @configuration.connection.open?

maybe_cancel_subscribers

connection = @configuration.connection
@logger.info("Connection will be closed: connection = `#{connection}`")
connection.close
@logger.info("Connection closed: connection = `#{connection}`")
end

def maybe_cancel_subscribers
@logger.info("Subscribers will be cancelled from queues: klasses = `#{@subscribers}`")
@subscribers.each(&:harmoniser_subscriber_stop)
@logger.info("Subscribers cancelled: klasses = `#{@subscribers}`")
end

def channel
@channel ||= self.class.create_channel(consumer_pool_size: @configuration.concurrency)
end
end
end
6 changes: 5 additions & 1 deletion lib/harmoniser/options.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
module Harmoniser
Options = Data.define(:environment, :require, :verbose) do
Options = Data.define(:concurrency, :environment, :require, :verbose) do
def production?
environment == "production"
end

def unbounded_concurrency?
concurrency == Float::INFINITY
end

def verbose?
!!verbose
end
Expand Down
3 changes: 3 additions & 0 deletions lib/harmoniser/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ def initialize(logger:)
@options = {}
@option_parser = OptionParser.new do |opts|
opts.banner = "harmoniser [options]"
opts.on "-c", "--concurrency INT", "Set the number of threads to use" do |arg|
@options[:concurrency] = Integer(arg)
end
opts.on "-e", "--environment ENV", "Set the application environment (defaults to inferred environment or 'production')" do |arg|
@options[:environment] = arg
end
Expand Down
1 change: 1 addition & 0 deletions lib/harmoniser/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def handle_return(exchange)

class << self
def included(base)
# TODO make these constants private
base.const_set(:HARMONISER_PUBLISHER_MUTEX, Mutex.new)
base.const_set(:HARMONISER_PUBLISHER_CLASS, base)
base.extend(ClassMethods)
Expand Down
Loading

0 comments on commit 17b326d

Please sign in to comment.