diff --git a/pipeline/event.go b/pipeline/event.go index d0a11851..4d8981a4 100644 --- a/pipeline/event.go +++ b/pipeline/event.go @@ -226,14 +226,21 @@ type eventPool struct { getMu *sync.Mutex getCond *sync.Cond + + stopped *atomic.Bool + runHeartbeatOnce *sync.Once + slowWaiters *atomic.Int64 } func newEventPool(capacity, avgEventSize int) *eventPool { eventPool := &eventPool{ - avgEventSize: avgEventSize, - capacity: capacity, - getMu: &sync.Mutex{}, - backCounter: *atomic.NewInt64(int64(capacity)), + avgEventSize: avgEventSize, + capacity: capacity, + getMu: &sync.Mutex{}, + backCounter: *atomic.NewInt64(int64(capacity)), + runHeartbeatOnce: &sync.Once{}, + stopped: atomic.NewBool(false), + slowWaiters: atomic.NewInt64(0), } eventPool.getCond = sync.NewCond(eventPool.getMu) @@ -270,10 +277,17 @@ func (p *eventPool) get() *Event { // slow path runtime.Gosched() } else { + p.runHeartbeatOnce.Do(func() { + // Run heartbeat to periodically wake up goroutines that are waiting. + go p.wakeupWaiters() + }) + // slowest path + p.slowWaiters.Inc() p.getMu.Lock() p.getCond.Wait() p.getMu.Unlock() + p.slowWaiters.Dec() tries = 0 } } @@ -317,6 +331,26 @@ func (p *eventPool) back(event *Event) { p.getCond.Broadcast() } +func (p *eventPool) wakeupWaiters() { + for { + if p.stopped.Load() { + return + } + + time.Sleep(5 * time.Second) + waiters := p.slowWaiters.Load() + eventsAvailable := p.inUseEvents.Load() < int64(p.capacity) + if waiters > 0 && eventsAvailable { + // There are events in the pool, wake up waiting goroutines. + p.getCond.Broadcast() + } + } +} + +func (p *eventPool) stop() { + p.stopped.Store(true) +} + func (p *eventPool) dump() string { out := logger.Cond(len(p.events) == 0, logger.Header("no events"), func() string { o := logger.Header("events") diff --git a/pipeline/event_test.go b/pipeline/event_test.go index a217d696..fc91fd31 100644 --- a/pipeline/event_test.go +++ b/pipeline/event_test.go @@ -67,3 +67,25 @@ func BenchmarkEventPoolSlowestPath(b *testing.B) { wg.Wait() } } + +func TestSlowPath(t *testing.T) { + t.Parallel() + const ( + poolCapacity = 256 + concurrency = 5_000 + ) + pool := newEventPool(poolCapacity, DefaultAvgInputEventSize) + for i := 0; i < 1_000; i++ { + wg := new(sync.WaitGroup) + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + e := pool.get() + runtime.Gosched() + pool.back(e) + }() + } + wg.Wait() + } +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index bf377c4b..2d865eed 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -333,6 +333,8 @@ func (p *Pipeline) Stop() { p.output.Stop() p.shouldStop.Store(true) + + p.eventPool.stop() } func (p *Pipeline) SetInput(info *InputPluginInfo) {