From 58cdd9c71971b6b2f92aa746dbe769f41e7ec95e Mon Sep 17 00:00:00 2001 From: Andrey Marchenko Date: Fri, 5 Jul 2024 16:06:00 +0200 Subject: [PATCH] run Telemetry::Worker every metrics_aggregation_interval_seconds --- lib/datadog/core/configuration/components.rb | 2 ++ lib/datadog/core/telemetry/component.rb | 10 +++++++++- lib/datadog/core/telemetry/worker.rb | 14 +++++++++++--- sig/datadog/core/telemetry/component.rbs | 2 +- sig/datadog/core/telemetry/worker.rbs | 4 +++- spec/datadog/core/configuration/components_spec.rb | 8 ++++++-- spec/datadog/core/telemetry/component_spec.rb | 6 ++++++ spec/datadog/core/telemetry/worker_spec.rb | 11 ++++++++--- 8 files changed, 46 insertions(+), 11 deletions(-) diff --git a/lib/datadog/core/configuration/components.rb b/lib/datadog/core/configuration/components.rb index 665485c6f0c..7aea57f5ee1 100644 --- a/lib/datadog/core/configuration/components.rb +++ b/lib/datadog/core/configuration/components.rb @@ -64,7 +64,9 @@ def build_telemetry(settings, agent_settings, logger) Telemetry::Component.new( enabled: enabled, + metrics_enabled: enabled && settings.telemetry.metrics_enabled, heartbeat_interval_seconds: settings.telemetry.heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: settings.telemetry.metrics_aggregation_interval_seconds, dependency_collection: settings.telemetry.dependency_collection ) end diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index 0d5046e4391..f3fe8c63ac5 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -15,15 +15,23 @@ class Component include Core::Utils::Forking # @param enabled [Boolean] Determines whether telemetry events should be sent to the API + # @param metrics_enabled [Boolean] Determines whether telemetry metrics should be sent to the API # @param heartbeat_interval_seconds [Float] How frequently heartbeats will be reported, in seconds. + # @param metrics_aggregation_interval_seconds [Float] How frequently metrics will be aggregated, in seconds. # @param [Boolean] dependency_collection Whether to send the `app-dependencies-loaded` event - def initialize(heartbeat_interval_seconds:, dependency_collection:, enabled: true) + def initialize( + heartbeat_interval_seconds:, + metrics_aggregation_interval_seconds:, + dependency_collection:, enabled: true, + metrics_enabled: true + ) @enabled = enabled @stopped = false @worker = Telemetry::Worker.new( enabled: @enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: Emitter.new, dependency_collection: dependency_collection ) diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 7706ac7ce30..4add8057cfb 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -21,6 +21,7 @@ class Worker def initialize( heartbeat_interval_seconds:, + metrics_aggregation_interval_seconds:, emitter:, dependency_collection:, enabled: true, @@ -30,10 +31,13 @@ def initialize( @emitter = emitter @dependency_collection = dependency_collection + @ticks_per_heartbeat = (heartbeat_interval_seconds / metrics_aggregation_interval_seconds).to_i + @current_ticks = 0 + # Workers::Polling settings self.enabled = enabled # Workers::IntervalLoop settings - self.loop_base_interval = heartbeat_interval_seconds + self.loop_base_interval = metrics_aggregation_interval_seconds self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP @shutdown_timeout = shutdown_timeout @@ -75,10 +79,14 @@ def perform(*events) return if !enabled? || forked? started! unless sent_started_event? + # flush metrics here + flush_events(events) - heartbeat! + @current_ticks += 1 + return if @current_ticks < @ticks_per_heartbeat - flush_events(events) + @current_ticks = 0 + heartbeat! end def flush_events(events) diff --git a/sig/datadog/core/telemetry/component.rbs b/sig/datadog/core/telemetry/component.rbs index 4d411d32578..dac3d41bba4 100644 --- a/sig/datadog/core/telemetry/component.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -10,7 +10,7 @@ module Datadog include Core::Utils::Forking - def initialize: (heartbeat_interval_seconds: Numeric, dependency_collection: bool, ?enabled: bool) -> void + def initialize: (heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, dependency_collection: bool, ?enabled: bool, ?metrics_enabled: bool) -> void def disable!: () -> void diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 822b9fece95..4e0f87a3249 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -17,8 +17,10 @@ module Datadog @shutdown_timeout: Integer @buffer_size: Integer @dependency_collection: bool + @ticks_per_heartbeat: Integer + @current_ticks: Integer - def initialize: (?enabled: bool, heartbeat_interval_seconds: Numeric, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void + def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, ?shutdown_timeout: Integer, ?buffer_size: Integer, dependency_collection: bool) -> void def start: () -> void diff --git a/spec/datadog/core/configuration/components_spec.rb b/spec/datadog/core/configuration/components_spec.rb index 79c65637f91..1ccea4861b7 100644 --- a/spec/datadog/core/configuration/components_spec.rb +++ b/spec/datadog/core/configuration/components_spec.rb @@ -225,11 +225,14 @@ context 'given settings' do let(:telemetry) { instance_double(Datadog::Core::Telemetry::Component) } let(:expected_options) do - { enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + { enabled: enabled, metrics_enabled: metrics_enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection } end let(:enabled) { true } + let(:metrics_enabled) { true } let(:heartbeat_interval_seconds) { 60 } + let(:metrics_aggregation_interval_seconds) { 10 } let(:dependency_collection) { true } before do @@ -246,7 +249,8 @@ context 'and :unix agent adapter' do let(:expected_options) do - { enabled: false, heartbeat_interval_seconds: heartbeat_interval_seconds, + { enabled: false, metrics_enabled: false, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection } end let(:agent_settings) do diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index ba17d37c2f4..b641e48bbda 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -6,13 +6,17 @@ subject(:telemetry) do described_class.new( enabled: enabled, + metrics_enabled: metrics_enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection ) end let(:enabled) { true } + let(:metrics_enabled) { true } let(:heartbeat_interval_seconds) { 0 } + let(:metrics_aggregation_interval_seconds) { 1 } let(:dependency_collection) { true } let(:worker) { double(Datadog::Core::Telemetry::Worker) } let(:not_found) { false } @@ -20,6 +24,7 @@ before do allow(Datadog::Core::Telemetry::Worker).to receive(:new).with( heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection, enabled: enabled, emitter: an_instance_of(Datadog::Core::Telemetry::Emitter) @@ -40,6 +45,7 @@ subject(:telemetry) do described_class.new( heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, dependency_collection: dependency_collection ) end diff --git a/spec/datadog/core/telemetry/worker_spec.rb b/spec/datadog/core/telemetry/worker_spec.rb index 54b23ca6e79..3f141a738ed 100644 --- a/spec/datadog/core/telemetry/worker_spec.rb +++ b/spec/datadog/core/telemetry/worker_spec.rb @@ -7,6 +7,7 @@ described_class.new( enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: emitter, dependency_collection: dependency_collection ) @@ -14,6 +15,7 @@ let(:enabled) { true } let(:heartbeat_interval_seconds) { 0.5 } + let(:metrics_aggregation_interval_seconds) { 0.25 } let(:emitter) { double(Datadog::Core::Telemetry::Emitter) } let(:dependency_collection) { false } @@ -58,7 +60,7 @@ it 'creates a new worker in stopped state' do expect(worker).to have_attributes( enabled?: true, - loop_base_interval: heartbeat_interval_seconds, + loop_base_interval: metrics_aggregation_interval_seconds, run_async?: false, running?: false, started?: false @@ -94,7 +96,7 @@ expect(worker).to have_attributes( enabled?: true, - loop_base_interval: heartbeat_interval_seconds, + loop_base_interval: metrics_aggregation_interval_seconds, run_async?: true, running?: true, started?: true @@ -152,6 +154,7 @@ context 'when app-started event exhausted retries' do let(:heartbeat_interval_seconds) { 0.1 } + let(:metrics_aggregation_interval_seconds) { 0.05 } it 'stops retrying, never sends heartbeat, and disables worker' do expect(emitter).to receive(:request).with(an_instance_of(Datadog::Core::Telemetry::Event::AppStarted)) @@ -239,6 +242,7 @@ described_class.new( enabled: enabled, heartbeat_interval_seconds: heartbeat_interval_seconds, + metrics_aggregation_interval_seconds: metrics_aggregation_interval_seconds, emitter: emitter, dependency_collection: dependency_collection ) @@ -270,11 +274,12 @@ describe '#stop' do let(:heartbeat_interval_seconds) { 60 } + let(:metrics_aggregation_interval_seconds) { 30 } it 'flushes events and stops the worker' do worker.start - try_wait_until { @received_heartbeat } + try_wait_until { @received_started } events_received = 0 mutex = Mutex.new