From f9309fbf93d49392109aa3220564b5097c6ddb8a Mon Sep 17 00:00:00 2001 From: Anton Kuraev Date: Wed, 10 Nov 2021 18:20:22 -0800 Subject: [PATCH] Multiple improvements --- Gemfile | 5 -- config.ru | 18 ++--- optica.rb | 13 +++- store.rb | 203 +++++++++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 198 insertions(+), 41 deletions(-) diff --git a/Gemfile b/Gemfile index 274adaf..0a5a4e5 100644 --- a/Gemfile +++ b/Gemfile @@ -11,11 +11,6 @@ gem 'dogstatsd-ruby', '= 3.3.0' gem 'aws-sdk-sqs', '= 1.3.0' gem 'get_process_mem', '= 0.2.1' -group :newrelic do - gem 'newrelic_rpm', '~> 3.18.1' - gem 'newrelic-zookeeper', '~> 1.0.0' -end - group :ddtrace do gem 'ddtrace', '~> 0.45.0' end diff --git a/config.ru b/config.ru index d7c90dd..fc414df 100644 --- a/config.ru +++ b/config.ru @@ -9,6 +9,11 @@ log.level = Logger::INFO unless opts['debug'] opts['log'] = log +# Override store mode from ENV +if ENV['OPTICA_SPLIT_MODE'] + opts['split_mode'] = ENV['OPTICA_SPLIT_MODE'].downcase +end + # Enable GC stats if opts['gc_stats'] if defined? GC::Profiler && GC::Profiler.respond_to?(:enable) @@ -28,13 +33,6 @@ end require 'datadog/statsd' STATSD = Datadog::Statsd.new(opts['statsd_host'], opts['statsd_port']) -begin - require 'newrelic_rpm' - require 'newrelic-zookeeper' -rescue LoadError - log.info "Newrelic not found, skipping..." -end - # prepare to exit cleanly $EXIT = false @@ -92,7 +90,7 @@ end # set a signal handler ['INT', 'TERM', 'QUIT'].each do |signal| trap(signal) do - log.warn "Got signal #{signal} -- exit currently #{$EXIT}" + $stderr.puts "Got signal #{signal} -- exit currently #{$EXIT}" exit! if $EXIT $EXIT = true @@ -133,7 +131,8 @@ def datadog_config(log) # add correlation IDs to logger log.formatter = proc do |severity, datetime, progname, msg| - "[#{datetime}][#{progname}][#{severity}][#{Datadog.tracer.active_correlation}] #{msg}\n" + correlation = Datadog.tracer.active_correlation rescue 'FAILED' + "[#{datetime}][#{progname}][#{severity}][#{correlation}] #{msg}\n" end end @@ -148,6 +147,7 @@ Optica.set :logger, log Optica.set :store, store Optica.set :events, events Optica.set :ip_check, ip_check +Optica.set :split_mode, opts['split_mode'] # start the app log.info "Starting sinatra server..." diff --git a/optica.rb b/optica.rb index 4f5be44..8185ddc 100644 --- a/optica.rb +++ b/optica.rb @@ -30,14 +30,25 @@ class Optica < Sinatra::Base return get_nodes(request, fields_to_include) end + get '/store' do + content_type 'application/octet-stream' + return settings.store.nodes_serialized + end + def get_nodes(request, fields_to_include=nil) params = CGI::parse(request.query_string).reject { |p| p[0] == '_' } + # Optimization for some of the most expensive requests + if fields_to_include.nil? && params.empty? && settings.split_mode == 'server' + content_type 'application/json', :charset => 'utf-8' + return settings.store.nodes_serialized + end + # include only those nodes that match passed-in parameters examined = 0 to_return = {} begin - nodes = settings.store.nodes + nodes = settings.store.lookup(params) rescue halt(503) end diff --git a/store.rb b/store.rb index fc0d413..354ff44 100644 --- a/store.rb +++ b/store.rb @@ -1,15 +1,28 @@ require 'zk' require 'oj' require 'hash_deep_merge' +require 'open-uri' class Store attr_reader :ips - DEFAULT_CACHE_STALE_AGE = 0 + DEFAULT_CACHE_STALE_AGE = 0 + DEFAULT_SPLIT_MODE = "disabled" + DEFAULT_STORE_PORT = 8001 + DEFAULT_HTTP_TIMEOUT = 30 + DEFAULT_HTTP_RETRY_DELAY = 5 def initialize(opts) @log = opts['log'] + @index_fields = opts['index_fields'].to_s.split(/,\s*/) + + @opts = { + 'split_mode' => DEFAULT_SPLIT_MODE, + 'split_mode_store_port' => DEFAULT_STORE_PORT, + 'split_mode_retry_delay' => DEFAULT_HTTP_RETRY_DELAY, + 'split_mode_http_timeout' => DEFAULT_HTTP_TIMEOUT, + }.merge(opts) unless opts['zk_path'] raise ArgumentError, "missing required argument 'zk_path'" @@ -36,18 +49,21 @@ def setup_cache(opts) @cache_root_watcher = nil # mutex for atomically updating cached results - @cache_mutex = Mutex.new + @cache_results_serialized = nil @cache_results = {} + @cache_indices = {} + @cache_mutex = Mutex.new # daemon that'll fetch from zk periodically @cache_fetch_thread = nil # flag that controls if fetch daemon should run @cache_fetch_thread_should_run = false # how long we serve cached data - @cache_fetch_interval = (opts['cache_fetch_interval'] || 20).to_i + @cache_fetch_base_interval = (opts['cache_fetch_interval'] || 20).to_i + @cache_fetch_interval = @cache_fetch_base_interval # timestamp that prevents setting cache result with stale data - @cache_results_last_fetched_time = Time.now + @cache_results_last_fetched_time = 0 end def start() @@ -101,20 +117,83 @@ def stop() # get instances for a given service def nodes() STATSD.time('optica.store.get_nodes') do - return load_instances_from_zk unless @cache_enabled + unless @cache_enabled + inst, idx = load_instances_from_zk + return inst + end check_cache_age @cache_results end end + def nodes_serialized + @cache_results_serialized + end + + def lookup(params) + if @opts['split_mode'] != 'server' || !@cache_enabled + return nodes + end + + STATSD.time('optica.store.lookup') do + + # Find all suitable indices and their cardinalities + cardinalities = params.reduce({}) do |res, (key, _)| + res[key] = @cache_indices[key].length if @cache_indices.key? key + res + end + + unless cardinalities.empty? + # Find best suitable index + best_key = cardinalities.sort_by {|k,v| v}.first.first + best_idx = @cache_indices.fetch(best_key, {}) + + # Check if index saves enough cycles, otherwise fall back to full cache + if @cache_results.length > 0 && best_idx.length.to_f / @cache_results.length.to_f > 0.5 + return nodes + end + + return nodes_from_index(best_idx, params[best_key]) + end + + return nodes + end + end + + def load_instances + STATSD.time('optica.store.load_instances') do + @opts['split_mode'] == 'server' ? + load_instances_from_leader : + load_instances_from_zk + end + end + + def load_instances_from_leader + begin + uri = "http://localhost:%d/store" % @opts['split_mode_store_port'] + res = open(uri, :read_timeout => @opts['split_mode_http_timeout']) + + remote_store = Oj.load res.read + [ remote_store['inst'], remote_store['idx'] ] + rescue OpenURI::HTTPError, Errno::ECONNREFUSED, Net::ReadTimeout => e + @log.error "Error loading store from #{uri}: #{e.inspect}; will retry after #{@opts['split_mode_retry_delay']}" + + sleep @opts['split_mode_retry_delay'] + retry + end + end + def load_instances_from_zk() @log.info "Reading instances from zk:" - from_server = {} + + inst = {} + idx = {} begin @zk.children('/', :watch => true).each do |child| - from_server[child] = get_node("/#{child}") + node = get_node("/#{child}") + update_nodes child, node, inst, idx end rescue Exception => e # ZK client library caches DNS names of ZK nodes and it resets the @@ -128,7 +207,7 @@ def load_instances_from_zk() raise e end - from_server + [inst, idx] end def add(node, data) @@ -190,6 +269,53 @@ def healthy?() end private + + def nodes_from_index(idx, values) + matching_keys = [] + + # To preserve original optica behavior we have to validate all keys + # against standard rules + values.each do |val| + keys = idx.keys.select do |key| + matched = true + if key.is_a? String + matched = false unless key.match val + elsif key.is_a? Array + matched = false unless key.include? val + elsif key.class == TrueClass + matched = false unless ['true', 'True', '1'].include? val + elsif key.class == FalseClass + matched = false unless ['false', 'False', '0'].include? val + end + matched + end + matching_keys << keys + end + + if matching_keys.length == 1 + matching_keys = matching_keys.first + elsif matching_keys.length > 1 + matching_keys = matching_keys.inject(:&) + end + + matching_keys.reduce({}) do |res, key| + res.merge idx.fetch(key, {}) + end + end + + def update_nodes(node_name, node, inst, idx) + inst[node_name] = node + + @index_fields.each do |key| + if node.key?(key) && !node[key].nil? + val = node[key] + idx[key] ||= {} + idx[key][val] ||= {} + idx[key][val][node_name] = node + end + end + end + def get_node(node) begin data, stat = STATSD.time('optica.zookeeper.get') do @@ -215,7 +341,7 @@ def get_node(node) # immediately update cache if node joins/leaves def setup_watchers - return if @zk.nil? + return if @zk.nil? || @opts['split_mode'] == 'server' @cache_root_watcher = @zk.register("/", :only => :child) do |event| @log.info "Children added/deleted" @@ -238,32 +364,57 @@ def check_cache_age end def reload_instances() - # Here we use local time to preven race condition - # Basically cache fetch thread or zookeeper watch callback - # both will call this to refresh cache. Depending on which - # finishes first our cache will get set by the slower one. - # So in order to prevent setting cache to an older result, - # we set both cache and the timestamp of that version fetched - # Since timestamp will be monotonically increasing, we are - # sure that cache set will always have newer versions - - fetch_start_time = Time.now - instances = load_instances_from_zk.freeze - @cache_mutex.synchronize do - if fetch_start_time > @cache_results_last_fetched_time then - @cache_results_last_fetched_time = fetch_start_time - @cache_results = instances - end + + return unless @cache_mutex.try_lock + + begin + now = Time.now.to_i + + if now > @cache_results_last_fetched_time + @cache_fetch_interval + inst, idx = load_instances + + @cache_results = inst.freeze + @cache_indices = idx.freeze + + case @opts['split_mode'] + when 'store' + new_store = { + 'inst' => @cache_results, + 'idx' => @cache_indices, + } + @cache_results_serialized = Oj.dump new_store + when 'server' + new_store = { + 'examined' => 0, + 'returned' => @cache_results.length, + 'nodes' => @cache_results, + } + @cache_results_serialized = Oj.dump new_store + end + + + @cache_results_last_fetched_time = now + update_cache_fetch_interval + + @log.info "reloaded cache. new reload interval = #{@cache_fetch_interval}" + end + ensure + @cache_mutex.unlock end end + def update_cache_fetch_interval + @cache_fetch_interval = @cache_fetch_base_interval + rand(0..20) + end + def start_fetch_thread() @cache_fetch_thread_should_run = true @cache_fetch_thread = Thread.new do while @cache_fetch_thread_should_run do begin sleep(@cache_fetch_interval) rescue nil - @log.info "Cache fetch thread now fetches from zk..." + source = @opts['split_mode'] == 'server' ? 'remote store' : 'zookeeper' + @log.info "Cache fetch thread now fetches from #{source}..." reload_instances rescue nil check_cache_age rescue => ex