Skip to content

Commit

Permalink
fix: make race condition on reassignment more rare (#1052)
Browse files Browse the repository at this point in the history
* fix: make race condition on reassignment more rare

* fix: proper concurrency on bulk dispatch

* prevent concurrent err assignments
  • Loading branch information
abelanger5 authored Nov 15, 2024
1 parent 961e884 commit ae5df5b
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 92 deletions.
188 changes: 108 additions & 80 deletions internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/google/uuid"
"github.com/hashicorp/go-multierror"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/hatchet-dev/hatchet/internal/datautils"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
Expand Down Expand Up @@ -465,7 +466,7 @@ func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *ms

// if the step run has a job run in a non-running state, we should not send it to the worker
if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) {
d.l.Warn().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus))
d.l.Debug().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus))

// release the semaphore
return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, payload.StepRunId, false)
Expand Down Expand Up @@ -553,6 +554,11 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task
ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned-bulk", task.OtelCarrier)
defer span.End()

// we set a timeout of 25 seconds because we don't want to hold the semaphore for longer than the visibility timeout (30 seconds)
// on the worker
ctx, cancel := context.WithTimeout(ctx, 25*time.Second)
defer cancel()

payload := tasktypes.StepRunAssignedBulkTaskPayload{}
metadata := tasktypes.StepRunAssignedBulkTaskMetadata{}

Expand Down Expand Up @@ -588,97 +594,119 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task
stepRunIdToData[sqlchelpers.UUIDToStr(sr.SRID)] = sr
}

var outerErr error
outerEg := errgroup.Group{}

for workerId, stepRunIds := range payload.WorkerIdToStepRunIds {
workerId := workerId

d.l.Debug().Msgf("worker %s has %d step runs", workerId, len(stepRunIds))

// get the worker for this task
workers, err := d.workers.Get(workerId)

if err != nil && !errors.Is(err, ErrWorkerNotFound) {
outerErr = multierror.Append(outerErr, fmt.Errorf("could not get worker: %w", err))
continue
}

for _, stepRunId := range stepRunIds {
stepRunId := stepRunId

stepRun := stepRunIdToData[stepRunId]

// if the step run has a job run in a non-running state, we should not send it to the worker
if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) {
d.l.Warn().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus))

// release the semaphore
return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false)
}

// if the step run is in a final state, we should not send it to the worker
if repository.IsFinalStepRunStatus(stepRun.Status) {
d.l.Warn().Msgf("step run %s is in a final state %s, ignoring", stepRunId, string(stepRun.Status))

return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false)
}
outerEg.Go(func() error {
d.l.Debug().Msgf("worker %s has %d step runs", workerId, len(stepRunIds))

var multiErr error
var success bool
// get the worker for this task
workers, err := d.workers.Get(workerId)

for i, w := range workers {
err = w.StartStepRunFromBulk(ctx, metadata.TenantId, stepRun)

if err != nil {
multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err))
} else {
success = true
break
}
if err != nil && !errors.Is(err, ErrWorkerNotFound) {
return fmt.Errorf("could not get worker: %w", err)
}

now := time.Now().UTC()

if success {
defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Sent step run to the assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO),
Timestamp: &now,
EventData: map[string]interface{}{"worker_id": workerId},
},
)

continue
innerEg := errgroup.Group{}

for _, stepRunId := range stepRunIds {
stepRunId := stepRunId

innerEg.Go(func() error {
stepRun := stepRunIdToData[stepRunId]

requeue := func() error {
// we were unable to send the step run to any worker, requeue the step run with an internal retry
_, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{
IsInternalRetry: true,
})

if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) {
return fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err)
}

return nil
}

// if we've reached the context deadline, this should be requeued
if ctx.Err() != nil {
return requeue()
}

// if the step run has a job run in a non-running state, we should not send it to the worker
if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) {
d.l.Debug().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus))

// release the semaphore
return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false)
}

// if the step run is in a final state, we should not send it to the worker
if repository.IsFinalStepRunStatus(stepRun.Status) {
d.l.Warn().Msgf("step run %s is in a final state %s, ignoring", stepRunId, string(stepRun.Status))

return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, stepRunId, false)
}

var multiErr error
var success bool

for i, w := range workers {
err := w.StartStepRunFromBulk(ctx, metadata.TenantId, stepRun)

if err != nil {
multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err))
} else {
success = true
break
}
}

now := time.Now().UTC()

if success {
defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Sent step run to the assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO),
Timestamp: &now,
EventData: map[string]interface{}{"worker_id": workerId},
},
)

return nil
}

defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Could not send step run to assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING),
Timestamp: &now,
EventData: map[string]interface{}{"worker_id": workerId},
},
)

if err := requeue(); err != nil {
multiErr = multierror.Append(multiErr, err)
}

return multiErr
})
}

defer d.repo.StepRun().DeferredStepRunEvent(
metadata.TenantId,
repository.CreateStepRunEventOpts{
StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID),
EventMessage: repository.StringPtr("Could not send step run to assigned worker"),
EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED),
EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING),
Timestamp: &now,
EventData: map[string]interface{}{"worker_id": workerId},
},
)

// we were unable to send the step run to any worker, requeue the step run with an internal retry
_, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{
IsInternalRetry: true,
})

if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) {
outerErr = multierror.Append(outerErr, fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err))
}
}
return innerEg.Wait()
})
}

return outerErr
return outerEg.Wait()
}

func (d *DispatcherImpl) handleStepRunCancelled(ctx context.Context, task *msgqueue.Message) error {
Expand Down
28 changes: 16 additions & 12 deletions pkg/repository/prisma/step_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3010,18 +3010,6 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st
}
}

if opts.IsRetry || opts.IsInternalRetry {
// if this is a retry, write a queue item to release the worker semaphore
err := s.releaseWorkerSemaphoreSlot(ctx, tenantId, stepRunId)

if err != nil {
return nil, fmt.Errorf("could not release worker semaphore queue items: %w", err)
}

// retries get highest priority to ensure that they're run immediately
priority = 4
}

if len(opts.ExpressionEvals) > 0 {
err := s.createExpressionEvals(ctx, s.pool, stepRunId, opts.ExpressionEvals)

Expand Down Expand Up @@ -3054,6 +3042,22 @@ func (s *stepRunEngineRepository) QueueStepRun(ctx context.Context, tenantId, st
return nil, repository.ErrAlreadyQueued
}

if opts.IsRetry || opts.IsInternalRetry {
// if this is a retry, write a queue item to release the worker semaphore
//
// FIXME: there is a race condition here where we can delete a worker semaphore slot that has already been reassigned,
// but the step run was not in a RUNNING state. The fix for this would be to track an total retry count on the step run
// and use this to identify semaphore slots, but this involves a big refactor of semaphore slots.
err := s.releaseWorkerSemaphoreSlot(ctx, tenantId, stepRunId)

if err != nil {
return nil, fmt.Errorf("could not release worker semaphore queue items: %w", err)
}

// retries get highest priority to ensure that they're run immediately
priority = 4
}

_, err = s.bulkQueuer.BuffItem(tenantId, buffer.BulkQueueStepRunOpts{
GetStepRunForEngineRow: innerStepRun,
Priority: priority,
Expand Down

0 comments on commit ae5df5b

Please sign in to comment.