Skip to content

Commit

Permalink
refactor: use shared tenant listener for messages (#911)
Browse files Browse the repository at this point in the history
* refactor: use shared tenant listener per tenant exchange

* fix: remove subscription properly
  • Loading branch information
abelanger5 authored Sep 26, 2024
1 parent bdcd9c2 commit 5f5e1e8
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 30 deletions.
105 changes: 105 additions & 0 deletions internal/msgqueue/shared_tenant_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package msgqueue

import (
"sync"

"github.com/hashicorp/go-multierror"
)

type sharedTenantSub struct {
fs *sync.Map
counter int
isRunning bool
mu sync.Mutex
cleanup func() error
}

type SharedTenantReader struct {
tenants *sync.Map
mq MessageQueue
}

func NewSharedTenantReader(mq MessageQueue) *SharedTenantReader {
return &SharedTenantReader{
tenants: &sync.Map{},
mq: mq,
}
}

func (s *SharedTenantReader) Subscribe(tenantId string, postAck AckHook) (func() error, error) {
tenant, _ := s.tenants.LoadOrStore(tenantId, &sharedTenantSub{
fs: &sync.Map{},
})

t := tenant.(*sharedTenantSub)

t.mu.Lock()
defer t.mu.Unlock()

t.counter++

subId := t.counter

t.fs.Store(subId, postAck)

if !t.isRunning {
t.isRunning = true

q, err := TenantEventConsumerQueue(tenantId)

if err != nil {
return nil, err
}

cleanupSingleSub, err := s.mq.Subscribe(q, NoOpHook, func(task *Message) error {
var innerErr error

t.fs.Range(func(key, value interface{}) bool {
f := value.(AckHook)

if err := f(task); err != nil {
innerErr = multierror.Append(innerErr, err)
}

return true
})

return innerErr
})

if err != nil {
return nil, err
}

t.cleanup = cleanupSingleSub
}

return func() error {
t.mu.Lock()
defer t.mu.Unlock()

t.fs.Delete(subId)

if lenSyncMap(t.fs) == 0 {
// shut down the subscription
if t.cleanup != nil {
if err := t.cleanup(); err != nil {
return err
}
}

t.isRunning = false
}

return nil
}, nil
}

func lenSyncMap(m *sync.Map) int {
var i int
m.Range(func(k, v interface{}) bool {
i++
return true
})
return i
}
18 changes: 9 additions & 9 deletions internal/services/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ type Dispatcher interface {
type DispatcherImpl struct {
contracts.UnimplementedDispatcherServer

s gocron.Scheduler
mq msgqueue.MessageQueue
heavyReadMQ msgqueue.MessageQueue
l *zerolog.Logger
dv datautils.DataDecoderValidator
v validator.Validator
repo repository.EngineRepository
cache cache.Cacheable
s gocron.Scheduler
mq msgqueue.MessageQueue
sharedReader *msgqueue.SharedTenantReader
l *zerolog.Logger
dv datautils.DataDecoderValidator
v validator.Validator
repo repository.EngineRepository
cache cache.Cacheable

entitlements repository.EntitlementsRepository

Expand Down Expand Up @@ -243,7 +243,7 @@ func (d *DispatcherImpl) Start() (func() error, error) {
mqCleanup, heavyReadMQ := d.mq.Clone()
heavyReadMQ.SetQOS(1000)

d.heavyReadMQ = heavyReadMQ
d.sharedReader = msgqueue.NewSharedTenantReader(heavyReadMQ)

// register the dispatcher by creating a new dispatcher in the database
dispatcher, err := d.repo.Dispatcher().CreateNewDispatcher(ctx, &repository.CreateDispatcherOpts{
Expand Down
24 changes: 3 additions & 21 deletions internal/services/dispatcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,11 +528,6 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByAdditionalMeta(key string, v
tenant := stream.Context().Value("tenant").(*dbsqlc.Tenant)
tenantId := sqlchelpers.UUIDToStr(tenant.ID)

q, err := msgqueue.TenantEventConsumerQueue(tenantId)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

Expand Down Expand Up @@ -602,7 +597,7 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByAdditionalMeta(key string, v
}

// subscribe to the task queue for the tenant
cleanupQueue, err := s.heavyReadMQ.Subscribe(q, msgqueue.NoOpHook, f)
cleanupQueue, err := s.sharedReader.Subscribe(tenantId, f)

if err != nil {
return err
Expand All @@ -625,12 +620,6 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByWorkflowRunId(workflowRunId

s.l.Debug().Msgf("Received subscribe request for workflow: %s", workflowRunId)

q, err := msgqueue.TenantEventConsumerQueue(tenantId)

if err != nil {
return err
}

ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

Expand Down Expand Up @@ -685,7 +674,7 @@ func (s *DispatcherImpl) subscribeToWorkflowEventsByWorkflowRunId(workflowRunId
}

// subscribe to the task queue for the tenant
cleanupQueue, err := s.heavyReadMQ.Subscribe(q, msgqueue.NoOpHook, f)
cleanupQueue, err := s.sharedReader.Subscribe(tenantId, f)

if err != nil {
return err
Expand Down Expand Up @@ -784,13 +773,6 @@ func (s *DispatcherImpl) SubscribeToWorkflowRuns(server contracts.Dispatcher_Sub
ctx, cancel := context.WithCancel(server.Context())
defer cancel()

// subscribe to the task queue for the tenant
q, err := msgqueue.TenantEventConsumerQueue(tenantId)

if err != nil {
return err
}

wg := sync.WaitGroup{}
sendMu := sync.Mutex{}

Expand Down Expand Up @@ -901,7 +883,7 @@ func (s *DispatcherImpl) SubscribeToWorkflowRuns(server contracts.Dispatcher_Sub
}

// subscribe to the task queue for the tenant
cleanupQueue, err := s.heavyReadMQ.Subscribe(q, msgqueue.NoOpHook, f)
cleanupQueue, err := s.sharedReader.Subscribe(tenantId, f)

if err != nil {
return err
Expand Down

0 comments on commit 5f5e1e8

Please sign in to comment.