Skip to content

Commit

Permalink
fix: circuit breaker for dispatcher reassignment (#1144)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Dec 20, 2024
1 parent 5975be6 commit a237f90
Show file tree
Hide file tree
Showing 6 changed files with 364 additions and 130 deletions.
186 changes: 56 additions & 130 deletions internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/queueutils"
"github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
"github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils"
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
Expand Down Expand Up @@ -358,8 +359,6 @@ func (d *DispatcherImpl) handleTask(ctx context.Context, task *msgqueue.Message)
switch task.ID {
case "group-key-action-assigned":
err = d.a.WrapErr(d.handleGroupKeyActionAssignedTask(ctx, task), map[string]interface{}{})
case "step-run-assigned":
err = d.a.WrapErr(d.handleStepRunAssignedTask(ctx, task), map[string]interface{}{})
case "step-run-assigned-bulk":
err = d.a.WrapErr(d.handleStepRunBulkAssignedTask(ctx, task), map[string]interface{}{})
case "step-run-cancelled":
Expand Down Expand Up @@ -438,118 +437,6 @@ func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, t
return multiErr
}

func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned", task.OtelCarrier)
defer span.End()

payload := tasktypes.StepRunAssignedTaskPayload{}
metadata := tasktypes.StepRunAssignedTaskMetadata{}

err := d.dv.DecodeAndValidate(task.Payload, &payload)

if err != nil {
return fmt.Errorf("could not decode dispatcher task payload: %w", err)
}

err = d.dv.DecodeAndValidate(task.Metadata, &metadata)

if err != nil {
return fmt.Errorf("could not decode dispatcher task metadata: %w", err)
}

// load the step run from the database
stepRun, err := d.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId)

if err != nil {
return fmt.Errorf("could not get step run: %w", err)
}

// if the step run has a job run in a non-running state, we should not send it to the worker
if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) {
d.l.Debug().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus))

// release the semaphore
return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, payload.StepRunId, false)
}

// if the step run is in a final state, we should not send it to the worker
if repository.IsFinalStepRunStatus(stepRun.SRStatus) {
d.l.Warn().Msgf("step run %s is in a final state %s, ignoring", payload.StepRunId, string(stepRun.SRStatus))

return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, payload.StepRunId, false)
}

data, err := d.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId)

if err != nil {
return fmt.Errorf("could not get step run data: %w", err)
}

servertel.WithStepRunModel(span, stepRun)

var multiErr error
var success bool

// get the worker for this task
workers, err := d.workers.Get(payload.WorkerId)

if err != nil && !errors.Is(err, ErrWorkerNotFound) {
return fmt.Errorf("could not get worker: %w", err)
}

for i, w := range workers {
err = w.StartStepRun(ctx, metadata.TenantId, stepRun, data)

if err != nil {
multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err))
} else {
success = true
break
}
}

now := time.Now().UTC()

if success {
defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Sent step run to the assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO),
Timestamp: &now,
EventData: map[string]interface{}{"worker_id": payload.WorkerId},
},
)

return nil
}

defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Could not send step run to assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING),
Timestamp: &now,
EventData: map[string]interface{}{"worker_id": payload.WorkerId},
},
)

// we were unable to send the step run to any worker, requeue the step run with an internal retry
_, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{
IsInternalRetry: true,
})

if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) {
multiErr = multierror.Append(multiErr, fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err))
}

return multiErr
}

func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task *msgqueue.Message) error {
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned-bulk", task.OtelCarrier)
defer span.End()
Expand Down Expand Up @@ -611,28 +498,25 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task

innerEg := errgroup.Group{}

toRetry := []string{}
toRetryMu := sync.Mutex{}

for _, stepRunId := range stepRunIds {
stepRunId := stepRunId

innerEg.Go(func() error {
stepRun := stepRunIdToData[stepRunId]

requeue := func() error {
// we were unable to send the step run to any worker, requeue the step run with an internal retry
_, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{
IsInternalRetry: true,
})

if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) {
return fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err)
}

return nil
requeue := func() {
toRetryMu.Lock()
toRetry = append(toRetry, stepRunId)
toRetryMu.Unlock()
}

// if we've reached the context deadline, this should be requeued
if ctx.Err() != nil {
return requeue()
requeue()
return nil
}

// if the step run has a job run in a non-running state, we should not send it to the worker
Expand All @@ -657,6 +541,7 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task
err := w.StartStepRunFromBulk(ctx, metadata.TenantId, stepRun)

if err != nil {
d.l.Err(err).Msgf("could not send step run to worker (%d)", i)
multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err))
} else {
success = true
Expand Down Expand Up @@ -694,15 +579,56 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task
},
)

if err := requeue(); err != nil {
multiErr = multierror.Append(multiErr, err)
}
requeue()

return multiErr
})
}

return innerEg.Wait()
innerErr := innerEg.Wait()

if len(toRetry) > 0 {
retryCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

_, stepRunsToFail, err := d.repo.StepRun().InternalRetryStepRuns(retryCtx, metadata.TenantId, toRetry)

if err != nil {
innerErr = multierror.Append(innerErr, fmt.Errorf("could not requeue step runs: %w", err))
}

if len(stepRunsToFail) > 0 {
now := time.Now()

batchErr := queueutils.BatchConcurrent(50, stepRunsToFail, func(stepRuns []*dbsqlc.GetStepRunForEngineRow) error {
var innerBatchErr error

for _, stepRun := range stepRuns {
err := d.mq.AddMessage(
retryCtx,
msgqueue.JOB_PROCESSING_QUEUE,
tasktypes.StepRunFailedToTask(
stepRun,
"Could not send step run to worker",
&now,
),
)

if err != nil {
innerBatchErr = multierror.Append(innerBatchErr, err)
}
}

return innerBatchErr
})

if batchErr != nil {
innerErr = multierror.Append(innerErr, fmt.Errorf("could not fail step runs: %w", batchErr))
}
}
}

return innerErr
})
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

"github.com/hatchet-dev/hatchet/pkg/client/loader"
Expand Down Expand Up @@ -255,8 +256,15 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
transportCreds = credentials.NewTLS(opts.tls)
}

keepAliveParams := keepalive.ClientParameters{
Time: 10 * time.Second, // grpc.keepalive_time_ms: 10 * 1000
Timeout: 60 * time.Second, // grpc.keepalive_timeout_ms: 60 * 1000
PermitWithoutStream: true, // grpc.keepalive_permit_without_calls: 1
}

grpcOpts := []grpc.DialOption{
grpc.WithTransportCredentials(transportCreds),
grpc.WithKeepaliveParams(keepAliveParams),
}

if !opts.noGrpcRetry {
Expand Down
105 changes: 105 additions & 0 deletions pkg/repository/prisma/dbsqlc/step_runs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,111 @@ SELECT
FROM
step_runs_to_fail srs2;

-- name: InternalRetryStepRuns :many
WITH step_runs AS (
SELECT
sr."id",
sr."tenantId",
sr."scheduleTimeoutAt",
sr."retryCount",
sr."internalRetryCount",
s."actionId",
s."id" AS "stepId",
s."timeout" AS "stepTimeout",
s."scheduleTimeout" AS "scheduleTimeout"
FROM
"StepRun" sr
JOIN
"Step" s ON sr."stepId" = s."id"
WHERE
sr."tenantId" = @tenantId::uuid
AND sr."id" = ANY(@stepRunIds::uuid[])
),
step_runs_to_reassign AS (
SELECT
*
FROM
step_runs
WHERE
"internalRetryCount" < @maxInternalRetryCount::int
),
step_runs_to_fail AS (
SELECT
*
FROM
step_runs
WHERE
"internalRetryCount" >= @maxInternalRetryCount::int
),
deleted_sqis AS (
DELETE FROM
"SemaphoreQueueItem" sqi
USING
step_runs srs
WHERE
sqi."stepRunId" = srs."id"
),
deleted_tqis AS (
DELETE FROM
"TimeoutQueueItem" tqi
-- delete when step run id AND retry count tuples match
USING
step_runs srs
WHERE
tqi."stepRunId" = srs."id"
AND tqi."retryCount" = srs."retryCount"
),
inserted_queue_items AS (
INSERT INTO "QueueItem" (
"stepRunId",
"stepId",
"actionId",
"scheduleTimeoutAt",
"stepTimeout",
"priority",
"isQueued",
"tenantId",
"queue"
)
SELECT
srs."id",
srs."stepId",
srs."actionId",
CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'),
srs."stepTimeout",
-- Queue with priority 4 so that reassignment gets highest priority
4,
true,
srs."tenantId",
srs."actionId"
FROM
step_runs_to_reassign srs
),
updated_step_runs AS (
UPDATE "StepRun" sr
SET
"status" = 'PENDING_ASSIGNMENT',
"scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'),
"updatedAt" = CURRENT_TIMESTAMP,
"internalRetryCount" = sr."internalRetryCount" + 1
FROM step_runs_to_reassign srs
WHERE sr."id" = srs."id"
RETURNING sr."id"
)
SELECT
srs1."id",
srs1."retryCount",
'REASSIGNED' AS "operation"
FROM
step_runs_to_reassign srs1
UNION ALL
SELECT
srs2."id",
srs2."retryCount",
'FAILED' AS "operation"
FROM
step_runs_to_fail srs2;

-- name: ListStepRunsToTimeout :many
SELECT "id"
FROM "StepRun"
Expand Down
Loading

0 comments on commit a237f90

Please sign in to comment.