From fbdb7fea7669faf6a088790a214965ec5df59c22 Mon Sep 17 00:00:00 2001 From: Esity Date: Thu, 23 Sep 2021 13:09:40 -0500 Subject: [PATCH] v0.2.0 --- .rubocop.yml | 2 +- CHANGELOG.md | 5 + config.ru | 3 +- exe/snmp_collector | 12 ++- lib/telemetry/snmp.rb | 1 + lib/telemetry/snmp/api.rb | 18 ++++ lib/telemetry/snmp/collector.rb | 101 ++++++++++++++++++ lib/telemetry/snmp/data.rb | 2 +- lib/telemetry/snmp/data/default_opts.rb | 20 ++-- .../snmp/data/migrations/011_device_locks.rb | 19 ++++ lib/telemetry/snmp/data/models/device.rb | 1 + lib/telemetry/snmp/data/models/device_lock.rb | 11 ++ lib/telemetry/snmp/publisher.rb | 22 ++-- lib/telemetry/snmp/version.rb | 2 +- spec/telemetry/publisher_spec.rb | 38 +++---- 15 files changed, 209 insertions(+), 48 deletions(-) create mode 100644 lib/telemetry/snmp/collector.rb create mode 100644 lib/telemetry/snmp/data/migrations/011_device_locks.rb create mode 100644 lib/telemetry/snmp/data/models/device_lock.rb diff --git a/.rubocop.yml b/.rubocop.yml index 57965f8..4b40f67 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -9,7 +9,7 @@ Metrics/ClassLength: Metrics/BlockLength: Max: 100 Metrics/CyclomaticComplexity: - Max: 10 + Max: 12 Metrics/PerceivedComplexity: Max: 13 Metrics/AbcSize: diff --git a/CHANGELOG.md b/CHANGELOG.md index a9e0386..395f5e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ # Telemetry::SNMP +## [0.2.0] +* Fixing ENV variables to not use . +* Adding new device lock table for future reference +* Adding new Collector class + ## [0.1.0] - Initial release diff --git a/config.ru b/config.ru index 19c1008..9a6a73e 100644 --- a/config.ru +++ b/config.ru @@ -10,6 +10,5 @@ require 'multi_json' require 'telemetry/snmp' require 'telemetry/snmp/api' - -Telemetry::Snmp::Data.start! +Telemetry::Snmp.bootstrap run Telemetry::Snmp::API diff --git a/exe/snmp_collector b/exe/snmp_collector index 381aa9f..94dd9dc 100755 --- a/exe/snmp_collector +++ b/exe/snmp_collector @@ -9,8 +9,10 @@ trap('SIGINT') { @quit = true } @quit = false until @quit + sleep(1) @lines = [] - Telemetry::Snmp::Data::Model::Device.each do |row| + Telemetry::Snmp::Data::Model::Device.where(:active).each do |row| + break if @quit next if row.values[:last_polled].to_i + row.values[:frequency] > Time.now.to_i fields = {} @@ -20,10 +22,13 @@ until @quit env: row.values[:environment], dc: row.values[:datacenter], zone: row.values[:zone], - influxdb_node_group: 'snmp' + influxdb_node_group: 'snmp', + influxdb_database: 'snmp' } Telemetry::Snmp::Data::Model::OID.each do |oid_row| + break if @quit + oid_value = Telemetry::Snmp::Client.oid_value(row[:hostname], oid_row.values[:oid]) next if oid_value.nil? next unless oid_value.is_a?(Integer) || oid_value.is_a?(Float) @@ -34,7 +39,7 @@ until @quit end @lines.push Telemetry::Metrics::Parser.to_line_protocol( - measurement: 'palo_alto_a', + measurement: 'palo_alto', fields: fields, tags: tags, timestamp: (DateTime.now.strftime('%Q').to_i * 1000 * 1000) @@ -48,6 +53,7 @@ until @quit row.save rescue StandardError => e Telemetry::Logger.error "#{e.class}: #{e.message}" + Telemetry::Logger.error e.backtrace[0..20] end Telemetry::Snmp::Publisher.push_lines(@lines) unless @lines.empty? diff --git a/lib/telemetry/snmp.rb b/lib/telemetry/snmp.rb index ddee540..96050db 100644 --- a/lib/telemetry/snmp.rb +++ b/lib/telemetry/snmp.rb @@ -6,6 +6,7 @@ require 'telemetry/snmp/data' require 'telemetry/snmp/client' require 'telemetry/snmp/publisher' +require 'telemetry/snmp/collector' module Telemetry module Snmp diff --git a/lib/telemetry/snmp/api.rb b/lib/telemetry/snmp/api.rb index b5026a7..8991767 100644 --- a/lib/telemetry/snmp/api.rb +++ b/lib/telemetry/snmp/api.rb @@ -37,6 +37,20 @@ class API < Sinatra::Base response.body = Oj.dump(response.body, mode: :compat) unless response.body.is_a? String end + get '/' do + { + version: Telemetry::Snmp::VERSION, + migration_version: Telemetry::Snmp::Data.migration_version + } + end + + get '' do + { + version: Telemetry::Snmp::VERSION, + migration_version: Telemetry::Snmp::Data.migration_version + } + end + get '/version' do { version: Telemetry::Snmp::VERSION, @@ -44,6 +58,10 @@ class API < Sinatra::Base } end + get '/loop_devices' do + Telemetry::Snmp::Collector.loop_devices + end + namespace('/users') { register Telemetry::Snmp::Controller::Users } namespace('/devices/creds') { register Telemetry::Snmp::Controller::DeviceCreds } namespace('/devices') { register Telemetry::Snmp::Controller::Devices } diff --git a/lib/telemetry/snmp/collector.rb b/lib/telemetry/snmp/collector.rb new file mode 100644 index 0000000..b193d8e --- /dev/null +++ b/lib/telemetry/snmp/collector.rb @@ -0,0 +1,101 @@ +module Telemetry + module Snmp + module Collector + class << self + def worker_name + "#{::Socket.gethostname.tr('.', '_')}.#{::Process.pid}.#{Thread.current.object_id}" + end + + def loop_devices + Telemetry::Snmp::Data::Model::Device.where(:active).each do |row| + next if row.values[:last_polled].to_i + row.values[:frequency] > Time.now.to_i + next if device_locked?(row.values[:id]) + + collect(row.values[:id]) + end + end + + def unlock_expired_devices + Telemetry::Snmp::Data::Model::DeviceLock.each do |row| + next if row.values[:expires] < Sequel::CURRENT_TIMESTAMP + + row.delete + end + end + + def device_locked?(device_id) + !Telemetry::Snmp::Data::Model::DeviceLock[device_id: device_id].nil? + end + + def lock_device(device_id) + return false unless Telemetry::Snmp::Data::Model::DeviceLock[device_id: device_id].nil? + + Telemetry::Snmp::Data::Model::DeviceLock.insert( + worker_name: worker_name, + device_id: device_id, + created: Sequel::CURRENT_TIMESTAMP, + expires: Sequel::CURRENT_TIMESTAMP + ) + true + end + + # noinspection RubyArgCount + def unlock_device(device_id) + device = Telemetry::Snmp::Data::Model::DeviceLock[device_id: device_id] + return true if device.nil? + + device.delete + end + + def collect(device_id) + lock_device(device_id) + row = Telemetry::Snmp::Data::Model::Device[device_id] + lines = [] + fields = {} + tags = { + hostname: row.values[:hostname], + ip_address: row.values[:ip_address], + env: row.values[:environment], + dc: row.values[:datacenter], + zone: row.values[:zone], + influxdb_node_group: 'snmp', + influxdb_database: 'snmp' + } + + Telemetry::Snmp::Data::Model::OID.each do |oid_row| + break if @quit + + oid_value = Telemetry::Snmp::Client.oid_value(row[:hostname], oid_row.values[:oid]) + next if oid_value.nil? + next unless oid_value.is_a?(Integer) || oid_value.is_a?(Float) + + fields[oid_row.values[:name]] = + "#{Telemetry::Snmp::Client.oid_value(row[:hostname], oid_row.values[:oid])}i" + rescue StandardError => e + Telemetry::Logger.error "#{e.class}: #{e.message}" + end + + lines.push Telemetry::Metrics::Parser.to_line_protocol( + measurement: 'palo_alto', + fields: fields, + tags: tags, + timestamp: (DateTime.now.strftime('%Q').to_i * 1000 * 1000) + ) + + walker = Telemetry::Snmp::Client.grab_oid_metrics(row.values[:hostname]) + Telemetry::Logger.info "Pushing #{walker.count} lines for #{row.values[:hostname]}" unless walker.empty? + Telemetry::Snmp::Publisher.push_lines(walker) unless walker.empty? + + row.update(last_polled: Sequel::CURRENT_TIMESTAMP) + row.save + + Telemetry::Snmp::Publisher.push_lines(lines) unless lines.empty? + unlock_device(device_id) + rescue StandardError => e + Telemetry::Logger.exception(e, level: 'error') + unlock_device(device_id) + end + end + end + end +end diff --git a/lib/telemetry/snmp/data.rb b/lib/telemetry/snmp/data.rb index acb124a..873edbd 100644 --- a/lib/telemetry/snmp/data.rb +++ b/lib/telemetry/snmp/data.rb @@ -38,7 +38,7 @@ def load_models(*models_array) end def models - %w[user device_cred device oid oid_group oid_oid_groups oid_walk user_audit_log] + %w[user device_cred device device_lock oid oid_group oid_oid_groups oid_walk user_audit_log] end def connection(**opts) diff --git a/lib/telemetry/snmp/data/default_opts.rb b/lib/telemetry/snmp/data/default_opts.rb index b988d15..87baff5 100644 --- a/lib/telemetry/snmp/data/default_opts.rb +++ b/lib/telemetry/snmp/data/default_opts.rb @@ -19,52 +19,52 @@ def default_credentials module_function :default_credentials def adapter - ENV["#{env_key}.adapter"] == 'postgres' ? 'postgres' : 'mysql2' + ENV["#{env_key}_adapter"] == 'postgres' ? 'postgres' : 'mysql2' end module_function :adapter def username - ENV["#{env_key}.username"] || 'root' + ENV["#{env_key}_username"] || 'root' end module_function :username def password - ENV["#{env_key}.password"] || nil + ENV["#{env_key}_password"] || nil end module_function :password def database - ENV["#{env_key}.database"] || 'telemetry_snmp' + ENV["#{env_key}_database"] || 'telemetry_snmp' end module_function :database def host - ENV["#{env_key}.host"] || '127.0.0.1' + ENV["#{env_key}_host"] || '127.0.0.1' end module_function :host def port - ENV.key?("#{env_key}.port") ? ENV["#{env_key}.port"].to_i : 3306 + ENV.key?("#{env_key}_port") ? ENV["#{env_key}_port"].to_i : 3306 end module_function :port def max_connections - ENV.key?("#{env_key}.max_connections") ? ENV["#{env_key}.max_connections"].to_i : 16 + ENV.key?("#{env_key}_max_connections") ? ENV["#{env_key}_max_connections"].to_i : 16 end module_function :max_connections def pool_timeout - ENV.key?("#{env_key}.pool_timeout") ? ENV["#{env_key}.pool_timeout"].to_i : 2 + ENV.key?("#{env_key}_pool_timeout") ? ENV["#{env_key}_pool_timeout"].to_i : 2 end module_function :pool_timeout def preconnect - ENV["#{env_key}.preconnect"] || 'concurrently' + ENV["#{env_key}_preconnect"] || 'concurrently' end module_function :preconnect def env_key - ENV['conflux.data.key'] || 'telemetry.snmp.data' + 'telemetry_snmp_data' end module_function :env_key end diff --git a/lib/telemetry/snmp/data/migrations/011_device_locks.rb b/lib/telemetry/snmp/data/migrations/011_device_locks.rb new file mode 100644 index 0000000..69cad34 --- /dev/null +++ b/lib/telemetry/snmp/data/migrations/011_device_locks.rb @@ -0,0 +1,19 @@ +Sequel.migration do + change do + create_table(:device_locks) do + primary_key :id + String :worker_name, null: false + foreign_key :device_id, :devices, null: false, unique: true + + DateTime :created + DateTime :updated + DateTime :expires + + index :device_id + index :worker_name + index :created + index :updated + index :expires + end + end +end diff --git a/lib/telemetry/snmp/data/models/device.rb b/lib/telemetry/snmp/data/models/device.rb index 4beedd5..ed53e7f 100644 --- a/lib/telemetry/snmp/data/models/device.rb +++ b/lib/telemetry/snmp/data/models/device.rb @@ -4,6 +4,7 @@ module Data module Model class Device < Sequel::Model many_to_one :device_cred + many_to_one :device_lock end end end diff --git a/lib/telemetry/snmp/data/models/device_lock.rb b/lib/telemetry/snmp/data/models/device_lock.rb new file mode 100644 index 0000000..bb60001 --- /dev/null +++ b/lib/telemetry/snmp/data/models/device_lock.rb @@ -0,0 +1,11 @@ +module Telemetry + module Snmp + module Data + module Model + class DeviceLock < Sequel::Model + one_to_one :device + end + end + end + end +end diff --git a/lib/telemetry/snmp/publisher.rb b/lib/telemetry/snmp/publisher.rb index 6c443d2..71aa6c0 100644 --- a/lib/telemetry/snmp/publisher.rb +++ b/lib/telemetry/snmp/publisher.rb @@ -21,8 +21,8 @@ def opts end def username - if ENV.key? 'telemetry.snmp.amqp.username' - ENV['telemetry.snmp.amqp.username'] + if ENV.key? 'telemetry_snmp_amqp_username' + ENV['telemetry_snmp_amqp_username'] elsif opts[:amqp].key? :username opts[:amqp][:username] else @@ -31,16 +31,16 @@ def username end def password - ENV['telemetry.snmp.amqp.password'] || opts[:amqp][:password] || 'guest' + ENV['telemetry_snmp_amqp_password'] || opts[:amqp][:password] || 'guest' end def vhost - ENV['telemetry.snmp.amqp.vhost'] || opts[:amqp][:vhost] || 'telemetry' + ENV['telemetry_snmp_amqp_vhost'] || opts[:amqp][:vhost] || 'telemetry' end def port - if ENV.key? 'telemetry.snmp.amqp.port' - ENV['telemetry.snmp.amqp.port'].to_i + if ENV.key? 'telemetry_snmp_amqp_port' + ENV['telemetry_snmp_amqp_port'].to_i elsif opts[:amqp].key? :port opts[:amqp][:port] elsif use_ssl? @@ -51,8 +51,8 @@ def port end def use_ssl? - if ENV.key? 'telemetry.snmp.amqp.use_ssl' - %w[1 true].include? ENV['telemetry.snmp.amqp.use_ssl'] + if ENV.key? 'telemetry_snmp_amqp_use_ssl' + %w[1 true].include? ENV['telemetry_snmp_amqp_use_ssl'] elsif opts[:amqp].key?(:use_ssl) opts[:amqp][:use_ssl] else @@ -61,8 +61,8 @@ def use_ssl? end def nodes - if ENV.key?('telemetry.snmp.amqp.nodes') - ENV['telemetry.snmp.amqp.nodes'].split(',') + if ENV.key?('telemetry_snmp_amqp_nodes') + ENV['telemetry_snmp_amqp_nodes'].split(',') elsif opts[:amqp].key?(:nodes) opts[:amqp][:nodes] else @@ -71,7 +71,7 @@ def nodes end def exchange_name - ENV['telemetry.snmp.amqp.exchange_name'] || opts[:amqp][:exchange_name] || 'telemetry.snmp' + ENV['telemetry_snmp_amqp_exchange_name'] || opts[:amqp][:exchange_name] || 'telemetry.snmp' end def session diff --git a/lib/telemetry/snmp/version.rb b/lib/telemetry/snmp/version.rb index 2734e2e..d943abf 100644 --- a/lib/telemetry/snmp/version.rb +++ b/lib/telemetry/snmp/version.rb @@ -2,6 +2,6 @@ module Telemetry module Snmp - VERSION = '0.1.0' + VERSION = '0.2.0' end end diff --git a/spec/telemetry/publisher_spec.rb b/spec/telemetry/publisher_spec.rb index b4f8634..d5f1187 100644 --- a/spec/telemetry/publisher_spec.rb +++ b/spec/telemetry/publisher_spec.rb @@ -12,48 +12,48 @@ it 'should have a username' do expect(described_class.username).to eq 'guest' - ENV['telemetry.snmp.amqp.username'] = 'foobar' + ENV['telemetry_snmp_amqp_username'] = 'foobar' expect(described_class.username).to eq 'foobar' - ENV['telemetry.snmp.amqp.username'] = nil + ENV['telemetry_snmp_amqp_username'] = nil end it 'should have a password' do expect(described_class.password).to eq 'guest' - ENV['telemetry.snmp.amqp.password'] = 'foobar' + ENV['telemetry_snmp_amqp_password'] = 'foobar' expect(described_class.password).to eq 'foobar' - ENV['telemetry.snmp.amqp.password'] = nil + ENV['telemetry_snmp_amqp_password'] = nil expect(described_class.password).to eq 'guest' end it 'should have a vhost' do expect(described_class.vhost).to eq 'telemetry' - ENV['telemetry.snmp.amqp.vhost'] = 'foobar' + ENV['telemetry_snmp_amqp_vhost'] = 'foobar' expect(described_class.vhost).to eq 'foobar' - ENV['telemetry.snmp.amqp.vhost'] = nil + ENV['telemetry_snmp_amqp_vhost'] = nil expect(described_class.vhost).to eq 'telemetry' end it 'should have a port' do expect(described_class.port).to eq 5672 - ENV['telemetry.snmp.amqp.port'] = '8811' + ENV['telemetry_snmp_amqp_port'] = '8811' expect(described_class.port).to eq 8811 - ENV['telemetry.snmp.amqp.port'] = nil + ENV['telemetry_snmp_amqp_port'] = nil expect(described_class.port).to eq 5672 end it 'should have nodes' do expect(described_class.nodes).to be_a Array expect(described_class.nodes).to eq ['localhost'] - ENV['telemetry.snmp.amqp.nodes'] = 'foo,bar' + ENV['telemetry_snmp_amqp_nodes'] = 'foo,bar' expect(described_class.nodes).to eq %w[foo bar] - ENV['telemetry.snmp.amqp.nodes'] = nil + ENV['telemetry_snmp_amqp_nodes'] = nil end it 'should have an exchange name' do expect(described_class.exchange_name).to eq 'telemetry.snmp' - ENV['telemetry.snmp.amqp.exchange_name'] = 'test_exchange' - expect(described_class.exchange_name).to eq 'test_exchange' - ENV['telemetry.snmp.amqp.exchange_name'] = nil + ENV['telemetry_snmp_amqp_exchange_name'] = 'test.exchange' + expect(described_class.exchange_name).to eq 'test.exchange' + ENV['telemetry_snmp_amqp_exchange_name'] = nil end it 'should have publlish_opts' do @@ -74,15 +74,15 @@ it 'should support use_ssl' do expect(described_class.use_ssl?).to eq false - ENV['telemetry.snmp.amqp.use_ssl'] = 'true' + ENV['telemetry_snmp_amqp_use_ssl'] = 'true' expect(described_class.use_ssl?).to eq true - ENV['telemetry.snmp.amqp.use_ssl'] = 'foobar' + ENV['telemetry_snmp_amqp_use_ssl'] = 'foobar' expect(described_class.use_ssl?).to eq false - ENV['telemetry.snmp.amqp.use_ssl'] = '1' + ENV['telemetry_snmp_amqp_use_ssl'] = '1' expect(described_class.use_ssl?).to eq true - ENV['telemetry.snmp.amqp.use_ssl'] = '0' + ENV['telemetry_snmp_amqp_use_ssl'] = '0' expect(described_class.use_ssl?).to eq false - ENV['telemetry.snmp.amqp.use_ssl'] = nil + ENV['telemetry_snmp_amqp_use_ssl'] = nil expect(described_class.use_ssl?).to eq false end @@ -93,7 +93,7 @@ it 'should be able to start! with options' do opts = { amqp: { - vhost: 'telegraf', + vhost: '/', nodes: ['localhost', '127.0.0.1'], exchange_name: 'telemetry.snmp', port: 5672,