From 326eeb388d42c796fac21ed3ff564195b9bf9583 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Wed, 21 Aug 2024 13:21:55 -0400 Subject: [PATCH] feat: pass otel through msgqueue --- internal/msgqueue/msgqueue.go | 3 +++ internal/msgqueue/rabbitmq/rabbitmq.go | 6 ++++++ .../services/controllers/jobs/controller.go | 20 +++++++++--------- .../services/controllers/jobs/partition.go | 2 +- internal/services/dispatcher/dispatcher.go | 6 +++--- internal/telemetry/telemetry.go | 21 +++++++++++++++++++ 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/internal/msgqueue/msgqueue.go b/internal/msgqueue/msgqueue.go index e2bf90e8e..53b3bc2d0 100644 --- a/internal/msgqueue/msgqueue.go +++ b/internal/msgqueue/msgqueue.go @@ -130,6 +130,9 @@ type Message struct { // RetryDelay is the delay between retries. RetryDelay int `json:"retry_delay"` + + // OtelCarrier is the OpenTelemetry carrier for the task. + OtelCarrier map[string]string `json:"otel_carrier"` } func (t *Message) TenantID() string { diff --git a/internal/msgqueue/rabbitmq/rabbitmq.go b/internal/msgqueue/rabbitmq/rabbitmq.go index 2dad642bf..bafcb0b74 100644 --- a/internal/msgqueue/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/rabbitmq/rabbitmq.go @@ -15,6 +15,7 @@ import ( "github.com/rs/zerolog" "github.com/hatchet-dev/hatchet/internal/msgqueue" + "github.com/hatchet-dev/hatchet/internal/telemetry" "github.com/hatchet-dev/hatchet/pkg/logger" "github.com/hatchet-dev/hatchet/pkg/random" ) @@ -185,6 +186,11 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { // AddMessage adds a msg to the queue. func (t *MessageQueueImpl) AddMessage(ctx context.Context, q msgqueue.Queue, msg *msgqueue.Message) error { + // inject otel carrier into the message + if msg.OtelCarrier == nil { + msg.OtelCarrier = telemetry.GetCarrier(ctx) + } + t.msgs <- &msgWithQueue{ Message: msg, q: q, diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index a23f4b48a..a025aafbf 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -297,7 +297,7 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes } func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-job-run-queued") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-queued", task.OtelCarrier) defer span.End() payload := tasktypes.JobRunQueuedTaskPayload{} @@ -352,7 +352,7 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq } func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-job-run-cancelled") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-cancelled", task.OtelCarrier) defer span.End() payload := tasktypes.JobRunCancelledTaskPayload{} @@ -409,7 +409,7 @@ func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *m } func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-retry") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-retry", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunRetryTaskPayload{} @@ -481,7 +481,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq // handleStepRunReplay replays a step run from scratch - it resets the workflow run state, job run state, and // all cancelled step runs which are children of the step run being replayed. func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-replay") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-replay", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunReplayTaskPayload{} @@ -605,7 +605,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg } func (ec *JobsControllerImpl) handleStepRunQueued(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-queued") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-queued", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunTaskPayload{} @@ -1009,7 +1009,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId } func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-started") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-started", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunStartedTaskPayload{} @@ -1055,7 +1055,7 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ms } func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-finished") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-finished", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunFinishedTaskPayload{} @@ -1147,7 +1147,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m } func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-failed") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-failed", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunFailedTaskPayload{} @@ -1277,7 +1277,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun } func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timed-out") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-timed-out", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunTimedOutTaskPayload{} @@ -1299,7 +1299,7 @@ func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *m } func (ec *JobsControllerImpl) handleStepRunCancel(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "handle-step-run-cancel") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-cancel", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunCancelTaskPayload{} diff --git a/internal/services/controllers/jobs/partition.go b/internal/services/controllers/jobs/partition.go index daca051ae..8ba05ffea 100644 --- a/internal/services/controllers/jobs/partition.go +++ b/internal/services/controllers/jobs/partition.go @@ -209,7 +209,7 @@ func (p *Partition) handleTask(ctx context.Context, task *msgqueue.Message) (err } func (p *Partition) handleCheckQueue(ctx context.Context, task *msgqueue.Message) error { - _, span := telemetry.NewSpan(ctx, "handle-check-queue") + _, span := telemetry.NewSpanWithCarrier(ctx, "handle-check-queue", task.OtelCarrier) defer span.End() metadata := tasktypes.CheckTenantQueueMetadata{} diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index dd66cf599..93226d09b 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -360,7 +360,7 @@ func (d *DispatcherImpl) handleTask(ctx context.Context, task *msgqueue.Message) } func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "group-key-action-assigned") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "group-key-action-assigned", task.OtelCarrier) defer span.End() payload := tasktypes.GroupKeyActionAssignedTaskPayload{} @@ -427,7 +427,7 @@ func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, t } func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "step-run-assigned") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunAssignedTaskPayload{} @@ -528,7 +528,7 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms } func (d *DispatcherImpl) handleStepRunCancelled(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpan(ctx, "step-run-cancelled") + ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-cancelled", task.OtelCarrier) defer span.End() payload := tasktypes.StepRunCancelledTaskPayload{} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 793c20256..0953b4aa9 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" @@ -91,6 +92,26 @@ func NewSpan(ctx context.Context, name string) (context.Context, trace.Span) { return ctx, span } +func NewSpanWithCarrier(ctx context.Context, name string, carrier map[string]string) (context.Context, trace.Span) { + propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) + + otelCarrier := propagation.MapCarrier(carrier) + parentCtx := propagator.Extract(ctx, otelCarrier) + + ctx, span := otel.Tracer("").Start(parentCtx, prefixSpanKey(name)) + return ctx, span +} + +func GetCarrier(ctx context.Context) map[string]string { + propgator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}) + + // Serialize the context into carrier + carrier := propagation.MapCarrier{} + propgator.Inject(ctx, carrier) + + return carrier +} + type AttributeKey string // AttributeKV is a wrapper for otel attributes KV