Skip to content

Commit

Permalink
feat: pass otel through msgqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 committed Aug 21, 2024
1 parent 47d2c4e commit 326eeb3
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 14 deletions.
3 changes: 3 additions & 0 deletions internal/msgqueue/msgqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ type Message struct {

// RetryDelay is the delay between retries.
RetryDelay int `json:"retry_delay"`

// OtelCarrier is the OpenTelemetry carrier for the task.
OtelCarrier map[string]string `json:"otel_carrier"`
}

func (t *Message) TenantID() string {
Expand Down
6 changes: 6 additions & 0 deletions internal/msgqueue/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/rs/zerolog"

"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/logger"
"github.com/hatchet-dev/hatchet/pkg/random"
)
Expand Down Expand Up @@ -185,6 +186,11 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {

// AddMessage adds a msg to the queue.
func (t *MessageQueueImpl) AddMessage(ctx context.Context, q msgqueue.Queue, msg *msgqueue.Message) error {
// inject otel carrier into the message
if msg.OtelCarrier == nil {
msg.OtelCarrier = telemetry.GetCarrier(ctx)
}

t.msgs <- &msgWithQueue{
Message: msg,
q: q,
Expand Down
20 changes: 10 additions & 10 deletions internal/services/controllers/jobs/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (ec *JobsControllerImpl) handleTask(ctx context.Context, task *msgqueue.Mes
}

func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-job-run-queued")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-queued", task.OtelCarrier)
defer span.End()

payload := tasktypes.JobRunQueuedTaskPayload{}
Expand Down Expand Up @@ -352,7 +352,7 @@ func (ec *JobsControllerImpl) handleJobRunQueued(ctx context.Context, task *msgq
}

func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-job-run-cancelled")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-job-run-cancelled", task.OtelCarrier)
defer span.End()

payload := tasktypes.JobRunCancelledTaskPayload{}
Expand Down Expand Up @@ -409,7 +409,7 @@ func (ec *JobsControllerImpl) handleJobRunCancelled(ctx context.Context, task *m
}

func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-retry")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-retry", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunRetryTaskPayload{}
Expand Down Expand Up @@ -481,7 +481,7 @@ func (ec *JobsControllerImpl) handleStepRunRetry(ctx context.Context, task *msgq
// handleStepRunReplay replays a step run from scratch - it resets the workflow run state, job run state, and
// all cancelled step runs which are children of the step run being replayed.
func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-replay")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-replay", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunReplayTaskPayload{}
Expand Down Expand Up @@ -605,7 +605,7 @@ func (ec *JobsControllerImpl) handleStepRunReplay(ctx context.Context, task *msg
}

func (ec *JobsControllerImpl) handleStepRunQueued(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-queued")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-queued", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunTaskPayload{}
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId
}

func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-started")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-started", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunStartedTaskPayload{}
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func (ec *JobsControllerImpl) handleStepRunStarted(ctx context.Context, task *ms
}

func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-finished")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-finished", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunFinishedTaskPayload{}
Expand Down Expand Up @@ -1147,7 +1147,7 @@ func (ec *JobsControllerImpl) handleStepRunFinished(ctx context.Context, task *m
}

func (ec *JobsControllerImpl) handleStepRunFailed(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-failed")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-failed", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunFailedTaskPayload{}
Expand Down Expand Up @@ -1277,7 +1277,7 @@ func (ec *JobsControllerImpl) failStepRun(ctx context.Context, tenantId, stepRun
}

func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timed-out")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-timed-out", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunTimedOutTaskPayload{}
Expand All @@ -1299,7 +1299,7 @@ func (ec *JobsControllerImpl) handleStepRunTimedOut(ctx context.Context, task *m
}

func (ec *JobsControllerImpl) handleStepRunCancel(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "handle-step-run-cancel")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "handle-step-run-cancel", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunCancelTaskPayload{}
Expand Down
2 changes: 1 addition & 1 deletion internal/services/controllers/jobs/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (p *Partition) handleTask(ctx context.Context, task *msgqueue.Message) (err
}

func (p *Partition) handleCheckQueue(ctx context.Context, task *msgqueue.Message) error {
_, span := telemetry.NewSpan(ctx, "handle-check-queue")
_, span := telemetry.NewSpanWithCarrier(ctx, "handle-check-queue", task.OtelCarrier)
defer span.End()

metadata := tasktypes.CheckTenantQueueMetadata{}
Expand Down
6 changes: 3 additions & 3 deletions internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (d *DispatcherImpl) handleTask(ctx context.Context, task *msgqueue.Message)
}

func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "group-key-action-assigned")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "group-key-action-assigned", task.OtelCarrier)
defer span.End()

payload := tasktypes.GroupKeyActionAssignedTaskPayload{}
Expand Down Expand Up @@ -427,7 +427,7 @@ func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, t
}

func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "step-run-assigned")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunAssignedTaskPayload{}
Expand Down Expand Up @@ -528,7 +528,7 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms
}

func (d *DispatcherImpl) handleStepRunCancelled(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpan(ctx, "step-run-cancelled")
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-cancelled", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunCancelledTaskPayload{}
Expand Down
21 changes: 21 additions & 0 deletions internal/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -91,6 +92,26 @@ func NewSpan(ctx context.Context, name string) (context.Context, trace.Span) {
return ctx, span
}

func NewSpanWithCarrier(ctx context.Context, name string, carrier map[string]string) (context.Context, trace.Span) {
propagator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})

otelCarrier := propagation.MapCarrier(carrier)
parentCtx := propagator.Extract(ctx, otelCarrier)

ctx, span := otel.Tracer("").Start(parentCtx, prefixSpanKey(name))
return ctx, span
}

func GetCarrier(ctx context.Context) map[string]string {
propgator := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})

// Serialize the context into carrier
carrier := propagation.MapCarrier{}
propgator.Inject(ctx, carrier)

return carrier
}

type AttributeKey string

// AttributeKV is a wrapper for otel attributes KV
Expand Down

0 comments on commit 326eeb3

Please sign in to comment.