Skip to content

Commit

Permalink
storage/DiskCache: add Size()
Browse files Browse the repository at this point in the history
Size() returns the number of bytes currently stored in the cache.
  • Loading branch information
micvbang committed Apr 9, 2024
1 parent 610d554 commit 567bb44
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 53 deletions.
7 changes: 6 additions & 1 deletion cmd/seb-api/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ func makeBlockingS3Storage(log logger.Logger, sleepTime time.Duration, s3BucketN
return nil, fmt.Errorf("creating s3 session: %s", err)
}

diskCache, err := storage.NewDiskCache(log.Name("disk cache"), "/tmp/seb-cache")
if err != nil {
return nil, fmt.Errorf("creating disk cache: %w", err)
}

s3TopicStorage := func(log logger.Logger, topicName string) (*storage.TopicStorage, error) {
return storage.NewS3TopicStorage(log.Name("s3 storage"), storage.S3StorageInput{
S3: s3.New(session),
BucketName: s3BucketName,
RootDir: "/tmp/recordbatch",
TopicName: topicName,
Cache: storage.NewDiskCache(log.Name("disk cache"), "/tmp/seb-cache"),
Cache: diskCache,
})
}

Expand Down
101 changes: 92 additions & 9 deletions internal/storage/diskcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,119 @@ import (
"io"
"os"
"path"
"sync"
"time"

"github.com/micvbang/go-helpy/filepathy"
"github.com/micvbang/simple-event-broker/internal/infrastructure/logger"
)

type cacheItem struct {
size int64
createdAt time.Time
usedAt time.Time
}

// DiskCache is used to cache RecordBatches on the local disk.
type DiskCache struct {
log logger.Logger
rootdir string

mu sync.Mutex
contents map[string]cacheItem
}

func NewDiskCache(log logger.Logger, rootDir string) *DiskCache {
return &DiskCache{log: log, rootdir: rootDir}
func NewDiskCache(log logger.Logger, rootDir string) (*DiskCache, error) {
cacheContents := make(map[string]cacheItem, 64)

fileWalkConfig := filepathy.WalkConfig{
Files: true,
Recursive: true,
}
err := filepathy.Walk(rootDir, fileWalkConfig, func(path string, info os.FileInfo, err error) error {
cacheContents[path] = cacheItem{
size: info.Size(),
createdAt: info.ModTime(),
}
return nil
})
if err != nil {
return nil, fmt.Errorf("reading existing cache: %w", err)
}

return &DiskCache{log: log, rootdir: rootDir, contents: cacheContents}, nil
}

// TODO: somehow to track how much disk space is being used
// TODO: somehow to throw away "old" (no longer needed) data

func (c *DiskCache) Writer(recordBatchPath string) (io.WriteCloser, error) {
c.log.Debugf("adding '%s'", recordBatchPath)
log := c.log.WithField("recordBatchPath", recordBatchPath)

log.Debugf("adding '%s'", recordBatchPath)
tmpFile, err := os.CreateTemp("", "seb_*")
if err != nil {
return nil, fmt.Errorf("creating temp file: %w", err)
}

cachePath := c.cachePath(recordBatchPath)

return &cacheWriter{
tmpFile: tmpFile,
destPath: c.cachePath(recordBatchPath),
destPath: cachePath,
reportFileSize: func(size int64) {
log.Debugf("adding to contents")

c.mu.Lock()
defer c.mu.Unlock()

c.contents[cachePath] = cacheItem{
size: size,
createdAt: time.Now(),
}
},
}, nil
}

func (c *DiskCache) Size() int64 {
c.mu.Lock()
defer c.mu.Unlock()

size := int64(0)
for _, item := range c.contents {
size += item.size
}
return size
}

func (c *DiskCache) Reader(recordBatchPath string) (io.ReadSeekCloser, error) {
f, err := os.Open(c.cachePath(recordBatchPath))
log := c.log.WithField("recordBatchPath", recordBatchPath)

cachePath := c.cachePath(recordBatchPath)
f, err := os.Open(cachePath)
if err != nil {
return nil, errors.Join(ErrNotInCache, fmt.Errorf("opening record batch '%s': %w", recordBatchPath, err))
}

c.log.Debugf("hit for '%s'", recordBatchPath)
now := time.Now()
log.Debugf("setting used at: %v", now)

c.mu.Lock()
defer c.mu.Unlock()
item, ok := c.contents[recordBatchPath]
if !ok {
log.Debugf("not found in contents, adding")
fileInfo, err := os.Stat(cachePath)
if err == nil {
item = cacheItem{
size: fileInfo.Size(),
createdAt: fileInfo.ModTime(),
}
}
}
item.usedAt = now
c.contents[cachePath] = item

log.Debugf("hit for '%s'", recordBatchPath)
return f, nil
}

Expand All @@ -53,10 +129,15 @@ func (c *DiskCache) cachePath(recordBatchPath string) string {
type cacheWriter struct {
tmpFile *os.File
destPath string
size int64

reportFileSize func(int64)
}

func (cw cacheWriter) Write(bs []byte) (int, error) {
return cw.tmpFile.Write(bs)
func (cw *cacheWriter) Write(bs []byte) (int, error) {
n, err := cw.tmpFile.Write(bs)
cw.size += int64(n)
return n, err
}

func (cw cacheWriter) Close() error {
Expand All @@ -76,5 +157,7 @@ func (cw cacheWriter) Close() error {
return fmt.Errorf("moving %s to %s: %w", cw.tmpFile.Name(), cw.destPath, err)
}

cw.reportFileSize(cw.size)

return nil
}
131 changes: 108 additions & 23 deletions internal/storage/diskcache_test.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
package storage

import (
"crypto/rand"
"fmt"
"io"
"os"
"path"
"testing"

"github.com/micvbang/go-helpy/inty"
"github.com/micvbang/simple-event-broker/internal/tester"
"github.com/stretchr/testify/require"
)

// TestCacheWriterWritesToDisk verifies that the io.WriteCloser returned by
// Writer() writes the expected bytes to the cache file, and does not write the
// file to the cache destination until Close() is called.
func TestCacheWriterWritesToDisk(t *testing.T) {
tempDir, err := os.MkdirTemp("", "seb_*")
require.NoError(t, err)
tempDir := tester.TempDir(t)

const cachePath = "/some/topic/name/123"
expectedCachedFile := path.Join(tempDir, cachePath)

expectedBytes := make([]byte, 4096)
_, err = rand.Read(expectedBytes)
require.NoError(t, err)
expectedBytes := tester.RandomBytes(t, 4096)

c := NewDiskCache(log, tempDir)
c, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

// Act
f, err := c.Writer(cachePath)
Expand Down Expand Up @@ -55,26 +55,18 @@ func TestCacheWriterWritesToDisk(t *testing.T) {
// TestCacheReaderReadsFromDisk verifies that the io.ReadSeekCloser returned by
// Reader() reads the correct file and returns the expected bytes.
func TestCacheReaderReadsFromDisk(t *testing.T) {
tempDir, err := os.MkdirTemp("", "seb_*")
require.NoError(t, err)

tempDir := tester.TempDir(t)
const cachePath = "/some/topic/name/123"

expectedBytes := make([]byte, 4096)
_, err = rand.Read(expectedBytes)
require.NoError(t, err)
expectedBytes := tester.RandomBytes(t, 4096)

c := NewDiskCache(log, tempDir)

f, err := c.Writer(cachePath)
c, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

n, err := f.Write(expectedBytes)
f, err := c.Writer(cachePath)
require.NoError(t, err)
require.Equal(t, len(expectedBytes), n)

err = f.Close()
require.NoError(t, err)
tester.WriteAndClose(t, f, expectedBytes)

// Act
reader, err := c.Reader(cachePath)
Expand All @@ -89,12 +81,105 @@ func TestCacheReaderReadsFromDisk(t *testing.T) {
// TestCacheReaderFileNotCached verifies that Reader() returns ErrNotInCache
// when attempting to read a file from cache that does not exist.
func TestCacheReaderFileNotCached(t *testing.T) {
tempDir, err := os.MkdirTemp("", "seb_*")
require.NoError(t, err)
tempDir := tester.TempDir(t)

c := NewDiskCache(log, tempDir)
c, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

// Act, assert
_, err = c.Reader("non/existing/path")
require.ErrorIs(t, err, ErrNotInCache)
}

// TestCacheSize verifies that Size() returns the expected number of bytes.
func TestCacheSize(t *testing.T) {
tempDir := tester.TempDir(t)
c, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

expectedSize := 0
for i := 0; i < 10; i++ {
f, err := c.Writer(fmt.Sprintf("/some/name/%d", i))
require.NoError(t, err)

bs := tester.RandomBytes(t, 128+inty.RandomN(256))
tester.WriteAndClose(t, f, bs)

expectedSize += len(bs)
}

require.Equal(t, int64(expectedSize), c.Size())
}

// TestCacheSizeWithExistingFiles verifies that NewDiskCache initializes the
// cache with data already on disk, such that calls to Size() returns the
// correct number of bytes currently on disk for the given root dir.
// Additionally, it's also verified that Size() still reports the correct number
// of bytes after adding _more_ items to a pre-initialized cache.
func TestCacheSizeWithExistingFiles(t *testing.T) {
tempDir := tester.TempDir(t)

expectedSize := 0
{
c1, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

for i := 0; i < 10; i++ {
f, err := c1.Writer(fmt.Sprintf("/some/name/%d", i))
require.NoError(t, err)

bs := tester.RandomBytes(t, 128+inty.RandomN(256))
tester.WriteAndClose(t, f, bs)

expectedSize += len(bs)
}

require.Equal(t, int64(expectedSize), c1.Size())
}

// Act
c2, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

// Assert
require.Equal(t, int64(expectedSize), c2.Size())

// Act
// add more items to the cache
for i := 0; i < 10; i++ {
f, err := c2.Writer(fmt.Sprintf("/some/other/name/%d", i))
require.NoError(t, err)

bs := tester.RandomBytes(t, 128+inty.RandomN(256))
tester.WriteAndClose(t, f, bs)

expectedSize += len(bs)
}

// Assert
require.Equal(t, int64(expectedSize), c2.Size())
}

// TestCacheSizeOverwriteItem verifies that Size() returns the correct number of
// bytes when items in the cache are overwritten.
func TestCacheSizeOverwriteItem(t *testing.T) {
tempDir := tester.TempDir(t)
c, err := NewDiskCache(log, tempDir)
require.NoError(t, err)

itemsToCache := [][]byte{
tester.RandomBytes(t, 256), // put item in cache
tester.RandomBytes(t, 128), // smaller item
tester.RandomBytes(t, 512), // larger item
}

for _, item := range itemsToCache {
// Act
f, err := c.Writer("overwritten-item")
require.NoError(t, err)
tester.WriteAndClose(t, f, item)

// Assert
require.Equal(t, int64(len(item)), c.Size())
}
}
Loading

0 comments on commit 567bb44

Please sign in to comment.