Skip to content

Commit

Permalink
fix: async batch write mode to use server context vs caller context (#11
Browse files Browse the repository at this point in the history
)
  • Loading branch information
eaddingtonwhite authored Jul 2, 2024
1 parent 027861a commit 9278424
Showing 1 changed file with 65 additions and 12 deletions.
77 changes: 65 additions & 12 deletions caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ const (
)

type cachingMiddleware struct {
cacheName string
momentoClient momento.CacheClient
writebackType WritebackType
cacheName string
momentoClient momento.CacheClient
writebackType WritebackType
asyncWriteChan chan *momento.SetBatchRequest
}

type MiddlewareProps struct {
Expand Down Expand Up @@ -59,6 +60,9 @@ func AttachNewCachingMiddleware(props MiddlewareProps) {
}

func NewCachingMiddleware(mw *cachingMiddleware) middleware.InitializeMiddleware {
if mw.writebackType == ASYNCHRONOUS {
mw.startAsyncBatchWriter()
}
return middleware.InitializeMiddlewareFunc("CachingMiddleware", func(
ctx context.Context, in middleware.InitializeInput, next middleware.InitializeHandler,
) (out middleware.InitializeOutput, metadata middleware.Metadata, err error) {
Expand Down Expand Up @@ -90,6 +94,46 @@ func NewCachingMiddleware(mw *cachingMiddleware) middleware.InitializeMiddleware
})
}

func (d *cachingMiddleware) startAsyncBatchWriter() {
const maxBufferSize = 100 // Should we make configurable or larger/smaller?
batchWriteChan := make(chan *momento.SetBatchRequest, maxBufferSize)

d.asyncWriteChan = batchWriteChan

serverContext := context.Background()
go func() { // TODO we could spawn multiple writers but for now always just have single writer
// Loop to read from the channel until the server context is done
for {
select {
case batchSetRequest, ok := <-batchWriteChan:
if !ok {
d.momentoClient.Logger().Info("async batch write channel closed")
return
}
_, err := d.momentoClient.SetBatch(serverContext, batchSetRequest)
if err != nil {
d.momentoClient.Logger().Warn(
fmt.Sprintf("error storing item batch in cache err=%+v", err),
)
// TODO handle limit errors and back off reading items from write chan and possibly retry .
//var apiErr momento.MomentoError
//if errors.As(err, &apiErr) {
// switch apiErr.Code() {
// case momento.LimitExceededError:
// // Handle limit error back off.
// }
//}
}
d.momentoClient.Logger().Debug(fmt.Sprintf("stored dynamodb items in cache %s", d.cacheName))
case <-serverContext.Done():
d.momentoClient.Logger().Info("async batch write channel closed")
return
}
}
}()

}

func (d *cachingMiddleware) handleBatchGetItemCommand(ctx context.Context, input *dynamodb.BatchGetItemInput, in middleware.InitializeInput, next middleware.InitializeHandler) (middleware.InitializeOutput, error) {
if len(input.RequestItems) > 100 {
return middleware.InitializeOutput{}, errors.New("request items exceeded maximum of 100")
Expand Down Expand Up @@ -242,7 +286,7 @@ func (d *cachingMiddleware) handleBatchGetItemCommand(ctx context.Context, input
if d.writebackType == SYNCHRONOUS {
d.writeBatchResultsToCache(ctx, o, tableToDdbKeys)
} else if d.writebackType == ASYNCHRONOUS {
go d.writeBatchResultsToCache(ctx, o, tableToDdbKeys)
d.writeBatchResultsToAsyncChannel(o, tableToDdbKeys)
}

}
Expand Down Expand Up @@ -334,6 +378,22 @@ func (d *cachingMiddleware) writeResultToCache(ctx context.Context, ddbOutput *d
}

func (d *cachingMiddleware) writeBatchResultsToCache(ctx context.Context, ddbOutput *dynamodb.BatchGetItemOutput, tableToDdbKeys map[string][]string) {
d.prepareMomentoBatchGetRequest(ddbOutput, tableToDdbKeys)
// set item batch in Momento cache
_, err := d.momentoClient.SetBatch(ctx, d.prepareMomentoBatchGetRequest(ddbOutput, tableToDdbKeys))
if err != nil {
d.momentoClient.Logger().Warn(
fmt.Sprintf("error storing item batch in cache err=%+v", err),
)
}
d.momentoClient.Logger().Debug(fmt.Sprintf("stored dynamodb items in cache %s", d.cacheName))
}

func (d *cachingMiddleware) writeBatchResultsToAsyncChannel(ddbOutput *dynamodb.BatchGetItemOutput, tableToDdbKeys map[string][]string) {
d.asyncWriteChan <- d.prepareMomentoBatchGetRequest(ddbOutput, tableToDdbKeys)
}

func (d *cachingMiddleware) prepareMomentoBatchGetRequest(ddbOutput *dynamodb.BatchGetItemOutput, tableToDdbKeys map[string][]string) *momento.SetBatchRequest {
d.momentoClient.Logger().Debug("storing dynamodb items in cache")
var itemsToSet []momento.BatchSetItem
// compute and gather keys and JSON encoded items to store in Momento cache
Expand Down Expand Up @@ -363,17 +423,10 @@ func (d *cachingMiddleware) writeBatchResultsToCache(ctx context.Context, ddbOut
})
}
}
// set item batch in Momento cache
_, err := d.momentoClient.SetBatch(ctx, &momento.SetBatchRequest{
return &momento.SetBatchRequest{
CacheName: d.cacheName,
Items: itemsToSet,
})
if err != nil {
d.momentoClient.Logger().Warn(
fmt.Sprintf("error storing item batch in cache err=%+v", err),
)
}
d.momentoClient.Logger().Debug(fmt.Sprintf("stored dynamodb items in cache %s", d.cacheName))
}

func ComputeCacheKey(tableName string, keys map[string]types.AttributeValue) (string, error) {
Expand Down

0 comments on commit 9278424

Please sign in to comment.