diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 781c5c705..a3cd5d4f5 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -41,7 +41,9 @@ type JobsControllerImpl struct { a *hatcheterrors.Wrapped partitionId string - requeueMutexes map[string]*sync.Mutex + requeueMutexes map[string]*sync.Mutex + reassignMutexes map[string]*sync.Mutex + timeoutMutexes map[string]*sync.Mutex } type JobsControllerOpt func(*JobsControllerOpts) @@ -134,14 +136,16 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) { a.WithData(map[string]interface{}{"service": "jobs-controller"}) return &JobsControllerImpl{ - mq: opts.mq, - l: opts.l, - repo: opts.repo, - dv: opts.dv, - s: s, - a: a, - partitionId: opts.partitionId, - requeueMutexes: make(map[string]*sync.Mutex), + mq: opts.mq, + l: opts.l, + repo: opts.repo, + dv: opts.dv, + s: s, + a: a, + partitionId: opts.partitionId, + requeueMutexes: make(map[string]*sync.Mutex), + reassignMutexes: make(map[string]*sync.Mutex), + timeoutMutexes: make(map[string]*sync.Mutex), }, nil } @@ -795,6 +799,17 @@ func (jc *JobsControllerImpl) runStepRunReassign(ctx context.Context, startedAt // runStepRunReassignTenant looks for step runs that have been assigned to a worker but have not started, // or have been running but the worker has become inactive. func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tenantId string) error { + // we want only one requeue running at a time for a tenant + if ec.reassignMutexes[tenantId] == nil { + ec.reassignMutexes[tenantId] = &sync.Mutex{} + } + + if !ec.reassignMutexes[tenantId].TryLock() { + return nil + } + + defer ec.reassignMutexes[tenantId].Unlock() + ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign") defer span.End() @@ -888,6 +903,16 @@ func (jc *JobsControllerImpl) runStepRunTimeout(ctx context.Context) func() { // runStepRunTimeoutTenant looks for step runs that are timed out in the tenant. func (ec *JobsControllerImpl) runStepRunTimeoutTenant(ctx context.Context, tenantId string) error { + if ec.timeoutMutexes[tenantId] == nil { + ec.timeoutMutexes[tenantId] = &sync.Mutex{} + } + + if !ec.timeoutMutexes[tenantId].TryLock() { + return nil + } + + defer ec.timeoutMutexes[tenantId].Unlock() + ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout") defer span.End()