Skip to content

Commit

Permalink
Multiple improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton Kuraev committed Nov 15, 2021
1 parent 253ad79 commit f9309fb
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 41 deletions.
5 changes: 0 additions & 5 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ gem 'dogstatsd-ruby', '= 3.3.0'
gem 'aws-sdk-sqs', '= 1.3.0'
gem 'get_process_mem', '= 0.2.1'

group :newrelic do
gem 'newrelic_rpm', '~> 3.18.1'
gem 'newrelic-zookeeper', '~> 1.0.0'
end

group :ddtrace do
gem 'ddtrace', '~> 0.45.0'
end
18 changes: 9 additions & 9 deletions config.ru
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ log.level = Logger::INFO unless opts['debug']

opts['log'] = log

# Override store mode from ENV
if ENV['OPTICA_SPLIT_MODE']
opts['split_mode'] = ENV['OPTICA_SPLIT_MODE'].downcase
end

# Enable GC stats
if opts['gc_stats']
if defined? GC::Profiler && GC::Profiler.respond_to?(:enable)
Expand All @@ -28,13 +33,6 @@ end
require 'datadog/statsd'
STATSD = Datadog::Statsd.new(opts['statsd_host'], opts['statsd_port'])

begin
require 'newrelic_rpm'
require 'newrelic-zookeeper'
rescue LoadError
log.info "Newrelic not found, skipping..."
end

# prepare to exit cleanly
$EXIT = false

Expand Down Expand Up @@ -92,7 +90,7 @@ end
# set a signal handler
['INT', 'TERM', 'QUIT'].each do |signal|
trap(signal) do
log.warn "Got signal #{signal} -- exit currently #{$EXIT}"
$stderr.puts "Got signal #{signal} -- exit currently #{$EXIT}"

exit! if $EXIT
$EXIT = true
Expand Down Expand Up @@ -133,7 +131,8 @@ def datadog_config(log)

# add correlation IDs to logger
log.formatter = proc do |severity, datetime, progname, msg|
"[#{datetime}][#{progname}][#{severity}][#{Datadog.tracer.active_correlation}] #{msg}\n"
correlation = Datadog.tracer.active_correlation rescue 'FAILED'
"[#{datetime}][#{progname}][#{severity}][#{correlation}] #{msg}\n"
end
end

Expand All @@ -148,6 +147,7 @@ Optica.set :logger, log
Optica.set :store, store
Optica.set :events, events
Optica.set :ip_check, ip_check
Optica.set :split_mode, opts['split_mode']

# start the app
log.info "Starting sinatra server..."
Expand Down
13 changes: 12 additions & 1 deletion optica.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,25 @@ class Optica < Sinatra::Base
return get_nodes(request, fields_to_include)
end

get '/store' do
content_type 'application/octet-stream'
return settings.store.nodes_serialized
end

def get_nodes(request, fields_to_include=nil)
params = CGI::parse(request.query_string).reject { |p| p[0] == '_' }

# Optimization for some of the most expensive requests
if fields_to_include.nil? && params.empty? && settings.split_mode == 'server'
content_type 'application/json', :charset => 'utf-8'
return settings.store.nodes_serialized
end

# include only those nodes that match passed-in parameters
examined = 0
to_return = {}
begin
nodes = settings.store.nodes
nodes = settings.store.lookup(params)
rescue
halt(503)
end
Expand Down
203 changes: 177 additions & 26 deletions store.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
require 'zk'
require 'oj'
require 'hash_deep_merge'
require 'open-uri'

class Store

attr_reader :ips

DEFAULT_CACHE_STALE_AGE = 0
DEFAULT_CACHE_STALE_AGE = 0
DEFAULT_SPLIT_MODE = "disabled"
DEFAULT_STORE_PORT = 8001
DEFAULT_HTTP_TIMEOUT = 30
DEFAULT_HTTP_RETRY_DELAY = 5

def initialize(opts)
@log = opts['log']
@index_fields = opts['index_fields'].to_s.split(/,\s*/)

@opts = {
'split_mode' => DEFAULT_SPLIT_MODE,
'split_mode_store_port' => DEFAULT_STORE_PORT,
'split_mode_retry_delay' => DEFAULT_HTTP_RETRY_DELAY,
'split_mode_http_timeout' => DEFAULT_HTTP_TIMEOUT,
}.merge(opts)

unless opts['zk_path']
raise ArgumentError, "missing required argument 'zk_path'"
Expand All @@ -36,18 +49,21 @@ def setup_cache(opts)
@cache_root_watcher = nil

# mutex for atomically updating cached results
@cache_mutex = Mutex.new
@cache_results_serialized = nil
@cache_results = {}
@cache_indices = {}
@cache_mutex = Mutex.new

# daemon that'll fetch from zk periodically
@cache_fetch_thread = nil
# flag that controls if fetch daemon should run
@cache_fetch_thread_should_run = false
# how long we serve cached data
@cache_fetch_interval = (opts['cache_fetch_interval'] || 20).to_i
@cache_fetch_base_interval = (opts['cache_fetch_interval'] || 20).to_i
@cache_fetch_interval = @cache_fetch_base_interval

# timestamp that prevents setting cache result with stale data
@cache_results_last_fetched_time = Time.now
@cache_results_last_fetched_time = 0
end

def start()
Expand Down Expand Up @@ -101,20 +117,83 @@ def stop()
# get instances for a given service
def nodes()
STATSD.time('optica.store.get_nodes') do
return load_instances_from_zk unless @cache_enabled
unless @cache_enabled
inst, idx = load_instances_from_zk
return inst
end

check_cache_age
@cache_results
end
end

def nodes_serialized
@cache_results_serialized
end

def lookup(params)
if @opts['split_mode'] != 'server' || !@cache_enabled
return nodes
end

STATSD.time('optica.store.lookup') do

# Find all suitable indices and their cardinalities
cardinalities = params.reduce({}) do |res, (key, _)|
res[key] = @cache_indices[key].length if @cache_indices.key? key
res
end

unless cardinalities.empty?
# Find best suitable index
best_key = cardinalities.sort_by {|k,v| v}.first.first
best_idx = @cache_indices.fetch(best_key, {})

# Check if index saves enough cycles, otherwise fall back to full cache
if @cache_results.length > 0 && best_idx.length.to_f / @cache_results.length.to_f > 0.5
return nodes
end

return nodes_from_index(best_idx, params[best_key])
end

return nodes
end
end

def load_instances
STATSD.time('optica.store.load_instances') do
@opts['split_mode'] == 'server' ?
load_instances_from_leader :
load_instances_from_zk
end
end

def load_instances_from_leader
begin
uri = "http://localhost:%d/store" % @opts['split_mode_store_port']
res = open(uri, :read_timeout => @opts['split_mode_http_timeout'])

remote_store = Oj.load res.read
[ remote_store['inst'], remote_store['idx'] ]
rescue OpenURI::HTTPError, Errno::ECONNREFUSED, Net::ReadTimeout => e
@log.error "Error loading store from #{uri}: #{e.inspect}; will retry after #{@opts['split_mode_retry_delay']}"

sleep @opts['split_mode_retry_delay']
retry
end
end

def load_instances_from_zk()
@log.info "Reading instances from zk:"
from_server = {}

inst = {}
idx = {}

begin
@zk.children('/', :watch => true).each do |child|
from_server[child] = get_node("/#{child}")
node = get_node("/#{child}")
update_nodes child, node, inst, idx
end
rescue Exception => e
# ZK client library caches DNS names of ZK nodes and it resets the
Expand All @@ -128,7 +207,7 @@ def load_instances_from_zk()
raise e
end

from_server
[inst, idx]
end

def add(node, data)
Expand Down Expand Up @@ -190,6 +269,53 @@ def healthy?()
end

private

def nodes_from_index(idx, values)
matching_keys = []

# To preserve original optica behavior we have to validate all keys
# against standard rules
values.each do |val|
keys = idx.keys.select do |key|
matched = true
if key.is_a? String
matched = false unless key.match val
elsif key.is_a? Array
matched = false unless key.include? val
elsif key.class == TrueClass
matched = false unless ['true', 'True', '1'].include? val
elsif key.class == FalseClass
matched = false unless ['false', 'False', '0'].include? val
end
matched
end
matching_keys << keys
end

if matching_keys.length == 1
matching_keys = matching_keys.first
elsif matching_keys.length > 1
matching_keys = matching_keys.inject(:&)
end

matching_keys.reduce({}) do |res, key|
res.merge idx.fetch(key, {})
end
end

def update_nodes(node_name, node, inst, idx)
inst[node_name] = node

@index_fields.each do |key|
if node.key?(key) && !node[key].nil?
val = node[key]
idx[key] ||= {}
idx[key][val] ||= {}
idx[key][val][node_name] = node
end
end
end

def get_node(node)
begin
data, stat = STATSD.time('optica.zookeeper.get') do
Expand All @@ -215,7 +341,7 @@ def get_node(node)

# immediately update cache if node joins/leaves
def setup_watchers
return if @zk.nil?
return if @zk.nil? || @opts['split_mode'] == 'server'

@cache_root_watcher = @zk.register("/", :only => :child) do |event|
@log.info "Children added/deleted"
Expand All @@ -238,32 +364,57 @@ def check_cache_age
end

def reload_instances()
# Here we use local time to preven race condition
# Basically cache fetch thread or zookeeper watch callback
# both will call this to refresh cache. Depending on which
# finishes first our cache will get set by the slower one.
# So in order to prevent setting cache to an older result,
# we set both cache and the timestamp of that version fetched
# Since timestamp will be monotonically increasing, we are
# sure that cache set will always have newer versions

fetch_start_time = Time.now
instances = load_instances_from_zk.freeze
@cache_mutex.synchronize do
if fetch_start_time > @cache_results_last_fetched_time then
@cache_results_last_fetched_time = fetch_start_time
@cache_results = instances
end

return unless @cache_mutex.try_lock

begin
now = Time.now.to_i

if now > @cache_results_last_fetched_time + @cache_fetch_interval
inst, idx = load_instances

@cache_results = inst.freeze
@cache_indices = idx.freeze

case @opts['split_mode']
when 'store'
new_store = {
'inst' => @cache_results,
'idx' => @cache_indices,
}
@cache_results_serialized = Oj.dump new_store
when 'server'
new_store = {
'examined' => 0,
'returned' => @cache_results.length,
'nodes' => @cache_results,
}
@cache_results_serialized = Oj.dump new_store
end


@cache_results_last_fetched_time = now
update_cache_fetch_interval

@log.info "reloaded cache. new reload interval = #{@cache_fetch_interval}"
end
ensure
@cache_mutex.unlock
end
end

def update_cache_fetch_interval
@cache_fetch_interval = @cache_fetch_base_interval + rand(0..20)
end

def start_fetch_thread()
@cache_fetch_thread_should_run = true
@cache_fetch_thread = Thread.new do
while @cache_fetch_thread_should_run do
begin
sleep(@cache_fetch_interval) rescue nil
@log.info "Cache fetch thread now fetches from zk..."
source = @opts['split_mode'] == 'server' ? 'remote store' : 'zookeeper'
@log.info "Cache fetch thread now fetches from #{source}..."
reload_instances rescue nil
check_cache_age
rescue => ex
Expand Down

0 comments on commit f9309fb

Please sign in to comment.