From 9446a30e56d16dff04c98593a251cecbe872e147 Mon Sep 17 00:00:00 2001 From: Frederik Spang Date: Tue, 29 Oct 2024 14:05:22 +0100 Subject: [PATCH] Add child_spans for Sidekiq Queue instrumentation (#2403) * Add childspan for Sidekiq Queue instrumentation * Add basic specs * Remove unneeded exception alloc * Add CHANGELOG entry * Set root op instead of nested spans * Adjust feedback * Remove timecop_delay from options hash * Fix 1 day * Fix 1.day * Dont use .day helper --- CHANGELOG.md | 3 +- sentry-sidekiq/Gemfile | 2 + .../sidekiq/sentry_context_middleware.rb | 35 ++++++++++++++--- .../sidekiq/sentry_context_middleware_spec.rb | 39 +++++++++++++++++++ sentry-sidekiq/spec/sentry/sidekiq_spec.rb | 2 +- sentry-sidekiq/spec/spec_helper.rb | 9 ++++- 6 files changed, 81 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a11809b02..7d2360ffd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,7 +3,8 @@ ### Features - Add `include_sentry_event` matcher for RSpec [#2424](https://github.com/getsentry/sentry-ruby/pull/2424) -- Add support for Sentry Cache instrumentation, when using Rails.cache [#2380](https://github.com/getsentry/sentry-ruby/pull/2380) +- Add support for Sentry Cache instrumentation, when using Rails.cache ([#2380](https://github.com/getsentry/sentry-ruby/pull/2380)) +- Add support for Queue Instrumentation for Sidekiq. [#2403](https://github.com/getsentry/sentry-ruby/pull/2403) Note: MemoryStore and FileStore require Rails 8.0+ diff --git a/sentry-sidekiq/Gemfile b/sentry-sidekiq/Gemfile index c860cb206..8448e3b77 100644 --- a/sentry-sidekiq/Gemfile +++ b/sentry-sidekiq/Gemfile @@ -25,4 +25,6 @@ end gem "rails", "> 5.0.0" +gem "timecop" + eval_gemfile File.expand_path("../Gemfile", __dir__) diff --git a/sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb b/sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb index e43acb653..690f26ade 100644 --- a/sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb +++ b/sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb @@ -4,11 +4,24 @@ module Sentry module Sidekiq + module Helpers + def set_span_data(span, id:, queue:, latency: nil, retry_count: nil) + if span + span.set_data("messaging.message.id", id) + span.set_data("messaging.destination.name", queue) + span.set_data("messaging.message.receive.latency", latency) if latency + span.set_data("messaging.message.retry.count", retry_count) if retry_count + end + end + end + class SentryContextServerMiddleware - OP_NAME = "queue.sidekiq" + include Sentry::Sidekiq::Helpers + + OP_NAME = "queue.process" SPAN_ORIGIN = "auto.queue.sidekiq" - def call(_worker, job, queue) + def call(worker, job, queue) return yield unless Sentry.initialized? context_filter = Sentry::Sidekiq::ContextFilter.new(job) @@ -23,7 +36,12 @@ def call(_worker, job, queue) scope.set_contexts(sidekiq: job.merge("queue" => queue)) scope.set_transaction_name(context_filter.transaction_name, source: :task) transaction = start_transaction(scope, job["trace_propagation_headers"]) - scope.set_span(transaction) if transaction + + if transaction + scope.set_span(transaction) + + set_span_data(transaction, id: job["jid"], queue: queue, latency: ((Time.now.to_f - job["enqueued_at"]) * 1000).to_i, retry_count: job["retry_count"] || 0) + end begin yield @@ -63,13 +81,20 @@ def finish_transaction(transaction, status) end class SentryContextClientMiddleware - def call(_worker_class, job, _queue, _redis_pool) + include Sentry::Sidekiq::Helpers + + def call(worker_class, job, queue, _redis_pool) return yield unless Sentry.initialized? user = Sentry.get_current_scope.user job["sentry_user"] = user unless user.empty? job["trace_propagation_headers"] ||= Sentry.get_trace_propagation_headers - yield + + Sentry.with_child_span(op: "queue.publish", description: worker_class.to_s) do |span| + set_span_data(span, id: job["jid"], queue: queue) + + yield + end end end end diff --git a/sentry-sidekiq/spec/sentry/sidekiq/sentry_context_middleware_spec.rb b/sentry-sidekiq/spec/sentry/sidekiq/sentry_context_middleware_spec.rb index de5e9416f..ea930ee4e 100644 --- a/sentry-sidekiq/spec/sentry/sidekiq/sentry_context_middleware_spec.rb +++ b/sentry-sidekiq/spec/sentry/sidekiq/sentry_context_middleware_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "spec_helper" +require "timecop" RSpec.shared_context "sidekiq", shared_context: :metadata do let(:user) { { "id" => rand(10_000) } } @@ -65,6 +66,30 @@ expect(transaction.contexts.dig(:trace, :origin)).to eq('auto.queue.sidekiq') end + it "adds a queue.process spans" do + Timecop.freeze do + execute_worker(processor, HappyWorker) + execute_worker(processor, HappyWorker, jid: '123456', timecop_delay: Time.now + 86400) + + expect(transport.events.count).to eq(2) + + transaction = transport.events[0] + expect(transaction).not_to be_nil + expect(transaction.spans.count).to eq(0) + expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123123') # Default defined in #execute_worker + expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default') + expect(transaction.contexts[:trace][:data]['messaging.message.retry.count']).to eq(0) + expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(0) + + transaction = transport.events[1] + expect(transaction).not_to be_nil + expect(transaction.spans.count).to eq(0) + expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123456') # Explicitly set above. + expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default') + expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(86400000) + end + end + context "with trace_propagation_headers" do let(:parent_transaction) { Sentry.start_transaction(op: "sidekiq") } @@ -73,6 +98,7 @@ execute_worker(processor, HappyWorker, trace_propagation_headers: trace_propagation_headers) expect(transport.events.count).to eq(1) + transaction = transport.events[0] expect(transaction).not_to be_nil expect(transaction.contexts.dig(:trace, :trace_id)).to eq(parent_transaction.trace_id) @@ -156,5 +182,18 @@ expect(second_headers["sentry-trace"]).to eq(transaction.to_sentry_trace) expect(second_headers["baggage"]).to eq(transaction.to_baggage) end + + it "has a queue.publish span" do + message_id = client.push('queue' => 'default', 'class' => HappyWorker, 'args' => []) + + transaction.finish + + expect(transport.events.count).to eq(1) + event = transport.events.last + expect(event.spans.count).to eq(1) + expect(event.spans[0][:op]).to eq("queue.publish") + expect(event.spans[0][:data]['messaging.message.id']).to eq(message_id) + expect(event.spans[0][:data]['messaging.destination.name']).to eq('default') + end end end diff --git a/sentry-sidekiq/spec/sentry/sidekiq_spec.rb b/sentry-sidekiq/spec/sentry/sidekiq_spec.rb index 64fdec5c1..4c788c9ce 100644 --- a/sentry-sidekiq/spec/sentry/sidekiq_spec.rb +++ b/sentry-sidekiq/spec/sentry/sidekiq_spec.rb @@ -223,7 +223,7 @@ def retry_last_failed_job expect(transaction.contexts.dig(:trace, :trace_id)).to be_a(String) expect(transaction.contexts.dig(:trace, :span_id)).to be_a(String) expect(transaction.contexts.dig(:trace, :status)).to eq("ok") - expect(transaction.contexts.dig(:trace, :op)).to eq("queue.sidekiq") + expect(transaction.contexts.dig(:trace, :op)).to eq("queue.process") end it "records transaction with exception" do diff --git a/sentry-sidekiq/spec/spec_helper.rb b/sentry-sidekiq/spec/spec_helper.rb index 5e1e6a6e9..e712a6a0f 100644 --- a/sentry-sidekiq/spec/spec_helper.rb +++ b/sentry-sidekiq/spec/spec_helper.rb @@ -229,15 +229,20 @@ def sidekiq_config(opts) def execute_worker(processor, klass, **options) klass_options = klass.sidekiq_options_hash || {} - # for Ruby < 2.6 klass_options.each do |k, v| options[k.to_sym] = v end - msg = Sidekiq.dump_json(jid: "123123", class: klass, args: [], **options) + jid = options.delete(:jid) || "123123" + timecop_delay = options.delete(:timecop_delay) + + msg = Sidekiq.dump_json(created_at: Time.now.to_f, enqueued_at: Time.now.to_f, jid: jid, class: klass, args: [], **options) + Timecop.freeze(timecop_delay) if timecop_delay work = Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg) process_work(processor, work) +ensure + Timecop.return if timecop_delay end def process_work(processor, work)