From ae5df5b88d23f1018ab521cd21c11d6196b34e4f Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Fri, 15 Nov 2024 14:17:51 -0500 Subject: [PATCH] fix: make race condition on reassignment more rare (#1052) * fix: make race condition on reassignment more rare * fix: proper concurrency on bulk dispatch * prevent concurrent err assignments --- internal/services/dispatcher/dispatcher.go | 188 ++++++++++++--------- pkg/repository/prisma/step_run.go | 28 +-- 2 files changed, 124 insertions(+), 92 deletions(-) diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index 63030055b..29c0ece76 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -11,6 +11,7 @@ import ( "github.com/google/uuid" "github.com/hashicorp/go-multierror" "github.com/rs/zerolog" + "golang.org/x/sync/errgroup" "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/msgqueue" @@ -465,7 +466,7 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms // if the step run has a job run in a non-running state, we should not send it to the worker if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) { - d.l.Warn().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus)) + d.l.Debug().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus)) // release the semaphore return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, payload.StepRunId, false) @@ -553,6 +554,11 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned-bulk", task.OtelCarrier) defer span.End() + // we set a timeout of 25 seconds because we don't want to hold the semaphore for longer than the visibility timeout (30 seconds) + // on the worker + ctx, cancel := context.WithTimeout(ctx, 25*time.Second) + defer cancel() + payload := tasktypes.StepRunAssignedBulkTaskPayload{} metadata := tasktypes.StepRunAssignedBulkTaskMetadata{} @@ -588,97 +594,119 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task stepRunIdToData[sqlchelpers.UUIDToStr(sr.SRID)] = sr } - var outerErr error + outerEg := errgroup.Group{} for workerId, stepRunIds := range payload.WorkerIdToStepRunIds { workerId := workerId - d.l.Debug().Msgf("worker %s has %d step runs", workerId, len(stepRunIds)) - - // get the worker for this task - workers, err := d.workers.Get(workerId) - - if err != nil && !errors.Is(err, ErrWorkerNotFound) { - outerErr = multierror.Append(outerErr, fmt.Errorf("could not get worker: %w", err)) - continue - } - - for _, stepRunId := range stepRunIds { - stepRunId := stepRunId - - stepRun := stepRunIdToData[stepRunId] - - // if the step run has a job run in a non-running state, we should not send it to the worker - if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) { - d.l.Warn().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus)) - - // release the semaphore - return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false) - } - - // if the step run is in a final state, we should not send it to the worker - if repository.IsFinalStepRunStatus(stepRun.Status) { - d.l.Warn().Msgf("step run %s is in a final state %s, ignoring", stepRunId, string(stepRun.Status)) - - return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false) - } + outerEg.Go(func() error { + d.l.Debug().Msgf("worker %s has %d step runs", workerId, len(stepRunIds)) - var multiErr error - var success bool + // get the worker for this task + workers, err := d.workers.Get(workerId) - for i, w := range workers { - err = w.StartStepRunFromBulk(ctx, metadata.TenantId, stepRun) - - if err != nil { - multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err)) - } else { - success = true - break - } + if err != nil && !errors.Is(err, ErrWorkerNotFound) { + return fmt.Errorf("could not get worker: %w", err) } - now := time.Now().UTC() - - if success { - defer d.repo.StepRun().DeferredStepRunEvent( - metadata.TenantId, - repository.CreateStepRunEventOpts{ - StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), - EventMessage: repository.StringPtr("Sent step run to the assigned worker"), - EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER), - EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO), - Timestamp: &now, - EventData: map[string]interface{}{"worker_id": workerId}, - }, - ) - - continue + innerEg := errgroup.Group{} + + for _, stepRunId := range stepRunIds { + stepRunId := stepRunId + + innerEg.Go(func() error { + stepRun := stepRunIdToData[stepRunId] + + requeue := func() error { + // we were unable to send the step run to any worker, requeue the step run with an internal retry + _, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{ + IsInternalRetry: true, + }) + + if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) { + return fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err) + } + + return nil + } + + // if we've reached the context deadline, this should be requeued + if ctx.Err() != nil { + return requeue() + } + + // if the step run has a job run in a non-running state, we should not send it to the worker + if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) { + d.l.Debug().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus)) + + // release the semaphore + return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false) + } + + // if the step run is in a final state, we should not send it to the worker + if repository.IsFinalStepRunStatus(stepRun.Status) { + d.l.Warn().Msgf("step run %s is in a final state %s, ignoring", stepRunId, string(stepRun.Status)) + + return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false) + } + + var multiErr error + var success bool + + for i, w := range workers { + err := w.StartStepRunFromBulk(ctx, metadata.TenantId, stepRun) + + if err != nil { + multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err)) + } else { + success = true + break + } + } + + now := time.Now().UTC() + + if success { + defer d.repo.StepRun().DeferredStepRunEvent( + metadata.TenantId, + repository.CreateStepRunEventOpts{ + StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), + EventMessage: repository.StringPtr("Sent step run to the assigned worker"), + EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER), + EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO), + Timestamp: &now, + EventData: map[string]interface{}{"worker_id": workerId}, + }, + ) + + return nil + } + + defer d.repo.StepRun().DeferredStepRunEvent( + metadata.TenantId, + repository.CreateStepRunEventOpts{ + StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), + EventMessage: repository.StringPtr("Could not send step run to assigned worker"), + EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED), + EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING), + Timestamp: &now, + EventData: map[string]interface{}{"worker_id": workerId}, + }, + ) + + if err := requeue(); err != nil { + multiErr = multierror.Append(multiErr, err) + } + + return multiErr + }) } - defer d.repo.StepRun().DeferredStepRunEvent( - metadata.TenantId, - repository.CreateStepRunEventOpts{ - StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), - EventMessage: repository.StringPtr("Could not send step run to assigned worker"), - EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED), - EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING), - Timestamp: &now, - EventData: map[string]interface{}{"worker_id": workerId}, - }, - ) - - // we were unable to send the step run to any worker, requeue the step run with an internal retry - _, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{ - IsInternalRetry: true, - }) - - if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) { - outerErr = multierror.Append(outerErr, fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err)) - } - } + return innerEg.Wait() + }) } - return outerErr + return outerEg.Wait() } func (d *DispatcherImpl) handleStepRunCancelled(ctx context.Context, task *msgqueue.Message) error { diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index abad98d36..9f12721ca 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -3010,18 +3010,6 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st } } - if opts.IsRetry || opts.IsInternalRetry { - // if this is a retry, write a queue item to release the worker semaphore - err := s.releaseWorkerSemaphoreSlot(ctx, tenantId, stepRunId) - - if err != nil { - return nil, fmt.Errorf("could not release worker semaphore queue items: %w", err) - } - - // retries get highest priority to ensure that they're run immediately - priority = 4 - } - if len(opts.ExpressionEvals) > 0 { err := s.createExpressionEvals(ctx, s.pool, stepRunId, opts.ExpressionEvals) @@ -3054,6 +3042,22 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st return nil, repository.ErrAlreadyQueued } + if opts.IsRetry || opts.IsInternalRetry { + // if this is a retry, write a queue item to release the worker semaphore + // + // FIXME: there is a race condition here where we can delete a worker semaphore slot that has already been reassigned, + // but the step run was not in a RUNNING state. The fix for this would be to track an total retry count on the step run + // and use this to identify semaphore slots, but this involves a big refactor of semaphore slots. + err := s.releaseWorkerSemaphoreSlot(ctx, tenantId, stepRunId) + + if err != nil { + return nil, fmt.Errorf("could not release worker semaphore queue items: %w", err) + } + + // retries get highest priority to ensure that they're run immediately + priority = 4 + } + _, err = s.bulkQueuer.BuffItem(tenantId, buffer.BulkQueueStepRunOpts{ GetStepRunForEngineRow: innerStepRun, Priority: priority,