Skip to content

Commit

Permalink
Merge pull request #159 from DataDog/remeh/4.8.2-wo-karim-prs
Browse files Browse the repository at this point in the history
Reverts #151 & #146 to release 4.8.2
  • Loading branch information
remeh authored Nov 16, 2020
2 parents da23442 + 5b57253 commit e665cfb
Show file tree
Hide file tree
Showing 27 changed files with 496 additions and 3,125 deletions.
4 changes: 2 additions & 2 deletions dogstatsd-ruby.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ Gem::Specification.new do |s|
s.name = "dogstatsd-ruby"
s.version = Datadog::Statsd::VERSION

s.authors = ["Rein Henrichs", "Karim Bogtob"]
s.authors = ["Rein Henrichs"]

s.summary = "A Ruby DogStatsd client"
s.description = "A Ruby DogStatsd client"
s.description = "A Ruby DogStastd client"
s.email = "[email protected]"

s.metadata = {
Expand Down
134 changes: 64 additions & 70 deletions lib/datadog/statsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
require_relative 'statsd/telemetry'
require_relative 'statsd/udp_connection'
require_relative 'statsd/uds_connection'
require_relative 'statsd/message_buffer'
require_relative 'statsd/batch'
require_relative 'statsd/serialization'
require_relative 'statsd/sender'
require_relative 'statsd/forwarder'

# = Datadog::Statsd: A DogStatsd client (https://www.datadoghq.com)
#
Expand All @@ -28,17 +26,12 @@
# statsd = Datadog::Statsd.new 'localhost', 8125, tags: 'tag1:true'
module Datadog
class Statsd
class Error < StandardError
end

OK = 0
WARNING = 1
CRITICAL = 2
UNKNOWN = 3

UDP_DEFAULT_BUFFER_SIZE = 1_432
UDS_DEFAULT_BUFFER_SIZE = 8_192
DEFAULT_BUFFER_POOL_SIZE = Float::INFINITY
DEFAULT_BUFFER_SIZE = 8 * 1_024
MAX_EVENT_SIZE = 8 * 1_024
# minimum flush interval for the telemetry in seconds
DEFAULT_TELEMETRY_FLUSH_INTERVAL = 10
Expand All @@ -58,59 +51,67 @@ def tags
serializer.global_tags
end

# Buffer containing the statsd message before they are sent in batch
attr_reader :buffer

# Maximum buffer size in bytes before it is flushed
attr_reader :max_buffer_bytes

# Default sample rate
attr_reader :sample_rate

# Connection
attr_reader :connection

# @param [String] host your statsd host
# @param [Integer] port your statsd port
# @option [String] namespace set a namespace to be prepended to every metric name
# @option [Array<String>|Hash] tags tags to be added to every metric
# @option [Logger] logger for debugging
# @option [Integer] buffer_max_payload_size max bytes to buffer
# @option [Integer] buffer_max_pool_size max messages to buffer
# @option [Integer] max_buffer_bytes max bytes to buffer when using #batch
# @option [String] socket_path unix socket path
# @option [Float] default sample rate if not overridden
def initialize(
host = nil,
port = nil,
socket_path: nil,

namespace: nil,
tags: nil,
sample_rate: nil,

buffer_max_payload_size: nil,
buffer_max_pool_size: nil,
buffer_overflowing_stategy: :drop,

max_buffer_bytes: DEFAULT_BUFFER_SIZE,
socket_path: nil,
logger: nil,

telemetry_enable: true,
sample_rate: nil,
disable_telemetry: false,
telemetry_flush_interval: DEFAULT_TELEMETRY_FLUSH_INTERVAL
)
unless tags.nil? || tags.is_a?(Array) || tags.is_a?(Hash)
raise ArgumentError, 'tags must be an array of string tags or a Hash'
raise ArgumentError, 'tags must be a Array<String> or a Hash'
end

@namespace = namespace
@prefix = @namespace ? "#{@namespace}.".freeze : nil

@serializer = Serialization::Serializer.new(prefix: @prefix, global_tags: tags)
@sample_rate = sample_rate

@forwarder = Forwarder.new(
host: host,
port: port,
socket_path: socket_path,
transport_type = socket_path.nil? ? :udp : :uds

@telemetry = Telemetry.new(disable_telemetry, telemetry_flush_interval,
global_tags: tags,
logger: logger,
transport_type: transport_type
)

buffer_max_payload_size: buffer_max_payload_size,
buffer_max_pool_size: buffer_max_pool_size,
buffer_overflowing_stategy: buffer_overflowing_stategy,
@connection = case transport_type
when :udp
UDPConnection.new(host, port, logger, telemetry)
when :uds
UDSConnection.new(socket_path, logger, telemetry)
end

telemetry_flush_interval: telemetry_enable ? telemetry_flush_interval : nil,
)
@logger = logger

@sample_rate = sample_rate

# we reduce max_buffer_bytes by a the rough estimate of the telemetry payload
@batch = Batch.new(connection, (max_buffer_bytes - telemetry.estimate_max_size))
end

# yield a new instance to a block and close it when done
Expand Down Expand Up @@ -269,9 +270,9 @@ def set(stat, value, opts = EMPTY_OPTIONS)
# @example Report a critical service check status
# $statsd.service_check('my.service.check', Statsd::CRITICAL, :tags=>['urgent'])
def service_check(name, status, opts = EMPTY_OPTIONS)
telemetry.sent(service_checks: 1) if telemetry
telemetry.sent(service_checks: 1)

forwarder.send_message(serializer.to_service_check(name, status, opts))
send_stat(serializer.to_service_check(name, status, opts))
end

# This end point allows you to post events to the stream. You can tag them, set priority and even aggregate them with other events.
Expand All @@ -293,48 +294,33 @@ def service_check(name, status, opts = EMPTY_OPTIONS)
# @example Report an awful event:
# $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent'])
def event(title, text, opts = EMPTY_OPTIONS)
telemetry.sent(events: 1) if telemetry
telemetry.sent(events: 1)

forwarder.send_message(serializer.to_event(title, text, opts))
send_stat(serializer.to_event(title, text, opts))
end

# Close the underlying socket
def close
forwarder.close
end

def sync_with_outbound_io
forwarder.sync_with_outbound_io
end

# Flush the buffer into the connection
def flush(flush_telemetry: false, sync: false)
forwarder.flush(flush_telemetry: flush_telemetry, sync: sync)
end

def telemetry
forwarder.telemetry
end

def host
forwarder.host
end

def port
forwarder.port
end

def socket_path
forwarder.socket_path
# Send several metrics in the same UDP Packet
# They will be buffered and flushed when the block finishes
#
# @example Send several metrics in one packet:
# $statsd.batch do |s|
# s.gauge('users.online',156)
# s.increment('page.views')
# end
def batch
@batch.open do
yield self
end
end

def transport_type
forwarder.transport_type
# Close the underlying socket
def close
connection.close
end

private
attr_reader :serializer
attr_reader :forwarder
attr_reader :telemetry

PROCESS_TIME_SUPPORTED = (RUBY_VERSION >= '2.1.0')
EMPTY_OPTIONS = {}.freeze
Expand All @@ -350,14 +336,22 @@ def now
end

def send_stats(stat, delta, type, opts = EMPTY_OPTIONS)
telemetry.sent(metrics: 1) if telemetry
telemetry.sent(metrics: 1)

sample_rate = opts[:sample_rate] || @sample_rate || 1

if sample_rate == 1 || rand <= sample_rate
full_stat = serializer.to_stat(stat, delta, type, tags: opts[:tags], sample_rate: sample_rate)

forwarder.send_message(full_stat)
send_stat(full_stat)
end
end

def send_stat(message)
if @batch.open?
@batch.add(message)
else
@connection.write(message)
end
end
end
Expand Down
56 changes: 56 additions & 0 deletions lib/datadog/statsd/batch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

module Datadog
class Statsd
class Batch
def initialize(connection, max_buffer_bytes)
@connection = connection
@max_buffer_bytes = max_buffer_bytes
@depth = 0
reset
end

def open
@depth += 1

yield
ensure
@depth -= 1
flush if !open?
end

def open?
@depth > 0
end

def add(message)
message_bytes = message.bytesize

unless @buffer_bytes == 0
if @buffer_bytes + 1 + message_bytes >= @max_buffer_bytes
flush
else
@buffer << "\n"
@buffer_bytes += 1
end
end

@buffer << message
@buffer_bytes += message_bytes
end

def flush
return if @buffer_bytes == 0
@connection.write(@buffer)
reset
end

private

def reset
@buffer = String.new
@buffer_bytes = 0
end
end
end
end
13 changes: 8 additions & 5 deletions lib/datadog/statsd/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
module Datadog
class Statsd
class Connection
def initialize(telemetry: nil, logger: nil)
def initialize(telemetry)
@telemetry = telemetry
@logger = logger
end

# Close the underlying socket
Expand All @@ -21,11 +20,15 @@ def close
def write(payload)
logger.debug { "Statsd: #{payload}" } if logger

flush_telemetry = telemetry.flush?

payload += telemetry.flush if flush_telemetry

send_message(payload)

telemetry.sent(packets: 1, bytes: payload.length) if telemetry
telemetry.reset if flush_telemetry

true
telemetry.sent(packets: 1, bytes: payload.length)
rescue StandardError => boom
# Try once to reconnect if the socket has been closed
retries ||= 1
Expand All @@ -42,7 +45,7 @@ def write(payload)
end
end

telemetry.dropped(packets: 1, bytes: payload.length) if telemetry
telemetry.dropped(packets: 1, bytes: payload.length)
logger.error { "Statsd: #{boom.class} #{boom}" } if logger
nil
end
Expand Down
Loading

0 comments on commit e665cfb

Please sign in to comment.