diff --git a/cmd/seb-api/app/main.go b/cmd/seb-api/app/main.go index 42fdf39..1637202 100644 --- a/cmd/seb-api/app/main.go +++ b/cmd/seb-api/app/main.go @@ -34,7 +34,7 @@ func Run() { go cacheEviction(log.Name("cache eviction"), cache, flags.cacheMaxBytes, flags.cacheEvictionInterval) - blockingS3Storage, err := makeBlockingS3Storage(log, cache, flags.recordBatchBlockTime, flags.bucketName) + blockingS3Storage, err := makeBlockingS3Storage(log, cache, flags.recordBatchSoftMaxBytes, flags.recordBatchBlockTime, flags.bucketName) if err != nil { log.Fatalf("making blocking s3 storage: %s", err) } @@ -71,7 +71,7 @@ func cacheEviction(log logger.Logger, cache *storage.Cache, cacheMaxBytes int64, } } -func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, blockTime time.Duration, s3BucketName string) (*storage.Storage, error) { +func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, bytesSoftMax int, blockTime time.Duration, s3BucketName string) (*storage.Storage, error) { session, err := session.NewSession() if err != nil { return nil, fmt.Errorf("creating s3 session: %s", err) @@ -86,7 +86,7 @@ func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, blockTime ti blockingBatcher := func(log logger.Logger, ts *storage.TopicStorage) storage.RecordBatcher { batchLogger := log.Name("blocking batcher") - return recordbatch.NewBlockingBatcher(batchLogger, blockTime, func(b recordbatch.RecordBatch) error { + return recordbatch.NewBlockingBatcher(batchLogger, blockTime, bytesSoftMax, func(b recordbatch.RecordBatch) error { t0 := time.Now() err := ts.AddRecordBatch(b) batchLogger.Debugf("persisting to s3: %v", time.Since(t0)) @@ -98,9 +98,11 @@ func makeBlockingS3Storage(log logger.Logger, cache *storage.Cache, blockTime ti } type flags struct { - bucketName string - recordBatchBlockTime time.Duration - logLevel int + bucketName string + logLevel int + + recordBatchBlockTime time.Duration + recordBatchSoftMaxBytes int httpListenAddress string httpListenPort int @@ -117,9 +119,11 @@ func parseFlags() flags { f := flags{} fs.StringVar(&f.bucketName, "b", "simple-commit-log-delete-me", "Bucket name") - fs.DurationVar(&f.recordBatchBlockTime, "s", time.Second, "Amount of time to wait between receiving first message in batch and committing it") fs.IntVar(&f.logLevel, "log-level", int(logger.LevelInfo), "Log level, info=4, debug=5") + fs.DurationVar(&f.recordBatchBlockTime, "batch-wait-time", time.Second, "Amount of time to wait between receiving first record in batch and committing it") + fs.IntVar(&f.recordBatchSoftMaxBytes, "batch-bytes-max", 10*sizey.MB, "Soft maximum for the number of bytes to include in each record batch") + fs.StringVar(&f.httpListenAddress, "l", "127.0.0.1", "Address to listen for HTTP traffic") fs.IntVar(&f.httpListenPort, "p", 8080, "Port to listen for HTTP traffic") fs.StringVar(&f.httpAPIKey, "api-key", "api-key", "API key for authorizing HTTP requests (this is not safe and needs to be changed)") diff --git a/internal/recordbatch/blockingbatcher.go b/internal/recordbatch/blockingbatcher.go index 2b448e7..cf80406 100644 --- a/internal/recordbatch/blockingbatcher.go +++ b/internal/recordbatch/blockingbatcher.go @@ -17,6 +17,7 @@ type BlockingBatcher struct { log logger.Logger mu sync.Mutex collectingBatch bool + bytesSoftMax int contextFactory func() context.Context blockedAdds chan blockedAdd @@ -24,17 +25,18 @@ type BlockingBatcher struct { persistRecordBatch func(RecordBatch) error } -func NewBlockingBatcher(log logger.Logger, blockTime time.Duration, persistRecordBatch func(RecordBatch) error) *BlockingBatcher { - return NewBlockingBatcherWithContextFactory(log, persistRecordBatch, NewContextFactory(blockTime)) +func NewBlockingBatcher(log logger.Logger, blockTime time.Duration, bytesSoftMax int, persistRecordBatch func(RecordBatch) error) *BlockingBatcher { + return NewBlockingBatcherWithConfig(log, bytesSoftMax, persistRecordBatch, NewContextFactory(blockTime)) } -func NewBlockingBatcherWithContextFactory(log logger.Logger, persistRecordBatch func(RecordBatch) error, contextFactory func() context.Context) *BlockingBatcher { +func NewBlockingBatcherWithConfig(log logger.Logger, bytesSoftMax int, persistRecordBatch func(RecordBatch) error, contextFactory func() context.Context) *BlockingBatcher { return &BlockingBatcher{ log: log, mu: sync.Mutex{}, blockedAdds: make(chan blockedAdd, 32), contextFactory: contextFactory, persistRecordBatch: persistRecordBatch, + bytesSoftMax: bytesSoftMax, } } @@ -66,8 +68,11 @@ func (b *BlockingBatcher) AddRecord(r Record) error { } func (b *BlockingBatcher) collectBatch() { - ctx := b.contextFactory() + ctx, cancel := context.WithCancel(b.contextFactory()) + defer cancel() + handledAdds := make([]blockedAdd, 0, 64) + batchBytes := 0 t0 := time.Now() @@ -76,7 +81,17 @@ func (b *BlockingBatcher) collectBatch() { case blockedAdd := <-b.blockedAdds: handledAdds = append(handledAdds, blockedAdd) + batchBytes += len(blockedAdd.record) b.log.Debugf("added record to batch (%d)", len(handledAdds)) + if batchBytes >= b.bytesSoftMax { + b.log.Debugf("batch size exceeded soft max (%d/%d), collecting", batchBytes, b.bytesSoftMax) + + // NOTE: this will not necessarily cause the context done-branch + // of this select to be invoked; if there's more adds on + // handledAdds, it's likely that this branch will continue for + // _some_ amount of time. + cancel() + } case <-ctx.Done(): b.log.Debugf("batch collection time: %v", time.Since(t0)) diff --git a/internal/recordbatch/blockingbatcher_test.go b/internal/recordbatch/blockingbatcher_test.go index 430dc8d..c7464fe 100644 --- a/internal/recordbatch/blockingbatcher_test.go +++ b/internal/recordbatch/blockingbatcher_test.go @@ -18,7 +18,7 @@ var log = logger.NewDefault(context.Background()) // TestBlockingBatcherAddReturnValue verifies that the error returned by // persistRecordBatch() is returned all the way back up to callers of -// batcher.Add(). +// batcher.AddRecord(). func TestBlockingBatcherAddReturnValue(t *testing.T) { var ( ctx context.Context @@ -43,7 +43,7 @@ func TestBlockingBatcherAddReturnValue(t *testing.T) { "no error": {expected: nil}, } - batcher := recordbatch.NewBlockingBatcherWithContextFactory(log, persistRecordBatch, contextFactory) + batcher := recordbatch.NewBlockingBatcherWithConfig(log, 1024, persistRecordBatch, contextFactory) for name, test := range tests { ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond) @@ -61,7 +61,7 @@ func TestBlockingBatcherAddReturnValue(t *testing.T) { } } -// TestBlockingBatcherAddBlocks verifies that calls to Add() block until +// TestBlockingBatcherAddBlocks verifies that calls to AddRecord() block until // persistRecordBatch has returned. This ensures that data has been persisted // before giving control back to the caller. func TestBlockingBatcherAddBlocks(t *testing.T) { @@ -78,9 +78,9 @@ func TestBlockingBatcherAddBlocks(t *testing.T) { return returnedErr } - batcher := recordbatch.NewBlockingBatcherWithContextFactory(log, persistRecordBatch, contextFactory) + batcher := recordbatch.NewBlockingBatcherWithConfig(log, 1024, persistRecordBatch, contextFactory) - const numRecordBatches = 25 + const numRecordBatches = 5 wg := sync.WaitGroup{} wg.Add(numRecordBatches) @@ -101,24 +101,77 @@ func TestBlockingBatcherAddBlocks(t *testing.T) { }() } - // wait for all above go-routines to be scheduled and block on Add() - time.Sleep(1 * time.Millisecond) + // wait for all above go-routines to be scheduled and block on AddRecord() + time.Sleep(5 * time.Millisecond) // expire ctx to make Batcher persist data (call persistRecordBatch()) cancel() - // wait a long time before verifying that none of the Add() callers have returned + // wait a long time before verifying that none of the AddRecord() callers have returned time.Sleep(10 * time.Millisecond) require.False(t, addReturned.Load()) // allow persistRecordBatch to return close(blockPersistRecordBatch) - // wait for persistRecordBatch() return value to propagate to Add() callers + // wait for persistRecordBatch() return value to propagate to AddRecord() callers + time.Sleep(1 * time.Millisecond) + + require.True(t, addReturned.Load()) + + // ensure that all AddRecord()ers return + wg.Wait() +} + +// TestBlockingBatcherSoftMax verifies that calls to AddRecord() will block +// until the configured soft max bytes limit is hit, after which it unblocks and +// persists all waiting records. +func TestBlockingBatcherSoftMax(t *testing.T) { + ctx := context.Background() + + contextFactory := func() context.Context { + return ctx + } + + persistRecordBatch := func(rb recordbatch.RecordBatch) error { + return nil + } + + const bytesSoftMax = 10 + + batcher := recordbatch.NewBlockingBatcherWithConfig(log, bytesSoftMax, persistRecordBatch, contextFactory) + addReturned := atomic.Bool{} + + wg := &sync.WaitGroup{} + wg.Add(bytesSoftMax - 1) + + // add too few bytes to trigger soft max + for range bytesSoftMax - 1 { + go func() { + defer wg.Done() + + err := batcher.AddRecord([]byte("1")) + require.NoError(t, err) + + addReturned.Store(true) + }() + } + + // wait for all above go-routines to be scheduled and block on AddRecord() + // and ensure that none of the AddRecord() callers have returned + time.Sleep(5 * time.Millisecond) + + require.False(t, addReturned.Load()) + + // add a record hitting the soft max, expecting it to be persisted + err := batcher.AddRecord([]byte("1")) + require.NoError(t, err) + + // wait for persistRecordBatch() return value to propagate to AddRecord() callers time.Sleep(1 * time.Millisecond) require.True(t, addReturned.Load()) - // ensure that all Add()ers return + // ensure that all AddRecord()ers return wg.Wait() } diff --git a/internal/storage/topicstorage.go b/internal/storage/topicstorage.go index df616db..0f37169 100644 --- a/internal/storage/topicstorage.go +++ b/internal/storage/topicstorage.go @@ -210,12 +210,6 @@ func (s *TopicStorage) recordBatchPath(recordBatchID uint64) string { return RecordBatchPath(s.topicPath, recordBatchID) } -// func readRecordBatchHeader(backingStorage BackingStorage, topicPath string, recordBatchID uint64) (recordbatch.Header, error) { -// rbPath := RecordBatchPath(topicPath, recordBatchID) - -// return rb.Header, nil -// } - const recordBatchExtension = ".record_batch" func listRecordBatchIDs(backingStorage BackingStorage, topicPath string) ([]uint64, error) {