From 24b5046f18dca4b8a5fe4207dc5bc4a65ec8db15 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Thu, 1 Aug 2024 15:38:16 -0400 Subject: [PATCH 1/2] fix: add mutexes --- .../services/controllers/jobs/controller.go | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 781c5c705..0cb647d04 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -41,7 +41,9 @@ type JobsControllerImpl struct { a *hatcheterrors.Wrapped partitionId string - requeueMutexes map[string]*sync.Mutex + requeueMutexes map[string]*sync.Mutex + reassignMutexes map[string]*sync.Mutex + timeoutMutexes map[string]*sync.Mutex } type JobsControllerOpt func(*JobsControllerOpts) @@ -795,6 +797,17 @@ func (jc *JobsControllerImpl) runStepRunReassign(ctx context.Context, startedAt // runStepRunReassignTenant looks for step runs that have been assigned to a worker but have not started, // or have been running but the worker has become inactive. func (ec *JobsControllerImpl) runStepRunReassignTenant(ctx context.Context, tenantId string) error { + // we want only one requeue running at a time for a tenant + if ec.reassignMutexes[tenantId] == nil { + ec.reassignMutexes[tenantId] = &sync.Mutex{} + } + + if !ec.reassignMutexes[tenantId].TryLock() { + return nil + } + + defer ec.reassignMutexes[tenantId].Unlock() + ctx, span := telemetry.NewSpan(ctx, "handle-step-run-reassign") defer span.End() @@ -888,6 +901,16 @@ func (jc *JobsControllerImpl) runStepRunTimeout(ctx context.Context) func() { // runStepRunTimeoutTenant looks for step runs that are timed out in the tenant. func (ec *JobsControllerImpl) runStepRunTimeoutTenant(ctx context.Context, tenantId string) error { + if ec.timeoutMutexes[tenantId] == nil { + ec.timeoutMutexes[tenantId] = &sync.Mutex{} + } + + if !ec.timeoutMutexes[tenantId].TryLock() { + return nil + } + + defer ec.timeoutMutexes[tenantId].Unlock() + ctx, span := telemetry.NewSpan(ctx, "handle-step-run-timeout") defer span.End() From a5e249e1164d50ead7518e2c2c7c65f329a9e450 Mon Sep 17 00:00:00 2001 From: gabriel ruttner Date: Thu, 1 Aug 2024 15:43:53 -0400 Subject: [PATCH 2/2] oops... --- .../services/controllers/jobs/controller.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/internal/services/controllers/jobs/controller.go b/internal/services/controllers/jobs/controller.go index 0cb647d04..a3cd5d4f5 100644 --- a/internal/services/controllers/jobs/controller.go +++ b/internal/services/controllers/jobs/controller.go @@ -136,14 +136,16 @@ func New(fs ...JobsControllerOpt) (*JobsControllerImpl, error) { a.WithData(map[string]interface{}{"service": "jobs-controller"}) return &JobsControllerImpl{ - mq: opts.mq, - l: opts.l, - repo: opts.repo, - dv: opts.dv, - s: s, - a: a, - partitionId: opts.partitionId, - requeueMutexes: make(map[string]*sync.Mutex), + mq: opts.mq, + l: opts.l, + repo: opts.repo, + dv: opts.dv, + s: s, + a: a, + partitionId: opts.partitionId, + requeueMutexes: make(map[string]*sync.Mutex), + reassignMutexes: make(map[string]*sync.Mutex), + timeoutMutexes: make(map[string]*sync.Mutex), }, nil }