diff --git a/internal/services/controllers/jobs/partition.go b/internal/services/controllers/jobs/partition.go index 8c93888bd..daca051ae 100644 --- a/internal/services/controllers/jobs/partition.go +++ b/internal/services/controllers/jobs/partition.go @@ -29,7 +29,7 @@ type Partition struct { a *hatcheterrors.Wrapped partitionId string - tenantOperations map[string]*operation + tenantOperations sync.Map } func NewPartition( @@ -47,14 +47,13 @@ func NewPartition( } return &Partition{ - mq: mq, - l: l, - repo: repo, - dv: dv, - s: s, - a: a, - partitionId: partitionId, - tenantOperations: make(map[string]*operation), + mq: mq, + l: l, + repo: repo, + dv: dv, + s: s, + a: a, + partitionId: partitionId, }, nil } @@ -222,9 +221,11 @@ func (p *Partition) handleCheckQueue(ctx context.Context, task *msgqueue.Message } // if this tenant is registered, then we should check the queue - if _, ok := p.tenantOperations[metadata.TenantId]; ok { - p.tenantOperations[metadata.TenantId].setContinue(true) - p.tenantOperations[metadata.TenantId].run(p.l, p.scheduleStepRuns) + if opInt, ok := p.tenantOperations.Load(metadata.TenantId); ok { + op := opInt.(*operation) + + op.setContinue(true) + op.run(p.l, p.scheduleStepRuns) } return nil @@ -245,13 +246,21 @@ func (p *Partition) runTenantQueues(ctx context.Context) func() { for i := range tenants { tenantId := sqlchelpers.UUIDToStr(tenants[i].ID) - if _, ok := p.tenantOperations[tenantId]; !ok { - p.tenantOperations[tenantId] = &operation{ + var op *operation + + opInt, ok := p.tenantOperations.Load(tenantId) + + if !ok { + op = &operation{ tenantId: tenantId, } + + p.tenantOperations.Store(tenantId, op) + } else { + op = opInt.(*operation) } - p.tenantOperations[tenantId].run(p.l, p.scheduleStepRuns) + op.run(p.l, p.scheduleStepRuns) } } }