diff --git a/pipeline/event.go b/pipeline/event.go index 661b95cc..045e9f2c 100644 --- a/pipeline/event.go +++ b/pipeline/event.go @@ -391,11 +391,13 @@ again: }) // Wait until we fit in the capacity. + p.slowWaiters.Inc() p.getCond.L.Lock() - if int(p.inUseEvents.Load()) > p.capacity { + if p.eventsAvailable() { p.getCond.Wait() } p.getCond.L.Unlock() + p.slowWaiters.Dec() goto again } @@ -430,10 +432,14 @@ func (p *eventSyncPool) wakeupWaiters() { time.Sleep(5 * time.Second) waiters := p.slowWaiters.Load() - eventsAvailable := p.inUseEvents.Load() <= int64(p.capacity) + eventsAvailable := p.eventsAvailable() if waiters > 0 && eventsAvailable { // There are events in the pool, wake up waiting goroutines. p.getCond.Broadcast() } } } + +func (p *eventSyncPool) eventsAvailable() bool { + return int(p.inUseEvents.Load()) >= p.capacity +} diff --git a/pipeline/event_test.go b/pipeline/event_test.go index 01c85c16..4eddb7f1 100644 --- a/pipeline/event_test.go +++ b/pipeline/event_test.go @@ -4,6 +4,7 @@ import ( "runtime" "sync" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -19,69 +20,154 @@ func TestEventPoolDump(t *testing.T) { } func BenchmarkEventPoolOneGoroutine(b *testing.B) { - const capacity = 32 - - p := newEventPool(capacity, DefaultAvgInputEventSize) - - for i := 0; i < b.N; i++ { - p.back(p.get()) + bench := func(b *testing.B, p pool) { + for i := 0; i < b.N; i++ { + p.back(p.get()) + } } + const capacity = 32 + b.Run("eventPool", func(b *testing.B) { + p := newEventPool(capacity, DefaultAvgInputEventSize) + bench(b, p) + }) + b.Run("syncPool", func(b *testing.B) { + p := newSyncPool(capacity) + bench(b, p) + }) } func BenchmarkEventPoolManyGoroutines(b *testing.B) { + bench := func(b *testing.B, p pool) { + workers := runtime.GOMAXPROCS(0) + for i := 0; i < b.N; i++ { + wg := &sync.WaitGroup{} + wg.Add(workers) + for j := 0; j < workers; j++ { + go func() { + defer wg.Done() + + for k := 0; k < 1000; k++ { + p.back(p.get()) + } + }() + } + wg.Wait() + } + } const capacity = 32 + b.Run("eventPool", func(b *testing.B) { + p := newEventPool(capacity, DefaultAvgInputEventSize) + bench(b, p) + }) + b.Run("syncPool", func(b *testing.B) { + p := newSyncPool(capacity) + bench(b, p) + }) +} - p := newEventPool(capacity, DefaultAvgInputEventSize) - - for i := 0; i < b.N; i++ { +func BenchmarkEventPoolSlowestPath(b *testing.B) { + bench := func(b *testing.B, p pool) { wg := &sync.WaitGroup{} - for j := 0; j < runtime.GOMAXPROCS(0); j++ { - wg.Add(1) - go func() { - for k := 0; k < 1000; k++ { - p.back(p.get()) - } - wg.Done() - }() + for i := 0; i < b.N; i++ { + const concurrency = 1_000 + wg.Add(concurrency) + for j := 0; j < concurrency; j++ { + go func() { + defer wg.Done() + e := p.get() + p.back(e) + }() + } + wg.Wait() } - wg.Wait() } -} -func BenchmarkEventPoolSlowestPath(b *testing.B) { const capacity = 32 + b.Run("eventPool", func(b *testing.B) { + p := newEventPool(capacity, DefaultAvgInputEventSize) + bench(b, p) + }) + b.Run("syncPool", func(b *testing.B) { + p := newSyncPool(capacity) + bench(b, p) + }) +} - p := newEventPool(capacity, DefaultAvgInputEventSize) - - for i := 0; i < b.N; i++ { - wg := &sync.WaitGroup{} - for j := 0; j < 100_000; j++ { - wg.Add(1) - go func() { - e := p.get() - p.back(e) - wg.Done() - }() +func TestLowMemPool(t *testing.T) { + t.Parallel() + r := require.New(t) + test := func(capacity, batchSize int) { + p := newSyncPool(capacity) + for i := 0; i < batchSize; i++ { + batch := make([]*Event, batchSize) + for j := 0; j < batchSize; j++ { + batch[j] = p.get() + } + r.Equal(int64(batchSize), p.inUse()) + r.Equal(int64(0), p.waiters()) + + for j := 0; j < batchSize; j++ { + p.back(batch[j]) + } } - wg.Wait() } + + test(1, 1) + test(8, 7) + test(64, 63) + test(64, 64) + test(1024, 128) } -func BenchmarkEventSyncPoolSlowestPath(b *testing.B) { - const capacity = 32 +func TestLowMemPoolSlowWait(t *testing.T) { + t.Parallel() + r := require.New(t) - p := newEventPool(capacity, DefaultAvgInputEventSize) + test := func(p pool) { + event := p.get() // Empty the pool. + r.Equal(int64(1), p.inUse()) - for i := 0; i < b.N; i++ { - wg := &sync.WaitGroup{} - for j := 0; j < 100_000; j++ { + eventReleased := false + + const waiters = 16 + // Create 16 goroutines to wait on new events. + wg := new(sync.WaitGroup) + for i := 0; i < waiters; i++ { wg.Add(1) go func() { + defer wg.Done() e := p.get() + if !eventReleased { + r.FailNowf("pool have to be empty", "event: %v", e) + } p.back(e) - wg.Done() }() } + + // Wait for all goroutines to be waiting. + for i := 0; i < 50; i++ { + if p.waiters() == waiters { + break + } + time.Sleep(time.Millisecond * 100) + } + + r.Equal(p.waiters(), int64(waiters)) + r.Equal(int64(1), p.inUse()) + + // Release events to wakeup waiters. + eventReleased = true + p.back(event) + wg.Wait() + + r.Equal(int64(0), p.waiters()) + r.Equal(int64(0), p.inUse()) } + + t.Run("syncPool", func(t *testing.T) { + pool := newSyncPool(1) + test(pool) + }) + // TODO: add test for eventPool after #685. } diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 675f99a1..7bafa6b3 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -1,7 +1,6 @@ package pipeline_test import ( - "reflect" "testing" "github.com/ozontech/file.d/pipeline" @@ -25,61 +24,6 @@ func getFakeInputInfo() *pipeline.InputPluginInfo { } } -func TestInUnparsableMessages(t *testing.T) { - name := "invalid_json" - message := []byte("{wHo Is Json: YoU MeAn SoN oF JoHn???") - pipelineSettings := &pipeline.Settings{ - Capacity: 5, - Decoder: "json", - MetricHoldDuration: pipeline.DefaultMetricHoldDuration, - } - offset := int64(666) - sourceID := pipeline.SourceID(3<<16 + int(10)) - - t.Run(name, func(t *testing.T) { - pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry()) - - pipe.SetInput(getFakeInputInfo()) - - seqID := pipe.In(sourceID, "kafka", offset, message, false, nil) - require.Equal(t, pipeline.EventSeqIDError, seqID) - - refPipe := reflect.ValueOf(pipe) - eventPool := reflect.Indirect(refPipe).FieldByName("eventPool") - - free1slice := reflect. - Indirect( - reflect. - Indirect(eventPool). - FieldByName("free1")). - Slice(0, pipelineSettings.Capacity) - free2slice := reflect. - Indirect( - reflect. - Indirect(eventPool). - FieldByName("free2")). - Slice(0, pipelineSettings.Capacity) - - for i := 0; i < pipelineSettings.Capacity; i++ { - // free1, free2 are []atomic.Bool which underlying v is really uint32 - // so if v val == uint32(1) event was released. - free1idxUint := reflect. - Indirect(free1slice.Index(i)). - FieldByName("v"). - FieldByName("v"). - Uint() - require.EqualValues(t, uint32(1), free1idxUint) - - free2idxUint := reflect. - Indirect(free2slice.Index(i)). - FieldByName("v"). - FieldByName("v"). - Uint() - require.EqualValues(t, uint32(1), free2idxUint) - } - }) -} - func TestInInvalidMessages(t *testing.T) { cases := []struct { name string