Skip to content

Commit

Permalink
Merge pull request #41 from airbnb/darnaut/sqs-events
Browse files Browse the repository at this point in the history
Support for sending events to SQS
  • Loading branch information
darnaut authored May 24, 2018
2 parents 2edbe6e + 2a64f7c commit ab88181
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 12 deletions.
3 changes: 2 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ gem 'unicorn-worker-killer', '~> 0.4.4'
gem 'hash-deep-merge', '~> 0.1.1'
gem 'oj', '= 3.3.2'
gem 'stomp', '~> 1.3.2'
gem 'statsd-ruby', '~> 1.2.1'
gem 'dogstatsd-ruby', '= 3.3.0'
gem 'aws-sdk-sqs', '= 1.3.0'

group :newrelic do
gem 'newrelic_rpm', '~> 3.18.1'
Expand Down
5 changes: 4 additions & 1 deletion config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
"debug":true,
"ip_check":true,
"rabbit_host":"127.0.0.1",
"rabbit_port":5672
"rabbit_port":5672,
"events":["rabbitmq", "sqs"],
"sqs_region":"us-west-2",
"sqs_queue":"optica-events"
}
32 changes: 27 additions & 5 deletions config.ru
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ if opts['gc_stats']
end

# prepare statsd
require 'statsd-ruby'
STATSD = Statsd.new(opts['statsd_host'], opts['statsd_port'])
require 'datadog/statsd'
STATSD = Datadog::Statsd.new(opts['statsd_host'], opts['statsd_port'])

begin
require 'newrelic_rpm'
Expand Down Expand Up @@ -56,10 +56,32 @@ require './store.rb'
store = Store.new(opts)
store.start

EVENTS_CLASSES = {
'rabbitmq' => {
'class_name' => 'EventsRabbitMQ',
'file_name' => './events_rmq.rb',
},
'sqs' => {
'class_name' => 'EventsSQS',
'file_name' => './events_sqs.rb'
},
}

events_classes = opts['events'] || ['rabbitmq']

# configure the event creator
require './events.rb'
events = Events.new(opts)
events.start
events = events_classes.map do |name|
class_opts = EVENTS_CLASSES[name]
raise "unknown value '#{name}' for events option" unless class_opts
class_name = class_opts['class_name']
file_name = class_opts['file_name']
log.info "loading #{class_name} from #{file_name}"
require file_name
class_const = Object.const_get(class_name)
class_const.new(opts).tap do |obj|
obj.start
end
end

# set a signal handler
['INT', 'TERM', 'QUIT'].each do |signal|
Expand Down
8 changes: 6 additions & 2 deletions events.rb → events_rmq.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require 'stomp'
require 'oj'

class Events
class EventsRabbitMQ
def initialize(opts)
@log = opts['log']

Expand All @@ -27,6 +27,10 @@ def initialize(opts)
@health_routing = opts['health_routing'] || 'checks.optica'
end

def name
'rabbitmq'
end

def start
@client = Stomp::Client.new(@connect_hash)
end
Expand All @@ -46,7 +50,7 @@ def healthy?
@log.error "events interface failed health check: #{e.inspect}"
false
else
@log.debug "events interface healthy"
@log.debug "events interface for RabbitMQ healthy"
true
end

Expand Down
53 changes: 53 additions & 0 deletions events_sqs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require 'aws-sdk-sqs'
require 'oj'

class EventsSQS
def initialize(opts)
@log = opts['log']

%w{sqs_region sqs_queue}.each do |req|
raise ArgumentError, "missing required argument '#{req}'" unless opts[req]
end

@opts = {
:region => opts['sqs_region'],
:queue => opts['sqs_queue'],
:logger => @log,
}

@message_group_id = opts['routing'] || 'events.node.converged'
@health_message_group_id = opts['health_routing'] || 'checks.optica'
end

def name
'sqs'
end

def start
@sqs = Aws::SQS::Client.new(region: @opts[:region], logger: @opts[:logger])
resp = @sqs.get_queue_url(queue_name: @opts[:queue])
@opts[:queue_url] = resp.queue_url
end

def send(data)
@sqs.send_message(queue_url: @opts[:queue_url],
message_group_id: @message_group_id, message_body: Oj.dump(data))
@log.debug "published an event to #{@opts[:queue]}"
rescue StandardError => e
@log.error "unexpected error publishing to SQS: #{e.inspect}"
raise e
end

def healthy?
@sqs.send_message(queue_url: @opts[:queue_url],
message_group_id: @health_message_group_id, message_body: '{}')
@log.debug "events interface for SQS is healthy"
true
rescue StandardError => e
@log.error "events interface for SQS failed health check: #{e.inspect}"
false
end

def stop
end
end
13 changes: 10 additions & 3 deletions optica.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,17 @@ def get_nodes(request, fields_to_include=nil)
# publish update event
message = 'stored'
begin
settings.events.send(merged_data.merge("event" => data))
tags = []
event = merged_data.merge('event' => data)
settings.events.each do |events|
tags = ["events_queue:#{events.name}"]
events.send(event)
STATSD.increment('optica.events', :tags => tags + ['status:success'])
end
rescue => e
STATSD.increment('optica.events', :tags => tags + ['status:failed']) unless tags.empty?
# If event publishing failed, we treat it as a warning rather than an error.
message += " -- [warning] failed to publish to rabbitmq: #{e.to_s}"
message += " -- [warning] failed to publish event: #{e.to_s}"
end

content_type 'text/plain', :charset => 'utf-8'
Expand All @@ -132,7 +139,7 @@ def get_nodes(request, fields_to_include=nil)
end

get '/health' do
if settings.store.healthy? and settings.events.healthy?
if settings.store.healthy? and settings.events.all? { |events| events.healthy? }
content_type 'text/plain', :charset => 'utf-8'
return "OK"
else
Expand Down

0 comments on commit ab88181

Please sign in to comment.