Skip to content

Commit

Permalink
Add test for new the event pool
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Oct 19, 2024
1 parent 7cab7cf commit f2ad3ff
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 97 deletions.
10 changes: 8 additions & 2 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,13 @@ again:
})

// Wait until we fit in the capacity.
p.slowWaiters.Inc()
p.getCond.L.Lock()
if int(p.inUseEvents.Load()) > p.capacity {
if p.eventsAvailable() {
p.getCond.Wait()
}
p.getCond.L.Unlock()
p.slowWaiters.Dec()
goto again
}

Expand Down Expand Up @@ -430,10 +432,14 @@ func (p *eventSyncPool) wakeupWaiters() {

time.Sleep(5 * time.Second)
waiters := p.slowWaiters.Load()
eventsAvailable := p.inUseEvents.Load() <= int64(p.capacity)
eventsAvailable := p.eventsAvailable()
if waiters > 0 && eventsAvailable {
// There are events in the pool, wake up waiting goroutines.
p.getCond.Broadcast()
}
}
}

func (p *eventSyncPool) eventsAvailable() bool {
return int(p.inUseEvents.Load()) >= p.capacity
}
164 changes: 125 additions & 39 deletions pipeline/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -19,69 +20,154 @@ func TestEventPoolDump(t *testing.T) {
}

func BenchmarkEventPoolOneGoroutine(b *testing.B) {
const capacity = 32

p := newEventPool(capacity, DefaultAvgInputEventSize)

for i := 0; i < b.N; i++ {
p.back(p.get())
bench := func(b *testing.B, p pool) {
for i := 0; i < b.N; i++ {
p.back(p.get())
}
}
const capacity = 32
b.Run("eventPool", func(b *testing.B) {
p := newEventPool(capacity, DefaultAvgInputEventSize)
bench(b, p)
})
b.Run("syncPool", func(b *testing.B) {
p := newSyncPool(capacity)
bench(b, p)
})
}

func BenchmarkEventPoolManyGoroutines(b *testing.B) {
bench := func(b *testing.B, p pool) {
workers := runtime.GOMAXPROCS(0)
for i := 0; i < b.N; i++ {
wg := &sync.WaitGroup{}
wg.Add(workers)
for j := 0; j < workers; j++ {
go func() {
defer wg.Done()

for k := 0; k < 1000; k++ {
p.back(p.get())
}
}()
}
wg.Wait()
}
}
const capacity = 32
b.Run("eventPool", func(b *testing.B) {
p := newEventPool(capacity, DefaultAvgInputEventSize)
bench(b, p)
})
b.Run("syncPool", func(b *testing.B) {
p := newSyncPool(capacity)
bench(b, p)
})
}

p := newEventPool(capacity, DefaultAvgInputEventSize)

for i := 0; i < b.N; i++ {
func BenchmarkEventPoolSlowestPath(b *testing.B) {
bench := func(b *testing.B, p pool) {
wg := &sync.WaitGroup{}
for j := 0; j < runtime.GOMAXPROCS(0); j++ {
wg.Add(1)
go func() {
for k := 0; k < 1000; k++ {
p.back(p.get())
}
wg.Done()
}()
for i := 0; i < b.N; i++ {
const concurrency = 1_000
wg.Add(concurrency)
for j := 0; j < concurrency; j++ {
go func() {
defer wg.Done()
e := p.get()
p.back(e)
}()
}
wg.Wait()
}
wg.Wait()
}
}

func BenchmarkEventPoolSlowestPath(b *testing.B) {
const capacity = 32
b.Run("eventPool", func(b *testing.B) {
p := newEventPool(capacity, DefaultAvgInputEventSize)
bench(b, p)
})
b.Run("syncPool", func(b *testing.B) {
p := newSyncPool(capacity)
bench(b, p)
})
}

p := newEventPool(capacity, DefaultAvgInputEventSize)

for i := 0; i < b.N; i++ {
wg := &sync.WaitGroup{}
for j := 0; j < 100_000; j++ {
wg.Add(1)
go func() {
e := p.get()
p.back(e)
wg.Done()
}()
func TestLowMemPool(t *testing.T) {
t.Parallel()
r := require.New(t)
test := func(capacity, batchSize int) {
p := newSyncPool(capacity)
for i := 0; i < batchSize; i++ {
batch := make([]*Event, batchSize)
for j := 0; j < batchSize; j++ {
batch[j] = p.get()
}
r.Equal(int64(batchSize), p.inUse())
r.Equal(int64(0), p.waiters())

for j := 0; j < batchSize; j++ {
p.back(batch[j])
}
}
wg.Wait()
}

test(1, 1)
test(8, 7)
test(64, 63)
test(64, 64)
test(1024, 128)
}

func BenchmarkEventSyncPoolSlowestPath(b *testing.B) {
const capacity = 32
func TestLowMemPoolSlowWait(t *testing.T) {
t.Parallel()
r := require.New(t)

p := newEventPool(capacity, DefaultAvgInputEventSize)
test := func(p pool) {
event := p.get() // Empty the pool.
r.Equal(int64(1), p.inUse())

for i := 0; i < b.N; i++ {
wg := &sync.WaitGroup{}
for j := 0; j < 100_000; j++ {
eventReleased := false

const waiters = 16
// Create 16 goroutines to wait on new events.
wg := new(sync.WaitGroup)
for i := 0; i < waiters; i++ {
wg.Add(1)
go func() {
defer wg.Done()
e := p.get()
if !eventReleased {
r.FailNowf("pool have to be empty", "event: %v", e)
}
p.back(e)
wg.Done()
}()
}

// Wait for all goroutines to be waiting.
for i := 0; i < 50; i++ {
if p.waiters() == waiters {
break
}
time.Sleep(time.Millisecond * 100)
}

r.Equal(p.waiters(), int64(waiters))
r.Equal(int64(1), p.inUse())

// Release events to wakeup waiters.
eventReleased = true
p.back(event)

wg.Wait()

r.Equal(int64(0), p.waiters())
r.Equal(int64(0), p.inUse())
}

t.Run("syncPool", func(t *testing.T) {
pool := newSyncPool(1)
test(pool)
})
// TODO: add test for eventPool after #685.
}
56 changes: 0 additions & 56 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package pipeline_test

import (
"reflect"
"testing"

"github.com/ozontech/file.d/pipeline"
Expand All @@ -25,61 +24,6 @@ func getFakeInputInfo() *pipeline.InputPluginInfo {
}
}

func TestInUnparsableMessages(t *testing.T) {
name := "invalid_json"
message := []byte("{wHo Is Json: YoU MeAn SoN oF JoHn???")
pipelineSettings := &pipeline.Settings{
Capacity: 5,
Decoder: "json",
MetricHoldDuration: pipeline.DefaultMetricHoldDuration,
}
offset := int64(666)
sourceID := pipeline.SourceID(3<<16 + int(10))

t.Run(name, func(t *testing.T) {
pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry())

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(sourceID, "kafka", offset, message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)

refPipe := reflect.ValueOf(pipe)
eventPool := reflect.Indirect(refPipe).FieldByName("eventPool")

free1slice := reflect.
Indirect(
reflect.
Indirect(eventPool).
FieldByName("free1")).
Slice(0, pipelineSettings.Capacity)
free2slice := reflect.
Indirect(
reflect.
Indirect(eventPool).
FieldByName("free2")).
Slice(0, pipelineSettings.Capacity)

for i := 0; i < pipelineSettings.Capacity; i++ {
// free1, free2 are []atomic.Bool which underlying v is really uint32
// so if v val == uint32(1) event was released.
free1idxUint := reflect.
Indirect(free1slice.Index(i)).
FieldByName("v").
FieldByName("v").
Uint()
require.EqualValues(t, uint32(1), free1idxUint)

free2idxUint := reflect.
Indirect(free2slice.Index(i)).
FieldByName("v").
FieldByName("v").
Uint()
require.EqualValues(t, uint32(1), free2idxUint)
}
})
}

func TestInInvalidMessages(t *testing.T) {
cases := []struct {
name string
Expand Down

0 comments on commit f2ad3ff

Please sign in to comment.