Skip to content

Commit

Permalink
Add child_spans for Sidekiq Queue instrumentation (#2403)
Browse files Browse the repository at this point in the history
* Add childspan for Sidekiq Queue instrumentation

* Add basic specs

* Remove unneeded exception alloc

* Add CHANGELOG entry

* Set root op instead of nested spans

* Adjust feedback

* Remove timecop_delay from options hash

* Fix 1 day

* Fix 1.day

* Dont use .day helper
  • Loading branch information
frederikspang authored Oct 29, 2024
1 parent 27d7384 commit 9446a30
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 9 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
### Features

- Add `include_sentry_event` matcher for RSpec [#2424](https://github.com/getsentry/sentry-ruby/pull/2424)
- Add support for Sentry Cache instrumentation, when using Rails.cache [#2380](https://github.com/getsentry/sentry-ruby/pull/2380)
- Add support for Sentry Cache instrumentation, when using Rails.cache ([#2380](https://github.com/getsentry/sentry-ruby/pull/2380))
- Add support for Queue Instrumentation for Sidekiq. [#2403](https://github.com/getsentry/sentry-ruby/pull/2403)

Note: MemoryStore and FileStore require Rails 8.0+

Expand Down
2 changes: 2 additions & 0 deletions sentry-sidekiq/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ end

gem "rails", "> 5.0.0"

gem "timecop"

eval_gemfile File.expand_path("../Gemfile", __dir__)
35 changes: 30 additions & 5 deletions sentry-sidekiq/lib/sentry/sidekiq/sentry_context_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,24 @@

module Sentry
module Sidekiq
module Helpers
def set_span_data(span, id:, queue:, latency: nil, retry_count: nil)
if span
span.set_data("messaging.message.id", id)
span.set_data("messaging.destination.name", queue)
span.set_data("messaging.message.receive.latency", latency) if latency
span.set_data("messaging.message.retry.count", retry_count) if retry_count
end
end
end

class SentryContextServerMiddleware
OP_NAME = "queue.sidekiq"
include Sentry::Sidekiq::Helpers

OP_NAME = "queue.process"
SPAN_ORIGIN = "auto.queue.sidekiq"

def call(_worker, job, queue)
def call(worker, job, queue)
return yield unless Sentry.initialized?

context_filter = Sentry::Sidekiq::ContextFilter.new(job)
Expand All @@ -23,7 +36,12 @@ def call(_worker, job, queue)
scope.set_contexts(sidekiq: job.merge("queue" => queue))
scope.set_transaction_name(context_filter.transaction_name, source: :task)
transaction = start_transaction(scope, job["trace_propagation_headers"])
scope.set_span(transaction) if transaction

if transaction
scope.set_span(transaction)

set_span_data(transaction, id: job["jid"], queue: queue, latency: ((Time.now.to_f - job["enqueued_at"]) * 1000).to_i, retry_count: job["retry_count"] || 0)
end

begin
yield
Expand Down Expand Up @@ -63,13 +81,20 @@ def finish_transaction(transaction, status)
end

class SentryContextClientMiddleware
def call(_worker_class, job, _queue, _redis_pool)
include Sentry::Sidekiq::Helpers

def call(worker_class, job, queue, _redis_pool)
return yield unless Sentry.initialized?

user = Sentry.get_current_scope.user
job["sentry_user"] = user unless user.empty?
job["trace_propagation_headers"] ||= Sentry.get_trace_propagation_headers
yield

Sentry.with_child_span(op: "queue.publish", description: worker_class.to_s) do |span|
set_span_data(span, id: job["jid"], queue: queue)

yield
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "spec_helper"
require "timecop"

RSpec.shared_context "sidekiq", shared_context: :metadata do
let(:user) { { "id" => rand(10_000) } }
Expand Down Expand Up @@ -65,6 +66,30 @@
expect(transaction.contexts.dig(:trace, :origin)).to eq('auto.queue.sidekiq')
end

it "adds a queue.process spans" do
Timecop.freeze do
execute_worker(processor, HappyWorker)
execute_worker(processor, HappyWorker, jid: '123456', timecop_delay: Time.now + 86400)

expect(transport.events.count).to eq(2)

transaction = transport.events[0]
expect(transaction).not_to be_nil
expect(transaction.spans.count).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123123') # Default defined in #execute_worker
expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default')
expect(transaction.contexts[:trace][:data]['messaging.message.retry.count']).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(0)

transaction = transport.events[1]
expect(transaction).not_to be_nil
expect(transaction.spans.count).to eq(0)
expect(transaction.contexts[:trace][:data]['messaging.message.id']).to eq('123456') # Explicitly set above.
expect(transaction.contexts[:trace][:data]['messaging.destination.name']).to eq('default')
expect(transaction.contexts[:trace][:data]['messaging.message.receive.latency']).to eq(86400000)
end
end

context "with trace_propagation_headers" do
let(:parent_transaction) { Sentry.start_transaction(op: "sidekiq") }

Expand All @@ -73,6 +98,7 @@
execute_worker(processor, HappyWorker, trace_propagation_headers: trace_propagation_headers)

expect(transport.events.count).to eq(1)

transaction = transport.events[0]
expect(transaction).not_to be_nil
expect(transaction.contexts.dig(:trace, :trace_id)).to eq(parent_transaction.trace_id)
Expand Down Expand Up @@ -156,5 +182,18 @@
expect(second_headers["sentry-trace"]).to eq(transaction.to_sentry_trace)
expect(second_headers["baggage"]).to eq(transaction.to_baggage)
end

it "has a queue.publish span" do
message_id = client.push('queue' => 'default', 'class' => HappyWorker, 'args' => [])

transaction.finish

expect(transport.events.count).to eq(1)
event = transport.events.last
expect(event.spans.count).to eq(1)
expect(event.spans[0][:op]).to eq("queue.publish")
expect(event.spans[0][:data]['messaging.message.id']).to eq(message_id)
expect(event.spans[0][:data]['messaging.destination.name']).to eq('default')
end
end
end
2 changes: 1 addition & 1 deletion sentry-sidekiq/spec/sentry/sidekiq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def retry_last_failed_job
expect(transaction.contexts.dig(:trace, :trace_id)).to be_a(String)
expect(transaction.contexts.dig(:trace, :span_id)).to be_a(String)
expect(transaction.contexts.dig(:trace, :status)).to eq("ok")
expect(transaction.contexts.dig(:trace, :op)).to eq("queue.sidekiq")
expect(transaction.contexts.dig(:trace, :op)).to eq("queue.process")
end

it "records transaction with exception" do
Expand Down
9 changes: 7 additions & 2 deletions sentry-sidekiq/spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,20 @@ def sidekiq_config(opts)

def execute_worker(processor, klass, **options)
klass_options = klass.sidekiq_options_hash || {}

# for Ruby < 2.6
klass_options.each do |k, v|
options[k.to_sym] = v
end

msg = Sidekiq.dump_json(jid: "123123", class: klass, args: [], **options)
jid = options.delete(:jid) || "123123"
timecop_delay = options.delete(:timecop_delay)

msg = Sidekiq.dump_json(created_at: Time.now.to_f, enqueued_at: Time.now.to_f, jid: jid, class: klass, args: [], **options)
Timecop.freeze(timecop_delay) if timecop_delay
work = Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)
process_work(processor, work)
ensure
Timecop.return if timecop_delay
end

def process_work(processor, work)
Expand Down

0 comments on commit 9446a30

Please sign in to comment.