Skip to content

Commit

Permalink
run Telemetry::Worker every metrics_aggregation_interval_seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
anmarchenko committed Jul 5, 2024
1 parent 49df7e4 commit 58cdd9c
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 11 deletions.
2 changes: 2 additions & 0 deletions lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def build_telemetry(settings, agent_settings, logger)

Telemetry::Component.new(
enabled: enabled,
metrics_enabled: enabled && settings.telemetry.metrics_enabled,
heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: settings.telemetry.metrics_aggregation_interval_seconds,
dependency_collection: settings.telemetry.dependency_collection
)
end
Expand Down
10 changes: 9 additions & 1 deletion lib/datadog/core/telemetry/component.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,23 @@ class Component
include Core::Utils::Forking

# @param enabled [Boolean] Determines whether telemetry events should be sent to the API
# @param metrics_enabled [Boolean] Determines whether telemetry metrics should be sent to the API
# @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds.
# @param metrics_aggregation_interval_seconds [Float] How frequently metrics will be aggregated, in seconds.
# @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event
def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true)
def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
dependency_collection:, enabled: true,
metrics_enabled: true
)
@enabled = enabled
@stopped = false

@worker = Telemetry::Worker.new(
enabled: @enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: Emitter.new,
dependency_collection: dependency_collection
)
Expand Down
14 changes: 11 additions & 3 deletions lib/datadog/core/telemetry/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Worker

def initialize(
heartbeat_interval_seconds:,
metrics_aggregation_interval_seconds:,
emitter:,
dependency_collection:,
enabled: true,
Expand All @@ -30,10 +31,13 @@ def initialize(
@emitter = emitter
@dependency_collection = dependency_collection

@ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i
@current_ticks = 0

# Workers::Polling settings
self.enabled = enabled
# Workers::IntervalLoop settings
self.loop_base_interval = heartbeat_interval_seconds
self.loop_base_interval = metrics_aggregation_interval_seconds
self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP

@shutdown_timeout = shutdown_timeout
Expand Down Expand Up @@ -75,10 +79,14 @@ def perform(*events)
return if !enabled? || forked?

started! unless sent_started_event?
# flush metrics here
flush_events(events)

heartbeat!
@current_ticks += 1
return if @current_ticks < @ticks_per_heartbeat

flush_events(events)
@current_ticks = 0
heartbeat!
end

def flush_events(events)
Expand Down
2 changes: 1 addition & 1 deletion sig/datadog/core/telemetry/component.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Datadog

include Core::Utils::Forking

def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void
def initialize: (heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, dependency_collection: bool, ?enabled: bool, ?metrics_enabled: bool) -> void

def disable!: () -> void

Expand Down
4 changes: 3 additions & 1 deletion sig/datadog/core/telemetry/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ module Datadog
@shutdown_timeout: Integer
@buffer_size: Integer
@dependency_collection: bool
@ticks_per_heartbeat: Integer
@current_ticks: Integer

def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void
def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void

def start: () -> void

Expand Down
8 changes: 6 additions & 2 deletions spec/datadog/core/configuration/components_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,14 @@
context 'given settings' do
let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) }
let(:expected_options) do
{ enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds,
{ enabled: enabled, metrics_enabled: metrics_enabled, heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
dependency_collection: dependency_collection }
end
let(:enabled) { true }
let(:metrics_enabled) { true }
let(:heartbeat_interval_seconds) { 60 }
let(:metrics_aggregation_interval_seconds) { 10 }
let(:dependency_collection) { true }

before do
Expand All @@ -246,7 +249,8 @@

context 'and :unix agent adapter' do
let(:expected_options) do
{ enabled: false, heartbeat_interval_seconds: heartbeat_interval_seconds,
{ enabled: false, metrics_enabled: false, heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
dependency_collection: dependency_collection }
end
let(:agent_settings) do
Expand Down
6 changes: 6 additions & 0 deletions spec/datadog/core/telemetry/component_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,25 @@
subject(:telemetry) do
described_class.new(
enabled: enabled,
metrics_enabled: metrics_enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
dependency_collection: dependency_collection
)
end

let(:enabled) { true }
let(:metrics_enabled) { true }
let(:heartbeat_interval_seconds) { 0 }
let(:metrics_aggregation_interval_seconds) { 1 }
let(:dependency_collection) { true }
let(:worker) { double(Datadog::Core::Telemetry::Worker) }
let(:not_found) { false }

before do
allow(Datadog::Core::Telemetry::Worker).to receive(:new).with(
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
dependency_collection: dependency_collection,
enabled: enabled,
emitter: an_instance_of(Datadog::Core::Telemetry::Emitter)
Expand All @@ -40,6 +45,7 @@
subject(:telemetry) do
described_class.new(
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
dependency_collection: dependency_collection
)
end
Expand Down
11 changes: 8 additions & 3 deletions spec/datadog/core/telemetry/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
described_class.new(
enabled: enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: emitter,
dependency_collection: dependency_collection
)
end

let(:enabled) { true }
let(:heartbeat_interval_seconds) { 0.5 }
let(:metrics_aggregation_interval_seconds) { 0.25 }
let(:emitter) { double(Datadog::Core::Telemetry::Emitter) }
let(:dependency_collection) { false }

Expand Down Expand Up @@ -58,7 +60,7 @@
it 'creates a new worker in stopped state' do
expect(worker).to have_attributes(
enabled?: true,
loop_base_interval: heartbeat_interval_seconds,
loop_base_interval: metrics_aggregation_interval_seconds,
run_async?: false,
running?: false,
started?: false
Expand Down Expand Up @@ -94,7 +96,7 @@

expect(worker).to have_attributes(
enabled?: true,
loop_base_interval: heartbeat_interval_seconds,
loop_base_interval: metrics_aggregation_interval_seconds,
run_async?: true,
running?: true,
started?: true
Expand Down Expand Up @@ -152,6 +154,7 @@

context 'when app-started event exhausted retries' do
let(:heartbeat_interval_seconds) { 0.1 }
let(:metrics_aggregation_interval_seconds) { 0.05 }

it 'stops retrying, never sends heartbeat, and disables worker' do
expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted))
Expand Down Expand Up @@ -239,6 +242,7 @@
described_class.new(
enabled: enabled,
heartbeat_interval_seconds: heartbeat_interval_seconds,
metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds,
emitter: emitter,
dependency_collection: dependency_collection
)
Expand Down Expand Up @@ -270,11 +274,12 @@

describe '#stop' do
let(:heartbeat_interval_seconds) { 60 }
let(:metrics_aggregation_interval_seconds) { 30 }

it 'flushes events and stops the worker' do
worker.start

try_wait_until { @received_heartbeat }
try_wait_until { @received_started }

events_received = 0
mutex = Mutex.new
Expand Down

0 comments on commit 58cdd9c

Please sign in to comment.