Skip to content

Commit

Permalink
recordbatch/BlockingBatcher: add soft max batch byte size
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed Apr 19, 2024
1 parent 81e1bca commit 497f172
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 27 deletions.
18 changes: 11 additions & 7 deletions cmd/seb-api/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func Run() {

go cacheEviction(log.Name("cache eviction"), cache, flags.cacheMaxBytes, flags.cacheEvictionInterval)

blockingS3Storage, err := makeBlockingS3Storage(log, cache, flags.recordBatchBlockTime, flags.bucketName)
blockingS3Storage, err := makeBlockingS3Storage(log, cache, flags.recordBatchSoftMaxBytes, flags.recordBatchBlockTime, flags.bucketName)
if err != nil {
log.Fatalf("making blocking s3 storage: %s", err)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func cacheEviction(log logger.Logger, cache *storage.Cache, cacheMaxBytes int64,
}
}

func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, blockTime time.Duration, s3BucketName string) (*storage.Storage, error) {
func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, bytesSoftMax int, blockTime time.Duration, s3BucketName string) (*storage.Storage, error) {
session, err := session.NewSession()
if err != nil {
return nil, fmt.Errorf("creating s3 session: %s", err)
Expand All @@ -86,7 +86,7 @@ func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, blockTime ti

blockingBatcher := func(log logger.Logger, ts *storage.TopicStorage) storage.RecordBatcher {
batchLogger := log.Name("blocking batcher")
return recordbatch.NewBlockingBatcher(batchLogger, blockTime, func(b recordbatch.RecordBatch) error {
return recordbatch.NewBlockingBatcher(batchLogger, blockTime, bytesSoftMax, func(b recordbatch.RecordBatch) error {
t0 := time.Now()
err := ts.AddRecordBatch(b)
batchLogger.Debugf("persisting to s3: %v", time.Since(t0))
Expand All @@ -98,9 +98,11 @@ func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, blockTime ti
}

type flags struct {
bucketName string
recordBatchBlockTime time.Duration
logLevel int
bucketName string
logLevel int

recordBatchBlockTime time.Duration
recordBatchSoftMaxBytes int

httpListenAddress string
httpListenPort int
Expand All @@ -117,9 +119,11 @@ func parseFlags() flags {
f := flags{}

fs.StringVar(&f.bucketName, "b", "simple-commit-log-delete-me", "Bucket name")
fs.DurationVar(&f.recordBatchBlockTime, "s", time.Second, "Amount of time to wait between receiving first message in batch and committing it")
fs.IntVar(&f.logLevel, "log-level", int(logger.LevelInfo), "Log level, info=4, debug=5")

fs.DurationVar(&f.recordBatchBlockTime, "batch-wait-time", time.Second, "Amount of time to wait between receiving first record in batch and committing it")
fs.IntVar(&f.recordBatchSoftMaxBytes, "batch-bytes-max", 10*sizey.MB, "Soft maximum for the number of bytes to include in each record batch")

fs.StringVar(&f.httpListenAddress, "l", "127.0.0.1", "Address to listen for HTTP traffic")
fs.IntVar(&f.httpListenPort, "p", 8080, "Port to listen for HTTP traffic")
fs.StringVar(&f.httpAPIKey, "api-key", "api-key", "API key for authorizing HTTP requests (this is not safe and needs to be changed)")
Expand Down
23 changes: 19 additions & 4 deletions internal/recordbatch/blockingbatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@ type BlockingBatcher struct {
log logger.Logger
mu sync.Mutex
collectingBatch bool
bytesSoftMax int

contextFactory func() context.Context
blockedAdds chan blockedAdd

persistRecordBatch func(RecordBatch) error
}

func NewBlockingBatcher(log logger.Logger, blockTime time.Duration, persistRecordBatch func(RecordBatch) error) *BlockingBatcher {
return NewBlockingBatcherWithContextFactory(log, persistRecordBatch, NewContextFactory(blockTime))
func NewBlockingBatcher(log logger.Logger, blockTime time.Duration, bytesSoftMax int, persistRecordBatch func(RecordBatch) error) *BlockingBatcher {
return NewBlockingBatcherWithConfig(log, bytesSoftMax, persistRecordBatch, NewContextFactory(blockTime))
}

func NewBlockingBatcherWithContextFactory(log logger.Logger, persistRecordBatch func(RecordBatch) error, contextFactory func() context.Context) *BlockingBatcher {
func NewBlockingBatcherWithConfig(log logger.Logger, bytesSoftMax int, persistRecordBatch func(RecordBatch) error, contextFactory func() context.Context) *BlockingBatcher {
return &BlockingBatcher{
log: log,
mu: sync.Mutex{},
blockedAdds: make(chan blockedAdd, 32),
contextFactory: contextFactory,
persistRecordBatch: persistRecordBatch,
bytesSoftMax: bytesSoftMax,
}
}

Expand Down Expand Up @@ -66,8 +68,11 @@ func (b *BlockingBatcher) AddRecord(r Record) error {
}

func (b *BlockingBatcher) collectBatch() {
ctx := b.contextFactory()
ctx, cancel := context.WithCancel(b.contextFactory())
defer cancel()

handledAdds := make([]blockedAdd, 0, 64)
batchBytes := 0

t0 := time.Now()

Expand All @@ -76,7 +81,17 @@ func (b *BlockingBatcher) collectBatch() {

case blockedAdd := <-b.blockedAdds:
handledAdds = append(handledAdds, blockedAdd)
batchBytes += len(blockedAdd.record)
b.log.Debugf("added record to batch (%d)", len(handledAdds))
if batchBytes >= b.bytesSoftMax {
b.log.Debugf("batch size exceeded soft max (%d/%d), collecting", batchBytes, b.bytesSoftMax)

// NOTE: this will not necessarily cause the batch collection
// branch of this select to be invoked; if there's more adds on
// handledAdds, it's likely that this branch will continue to
// process one or more of those.
cancel()
}

case <-ctx.Done():
b.log.Debugf("batch collection time: %v", time.Since(t0))
Expand Down
73 changes: 63 additions & 10 deletions internal/recordbatch/blockingbatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var log = logger.NewDefault(context.Background())

// TestBlockingBatcherAddReturnValue verifies that the error returned by
// persistRecordBatch() is returned all the way back up to callers of
// batcher.Add().
// batcher.AddRecord().
func TestBlockingBatcherAddReturnValue(t *testing.T) {
var (
ctx context.Context
Expand All @@ -43,7 +43,7 @@ func TestBlockingBatcherAddReturnValue(t *testing.T) {
"no error": {expected: nil},
}

batcher := recordbatch.NewBlockingBatcherWithContextFactory(log, persistRecordBatch, contextFactory)
batcher := recordbatch.NewBlockingBatcherWithConfig(log, 1024, persistRecordBatch, contextFactory)

for name, test := range tests {
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
Expand All @@ -61,7 +61,7 @@ func TestBlockingBatcherAddReturnValue(t *testing.T) {
}
}

// TestBlockingBatcherAddBlocks verifies that calls to Add() block until
// TestBlockingBatcherAddBlocks verifies that calls to AddRecord() block until
// persistRecordBatch has returned. This ensures that data has been persisted
// before giving control back to the caller.
func TestBlockingBatcherAddBlocks(t *testing.T) {
Expand All @@ -78,9 +78,9 @@ func TestBlockingBatcherAddBlocks(t *testing.T) {
return returnedErr
}

batcher := recordbatch.NewBlockingBatcherWithContextFactory(log, persistRecordBatch, contextFactory)
batcher := recordbatch.NewBlockingBatcherWithConfig(log, 1024, persistRecordBatch, contextFactory)

const numRecordBatches = 25
const numRecordBatches = 5

wg := sync.WaitGroup{}
wg.Add(numRecordBatches)
Expand All @@ -101,24 +101,77 @@ func TestBlockingBatcherAddBlocks(t *testing.T) {
}()
}

// wait for all above go-routines to be scheduled and block on Add()
time.Sleep(1 * time.Millisecond)
// wait for all above go-routines to be scheduled and block on AddRecord()
time.Sleep(5 * time.Millisecond)

// expire ctx to make Batcher persist data (call persistRecordBatch())
cancel()

// wait a long time before verifying that none of the Add() callers have returned
// wait a long time before verifying that none of the AddRecord() callers have returned
time.Sleep(10 * time.Millisecond)
require.False(t, addReturned.Load())

// allow persistRecordBatch to return
close(blockPersistRecordBatch)

// wait for persistRecordBatch() return value to propagate to Add() callers
// wait for persistRecordBatch() return value to propagate to AddRecord() callers
time.Sleep(1 * time.Millisecond)

require.True(t, addReturned.Load())

// ensure that all AddRecord()ers return
wg.Wait()
}

// TestBlockingBatcherSoftMax verifies that calls to AddRecord() will block
// until the configured soft max bytes limit is hit, after which it unblocks and
// persists all waiting records.
func TestBlockingBatcherSoftMax(t *testing.T) {
ctx := context.Background()

contextFactory := func() context.Context {
return ctx
}

persistRecordBatch := func(rb recordbatch.RecordBatch) error {
return nil
}

const bytesSoftMax = 10

batcher := recordbatch.NewBlockingBatcherWithConfig(log, bytesSoftMax, persistRecordBatch, contextFactory)
addReturned := atomic.Bool{}

wg := &sync.WaitGroup{}
wg.Add(bytesSoftMax - 1)

// add too few bytes to trigger soft max
for range bytesSoftMax - 1 {
go func() {
defer wg.Done()

err := batcher.AddRecord([]byte("1"))
require.NoError(t, err)

addReturned.Store(true)
}()
}

// wait for all above go-routines to be scheduled and block on AddRecord()
// and ensure that none of the AddRecord() callers have returned
time.Sleep(5 * time.Millisecond)

require.False(t, addReturned.Load())

// add a record hitting the soft max, expecting it to be persisted
err := batcher.AddRecord([]byte("1"))
require.NoError(t, err)

// wait for persistRecordBatch() return value to propagate to AddRecord() callers
time.Sleep(1 * time.Millisecond)

require.True(t, addReturned.Load())

// ensure that all Add()ers return
// ensure that all AddRecord()ers return
wg.Wait()
}
6 changes: 0 additions & 6 deletions internal/storage/topicstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@ func (s *TopicStorage) recordBatchPath(recordBatchID uint64) string {
return RecordBatchPath(s.topicPath, recordBatchID)
}

// func readRecordBatchHeader(backingStorage BackingStorage, topicPath string, recordBatchID uint64) (recordbatch.Header, error) {
// rbPath := RecordBatchPath(topicPath, recordBatchID)

// return rb.Header, nil
// }

const recordBatchExtension = ".record_batch"

func listRecordBatchIDs(backingStorage BackingStorage, topicPath string) ([]uint64, error) {
Expand Down

0 comments on commit 497f172

Please sign in to comment.