Skip to content

Commit

Permalink
[Encoding ]Request with pagination (#157)
Browse files Browse the repository at this point in the history
Co-authored-by: Siddharth More <Siddhi More>
  • Loading branch information
siddimore authored Jan 12, 2024
1 parent 4586a6d commit 1ffc11e
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 49 deletions.
29 changes: 23 additions & 6 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,26 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str

// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
// When limit is is 0, all items are returned
func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) {
queryInput := &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Limit: &limit,
var queryInput *dynamodb.QueryInput

// Fetch all items if limit is 0
if limit > 0 {
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Limit: &limit,
}
} else {
queryInput = &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
}
}

// If a pagination token was provided, set it as the ExclusiveStartKey
Expand All @@ -187,6 +200,10 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string,
return QueryResult{}, err
}

if len(response.Items) == 0 {
return QueryResult{Items: nil, LastEvaluatedKey: nil}, nil
}

// Return the items and the pagination token
return QueryResult{
Items: response.Items,
Expand Down
62 changes: 61 additions & 1 deletion common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,67 @@ func TestQueryIndexPaginationSingleItem(t *testing.T) {
Value: "0",
}}, 1, lastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 0)
assert.Nil(t, queryResult.Items)
assert.Nil(t, queryResult.LastEvaluatedKey)
}

func TestQueryIndexPaginationItemNoLimit(t *testing.T) {
tableName := "ProcessingWithNoPaginationLimit"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)},
}
err := dynamoClient.PutItem(ctx, tableName, item)
assert.NoError(t, err)
}

queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 0, nil)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 30)
assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Nil(t, queryResult.LastEvaluatedKey)

// Save Last Evaluated Key
lastEvaluatedKey := queryResult.LastEvaluatedKey

// Get the next item using LastEvaluatedKey expect to be nil
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 2, lastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 2)
assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.NotNil(t, queryResult.LastEvaluatedKey)
}

func TestQueryIndexPaginationNoStoredItems(t *testing.T) {
tableName := "ProcessingWithPaginationNoItem"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 1, nil)
assert.NoError(t, err)
assert.Nil(t, queryResult.Items)
assert.Nil(t, queryResult.LastEvaluatedKey)
}

Expand Down
12 changes: 7 additions & 5 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type Config struct {
BatchSizeMBLimit uint
MaxNumRetriesPerBlob uint

TargetNumChunks uint
TargetNumChunks uint
MaxBlobsToFetchFromStore int
}

type Batcher struct {
Expand Down Expand Up @@ -100,10 +101,11 @@ func NewBatcher(
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
streamerConfig := StreamerConfig{
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
Expand Down
13 changes: 12 additions & 1 deletion disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type StreamerConfig struct {

// TargetNumChunks is the target number of chunks per encoded blob
TargetNumChunks uint

// Maximum number of Blobs to fetch from store
MaxBlobsToFetchFromStore int
}

type EncodingStreamer struct {
Expand All @@ -62,6 +65,9 @@ type EncodingStreamer struct {

metrics *EncodingStreamerMetrics
logger common.Logger

// Used to keep track of the last evaluated key for fetching metadatas
exclusiveStartKey *disperser.BlobStoreExclusiveStartKey
}

type batch struct {
Expand Down Expand Up @@ -107,6 +113,7 @@ func NewEncodingStreamer(
encodingCtxCancelFuncs: make([]context.CancelFunc, 0),
metrics: metrics,
logger: logger,
exclusiveStartKey: nil,
}, nil
}

Expand Down Expand Up @@ -175,7 +182,11 @@ func (e *EncodingStreamer) dedupRequests(metadatas []*disperser.BlobMetadata, re
func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan EncodingResultOrStatus) error {
stageTimer := time.Now()
// pull new blobs and send to encoder
metadatas, err := e.blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
e.mu.Lock()
metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey)
e.exclusiveStartKey = newExclusiveStartKey
e.mu.Unlock()

if err != nil {
return fmt.Errorf("error getting blob metadatas: %w", err)
}
Expand Down
21 changes: 12 additions & 9 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (

var (
streamerConfig = batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
)

Expand Down Expand Up @@ -294,9 +295,10 @@ func TestEncodingFailure(t *testing.T) {
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 1e12)
workerpool := workerpool.New(5)
streamerConfig := batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
metrics := batcher.NewMetrics("9100", logger)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger)
Expand Down Expand Up @@ -467,9 +469,10 @@ func TestIncorrectParameters(t *testing.T) {
ctx := context.Background()

streamerConfig := batcher.StreamerConfig{
SRSOrder: 3000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 3000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}

encodingStreamer, c := createEncodingStreamer(t, 0, 1e12, streamerConfig)
Expand Down
2 changes: 2 additions & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Config struct {
}

func NewConfig(ctx *cli.Context) Config {

config := Config{
BlobstoreConfig: blobstore.Config{
BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name),
Expand All @@ -51,6 +52,7 @@ func NewConfig(ctx *cli.Context) Config {
SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name),
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
},
TimeoutConfig: batcher.TimeoutConfig{
EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "TARGET_NUM_CHUNKS"),
Value: 0,
}

MaxBlobsToFetchFromStoreFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"),
Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"),
Value: 100,
}
)

var requiredFlags = []cli.Flag{
Expand Down
53 changes: 50 additions & 3 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,33 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status

// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit
// along with items, also returns a pagination token that can be used to fetch the next set of items
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, map[string]types.AttributeValue, error) {
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) {

var attributeMap map[string]types.AttributeValue
var err error

// Convert the exclusive start key to a map of AttributeValue
if exclusiveStartKey != nil {
attributeMap, err = convertToAttribMap(exclusiveStartKey)
if err != nil {
return nil, nil, err
}
}

queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
}}, limit, exclusiveStartKey)
}}, limit, attributeMap)

if err != nil {
return nil, nil, err
}

// When no more results to fetch, the LastEvaluatedKey is nil
if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil {
return nil, nil, nil
}

metadata := make([]*disperser.BlobMetadata, len(queryResult.Items))
for i, item := range queryResult.Items {
metadata[i], err = UnmarshalBlobMetadata(item)
Expand All @@ -119,7 +137,13 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co
if lastEvaluatedKey == nil {
return metadata, nil, nil
}
return metadata, lastEvaluatedKey, nil

// Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey
exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey)
if err != nil {
return nil, nil, err
}
return metadata, exclusiveStartKey, nil
}

func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) {
Expand Down Expand Up @@ -377,3 +401,26 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*disperser.BlobMetadata, e

return &metadata, nil
}

func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) {
blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{}
err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey)
if err != nil {
return nil, err
}

return &blobStoreExclusiveStartKey, nil
}

func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) {
if blobStoreExclusiveStartKey == nil {
// Return an empty map or nil
return nil, nil
}

avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey)
if err != nil {
return nil, err
}
return avMap, nil
}
10 changes: 10 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) {
})
}

func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) {
ctx := context.Background()
// Query BlobMetadataStore for a blob that does not exist
// This should return nil for both the blob and lastEvaluatedKey
processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil)
assert.NoError(t, err)
assert.Nil(t, processing)
assert.Nil(t, lastEvaluatedKey)
}

func deleteItems(t *testing.T, keys []commondynamodb.Key) {
_, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys)
assert.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions disperser/common/blobstore/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (s *SharedBlobStore) GetBlobMetadataByStatus(ctx context.Context, blobStatu
return s.blobMetadataStore.GetBlobMetadataByStatus(ctx, blobStatus)
}

func (s *SharedBlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) {
return s.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, blobStatus, limit, exclusiveStartKey)
}

func (s *SharedBlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) {
return s.blobMetadataStore.GetBlobMetadataInBatch(ctx, batchHeaderHash, blobIndex)
}
Expand Down
Loading

0 comments on commit 1ffc11e

Please sign in to comment.