diff --git a/internal/storage/factories.go b/internal/storage/factories.go index 84bbb59..1b0e9bb 100644 --- a/internal/storage/factories.go +++ b/internal/storage/factories.go @@ -32,15 +32,17 @@ func NewTopicFactory(ts topic.Storage, cache *cache.Cache) TopicFactory { type BatcherFactory func(logger.Logger, *topic.Topic) RecordBatcher func NewBlockingBatcherFactory(blockTime time.Duration, batchBytesMax int) BatcherFactory { - return func(log logger.Logger, ts *topic.Topic) RecordBatcher { - batchLogger := log.Name("blocking batcher") + return func(log logger.Logger, t *topic.Topic) RecordBatcher { + log = log.Name("blocking batcher") - return NewBlockingBatcher(batchLogger, blockTime, batchBytesMax, func(b recordbatch.RecordBatch) ([]uint64, error) { + persist := func(b recordbatch.RecordBatch) ([]uint64, error) { t0 := time.Now() - offsets, err := ts.AddRecordBatch(b) - batchLogger.Infof("persisting to storage: %v", time.Since(t0)) + offsets, err := t.AddRecordBatch(b) + log.Infof("persisting to storage: %v", time.Since(t0)) return offsets, err - }) + } + + return NewBlockingBatcher(log, blockTime, batchBytesMax, persist) } }