Skip to content

Commit

Permalink
topic,cache: simplify construction of new instances
Browse files Browse the repository at this point in the history
  • Loading branch information
micvbang committed May 24, 2024
1 parent 02715d7 commit 645a02d
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 79 deletions.
37 changes: 4 additions & 33 deletions cmd/seb-api/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
"github.com/micvbang/simple-event-broker/internal/sebhttp"
"github.com/micvbang/simple-event-broker/internal/storage"
"github.com/micvbang/simple-event-broker/internal/topic"
)

func Run() {
Expand All @@ -26,19 +25,14 @@ func Run() {
log := logger.NewWithLevel(ctx, logger.LogLevel(flags.logLevel))
log.Debugf("flags: %v", flags)

cacheStorage, err := cache.NewDiskStorage(log.Name("disk cache"), flags.cacheDir)
if err != nil {
log.Fatalf("creating cache storage: %w", err)
}

cache, err := cache.New(log, cacheStorage)
c, err := cache.NewDiskCache(log, flags.cacheDir)
if err != nil {
log.Fatalf("creating disk cache: %w", err)
}

go cacheEviction(log.Name("cache eviction"), cache, flags.cacheMaxBytes, flags.cacheEvictionInterval)
go cache.EvictionLoop(ctx, log.Name("cache eviction"), c, flags.cacheMaxBytes, flags.cacheEvictionInterval)

blockingS3Storage, err := makeBlockingS3Storage(log, cache, flags.recordBatchSoftMaxBytes, flags.recordBatchBlockTime, flags.s3BucketName)
blockingS3Storage, err := makeBlockingS3Storage(log, c, flags.recordBatchSoftMaxBytes, flags.recordBatchBlockTime, flags.s3BucketName)
if err != nil {
log.Fatalf("making blocking s3 storage: %s", err)
}
Expand All @@ -65,36 +59,13 @@ func Run() {
log.Errorf("main returned: %s", err)
}

func cacheEviction(log logger.Logger, cache *cache.Cache, cacheMaxBytes int64, interval time.Duration) {
log = log.
WithField("max bytes", cacheMaxBytes).
WithField("interval", interval)

for {
cacheSize := cache.Size()

if cacheSize > cacheMaxBytes {
fillLevel := float32(cacheSize) / float32(cacheMaxBytes) * 100

log.Infof("cache full (%.2f%%, %d/%d bytes), evicting items", fillLevel, cacheSize, cacheMaxBytes)
err := cache.EvictLeastRecentlyUsed(cacheMaxBytes)
if err != nil {
log.Errorf("failed to evict cache: %s", err)
}
}

log.Debugf("sleeping")
time.Sleep(interval)
}
}

func makeBlockingS3Storage(log logger.Logger, cache *cache.Cache, bytesSoftMax int, blockTime time.Duration, s3BucketName string) (*storage.Storage, error) {
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
return nil, fmt.Errorf("creating s3 session: %s", err)
}

s3TopicFactory := storage.NewS3TopicFactory(cfg, s3BucketName, cache, &topic.Gzip{})
s3TopicFactory := storage.NewS3TopicFactory(cfg, s3BucketName, cache)
blockingBatcherFactory := storage.NewBlockingBatcherFactory(blockTime, bytesSoftMax)

storage := storage.New(
Expand Down
5 changes: 2 additions & 3 deletions cmd/seb-dump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ func main() {
topicName := filepath.Base(absInputPath)
fmt.Printf("Dumping records [%d; %d] from topic '%s'\n", flags.startFromOffset, flags.startFromOffset+flags.numRecords-1, topicName)

cacheStorage := cache.NewMemoryStorage(log.Name("disk cache"))
cache, err := cache.New(log, cacheStorage)
cache, err := cache.NewMemoryCache(log)
if err != nil {
log.Fatalf("creating disk cache: %w", err)
}

diskTopicStorage := topic.NewDiskStorage(log, rootDir)

topic, err := topic.New(log, diskTopicStorage, topicName, cache, topic.Gzip{})
topic, err := topic.New(log, diskTopicStorage, topicName, cache)
if err != nil {
log.Fatalf("failed to initialized disk storage: %s", err)
}
Expand Down
17 changes: 17 additions & 0 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ type Cache struct {
cacheItems map[string]CacheItem
}

// NewDiskCache returns a new Cache with DiskStorage.
func NewDiskCache(log logger.Logger, rootDir string) (*Cache, error) {
diskStorage, err := NewDiskStorage(log.Name("disk storage"), rootDir)
if err != nil {
return nil, fmt.Errorf("creating disk storage: %w", err)
}

return New(log, diskStorage)
}

// NewMemoryCache returns a new Cache with MemoryStorage.
func NewMemoryCache(log logger.Logger) (*Cache, error) {
memoryStorage := NewMemoryStorage(log.Name("memory cache"))

return New(log, memoryStorage)
}

func New(log logger.Logger, cacheStorage Storage) (*Cache, error) {
return NewCacheWithNow(log, cacheStorage, time.Now)
}
Expand Down
37 changes: 37 additions & 0 deletions internal/cache/eviction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cache

import (
"context"
"fmt"
"time"

"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
)

func EvictionLoop(ctx context.Context, log logger.Logger, cache *Cache, cacheMaxBytes int64, interval time.Duration) error {
log = log.
WithField("max bytes", cacheMaxBytes).
WithField("interval", interval)

ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}

cacheSize := cache.Size()
if cacheSize <= cacheMaxBytes {
continue
}

fillLevel := float32(cacheSize) / float32(cacheMaxBytes) * 100
log.Infof("cache full (%.2f%%, %d/%d bytes), evicting items", fillLevel, cacheSize, cacheMaxBytes)

err := cache.EvictLeastRecentlyUsed(cacheMaxBytes)
if err != nil {
return fmt.Errorf("evicting cache: %w", err)
}
}
}
2 changes: 1 addition & 1 deletion internal/infrastructure/tester/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func httpServer(t *testing.T, config httpServerConfig) *HTTPTestServer {

topicFactory := func(log logger.Logger, topicName string) (*topic.Topic, error) {
memoryTopicStorage := topic.NewMemoryStorage(log)
return topic.New(log, memoryTopicStorage, topicName, cache, nil)
return topic.New(log, memoryTopicStorage, topicName, cache, topic.WithCompress(nil))
}

storage := storage.New(
Expand Down
2 changes: 1 addition & 1 deletion internal/infrastructure/tester/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestStorage(t *testing.T, autoCreateTopic bool, f func(*testing.T, *storage
s := storage.New(log,
func(log logger.Logger, topicName string) (*topic.Topic, error) {
bs := backingStorageFactory(t)
return topic.New(log, bs, topicName, cache, &topic.Gzip{})
return topic.New(log, bs, topicName, cache)
},
storage.WithNullBatcher(),
storage.WithAutoCreateTopic(autoCreateTopic),
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/blockingbatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestBlockingBatcherSoftMax(t *testing.T) {
// and AddRecord() block and returns the correct offsets to all callers.
func TestBlockingBatcherConcurrency(t *testing.T) {
tester.TestTopicStorageAndCache(t, func(t *testing.T, s topic.Storage, c *cache.Cache) {
topic, err := topic.New(log, s, "topicName", c, nil)
topic, err := topic.New(log, s, "topicName", c, topic.WithCompress(nil))
require.NoError(t, err)

batcher := storage.NewBlockingBatcher(log, 5*time.Millisecond, 32*sizey.KB, topic.AddRecordBatch)
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@ import (

type TopicFactory func(_ logger.Logger, topicName string) (*topic.Topic, error)

func NewS3TopicFactory(cfg aws.Config, s3BucketName string, cache *cache.Cache, compress topic.Compress) TopicFactory {
func NewS3TopicFactory(cfg aws.Config, s3BucketName string, cache *cache.Cache) TopicFactory {
return func(log logger.Logger, topicName string) (*topic.Topic, error) {
storageLogger := log.Name("s3 storage").WithField("topic-name", topicName).WithField("bucket", s3BucketName)

s3Client := s3.NewFromConfig(cfg)
s3Storage := topic.NewS3Storage(storageLogger, s3Client, s3BucketName, "")
return topic.New(log, s3Storage, topicName, cache, compress)
return topic.New(log, s3Storage, topicName, cache)
}
}

func NewTopicFactory(ts topic.Storage, cache *cache.Cache) TopicFactory {
return func(log logger.Logger, topicName string) (*topic.Topic, error) {
return topic.New(log, ts, topicName, cache, &topic.Gzip{})
return topic.New(log, ts, topicName, cache)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/storage/nullbatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// AddRecord() block and returns the correct offsets to all callers.
func TestNullBatcherConcurrency(t *testing.T) {
tester.TestTopicStorageAndCache(t, func(t *testing.T, s topic.Storage, c *cache.Cache) {
topic, err := topic.New(log, s, "topicName", c, nil)
topic, err := topic.New(log, s, "topicName", c, topic.WithCompress(nil))
require.NoError(t, err)

batcher := storage.NewNullBatcher(topic.AddRecordBatch)
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func TestCreateTopicAlreadyExistsInStorage(t *testing.T) {
{
s1 := storage.New(log,
func(log logger.Logger, topicName string) (*topic.Topic, error) {
return topic.New(log, bs, topicName, cache, &topic.Gzip{})
return topic.New(log, bs, topicName, cache)
},
storage.WithNullBatcher(),
storage.WithAutoCreateTopic(false),
Expand All @@ -273,7 +273,7 @@ func TestCreateTopicAlreadyExistsInStorage(t *testing.T) {
{
s2 := storage.New(log,
func(log logger.Logger, topicName string) (*topic.Topic, error) {
return topic.New(log, bs, topicName, cache, &topic.Gzip{})
return topic.New(log, bs, topicName, cache)
},
storage.WithNullBatcher(),
storage.WithAutoCreateTopic(false),
Expand Down
35 changes: 24 additions & 11 deletions internal/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,20 @@ type Topic struct {

backingStorage Storage
cache *cache.Cache
compress Compress
compression Compress
OffsetCond *OffsetCond
}

func New(log logger.Logger, backingStorage Storage, topicName string, cache *cache.Cache, compress Compress) (*Topic, error) {
if cache == nil {
return nil, fmt.Errorf("cache required")
type Opts struct {
Compression Compress
}

func New(log logger.Logger, backingStorage Storage, topicName string, cache *cache.Cache, optFuncs ...func(*Opts)) (*Topic, error) {
opts := Opts{
Compression: Gzip{},
}
for _, optFunc := range optFuncs {
optFunc(&opts)
}

recordBatchOffsets, err := listRecordBatchOffsets(backingStorage, topicName)
Expand All @@ -64,7 +71,7 @@ func New(log logger.Logger, backingStorage Storage, topicName string, cache *cac
topicName: topicName,
recordBatchOffsets: recordBatchOffsets,
cache: cache,
compress: compress,
compression: opts.Compression,
OffsetCond: NewOffsetCond(0),
}

Expand Down Expand Up @@ -99,8 +106,8 @@ func (s *Topic) AddRecordBatch(recordBatch recordbatch.RecordBatch) ([]uint64, e
}

w := backingWriter
if s.compress != nil {
w, err = s.compress.NewWriter(backingWriter)
if s.compression != nil {
w, err = s.compression.NewWriter(backingWriter)
if err != nil {
return nil, fmt.Errorf("creating compression writer: %w", err)
}
Expand All @@ -112,7 +119,7 @@ func (s *Topic) AddRecordBatch(recordBatch recordbatch.RecordBatch) ([]uint64, e
return nil, fmt.Errorf("writing record batch: %w", err)
}

if s.compress != nil {
if s.compression != nil {
w.Close()
}
// once Close() returns, the data has been committed and can be retrieved by
Expand Down Expand Up @@ -343,8 +350,8 @@ func (s *Topic) parseRecordBatch(recordBatchID uint64) (*recordbatch.Parser, err
}

r := backingReader
if s.compress != nil {
r, err = s.compress.NewReader(backingReader)
if s.compression != nil {
r, err = s.compression.NewReader(backingReader)
if err != nil {
return nil, fmt.Errorf("creating compression reader: %w", err)
}
Expand All @@ -360,7 +367,7 @@ func (s *Topic) parseRecordBatch(recordBatchID uint64) (*recordbatch.Parser, err
return nil, fmt.Errorf("copying backing storage result to cache: %w", err)
}

if s.compress != nil {
if s.compression != nil {
r.Close()
}

Expand Down Expand Up @@ -437,3 +444,9 @@ func listRecordBatchOffsets(backingStorage Storage, topicName string) ([]uint64,
func RecordBatchKey(topicName string, recordBatchID uint64) string {
return filepath.Join(topicName, fmt.Sprintf("%012d%s", recordBatchID, recordBatchExtension))
}

func WithCompress(c Compress) func(*Opts) {
return func(o *Opts) {
o.Compression = c
}
}
Loading

0 comments on commit 645a02d

Please sign in to comment.