Skip to content

Commit

Permalink
fix: concurrent map writes on partitioner (#792)
Browse files Browse the repository at this point in the history
  • Loading branch information
abelanger5 authored Aug 19, 2024
1 parent 208836f commit fce101a
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions internal/services/controllers/jobs/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Partition struct {
a *hatcheterrors.Wrapped
partitionId string

tenantOperations map[string]*operation
tenantOperations sync.Map
}

func NewPartition(
Expand All @@ -47,14 +47,13 @@ func NewPartition(
}

return &Partition{
mq: mq,
l: l,
repo: repo,
dv: dv,
s: s,
a: a,
partitionId: partitionId,
tenantOperations: make(map[string]*operation),
mq: mq,
l: l,
repo: repo,
dv: dv,
s: s,
a: a,
partitionId: partitionId,
}, nil
}

Expand Down Expand Up @@ -222,9 +221,11 @@ func (p *Partition) handleCheckQueue(ctx context.Context, task *msgqueue.Message
}

// if this tenant is registered, then we should check the queue
if _, ok := p.tenantOperations[metadata.TenantId]; ok {
p.tenantOperations[metadata.TenantId].setContinue(true)
p.tenantOperations[metadata.TenantId].run(p.l, p.scheduleStepRuns)
if opInt, ok := p.tenantOperations.Load(metadata.TenantId); ok {
op := opInt.(*operation)

op.setContinue(true)
op.run(p.l, p.scheduleStepRuns)
}

return nil
Expand All @@ -245,13 +246,21 @@ func (p *Partition) runTenantQueues(ctx context.Context) func() {
for i := range tenants {
tenantId := sqlchelpers.UUIDToStr(tenants[i].ID)

if _, ok := p.tenantOperations[tenantId]; !ok {
p.tenantOperations[tenantId] = &operation{
var op *operation

opInt, ok := p.tenantOperations.Load(tenantId)

if !ok {
op = &operation{
tenantId: tenantId,
}

p.tenantOperations.Store(tenantId, op)
} else {
op = opInt.(*operation)
}

p.tenantOperations[tenantId].run(p.l, p.scheduleStepRuns)
op.run(p.l, p.scheduleStepRuns)
}
}
}
Expand Down

0 comments on commit fce101a

Please sign in to comment.