Skip to content

Commit

Permalink
storage/factories: simplify NewBlockingBatcherFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed May 24, 2024
1 parent e05d8a7 commit bd6127f
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions internal/storage/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ func NewTopicFactory(ts topic.Storage, cache *cache.Cache) TopicFactory {
type BatcherFactory func(logger.Logger, *topic.Topic) RecordBatcher

func NewBlockingBatcherFactory(blockTime time.Duration, batchBytesMax int) BatcherFactory {
return func(log logger.Logger, ts *topic.Topic) RecordBatcher {
batchLogger := log.Name("blocking batcher")
return func(log logger.Logger, t *topic.Topic) RecordBatcher {
log = log.Name("blocking batcher")

return NewBlockingBatcher(batchLogger, blockTime, batchBytesMax, func(b recordbatch.RecordBatch) ([]uint64, error) {
persist := func(b recordbatch.RecordBatch) ([]uint64, error) {
t0 := time.Now()
offsets, err := ts.AddRecordBatch(b)
batchLogger.Infof("persisting to storage: %v", time.Since(t0))
offsets, err := t.AddRecordBatch(b)
log.Infof("persisting to storage: %v", time.Since(t0))
return offsets, err
})
}

return NewBlockingBatcher(log, blockTime, batchBytesMax, persist)
}
}

Expand Down

0 comments on commit bd6127f

Please sign in to comment.