diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 80221e86f..a1bd99f02 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -41,11 +41,8 @@ type JobsControllerImpl struct { a *hatcheterrors.Wrapped partitionId string - requeueMutexes map[string]*sync.Mutex - reassignMutexes map[string]*sync.Mutex - timeoutMutexes map[string]*sync.Mutex - - mutexMutex sync.Mutex + reassignMutexes sync.Map + timeoutMutexes sync.Map } type JobsControllerOpt func(*JobsControllerOpts) @@ -138,17 +135,13 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) { a.WithData(map[string]interface{}{"service": "jobs-controller"}) return &JobsControllerImpl{ - mq: opts.mq, - l: opts.l, - repo: opts.repo, - dv: opts.dv, - s: s, - a: a, - partitionId: opts.partitionId, - requeueMutexes: make(map[string]*sync.Mutex), - reassignMutexes: make(map[string]*sync.Mutex), - timeoutMutexes: make(map[string]*sync.Mutex), - mutexMutex: sync.Mutex{}, + mq: opts.mq, + l: opts.l, + repo: opts.repo, + dv: opts.dv, + s: s, + a: a, + partitionId: opts.partitionId, }, nil } @@ -739,17 +732,18 @@ func (jc *JobsControllerImpl) runStepRunReassign(ctx context.Context, startedAt // or have been running but the worker has become inactive. func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tenantId string) error { // we want only one requeue running at a time for a tenant - ec.mutexMutex.Lock() - if ec.reassignMutexes[tenantId] == nil { - ec.reassignMutexes[tenantId] = &sync.Mutex{} + if _, ok := ec.reassignMutexes.Load(tenantId); !ok { + ec.reassignMutexes.Store(tenantId, &sync.Mutex{}) } - ec.mutexMutex.Unlock() - if !ec.reassignMutexes[tenantId].TryLock() { + muInt, _ := ec.reassignMutexes.Load(tenantId) + mu := muInt.(*sync.Mutex) + + if !mu.TryLock() { return nil } - defer ec.reassignMutexes[tenantId].Unlock() + defer mu.Unlock() ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign") defer span.End() @@ -844,17 +838,19 @@ func (jc *JobsControllerImpl) runStepRunTimeout(ctx context.Context) func() { // runStepRunTimeoutTenant looks for step runs that are timed out in the tenant. func (ec *JobsControllerImpl) runStepRunTimeoutTenant(ctx context.Context, tenantId string) error { - ec.mutexMutex.Lock() - if ec.timeoutMutexes[tenantId] == nil { - ec.timeoutMutexes[tenantId] = &sync.Mutex{} + // we want only one requeue running at a time for a tenant + if _, ok := ec.timeoutMutexes.Load(tenantId); !ok { + ec.reassignMutexes.Store(tenantId, &sync.Mutex{}) } - ec.mutexMutex.Unlock() - if !ec.timeoutMutexes[tenantId].TryLock() { + muInt, _ := ec.timeoutMutexes.Load(tenantId) + mu := muInt.(*sync.Mutex) + + if !mu.TryLock() { return nil } - defer ec.timeoutMutexes[tenantId].Unlock() + defer mu.Unlock() ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout") defer span.End()