From 5dc0c312e5f5c6b6cf06441eefb0dc687d42b632 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Thu, 4 Jan 2024 21:58:01 -0800 Subject: [PATCH 01/26] Add pagination method to dynamodB --- common/aws/dynamodb/client.go | 33 ++++ common/aws/dynamodb/client_test.go | 184 +++++++++++++++++- .../common/blobstore/blob_metadata_store.go | 22 +++ 3 files changed, 235 insertions(+), 4 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 7b8be05a5..91468c4fb 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -37,6 +37,11 @@ type Item = map[string]types.AttributeValue type Key = map[string]types.AttributeValue type ExpresseionValues = map[string]types.AttributeValue +type QueryResult struct { + Items []map[string]types.AttributeValue + LastEvaluatedKey map[string]types.AttributeValue +} + type Client struct { dynamoClient *dynamodb.Client logger common.Logger @@ -161,6 +166,34 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str return response.Items, nil } +// QueryIndexWithPagination returns all items in the index that match the given key +// Results are limited to the given limit and the pagination token is returned +func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) { + queryInput := &dynamodb.QueryInput{ + TableName: aws.String(tableName), + IndexName: aws.String(indexName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + Limit: &limit, + } + + // If a pagination token was provided, set it as the ExclusiveStartKey + if exclusiveStartKey != nil { + queryInput.ExclusiveStartKey = exclusiveStartKey + } + + response, err := c.dynamoClient.Query(ctx, queryInput) + if err != nil { + return QueryResult{}, err + } + + // Return the items and the pagination token + return QueryResult{ + Items: response.Items, + LastEvaluatedKey: response.LastEvaluatedKey, + }, nil +} + func (c *Client) DeleteItem(ctx context.Context, tableName string, key Key) error { _, err := c.dynamoClient.DeleteItem(ctx, &dynamodb.DeleteItemInput{Key: key, TableName: aws.String(tableName)}) if err != nil { diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index f9e86a203..ebf05bddd 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "testing" + "time" commonaws "github.com/Layr-Labs/eigenda/common/aws" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" @@ -79,14 +80,47 @@ func teardown() { func createTable(t *testing.T, tableName string) { ctx := context.Background() tableDescription, err := test_utils.CreateTable(ctx, clientConfig, tableName, &dynamodb.CreateTableInput{ - AttributeDefinitions: []types.AttributeDefinition{{ - AttributeName: aws.String("MetadataKey"), - AttributeType: types.ScalarAttributeTypeS, - }}, + AttributeDefinitions: []types.AttributeDefinition{ + { + AttributeName: aws.String("MetadataKey"), + AttributeType: types.ScalarAttributeTypeS, + }, + { + AttributeName: aws.String("BlobStatus"), + AttributeType: types.ScalarAttributeTypeN, // Assuming BlobStatus is a numeric value + }, + { + AttributeName: aws.String("CreatedAt"), + AttributeType: types.ScalarAttributeTypeS, // Assuming CreatedAt is a string representing a timestamp + }, + }, KeySchema: []types.KeySchemaElement{{ AttributeName: aws.String("MetadataKey"), KeyType: types.KeyTypeHash, }}, + // Add a global secondary index and CreatedAt for sorting + GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ + { + IndexName: aws.String("StatusIndex"), + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("BlobStatus"), + KeyType: types.KeyTypeHash, + }, + { + AttributeName: aws.String("CreatedAt"), + KeyType: types.KeyTypeRange, // Using CreatedAt as sort key + }, + }, + Projection: &types.Projection{ + ProjectionType: types.ProjectionTypeAll, // You can choose ALL, KEYS_ONLY, or INCLUDE + }, + ProvisionedThroughput: &types.ProvisionedThroughput{ + ReadCapacityUnits: aws.Int64(10), + WriteCapacityUnits: aws.Int64(10), + }, + }, + }, TableName: aws.String(tableName), ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), @@ -234,3 +268,145 @@ func TestBatchOperations(t *testing.T) { assert.NoError(t, err) assert.Len(t, fetchedItem, 0) } + +func TestQueryIndex(t *testing.T) { + tableName := "ProcessingQueryIndex" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + numItems := 30 + items := make([]commondynamodb.Item, numItems) + for i := 0; i < numItems; i += 1 { + items[i] = commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "CreatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + } + } + unprocessed, err := dynamoClient.PutItems(ctx, tableName, items) + assert.NoError(t, err) + assert.Len(t, unprocessed, 0) + + queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}) + assert.NoError(t, err) + assert.Equal(t, len(queryResult), 30) +} + +func TestQueryIndexPagination(t *testing.T) { + tableName := "ProcessingWithPagination" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + numItems := 30 + for i := 0; i < numItems; i += 1 { + createdAt := time.Now().Add(-time.Duration(i) * time.Second).Format(time.RFC3339) + // Create new item + item := commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "CreatedAt": &types.AttributeValueMemberS{Value: createdAt}, + } + dynamoClient.PutItem(ctx, tableName, item) + } + + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, nil) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 10) + assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.NotNil(t, queryResult.LastEvaluatedKey) + assert.Equal(t, "key20", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value) + + // Get the next 10 items + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, queryResult.LastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 10) + assert.Equal(t, "key10", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value) + + // Get the last 10 items + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, queryResult.LastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 10) + assert.Equal(t, "key0", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value) + + // Empty result Since all items are processed + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, queryResult.LastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 0) + assert.Nil(t, queryResult.LastEvaluatedKey) +} + +func TestQueryIndexWithPaginationForBatch(t *testing.T) { + tableName := "ProcessingWithPaginationForBatch" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + numItems := 30 + items := make([]commondynamodb.Item, numItems) + for i := 0; i < numItems; i += 1 { + items[i] = commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "CreatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + } + } + unprocessed, err := dynamoClient.PutItems(ctx, tableName, items) + assert.NoError(t, err) + assert.Len(t, unprocessed, 0) + + // Get First 10 items + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, nil) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 10) + + // Get the next 10 items + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, queryResult.LastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 10) + + // Get the last 10 items + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, queryResult.LastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 10) + + // Empty result Since all items are processed + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 10, queryResult.LastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 0) + assert.Nil(t, queryResult.LastEvaluatedKey) +} diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 2a8b3a760..1f578db8d 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -96,6 +96,28 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status return metadata, nil } +// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit +// along with items, also returns a pagination token that can be used to fetch the next set of items +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, error) { + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }}, limit, exclusiveStartKey) + if err != nil { + return nil, err + } + + metadata := make([]*disperser.BlobMetadata, len(queryResult.Items)) + for i, item := range queryResult.Items { + metadata[i], err = UnmarshalBlobMetadata(item) + if err != nil { + return nil, err + } + } + + return metadata, nil +} + func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) { items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, batchIndexName, "BatchHeaderHash = :batch_header_hash", commondynamodb.ExpresseionValues{ ":batch_header_hash": &types.AttributeValueMemberB{ From 9b8dada091c933fff18606caed6f69b642b17f44 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Thu, 4 Jan 2024 22:09:15 -0800 Subject: [PATCH 02/26] Fix lint error --- common/aws/dynamodb/client_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index ebf05bddd..fb37cc89d 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -315,7 +315,8 @@ func TestQueryIndexPagination(t *testing.T) { "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, "CreatedAt": &types.AttributeValueMemberS{Value: createdAt}, } - dynamoClient.PutItem(ctx, tableName, item) + err := dynamoClient.PutItem(ctx, tableName, item) + assert.NoError(t, err) } queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ From 604e29db421505a552c376e4f47813b081b88126 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Thu, 4 Jan 2024 22:37:14 -0800 Subject: [PATCH 03/26] return lastEvaluatedKey --- disperser/common/blobstore/blob_metadata_store.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 1f578db8d..7b154e842 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -98,24 +98,28 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status // GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit // along with items, also returns a pagination token that can be used to fetch the next set of items -func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, error) { +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, map[string]types.AttributeValue, error) { queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), }}, limit, exclusiveStartKey) if err != nil { - return nil, err + return nil, nil, err } metadata := make([]*disperser.BlobMetadata, len(queryResult.Items)) for i, item := range queryResult.Items { metadata[i], err = UnmarshalBlobMetadata(item) if err != nil { - return nil, err + return nil, nil, err } } - return metadata, nil + lastEvaluatedKey := queryResult.LastEvaluatedKey + if lastEvaluatedKey == nil { + return metadata, nil, nil + } + return metadata, lastEvaluatedKey, nil } func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) { From 47de551c81af202533113bc1c80f9e8723372967 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 06:36:49 -0800 Subject: [PATCH 04/26] Update Table to use RequestedAt instead of CreatedAt key --- common/aws/dynamodb/client_test.go | 32 +++++++++++++++++------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index fb37cc89d..1bd75d4b6 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strconv" "testing" "time" @@ -90,15 +91,16 @@ func createTable(t *testing.T, tableName string) { AttributeType: types.ScalarAttributeTypeN, // Assuming BlobStatus is a numeric value }, { - AttributeName: aws.String("CreatedAt"), - AttributeType: types.ScalarAttributeTypeS, // Assuming CreatedAt is a string representing a timestamp + AttributeName: aws.String("RequestedAt"), + AttributeType: types.ScalarAttributeTypeN, // Assuming RequestedAt is a string representing a timestamp + }, + }, + KeySchema: []types.KeySchemaElement{ + { + AttributeName: aws.String("MetadataKey"), + KeyType: types.KeyTypeHash, }, }, - KeySchema: []types.KeySchemaElement{{ - AttributeName: aws.String("MetadataKey"), - KeyType: types.KeyTypeHash, - }}, - // Add a global secondary index and CreatedAt for sorting GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{ { IndexName: aws.String("StatusIndex"), @@ -108,12 +110,12 @@ func createTable(t *testing.T, tableName string) { KeyType: types.KeyTypeHash, }, { - AttributeName: aws.String("CreatedAt"), - KeyType: types.KeyTypeRange, // Using CreatedAt as sort key + AttributeName: aws.String("RequestedAt"), + KeyType: types.KeyTypeRange, // Using RequestedAt as sort key }, }, Projection: &types.Projection{ - ProjectionType: types.ProjectionTypeAll, // You can choose ALL, KEYS_ONLY, or INCLUDE + ProjectionType: types.ProjectionTypeAll, // ProjectionTypeAll means all attributes are projected into the index }, ProvisionedThroughput: &types.ProvisionedThroughput{ ReadCapacityUnits: aws.Int64(10), @@ -283,7 +285,7 @@ func TestQueryIndex(t *testing.T) { "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, "BlobSize": &types.AttributeValueMemberN{Value: "123"}, "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, - "CreatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)}, } } unprocessed, err := dynamoClient.PutItems(ctx, tableName, items) @@ -306,14 +308,15 @@ func TestQueryIndexPagination(t *testing.T) { ctx := context.Background() numItems := 30 for i := 0; i < numItems; i += 1 { - createdAt := time.Now().Add(-time.Duration(i) * time.Second).Format(time.RFC3339) + requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix() + // Create new item item := commondynamodb.Item{ "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, "BlobSize": &types.AttributeValueMemberN{Value: "123"}, "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, - "CreatedAt": &types.AttributeValueMemberS{Value: createdAt}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)}, } err := dynamoClient.PutItem(ctx, tableName, item) assert.NoError(t, err) @@ -328,6 +331,7 @@ func TestQueryIndexPagination(t *testing.T) { assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) assert.NotNil(t, queryResult.LastEvaluatedKey) assert.Equal(t, "key20", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "0", queryResult.LastEvaluatedKey["BlobStatus"].(*types.AttributeValueMemberN).Value) // Get the next 10 items queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ @@ -371,7 +375,7 @@ func TestQueryIndexWithPaginationForBatch(t *testing.T) { "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, "BlobSize": &types.AttributeValueMemberN{Value: "123"}, "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, - "CreatedAt": &types.AttributeValueMemberS{Value: time.Now().Format(time.RFC3339)}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)}, } } unprocessed, err := dynamoClient.PutItems(ctx, tableName, items) From 5198c14ab858c1c336777586eea204253d08a9ba Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 08:10:06 -0800 Subject: [PATCH 05/26] Encoding Request With Pagination --- disperser/batcher/batcher.go | 2 +- disperser/batcher/encoding_streamer.go | 17 +++++- .../common/blobstore/blob_metadata_store.go | 59 ++++++++++++++++++- disperser/common/blobstore/shared_storage.go | 4 ++ disperser/disperser.go | 13 ++++ 5 files changed, 88 insertions(+), 7 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 3fc15ab6d..e6cc1da0e 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -80,7 +80,7 @@ type Batcher struct { func NewBatcher( config Config, timeoutConfig TimeoutConfig, - queue disperser.BlobStore, + queue disperser.ExtendedBlobStore, dispatcher disperser.Dispatcher, confirmer disperser.BatchConfirmer, chainState core.IndexedChainState, diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 8fe9e6ba3..0b5a97a50 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -53,7 +53,7 @@ type EncodingStreamer struct { Pool common.WorkerPool EncodedSizeNotifier *EncodedSizeNotifier - blobStore disperser.BlobStore + blobStore disperser.ExtendedBlobStore chainState core.IndexedChainState encoderClient disperser.EncoderClient assignmentCoordinator core.AssignmentCoordinator @@ -62,6 +62,10 @@ type EncodingStreamer struct { metrics *EncodingStreamerMetrics logger common.Logger + + // Used to keep track of the last evaluated key for fetching metadatas + exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey + exclusiveStartKeyLock sync.Mutex } type batch struct { @@ -83,7 +87,7 @@ func NewEncodedSizeNotifier(notify chan struct{}, threshold uint64) *EncodedSize func NewEncodingStreamer( config StreamerConfig, - blobStore disperser.BlobStore, + blobStore disperser.ExtendedBlobStore, chainState core.IndexedChainState, encoderClient disperser.EncoderClient, assignmentCoordinator core.AssignmentCoordinator, @@ -107,6 +111,8 @@ func NewEncodingStreamer( encodingCtxCancelFuncs: make([]context.CancelFunc, 0), metrics: metrics, logger: logger, + exclusiveStartKey: nil, + exclusiveStartKeyLock: sync.Mutex{}, }, nil } @@ -175,7 +181,12 @@ func (e *EncodingStreamer) dedupRequests(metadatas []*disperser.BlobMetadata, re func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan EncodingResultOrStatus) error { stageTimer := time.Now() // pull new blobs and send to encoder - metadatas, err := e.blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing) + e.exclusiveStartKeyLock.Lock() + // TODO: Get Limit from Config + metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 10, e.exclusiveStartKey) + e.exclusiveStartKey = newExclusiveStartKey + e.exclusiveStartKeyLock.Unlock() + if err != nil { return fmt.Errorf("error getting blob metadatas: %w", err) } diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 7b154e842..8b9d44b0a 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -98,11 +98,23 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status // GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit // along with items, also returns a pagination token that can be used to fetch the next set of items -func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, map[string]types.AttributeValue, error) { +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { + + var attributeMap map[string]types.AttributeValue = nil + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertExclusiveBlobStoreStartKeyToAttributeValueMap(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), - }}, limit, exclusiveStartKey) + }}, limit, attributeMap) if err != nil { return nil, nil, err } @@ -119,7 +131,13 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co if lastEvaluatedKey == nil { return metadata, nil, nil } - return metadata, lastEvaluatedKey, nil + + // Convert the last evaluated key to a disperser.ExclusiveBlobStoreStartKey + exclusiveStartKey, err = converTypeAttributeValuetToExclusiveBlobStoreStartKey(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil } func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) { @@ -377,3 +395,38 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*disperser.BlobMetadata, e return &metadata, nil } + +func converTypeAttributeValuetToExclusiveBlobStoreStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.ExclusiveBlobStoreStartKey, error) { + key := disperser.ExclusiveBlobStoreStartKey{} + + if bs, ok := exclusiveStartKeyMap["BlobStatus"].(*types.AttributeValueMemberN); ok { + blobStatus, err := strconv.ParseInt(bs.Value, 10, 32) + if err != nil { + return nil, fmt.Errorf("error parsing BlobStatus: %v", err) + } + key.BlobStatus = int32(blobStatus) + } + + if ra, ok := exclusiveStartKeyMap["RequestedAt"].(*types.AttributeValueMemberN); ok { + requestedAt, err := strconv.ParseInt(ra.Value, 10, 64) + if err != nil { + return nil, fmt.Errorf("error parsing RequestedAt: %v", err) + } + key.RequestedAt = requestedAt + } + + return &key, nil +} + +func convertExclusiveBlobStoreStartKeyToAttributeValueMap(s *disperser.ExclusiveBlobStoreStartKey) (map[string]types.AttributeValue, error) { + if s == nil { + // Return an empty map or nil, depending on your application logic + return nil, nil + } + + av, err := attributevalue.MarshalMap(s) + if err != nil { + return nil, err + } + return av, nil +} diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index d64cf8923..2ca5a6dd9 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -203,6 +203,10 @@ func (s *SharedBlobStore) GetBlobMetadataByStatus(ctx context.Context, blobStatu return s.blobMetadataStore.GetBlobMetadataByStatus(ctx, blobStatus) } +func (s *SharedBlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { + return s.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, blobStatus, limit, exclusiveStartKey) +} + func (s *SharedBlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { return s.blobMetadataStore.GetBlobMetadataInBatch(ctx, batchHeaderHash, blobIndex) } diff --git a/disperser/disperser.go b/disperser/disperser.go index 116b0910d..015df2977 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -124,6 +124,11 @@ type ConfirmationInfo struct { BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` } +type ExclusiveBlobStoreStartKey struct { + BlobStatus int32 // BlobStatus is an integer + RequestedAt int64 // RequestedAt is epoch time in seconds +} + type BlobStore interface { // StoreBlob adds a blob to the queue and returns a key that can be used to retrieve the blob later StoreBlob(ctx context.Context, blob *core.Blob, requestedAt uint64) (BlobKey, error) @@ -157,6 +162,14 @@ type BlobStore interface { HandleBlobFailure(ctx context.Context, metadata *BlobMetadata, maxRetry uint) error } +// ExtendedBlobStore implements additional methods on top of BlobStore +type ExtendedBlobStore interface { + BlobStore + // GetBlobMetadataByStatusWithPagination returns a list of blob metadata for blobs with the given status + // Results are limited to the given limit and the pagination token is returned + GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *ExclusiveBlobStoreStartKey) ([]*BlobMetadata, *ExclusiveBlobStoreStartKey, error) +} + type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SignerMessage } From 1946d6fe4fd51e93c273703147e18d529e2d8149 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 10:29:52 -0800 Subject: [PATCH 06/26] Update to use Extended BlobStore --- disperser/apiserver/server_test.go | 2 +- disperser/batcher/batcher.go | 2 +- disperser/batcher/batcher_test.go | 4 +- disperser/batcher/encoding_streamer_test.go | 2 +- disperser/common/blobstore/shared_storage.go | 2 +- disperser/common/inmem/store.go | 66 +++++++++++++++++++- disperser/dataapi/server_test.go | 4 +- test/integration_test.go | 2 +- 8 files changed, 73 insertions(+), 11 deletions(-) diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 21e5bd53d..b53124203 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -32,7 +32,7 @@ import ( ) var ( - queue disperser.BlobStore + queue disperser.ExtendedBlobStore dispersalServer *apiserver.DispersalServer dockertestPool *dockertest.Pool dockertestResource *dockertest.Resource diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index e6cc1da0e..e40dec263 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -61,7 +61,7 @@ type Batcher struct { Config TimeoutConfig - Queue disperser.BlobStore + Queue disperser.ExtendedBlobStore Dispatcher disperser.Dispatcher Confirmer disperser.BatchConfirmer EncoderClient disperser.EncoderClient diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index 3c1518670..b14e5f131 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -33,7 +33,7 @@ var ( type batcherComponents struct { confirmer *dmock.MockBatchConfirmer - blobStore disperser.BlobStore + blobStore disperser.ExtendedBlobStore encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer ethClient *cmock.MockEthClient @@ -118,7 +118,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) { }, b } -func queueBlob(t *testing.T, ctx context.Context, blob *core.Blob, blobStore disperser.BlobStore) (uint64, disperser.BlobKey) { +func queueBlob(t *testing.T, ctx context.Context, blob *core.Blob, blobStore disperser.ExtendedBlobStore) (uint64, disperser.BlobKey) { requestedAt := uint64(time.Now().UnixNano()) blobKey, err := blobStore.StoreBlob(ctx, blob, requestedAt) assert.NoError(t, err) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 31340bc32..fa97bc1f9 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -30,7 +30,7 @@ var ( const numOperators = 10 type components struct { - blobStore disperser.BlobStore + blobStore disperser.ExtendedBlobStore chainDataMock *coremock.ChainDataMock encoderClient *disperser.LocalEncoderClient } diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 2ca5a6dd9..85ae29ac6 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -61,7 +61,7 @@ type blobResultOrError struct { blobRequestHeader core.BlobRequestHeader } -var _ disperser.BlobStore = (*SharedBlobStore)(nil) +var _ disperser.ExtendedBlobStore = (*SharedBlobStore)(nil) func NewSharedStorage(bucketName string, s3Client s3.Client, blobMetadataStore *BlobMetadataStore, logger common.Logger) *SharedBlobStore { return &SharedBlobStore{ diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 006cbc4ce..1c0af4e75 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -21,10 +21,10 @@ type BlobHolder struct { Data []byte } -var _ disperser.BlobStore = (*BlobStore)(nil) +var _ disperser.ExtendedBlobStore = (*BlobStore)(nil) // NewBlobStore creates an empty BlobStore -func NewBlobStore() disperser.BlobStore { +func NewBlobStore() disperser.ExtendedBlobStore { return &BlobStore{ Blobs: make(map[disperser.BlobHash]*BlobHolder), Metadata: make(map[disperser.BlobKey]*disperser.BlobMetadata), @@ -154,6 +154,68 @@ func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperse return metas, nil } +// func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { +// metas := make([]*disperser.BlobMetadata, 0) +// startKey := exclusiveStartKey +// for _, meta := range q.Metadata { +// if meta.BlobStatus == status { +// if startKey == nil { +// metas = append(metas, meta) +// if len(metas) == int(limit) { +// return metas, &disperser.ExclusiveBlobStoreStartKey{ +// BlobStatus: int32(meta.BlobStatus), +// RequestedAt: int64(meta.RequestMetadata.RequestedAt), +// }, nil +// } +// } else { +// if meta.BlobStatus != disperser.BlobStatus(startKey.BlobStatus) && meta.RequestMetadata.RequestedAt != uint64(startKey.RequestedAt) { +// continue +// } +// metas = append(metas, meta) +// if len(metas) == int(limit) { +// return metas, &disperser.ExclusiveBlobStoreStartKey{ +// BlobStatus: int32(meta.BlobStatus), +// RequestedAt: int64(meta.RequestMetadata.RequestedAt), +// }, nil +// } +// } +// } +// } +// return metas, nil, nil +// } + +func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { + metas := make([]*disperser.BlobMetadata, 0) + foundStart := exclusiveStartKey == nil + + for _, meta := range q.Metadata { + if meta.BlobStatus == status { + if foundStart { + metas = append(metas, meta) + if len(metas) == int(limit) { + nextKey := &disperser.ExclusiveBlobStoreStartKey{ + BlobStatus: int32(meta.BlobStatus), + RequestedAt: int64(meta.RequestMetadata.RequestedAt), + } + return metas, nextKey, nil + } + } else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) { + foundStart = true // Start appending metas after this item + metas = append(metas, meta) + if len(metas) == int(limit) { + return metas, &disperser.ExclusiveBlobStoreStartKey{ + BlobStatus: int32(meta.BlobStatus), + RequestedAt: int64(meta.RequestMetadata.RequestedAt), + }, nil + } + } + } + } + + // Return all the metas if limit is not reached + return metas, nil, nil +} + func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { for _, meta := range q.Metadata { if meta.ConfirmationInfo != nil && meta.ConfirmationInfo.BatchHeaderHash == batchHeaderHash && meta.ConfirmationInfo.BlobIndex == blobIndex { diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index f2b0ce9fc..ef2f07b78 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -235,13 +235,13 @@ func setUpRouter() *gin.Engine { return gin.Default() } -func queueBlob(t *testing.T, blob *core.Blob, queue disperser.BlobStore) disperser.BlobKey { +func queueBlob(t *testing.T, blob *core.Blob, queue disperser.ExtendedBlobStore) disperser.BlobKey { key, err := queue.StoreBlob(context.Background(), blob, expectedRequestedAt) assert.NoError(t, err) return key } -func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, batchHeaderHash [32]byte, queue disperser.BlobStore) { +func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, batchHeaderHash [32]byte, queue disperser.ExtendedBlobStore) { // simulate blob confirmation var commitX, commitY fp.Element _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") diff --git a/test/integration_test.go b/test/integration_test.go index 7d2ebdf34..2400290ea 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -117,7 +117,7 @@ type TestDisperser struct { EncoderServer *encoder.Server } -func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger common.Logger) TestDisperser { +func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.ExtendedBlobStore, logger common.Logger) TestDisperser { dispatcherConfig := &dispatcher.Config{ Timeout: time.Second, } From 595f74d472b6ba46b6092a2ff5346e4c003ad9a4 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 11:22:24 -0800 Subject: [PATCH 07/26] Fix PR comment --- disperser/apiserver/server_test.go | 2 +- disperser/batcher/batcher.go | 4 ++-- disperser/batcher/batcher_test.go | 4 ++-- disperser/batcher/encoding_streamer.go | 4 ++-- disperser/batcher/encoding_streamer_test.go | 2 +- disperser/common/blobstore/shared_storage.go | 2 +- disperser/common/inmem/store.go | 4 ++-- disperser/dataapi/server_test.go | 4 ++-- disperser/disperser.go | 11 +++-------- test/integration_test.go | 2 +- 10 files changed, 17 insertions(+), 22 deletions(-) diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index b53124203..21e5bd53d 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -32,7 +32,7 @@ import ( ) var ( - queue disperser.ExtendedBlobStore + queue disperser.BlobStore dispersalServer *apiserver.DispersalServer dockertestPool *dockertest.Pool dockertestResource *dockertest.Resource diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index e40dec263..3fc15ab6d 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -61,7 +61,7 @@ type Batcher struct { Config TimeoutConfig - Queue disperser.ExtendedBlobStore + Queue disperser.BlobStore Dispatcher disperser.Dispatcher Confirmer disperser.BatchConfirmer EncoderClient disperser.EncoderClient @@ -80,7 +80,7 @@ type Batcher struct { func NewBatcher( config Config, timeoutConfig TimeoutConfig, - queue disperser.ExtendedBlobStore, + queue disperser.BlobStore, dispatcher disperser.Dispatcher, confirmer disperser.BatchConfirmer, chainState core.IndexedChainState, diff --git a/disperser/batcher/batcher_test.go b/disperser/batcher/batcher_test.go index b14e5f131..3c1518670 100644 --- a/disperser/batcher/batcher_test.go +++ b/disperser/batcher/batcher_test.go @@ -33,7 +33,7 @@ var ( type batcherComponents struct { confirmer *dmock.MockBatchConfirmer - blobStore disperser.ExtendedBlobStore + blobStore disperser.BlobStore encoderClient *disperser.LocalEncoderClient encodingStreamer *bat.EncodingStreamer ethClient *cmock.MockEthClient @@ -118,7 +118,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) { }, b } -func queueBlob(t *testing.T, ctx context.Context, blob *core.Blob, blobStore disperser.ExtendedBlobStore) (uint64, disperser.BlobKey) { +func queueBlob(t *testing.T, ctx context.Context, blob *core.Blob, blobStore disperser.BlobStore) (uint64, disperser.BlobKey) { requestedAt := uint64(time.Now().UnixNano()) blobKey, err := blobStore.StoreBlob(ctx, blob, requestedAt) assert.NoError(t, err) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 0b5a97a50..283d7b639 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -53,7 +53,7 @@ type EncodingStreamer struct { Pool common.WorkerPool EncodedSizeNotifier *EncodedSizeNotifier - blobStore disperser.ExtendedBlobStore + blobStore disperser.BlobStore chainState core.IndexedChainState encoderClient disperser.EncoderClient assignmentCoordinator core.AssignmentCoordinator @@ -87,7 +87,7 @@ func NewEncodedSizeNotifier(notify chan struct{}, threshold uint64) *EncodedSize func NewEncodingStreamer( config StreamerConfig, - blobStore disperser.ExtendedBlobStore, + blobStore disperser.BlobStore, chainState core.IndexedChainState, encoderClient disperser.EncoderClient, assignmentCoordinator core.AssignmentCoordinator, diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index fa97bc1f9..31340bc32 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -30,7 +30,7 @@ var ( const numOperators = 10 type components struct { - blobStore disperser.ExtendedBlobStore + blobStore disperser.BlobStore chainDataMock *coremock.ChainDataMock encoderClient *disperser.LocalEncoderClient } diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 85ae29ac6..2ca5a6dd9 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -61,7 +61,7 @@ type blobResultOrError struct { blobRequestHeader core.BlobRequestHeader } -var _ disperser.ExtendedBlobStore = (*SharedBlobStore)(nil) +var _ disperser.BlobStore = (*SharedBlobStore)(nil) func NewSharedStorage(bucketName string, s3Client s3.Client, blobMetadataStore *BlobMetadataStore, logger common.Logger) *SharedBlobStore { return &SharedBlobStore{ diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 1c0af4e75..b31d54624 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -21,10 +21,10 @@ type BlobHolder struct { Data []byte } -var _ disperser.ExtendedBlobStore = (*BlobStore)(nil) +var _ disperser.BlobStore = (*BlobStore)(nil) // NewBlobStore creates an empty BlobStore -func NewBlobStore() disperser.ExtendedBlobStore { +func NewBlobStore() disperser.BlobStore { return &BlobStore{ Blobs: make(map[disperser.BlobHash]*BlobHolder), Metadata: make(map[disperser.BlobKey]*disperser.BlobMetadata), diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index ef2f07b78..f2b0ce9fc 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -235,13 +235,13 @@ func setUpRouter() *gin.Engine { return gin.Default() } -func queueBlob(t *testing.T, blob *core.Blob, queue disperser.ExtendedBlobStore) disperser.BlobKey { +func queueBlob(t *testing.T, blob *core.Blob, queue disperser.BlobStore) disperser.BlobKey { key, err := queue.StoreBlob(context.Background(), blob, expectedRequestedAt) assert.NoError(t, err) return key } -func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, batchHeaderHash [32]byte, queue disperser.ExtendedBlobStore) { +func markBlobConfirmed(t *testing.T, blob *core.Blob, key disperser.BlobKey, batchHeaderHash [32]byte, queue disperser.BlobStore) { // simulate blob confirmation var commitX, commitY fp.Element _, err := commitX.SetString("21661178944771197726808973281966770251114553549453983978976194544185382599016") diff --git a/disperser/disperser.go b/disperser/disperser.go index 015df2977..8d6a9fdad 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -154,6 +154,9 @@ type BlobStore interface { GetBlobMetadataByStatus(ctx context.Context, blobStatus BlobStatus) ([]*BlobMetadata, error) // GetMetadataInBatch returns the metadata in a given batch at given index. GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*BlobMetadata, error) + // GetBlobMetadataByStatusWithPagination returns a list of blob metadata for blobs with the given status + // Results are limited to the given limit and the pagination token is returned + GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *ExclusiveBlobStoreStartKey) ([]*BlobMetadata, *ExclusiveBlobStoreStartKey, error) // GetAllBlobMetadataByBatch returns the metadata of all the blobs in the batch. GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*BlobMetadata, error) // GetBlobMetadata returns a blob metadata given a metadata key @@ -162,14 +165,6 @@ type BlobStore interface { HandleBlobFailure(ctx context.Context, metadata *BlobMetadata, maxRetry uint) error } -// ExtendedBlobStore implements additional methods on top of BlobStore -type ExtendedBlobStore interface { - BlobStore - // GetBlobMetadataByStatusWithPagination returns a list of blob metadata for blobs with the given status - // Results are limited to the given limit and the pagination token is returned - GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *ExclusiveBlobStoreStartKey) ([]*BlobMetadata, *ExclusiveBlobStoreStartKey, error) -} - type Dispatcher interface { DisperseBatch(context.Context, *core.IndexedOperatorState, []core.EncodedBlob, *core.BatchHeader) chan core.SignerMessage } diff --git a/test/integration_test.go b/test/integration_test.go index 2400290ea..7d2ebdf34 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -117,7 +117,7 @@ type TestDisperser struct { EncoderServer *encoder.Server } -func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.ExtendedBlobStore, logger common.Logger) TestDisperser { +func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser.BlobStore, logger common.Logger) TestDisperser { dispatcherConfig := &dispatcher.Config{ Timeout: time.Second, } From 74856fc0499b485fbeb6864dd21f097cdbf05496 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 11:43:00 -0800 Subject: [PATCH 08/26] make blobstofetch a configuration --- disperser/batcher/batcher.go | 12 +++++++----- disperser/batcher/encoding_streamer.go | 13 +++++++------ disperser/batcher/encoding_streamer_test.go | 14 ++++++++------ disperser/cmd/batcher/config.go | 1 + disperser/cmd/batcher/flags/flags.go | 8 ++++++++ 5 files changed, 31 insertions(+), 17 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 3fc15ab6d..6103a5bcb 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -54,7 +54,8 @@ type Config struct { BatchSizeMBLimit uint MaxNumRetriesPerBlob uint - TargetNumChunks uint + TargetNumChunks uint + MaxBlobsToFetchFromStore int } type Batcher struct { @@ -97,10 +98,11 @@ func NewBatcher( uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes ) streamerConfig := StreamerConfig{ - SRSOrder: config.SRSOrder, - EncodingRequestTimeout: config.PullInterval, - EncodingQueueLimit: config.EncodingRequestQueueSize, - TargetNumChunks: config.TargetNumChunks, + SRSOrder: config.SRSOrder, + EncodingRequestTimeout: config.PullInterval, + EncodingQueueLimit: config.EncodingRequestQueueSize, + TargetNumChunks: config.TargetNumChunks, + MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore, } encodingWorkerPool := workerpool.New(config.NumConnections) encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 283d7b639..0ca0595ca 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -41,6 +41,9 @@ type StreamerConfig struct { // TargetNumChunks is the target number of chunks per encoded blob TargetNumChunks uint + + // Maximum number of Blobs to fetch from store + MaxBlobsToFetchFromStore int } type EncodingStreamer struct { @@ -64,8 +67,7 @@ type EncodingStreamer struct { logger common.Logger // Used to keep track of the last evaluated key for fetching metadatas - exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey - exclusiveStartKeyLock sync.Mutex + exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey } type batch struct { @@ -112,7 +114,6 @@ func NewEncodingStreamer( metrics: metrics, logger: logger, exclusiveStartKey: nil, - exclusiveStartKeyLock: sync.Mutex{}, }, nil } @@ -181,11 +182,11 @@ func (e *EncodingStreamer) dedupRequests(metadatas []*disperser.BlobMetadata, re func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan EncodingResultOrStatus) error { stageTimer := time.Now() // pull new blobs and send to encoder - e.exclusiveStartKeyLock.Lock() + e.mu.Lock() // TODO: Get Limit from Config - metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 10, e.exclusiveStartKey) + metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey) e.exclusiveStartKey = newExclusiveStartKey - e.exclusiveStartKeyLock.Unlock() + e.mu.Lock() if err != nil { return fmt.Errorf("error getting blob metadatas: %w", err) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 31340bc32..065c4c750 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -21,9 +21,10 @@ import ( var ( streamerConfig = batcher.StreamerConfig{ - SRSOrder: 300000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 300000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } ) @@ -294,9 +295,10 @@ func TestEncodingFailure(t *testing.T) { sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 1e12) workerpool := workerpool.New(5) streamerConfig := batcher.StreamerConfig{ - SRSOrder: 300000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 300000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } metrics := batcher.NewMetrics("9100", logger) encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger) diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 03b0b8917..bbe8527fc 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -51,6 +51,7 @@ func NewConfig(ctx *cli.Context) Config { SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), + MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 9afea62fc..485e6b5c3 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -164,6 +164,14 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "TARGET_NUM_CHUNKS"), Value: 0, } + + MaxBlobsToFetchFromStoreFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"), + Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"), + Value: 10, + } ) var requiredFlags = []cli.Flag{ From f688e0ecc010e443fe232ec32440648dd82fc7fc Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 12:01:47 -0800 Subject: [PATCH 09/26] Fix PR Comment --- common/aws/dynamodb/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 91468c4fb..76464cfd8 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -38,8 +38,8 @@ type Key = map[string]types.AttributeValue type ExpresseionValues = map[string]types.AttributeValue type QueryResult struct { - Items []map[string]types.AttributeValue - LastEvaluatedKey map[string]types.AttributeValue + Items []Item + LastEvaluatedKey Key } type Client struct { From 1f2a6adea340130ebb4658ecbaed47c6015d7a5e Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Fri, 5 Jan 2024 14:54:47 -0800 Subject: [PATCH 10/26] Add Test for Invalid RequestEncoding when no max blob fetch is set --- disperser/batcher/encoding_streamer.go | 7 +++- disperser/batcher/encoding_streamer_test.go | 44 +++++++++++++++++++-- 2 files changed, 46 insertions(+), 5 deletions(-) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 0ca0595ca..cf082cb3b 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -183,10 +183,9 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan stageTimer := time.Now() // pull new blobs and send to encoder e.mu.Lock() - // TODO: Get Limit from Config metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey) e.exclusiveStartKey = newExclusiveStartKey - e.mu.Lock() + e.mu.Unlock() if err != nil { return fmt.Errorf("error getting blob metadatas: %w", err) @@ -196,6 +195,10 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan return nil } + if len(metadatas) > e.StreamerConfig.MaxBlobsToFetchFromStore { + return fmt.Errorf("number of metadatas fetched from store is %d greater than configured max number of blobs to fetch from store: %d", len(metadatas), e.StreamerConfig.MaxBlobsToFetchFromStore) + } + // read lock to access e.ReferenceBlockNumber e.mu.RLock() referenceBlockNumber := e.ReferenceBlockNumber diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 065c4c750..146f5741b 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -464,14 +464,52 @@ func TestPartialBlob(t *testing.T) { assert.Contains(t, batch.BlobMetadata, metadata1) } +func TestIncorrectRequestEncoding(t *testing.T) { + streamerConfig := batcher.StreamerConfig{ + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + } + + encodingStreamer, c := createEncodingStreamer(t, 10, 200_000, streamerConfig) + + securityParams := []*core.SecurityParam{{ + QuorumID: 0, + AdversaryThreshold: 80, + QuorumThreshold: 100, + }} + blobData := []byte{1, 2, 3, 4, 5} + + numItems := 30 + for i := 0; i < numItems; i += 1 { + blob := core.Blob{ + RequestHeader: core.BlobRequestHeader{ + SecurityParams: securityParams, + }, + Data: blobData, + } + ctx := context.Background() + _, err := c.blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano())) + assert.Nil(t, err) + } + + out := make(chan batcher.EncodingResultOrStatus) + // Request encoding + err := encodingStreamer.RequestEncoding(context.Background(), out) + assert.NotNil(t, err) + expectedErrMsg := "number of metadatas fetched from store is 30 greater than configured max number of blobs to fetch from store: 0" + assert.Equal(t, expectedErrMsg, err.Error()) +} + func TestIncorrectParameters(t *testing.T) { ctx := context.Background() streamerConfig := batcher.StreamerConfig{ - SRSOrder: 3000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } encodingStreamer, c := createEncodingStreamer(t, 0, 1e12, streamerConfig) From e8046288a52d21bf9d32cdead11547ca7fe11e4c Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Mon, 8 Jan 2024 08:29:05 -0800 Subject: [PATCH 11/26] Add Test for GetBlobMetadataByStatusWithPagination --- .../blobstore/blob_metadata_store_test.go | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index 5207fc650..c26e3d309 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -99,6 +99,80 @@ func TestBlobMetadataStoreOperations(t *testing.T) { }) } +func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { + ctx := context.Background() + blobKey1 := disperser.BlobKey{ + BlobHash: blobHash, + MetadataHash: "hash", + } + metadata1 := &disperser.BlobMetadata{ + MetadataHash: blobKey1.MetadataHash, + BlobHash: blobHash, + BlobStatus: disperser.Processing, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + } + blobKey2 := disperser.BlobKey{ + BlobHash: "blob2", + MetadataHash: "hash2", + } + metadata2 := &disperser.BlobMetadata{ + MetadataHash: blobKey2.MetadataHash, + BlobHash: blobKey2.BlobHash, + BlobStatus: disperser.Finalized, + Expiry: 0, + NumRetries: 0, + RequestMetadata: &disperser.RequestMetadata{ + BlobRequestHeader: blob.RequestHeader, + BlobSize: blobSize, + RequestedAt: 123, + }, + ConfirmationInfo: &disperser.ConfirmationInfo{}, + } + err := blobMetadataStore.QueueNewBlobMetadata(ctx, metadata1) + assert.NoError(t, err) + err = blobMetadataStore.QueueNewBlobMetadata(ctx, metadata2) + assert.NoError(t, err) + + fetchedMetadata, err := blobMetadataStore.GetBlobMetadata(ctx, blobKey1) + assert.NoError(t, err) + assert.Equal(t, metadata1, fetchedMetadata) + fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey2) + assert.NoError(t, err) + assert.Equal(t, metadata2, fetchedMetadata) + + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.NotNil(t, lastEvaluatedKey) + + processing, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, processing, 1) + assert.Nil(t, lastEvaluatedKey) + + finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 2, nil) + assert.NoError(t, err) + assert.Len(t, finalized, 2) + assert.Nil(t, lastEvaluatedKey) + + deleteItems(t, []commondynamodb.Key{ + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey1.BlobHash}, + }, + { + "MetadataHash": &types.AttributeValueMemberS{Value: blobKey2.MetadataHash}, + "BlobHash": &types.AttributeValueMemberS{Value: blobKey2.BlobHash}, + }, + }) +} + func deleteItems(t *testing.T, keys []commondynamodb.Key) { _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) assert.NoError(t, err) From 77b36fd05b4ad19f7d28323e54387434acfee056 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Mon, 8 Jan 2024 08:38:55 -0800 Subject: [PATCH 12/26] Fix Test --- .../common/blobstore/blob_metadata_store_test.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index c26e3d309..6a3cc6587 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -149,16 +149,13 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) assert.NoError(t, err) assert.Len(t, processing, 1) - assert.NotNil(t, lastEvaluatedKey) - - processing, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, lastEvaluatedKey) - assert.NoError(t, err) - assert.Len(t, processing, 1) + assert.Equal(t, metadata1, processing[0]) assert.Nil(t, lastEvaluatedKey) - finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 2, nil) + finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) assert.NoError(t, err) - assert.Len(t, finalized, 2) + assert.Len(t, finalized, 1) + assert.Equal(t, metadata2, finalized[0]) assert.Nil(t, lastEvaluatedKey) deleteItems(t, []commondynamodb.Key{ From ca05b435e1cfa66233fdeebe2d7bb3cb9b6e93b3 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Mon, 8 Jan 2024 08:57:27 -0800 Subject: [PATCH 13/26] Add more test --- common/aws/dynamodb/client_test.go | 41 +++++++++++++++++++ .../blobstore/blob_metadata_store_test.go | 7 +++- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index 1bd75d4b6..995e6889a 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -300,6 +300,47 @@ func TestQueryIndex(t *testing.T) { assert.Equal(t, len(queryResult), 30) } +func TestQueryIndexPaginationSingleItem(t *testing.T) { + tableName := "ProcessingWithPaginationSingleItem" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + requestedAt := time.Now().Unix() + item := commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", 0)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", 0)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)}, + } + err := dynamoClient.PutItem(ctx, tableName, item) + assert.NoError(t, err) + + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 1, nil) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 1) + assert.Equal(t, "key0", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.NotNil(t, queryResult.LastEvaluatedKey) + assert.Equal(t, "key0", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.Equal(t, "0", queryResult.LastEvaluatedKey["BlobStatus"].(*types.AttributeValueMemberN).Value) + + // Save Last Evaluated Key + lastEvaluatedKey := queryResult.LastEvaluatedKey + + // Get the next item using LastEvaluatedKey expect to be nil + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 0) + assert.Nil(t, queryResult.LastEvaluatedKey) +} + func TestQueryIndexPagination(t *testing.T) { tableName := "ProcessingWithPagination" createTable(t, tableName) diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index 6a3cc6587..3b393ac78 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -150,12 +150,17 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { assert.NoError(t, err) assert.Len(t, processing, 1) assert.Equal(t, metadata1, processing[0]) - assert.Nil(t, lastEvaluatedKey) + assert.NotNil(t, lastEvaluatedKey) finalized, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, nil) assert.NoError(t, err) assert.Len(t, finalized, 1) assert.Equal(t, metadata2, finalized[0]) + assert.NotNil(t, lastEvaluatedKey) + + finalized, lastEvaluatedKey, err = blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Finalized, 1, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, finalized, 0) assert.Nil(t, lastEvaluatedKey) deleteItems(t, []commondynamodb.Key{ From 29315669f6f8f1035add6873664c3de3258056eb Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Mon, 8 Jan 2024 11:04:19 -0800 Subject: [PATCH 14/26] remove commented code --- disperser/common/inmem/store.go | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index b31d54624..85c924500 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -154,36 +154,6 @@ func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperse return metas, nil } -// func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { -// metas := make([]*disperser.BlobMetadata, 0) -// startKey := exclusiveStartKey -// for _, meta := range q.Metadata { -// if meta.BlobStatus == status { -// if startKey == nil { -// metas = append(metas, meta) -// if len(metas) == int(limit) { -// return metas, &disperser.ExclusiveBlobStoreStartKey{ -// BlobStatus: int32(meta.BlobStatus), -// RequestedAt: int64(meta.RequestMetadata.RequestedAt), -// }, nil -// } -// } else { -// if meta.BlobStatus != disperser.BlobStatus(startKey.BlobStatus) && meta.RequestMetadata.RequestedAt != uint64(startKey.RequestedAt) { -// continue -// } -// metas = append(metas, meta) -// if len(metas) == int(limit) { -// return metas, &disperser.ExclusiveBlobStoreStartKey{ -// BlobStatus: int32(meta.BlobStatus), -// RequestedAt: int64(meta.RequestMetadata.RequestedAt), -// }, nil -// } -// } -// } -// } -// return metas, nil, nil -// } - func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { metas := make([]*disperser.BlobMetadata, 0) foundStart := exclusiveStartKey == nil From ba22d7b2acc2c44f3f1b7cf65b2e519f4ec7936a Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Mon, 8 Jan 2024 13:19:38 -0800 Subject: [PATCH 15/26] Set Default Value for BlobsToFetch from store --- disperser/cmd/batcher/config.go | 8 ++++- disperser/cmd/batcher/flags/flags.go | 4 +-- inabox/deploy/config.go | 49 ++++++++++++++-------------- inabox/deploy/env_vars.go | 2 ++ 4 files changed, 36 insertions(+), 27 deletions(-) diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index bbe8527fc..997655458 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -32,6 +32,12 @@ type Config struct { } func NewConfig(ctx *cli.Context) Config { + + maxBlobsToFetchFromStore := ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name) + // Set Minimum Number if no value is set + if maxBlobsToFetchFromStore == 0 { + maxBlobsToFetchFromStore = 1 + } config := Config{ BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), @@ -51,7 +57,7 @@ func NewConfig(ctx *cli.Context) Config { SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), - MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), + MaxBlobsToFetchFromStore: maxBlobsToFetchFromStore, }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 485e6b5c3..b5a3fa026 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -168,9 +168,9 @@ var ( MaxBlobsToFetchFromStoreFlag = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"), Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination", - Required: false, + Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"), - Value: 10, + Value: 1, } ) diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 569ee4c22..0f5d2a1af 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -197,30 +197,31 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath, // Generates batcher .env func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) BatcherVars { v := BatcherVars{ - BATCHER_S3_BUCKET_NAME: "test-eigenda-blobstore", - BATCHER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", - BATCHER_ENABLE_METRICS: "true", - BATCHER_METRICS_HTTP_PORT: "9094", - BATCHER_PULL_INTERVAL: "5s", - BATCHER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver, - BATCHER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager, - BATCHER_SRS_ORDER: "300000", - BATCHER_CHAIN_RPC: "", - BATCHER_PRIVATE_KEY: key[2:], - BATCHER_STD_LOG_LEVEL: "debug", - BATCHER_FILE_LOG_LEVEL: "trace", - BATCHER_LOG_PATH: logPath, - BATCHER_GRAPH_URL: graphUrl, - BATCHER_USE_GRAPH: "true", - BATCHER_BATCH_SIZE_LIMIT: "10240", // 10 GiB - BATCHER_INDEXER_PULL_INTERVAL: "1s", - BATCHER_AWS_REGION: "", - BATCHER_AWS_ACCESS_KEY_ID: "", - BATCHER_AWS_SECRET_ACCESS_KEY: "", - BATCHER_AWS_ENDPOINT_URL: "", - BATCHER_FINALIZER_INTERVAL: "6m", - BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", - BATCHER_NUM_CONFIRMATIONS: "0", + BATCHER_S3_BUCKET_NAME: "test-eigenda-blobstore", + BATCHER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", + BATCHER_ENABLE_METRICS: "true", + BATCHER_METRICS_HTTP_PORT: "9094", + BATCHER_PULL_INTERVAL: "5s", + BATCHER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver, + BATCHER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager, + BATCHER_SRS_ORDER: "300000", + BATCHER_CHAIN_RPC: "", + BATCHER_PRIVATE_KEY: key[2:], + BATCHER_STD_LOG_LEVEL: "debug", + BATCHER_FILE_LOG_LEVEL: "trace", + BATCHER_LOG_PATH: logPath, + BATCHER_GRAPH_URL: graphUrl, + BATCHER_USE_GRAPH: "true", + BATCHER_BATCH_SIZE_LIMIT: "10240", // 10 GiB + BATCHER_INDEXER_PULL_INTERVAL: "1s", + BATCHER_AWS_REGION: "", + BATCHER_AWS_ACCESS_KEY_ID: "", + BATCHER_AWS_SECRET_ACCESS_KEY: "", + BATCHER_AWS_ENDPOINT_URL: "", + BATCHER_FINALIZER_INTERVAL: "6m", + BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", + BATCHER_NUM_CONFIRMATIONS: "0", + BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE: "1", } env.applyDefaults(&v, "BATCHER", "batcher", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index 939fc1794..98813ee00 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -139,6 +139,8 @@ type BatcherVars struct { BATCHER_AWS_SECRET_ACCESS_KEY string BATCHER_AWS_ENDPOINT_URL string + + BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE string } func (vars BatcherVars) getEnvMap() map[string]string { From ba13aa220c4ec705326d6dd910b10ad08916e16e Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Mon, 8 Jan 2024 23:20:21 -0800 Subject: [PATCH 16/26] Add MetadataHash and BlobHash to ExclusiveStartKEy --- disperser/common/blobstore/blob_metadata_store.go | 8 ++++++++ disperser/disperser.go | 6 ++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 8b9d44b0a..4cd1616ee 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -415,6 +415,14 @@ func converTypeAttributeValuetToExclusiveBlobStoreStartKey(exclusiveStartKeyMap key.RequestedAt = requestedAt } + if bh, ok := exclusiveStartKeyMap["BlobHash"].(*types.AttributeValueMemberS); ok { + key.BlobHash = bh.Value + } + + if mh, ok := exclusiveStartKeyMap["MetadataHash"].(*types.AttributeValueMemberS); ok { + key.MetadataHash = mh.Value + } + return &key, nil } diff --git a/disperser/disperser.go b/disperser/disperser.go index 8d6a9fdad..a86b86515 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -125,8 +125,10 @@ type ConfirmationInfo struct { } type ExclusiveBlobStoreStartKey struct { - BlobStatus int32 // BlobStatus is an integer - RequestedAt int64 // RequestedAt is epoch time in seconds + BlobHash BlobHash + MetadataHash MetadataHash + BlobStatus int32 // BlobStatus is an integer + RequestedAt int64 // RequestedAt is epoch time in seconds } type BlobStore interface { From ee2666822e6be23a07b92f2801005c10799c8a4b Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Tue, 9 Jan 2024 10:36:44 -0800 Subject: [PATCH 17/26] return nil when their are no results --- common/aws/dynamodb/client.go | 4 ++++ common/aws/dynamodb/client_test.go | 17 ++++++++++++++++- .../common/blobstore/blob_metadata_store.go | 5 +++++ .../blobstore/blob_metadata_store_test.go | 10 ++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 76464cfd8..010633ed7 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -187,6 +187,10 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, return QueryResult{}, err } + if len(response.Items) == 0 { + return QueryResult{Items: nil, LastEvaluatedKey: nil}, nil + } + // Return the items and the pagination token return QueryResult{ Items: response.Items, diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index 995e6889a..7219f8363 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -337,7 +337,22 @@ func TestQueryIndexPaginationSingleItem(t *testing.T) { Value: "0", }}, 1, lastEvaluatedKey) assert.NoError(t, err) - assert.Len(t, queryResult.Items, 0) + assert.Nil(t, queryResult.Items) + assert.Nil(t, queryResult.LastEvaluatedKey) +} + +func TestQueryIndexPaginationNoStoredItems(t *testing.T) { + tableName := "ProcessingWithPaginationNoItem" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 1, nil) + assert.NoError(t, err) + assert.Nil(t, queryResult.Items) assert.Nil(t, queryResult.LastEvaluatedKey) } diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 4cd1616ee..79da3da42 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -119,6 +119,11 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co return nil, nil, err } + // When their are no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + metadata := make([]*disperser.BlobMetadata, len(queryResult.Items)) for i, item := range queryResult.Items { metadata[i], err = UnmarshalBlobMetadata(item) diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index 3b393ac78..b31e10de2 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -175,6 +175,16 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { }) } +func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { + ctx := context.Background() + // Query BlobMetadataStore for a blob that does not exist + // This should return nil for both the blob and lastEvaluatedKey + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Nil(t, processing) + assert.Nil(t, lastEvaluatedKey) +} + func deleteItems(t *testing.T, keys []commondynamodb.Key) { _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) assert.NoError(t, err) From 65c0078bb85af8f773236bb0240fe961af8b92fd Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Tue, 9 Jan 2024 12:48:51 -0800 Subject: [PATCH 18/26] Fetch all items when limit is set to 0 --- common/aws/dynamodb/client.go | 25 +++++++++++++---- common/aws/dynamodb/client_test.go | 45 ++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 010633ed7..fe68b5147 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -169,12 +169,23 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str // QueryIndexWithPagination returns all items in the index that match the given key // Results are limited to the given limit and the pagination token is returned func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) { - queryInput := &dynamodb.QueryInput{ - TableName: aws.String(tableName), - IndexName: aws.String(indexName), - KeyConditionExpression: aws.String(keyCondition), - ExpressionAttributeValues: expAttributeValues, - Limit: &limit, + var queryInput *dynamodb.QueryInput + + if limit != 0 { + queryInput = &dynamodb.QueryInput{ + TableName: aws.String(tableName), + IndexName: aws.String(indexName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + Limit: &limit, + } + } else { + queryInput = &dynamodb.QueryInput{ + TableName: aws.String(tableName), + IndexName: aws.String(indexName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + } } // If a pagination token was provided, set it as the ExclusiveStartKey @@ -191,6 +202,8 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, return QueryResult{Items: nil, LastEvaluatedKey: nil}, nil } + fmt.Printf("response: %v\n", response.LastEvaluatedKey) + // Return the items and the pagination token return QueryResult{ Items: response.Items, diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index 7219f8363..f0804fa47 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -341,6 +341,51 @@ func TestQueryIndexPaginationSingleItem(t *testing.T) { assert.Nil(t, queryResult.LastEvaluatedKey) } +func TestQueryIndexPaginationItemNoLimit(t *testing.T) { + tableName := "ProcessingWithNoPaginationLimit" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + numItems := 30 + for i := 0; i < numItems; i += 1 { + requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix() + + // Create new item + item := commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)}, + } + err := dynamoClient.PutItem(ctx, tableName, item) + assert.NoError(t, err) + } + + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 0, nil) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 30) + assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.Nil(t, queryResult.LastEvaluatedKey) + + // Save Last Evaluated Key + lastEvaluatedKey := queryResult.LastEvaluatedKey + + // Get the next item using LastEvaluatedKey expect to be nil + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 2, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 2) + assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.NotNil(t, queryResult.LastEvaluatedKey) +} + func TestQueryIndexPaginationNoStoredItems(t *testing.T) { tableName := "ProcessingWithPaginationNoItem" createTable(t, tableName) From 346bcdeee2c08bfca7a1380442ba4b3da8790f63 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Tue, 9 Jan 2024 12:49:47 -0800 Subject: [PATCH 19/26] Update comment --- common/aws/dynamodb/client.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index fe68b5147..9d3b34f4f 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -171,6 +171,7 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) { var queryInput *dynamodb.QueryInput + // Fetch all items if limit is 0 if limit != 0 { queryInput = &dynamodb.QueryInput{ TableName: aws.String(tableName), @@ -202,8 +203,6 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, return QueryResult{Items: nil, LastEvaluatedKey: nil}, nil } - fmt.Printf("response: %v\n", response.LastEvaluatedKey) - // Return the items and the pagination token return QueryResult{ Items: response.Items, From 87780da5b5e1e276674d1be06d15eaea829a25af Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Tue, 9 Jan 2024 13:31:27 -0800 Subject: [PATCH 20/26] changes --- disperser/batcher/batcher.go | 5 +++++ disperser/batcher/encoding_streamer.go | 21 +++++++++++++++++---- disperser/cmd/batcher/config.go | 12 ++++++------ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 947d398bd..b1091f158 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -285,6 +285,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { var blobHeader *core.BlobHeader var proof []byte if status == disperser.Confirmed { + fmt.Printf("Batcher Blob Confirmed\n") // generate inclusion proof if blobIndex >= len(batch.BlobHeaders) { return fmt.Errorf("HandleSingleBatch: error confirming blobs: blob header at index %d not found in batch", blobIndex) @@ -292,6 +293,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { blobHeader = batch.BlobHeaders[blobIndex] blobHeaderHash, err := blobHeader.GetBlobHeaderHash() + fmt.Printf("Batcher BlobHeaderHash: %v\n", blobHeaderHash) if err != nil { return fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err) } @@ -299,9 +301,12 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { if err != nil { return fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err) } + fmt.Printf("Batcher MerkleProof: %v\n", merkleProof) proof = serializeProof(merkleProof) } + fmt.Printf("Batcher InclusionProof: %v\n", proof) + confirmationInfo := &disperser.ConfirmationInfo{ BatchHeaderHash: headerHash, BlobIndex: uint32(blobIndex), diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index deb6a8061..877c1357b 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -68,6 +68,8 @@ type EncodingStreamer struct { // Used to keep track of the last evaluated key for fetching metadatas exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey + + count int } type batch struct { @@ -114,6 +116,7 @@ func NewEncodingStreamer( metrics: metrics, logger: logger, exclusiveStartKey: nil, + count: 0, }, nil } @@ -185,6 +188,7 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan e.mu.Lock() metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey) e.exclusiveStartKey = newExclusiveStartKey + e.count++ e.mu.Unlock() if err != nil { @@ -195,10 +199,6 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan return nil } - if len(metadatas) > e.StreamerConfig.MaxBlobsToFetchFromStore { - return fmt.Errorf("number of metadatas fetched from store is %d greater than configured max number of blobs to fetch from store: %d", len(metadatas), e.StreamerConfig.MaxBlobsToFetchFromStore) - } - // read lock to access e.ReferenceBlockNumber e.mu.RLock() referenceBlockNumber := e.ReferenceBlockNumber @@ -263,10 +263,15 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan for i := range metadatas { metadata := metadatas[i] + fmt.Printf("RequestEncoding Metadata: %v\n", metadata) + fmt.Printf("RequestEncoding Blobs: %v\n", blobs[metadata.GetBlobKey()]) e.RequestEncodingForBlob(ctx, metadata, blobs[metadata.GetBlobKey()], state, referenceBlockNumber, encoderChan) } + e.mu.Lock() + fmt.Printf("count: %d\n", e.count) + e.mu.Unlock() return nil } @@ -378,6 +383,7 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata Err: nil, } }) + fmt.Printf("RequestEncodingForBlob BlobKey: %v\n", blobKey) e.EncodedBlobstore.PutEncodingRequest(blobKey, res.BlobQuorumInfo.QuorumID) } @@ -389,6 +395,7 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod e.EncodedBlobstore.DeleteEncodingRequest(result.BlobMetadata.GetBlobKey(), result.BlobQuorumInfo.QuorumID) return fmt.Errorf("error encoding blob: %w", result.Err) } + fmt.Printf("ProcessEncodedBlobs Result: %v\n", result) err := e.EncodedBlobstore.PutEncodingResult(&result.EncodingResult) if err != nil { @@ -444,6 +451,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { // Delete any encoded results that are not from the current batching iteration (i.e. that has different reference block number) // If any pending encoded results are discarded here, it will be re-requested in the next iteration encodedResults := e.EncodedBlobstore.GetNewAndDeleteStaleEncodingResults(e.ReferenceBlockNumber) + fmt.Printf("CreateBatch EncodedResults: %v\n", encodedResults) // Reset the notifier e.EncodedSizeNotifier.mu.Lock() @@ -464,7 +472,9 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { // if the same blob has been dispersed multiple time with different security params, // there will be multiple encoded results for that (blob, quorum) result := encodedResults[i] + fmt.Printf("CreateBatch Result: %v\n", result) blobKey := result.BlobMetadata.GetBlobKey() + fmt.Printf("CreateBatch BlobKey: %v\n", blobKey) if _, ok := encodedBlobByKey[blobKey]; !ok { metadataByKey[blobKey] = result.BlobMetadata blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0) @@ -521,8 +531,11 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { i := 0 for key := range metadataByKey { encodedBlobs[i] = encodedBlobByKey[key] + fmt.Printf("CreateBatch EncodedBlobs: %v\n", encodedBlobs[i]) blobHeaders[i] = blobHeaderByKey[key] + fmt.Printf("CreateBatch BlobHeaders: %v\n", blobHeaders[i]) metadatas[i] = metadataByKey[key] + fmt.Printf("CreateBatch Metadatas: %v\n", metadatas[i]) i++ } diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 997655458..f87287f0c 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -33,11 +33,11 @@ type Config struct { func NewConfig(ctx *cli.Context) Config { - maxBlobsToFetchFromStore := ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name) - // Set Minimum Number if no value is set - if maxBlobsToFetchFromStore == 0 { - maxBlobsToFetchFromStore = 1 - } + // maxBlobsToFetchFromStore := ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name) + // // Set Minimum Number if no value is set + // if maxBlobsToFetchFromStore == 0 { + // maxBlobsToFetchFromStore = 1 + // } config := Config{ BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), @@ -57,7 +57,7 @@ func NewConfig(ctx *cli.Context) Config { SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), - MaxBlobsToFetchFromStore: maxBlobsToFetchFromStore, + MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), From 034bb635e7c7777d3738c88ee0755c9da4f51d9d Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Tue, 9 Jan 2024 13:39:32 -0800 Subject: [PATCH 21/26] Handle case when no limit is provided --- disperser/batcher/batcher.go | 5 ----- disperser/batcher/encoding_streamer.go | 17 ----------------- disperser/cmd/batcher/config.go | 5 ----- 3 files changed, 27 deletions(-) diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index b1091f158..947d398bd 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -285,7 +285,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { var blobHeader *core.BlobHeader var proof []byte if status == disperser.Confirmed { - fmt.Printf("Batcher Blob Confirmed\n") // generate inclusion proof if blobIndex >= len(batch.BlobHeaders) { return fmt.Errorf("HandleSingleBatch: error confirming blobs: blob header at index %d not found in batch", blobIndex) @@ -293,7 +292,6 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { blobHeader = batch.BlobHeaders[blobIndex] blobHeaderHash, err := blobHeader.GetBlobHeaderHash() - fmt.Printf("Batcher BlobHeaderHash: %v\n", blobHeaderHash) if err != nil { return fmt.Errorf("HandleSingleBatch: failed to get blob header hash: %w", err) } @@ -301,12 +299,9 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error { if err != nil { return fmt.Errorf("HandleSingleBatch: failed to generate blob header inclusion proof: %w", err) } - fmt.Printf("Batcher MerkleProof: %v\n", merkleProof) proof = serializeProof(merkleProof) } - fmt.Printf("Batcher InclusionProof: %v\n", proof) - confirmationInfo := &disperser.ConfirmationInfo{ BatchHeaderHash: headerHash, BlobIndex: uint32(blobIndex), diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 877c1357b..49b67d6b8 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -68,8 +68,6 @@ type EncodingStreamer struct { // Used to keep track of the last evaluated key for fetching metadatas exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey - - count int } type batch struct { @@ -116,7 +114,6 @@ func NewEncodingStreamer( metrics: metrics, logger: logger, exclusiveStartKey: nil, - count: 0, }, nil } @@ -188,7 +185,6 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan e.mu.Lock() metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey) e.exclusiveStartKey = newExclusiveStartKey - e.count++ e.mu.Unlock() if err != nil { @@ -263,15 +259,10 @@ func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan for i := range metadatas { metadata := metadatas[i] - fmt.Printf("RequestEncoding Metadata: %v\n", metadata) - fmt.Printf("RequestEncoding Blobs: %v\n", blobs[metadata.GetBlobKey()]) e.RequestEncodingForBlob(ctx, metadata, blobs[metadata.GetBlobKey()], state, referenceBlockNumber, encoderChan) } - e.mu.Lock() - fmt.Printf("count: %d\n", e.count) - e.mu.Unlock() return nil } @@ -383,7 +374,6 @@ func (e *EncodingStreamer) RequestEncodingForBlob(ctx context.Context, metadata Err: nil, } }) - fmt.Printf("RequestEncodingForBlob BlobKey: %v\n", blobKey) e.EncodedBlobstore.PutEncodingRequest(blobKey, res.BlobQuorumInfo.QuorumID) } @@ -395,7 +385,6 @@ func (e *EncodingStreamer) ProcessEncodedBlobs(ctx context.Context, result Encod e.EncodedBlobstore.DeleteEncodingRequest(result.BlobMetadata.GetBlobKey(), result.BlobQuorumInfo.QuorumID) return fmt.Errorf("error encoding blob: %w", result.Err) } - fmt.Printf("ProcessEncodedBlobs Result: %v\n", result) err := e.EncodedBlobstore.PutEncodingResult(&result.EncodingResult) if err != nil { @@ -451,7 +440,6 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { // Delete any encoded results that are not from the current batching iteration (i.e. that has different reference block number) // If any pending encoded results are discarded here, it will be re-requested in the next iteration encodedResults := e.EncodedBlobstore.GetNewAndDeleteStaleEncodingResults(e.ReferenceBlockNumber) - fmt.Printf("CreateBatch EncodedResults: %v\n", encodedResults) // Reset the notifier e.EncodedSizeNotifier.mu.Lock() @@ -472,9 +460,7 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { // if the same blob has been dispersed multiple time with different security params, // there will be multiple encoded results for that (blob, quorum) result := encodedResults[i] - fmt.Printf("CreateBatch Result: %v\n", result) blobKey := result.BlobMetadata.GetBlobKey() - fmt.Printf("CreateBatch BlobKey: %v\n", blobKey) if _, ok := encodedBlobByKey[blobKey]; !ok { metadataByKey[blobKey] = result.BlobMetadata blobQuorums[blobKey] = make([]*core.BlobQuorumInfo, 0) @@ -531,11 +517,8 @@ func (e *EncodingStreamer) CreateBatch() (*batch, error) { i := 0 for key := range metadataByKey { encodedBlobs[i] = encodedBlobByKey[key] - fmt.Printf("CreateBatch EncodedBlobs: %v\n", encodedBlobs[i]) blobHeaders[i] = blobHeaderByKey[key] - fmt.Printf("CreateBatch BlobHeaders: %v\n", blobHeaders[i]) metadatas[i] = metadataByKey[key] - fmt.Printf("CreateBatch Metadatas: %v\n", metadatas[i]) i++ } diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index f87287f0c..49d918b5b 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -33,11 +33,6 @@ type Config struct { func NewConfig(ctx *cli.Context) Config { - // maxBlobsToFetchFromStore := ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name) - // // Set Minimum Number if no value is set - // if maxBlobsToFetchFromStore == 0 { - // maxBlobsToFetchFromStore = 1 - // } config := Config{ BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), From 350ecdc54e166ffc0bb580ae9356f7d26719e3a2 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Tue, 9 Jan 2024 13:52:30 -0800 Subject: [PATCH 22/26] remove test --- disperser/batcher/encoding_streamer_test.go | 37 --------------------- 1 file changed, 37 deletions(-) diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index d6a90a623..b7b807e86 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -464,43 +464,6 @@ func TestPartialBlob(t *testing.T) { assert.Contains(t, batch.BlobMetadata, metadata1) } -func TestIncorrectRequestEncoding(t *testing.T) { - streamerConfig := batcher.StreamerConfig{ - SRSOrder: 3000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, - } - - encodingStreamer, c := createEncodingStreamer(t, 10, 200_000, streamerConfig) - - securityParams := []*core.SecurityParam{{ - QuorumID: 0, - AdversaryThreshold: 80, - QuorumThreshold: 100, - }} - blobData := []byte{1, 2, 3, 4, 5} - - numItems := 30 - for i := 0; i < numItems; i += 1 { - blob := core.Blob{ - RequestHeader: core.BlobRequestHeader{ - SecurityParams: securityParams, - }, - Data: blobData, - } - ctx := context.Background() - _, err := c.blobStore.StoreBlob(ctx, &blob, uint64(time.Now().UnixNano())) - assert.Nil(t, err) - } - - out := make(chan batcher.EncodingResultOrStatus) - // Request encoding - err := encodingStreamer.RequestEncoding(context.Background(), out) - assert.NotNil(t, err) - expectedErrMsg := "number of metadatas fetched from store is 30 greater than configured max number of blobs to fetch from store: 0" - assert.Equal(t, expectedErrMsg, err.Error()) -} - func TestIncorrectParameters(t *testing.T) { ctx := context.Background() From d40716f1369ab7cac185a93a098d2d003d8f423d Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Wed, 10 Jan 2024 10:09:14 -0800 Subject: [PATCH 23/26] Fix PR Comments --- common/aws/dynamodb/client.go | 3 +- disperser/batcher/encoding_streamer.go | 2 +- disperser/cmd/batcher/flags/flags.go | 2 +- .../common/blobstore/blob_metadata_store.go | 55 ++++++------------- disperser/common/blobstore/shared_storage.go | 2 +- disperser/common/inmem/store.go | 36 ++++++------ disperser/disperser.go | 4 +- inabox/deploy/config.go | 2 +- 8 files changed, 46 insertions(+), 60 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 9d3b34f4f..f1e2f730e 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -168,11 +168,12 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str // QueryIndexWithPagination returns all items in the index that match the given key // Results are limited to the given limit and the pagination token is returned +// When limit is is 0, all items are returned func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) { var queryInput *dynamodb.QueryInput // Fetch all items if limit is 0 - if limit != 0 { + if limit > 0 { queryInput = &dynamodb.QueryInput{ TableName: aws.String(tableName), IndexName: aws.String(indexName), diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 49b67d6b8..572f7c72d 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -67,7 +67,7 @@ type EncodingStreamer struct { logger common.Logger // Used to keep track of the last evaluated key for fetching metadatas - exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey + exclusiveStartKey *disperser.BlobStoreExclusiveStartKey } type batch struct { diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index b5a3fa026..7c9485769 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -170,7 +170,7 @@ var ( Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination", Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"), - Value: 1, + Value: 100, } ) diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 79da3da42..8741d6590 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -98,14 +98,14 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status // GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit // along with items, also returns a pagination token that can be used to fetch the next set of items -func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { - var attributeMap map[string]types.AttributeValue = nil + var attributeMap map[string]types.AttributeValue var err error // Convert the exclusive start key to a map of AttributeValue if exclusiveStartKey != nil { - attributeMap, err = convertExclusiveBlobStoreStartKeyToAttributeValueMap(exclusiveStartKey) + attributeMap, err = convertBlobStoreExclusiveStartKeyToAttributeValueMap(exclusiveStartKey) if err != nil { return nil, nil, err } @@ -115,11 +115,12 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), }}, limit, attributeMap) + if err != nil { return nil, nil, err } - // When their are no more results to fetch, the LastEvaluatedKey is nil + // When no more results to fetch, the LastEvaluatedKey is nil if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { return nil, nil, nil } @@ -137,8 +138,8 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co return metadata, nil, nil } - // Convert the last evaluated key to a disperser.ExclusiveBlobStoreStartKey - exclusiveStartKey, err = converTypeAttributeValuetToExclusiveBlobStoreStartKey(lastEvaluatedKey) + // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey + exclusiveStartKey, err = converTypeAttributeValuetToBlobStoreExclusiveStartKey(lastEvaluatedKey) if err != nil { return nil, nil, err } @@ -401,45 +402,25 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*disperser.BlobMetadata, e return &metadata, nil } -func converTypeAttributeValuetToExclusiveBlobStoreStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.ExclusiveBlobStoreStartKey, error) { - key := disperser.ExclusiveBlobStoreStartKey{} - - if bs, ok := exclusiveStartKeyMap["BlobStatus"].(*types.AttributeValueMemberN); ok { - blobStatus, err := strconv.ParseInt(bs.Value, 10, 32) - if err != nil { - return nil, fmt.Errorf("error parsing BlobStatus: %v", err) - } - key.BlobStatus = int32(blobStatus) - } - - if ra, ok := exclusiveStartKeyMap["RequestedAt"].(*types.AttributeValueMemberN); ok { - requestedAt, err := strconv.ParseInt(ra.Value, 10, 64) - if err != nil { - return nil, fmt.Errorf("error parsing RequestedAt: %v", err) - } - key.RequestedAt = requestedAt - } - - if bh, ok := exclusiveStartKeyMap["BlobHash"].(*types.AttributeValueMemberS); ok { - key.BlobHash = bh.Value - } - - if mh, ok := exclusiveStartKeyMap["MetadataHash"].(*types.AttributeValueMemberS); ok { - key.MetadataHash = mh.Value +func converTypeAttributeValuetToBlobStoreExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err } - return &key, nil + return &blobStoreExclusiveStartKey, nil } -func convertExclusiveBlobStoreStartKeyToAttributeValueMap(s *disperser.ExclusiveBlobStoreStartKey) (map[string]types.AttributeValue, error) { - if s == nil { - // Return an empty map or nil, depending on your application logic +func convertBlobStoreExclusiveStartKeyToAttributeValueMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil return nil, nil } - av, err := attributevalue.MarshalMap(s) + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) if err != nil { return nil, err } - return av, nil + return avMap, nil } diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index 2ca5a6dd9..8995b583e 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -203,7 +203,7 @@ func (s *SharedBlobStore) GetBlobMetadataByStatus(ctx context.Context, blobStatu return s.blobMetadataStore.GetBlobMetadataByStatus(ctx, blobStatus) } -func (s *SharedBlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { +func (s *SharedBlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { return s.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, blobStatus, limit, exclusiveStartKey) } diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 85c924500..52a0c6205 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "sort" "strconv" "github.com/Layr-Labs/eigenda/core" @@ -154,7 +155,7 @@ func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperse return metas, nil } -func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey) ([]*disperser.BlobMetadata, *disperser.ExclusiveBlobStoreStartKey, error) { +func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { metas := make([]*disperser.BlobMetadata, 0) foundStart := exclusiveStartKey == nil @@ -163,27 +164,30 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s if foundStart { metas = append(metas, meta) if len(metas) == int(limit) { - nextKey := &disperser.ExclusiveBlobStoreStartKey{ - BlobStatus: int32(meta.BlobStatus), - RequestedAt: int64(meta.RequestMetadata.RequestedAt), - } - return metas, nextKey, nil + break } } else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) { - foundStart = true // Start appending metas after this item - metas = append(metas, meta) - if len(metas) == int(limit) { - return metas, &disperser.ExclusiveBlobStoreStartKey{ - BlobStatus: int32(meta.BlobStatus), - RequestedAt: int64(meta.RequestMetadata.RequestedAt), - }, nil - } + foundStart = true // Found the starting point, start appending metas from next item } } } - // Return all the metas if limit is not reached - return metas, nil, nil + // Sort the metas by RequestedAt + sort.SliceStable(metas, func(i, j int) bool { + return metas[i].RequestMetadata.RequestedAt < metas[j].RequestMetadata.RequestedAt + }) + + // Determine nextKey for pagination + var nextKey *disperser.BlobStoreExclusiveStartKey + if len(metas) > 0 { + lastMeta := metas[len(metas)-1] + nextKey = &disperser.BlobStoreExclusiveStartKey{ + BlobStatus: int32(lastMeta.BlobStatus), + RequestedAt: int64(lastMeta.RequestMetadata.RequestedAt), + } + } + + return metas, nextKey, nil } func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { diff --git a/disperser/disperser.go b/disperser/disperser.go index a86b86515..8f884e2b0 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -124,7 +124,7 @@ type ConfirmationInfo struct { BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` } -type ExclusiveBlobStoreStartKey struct { +type BlobStoreExclusiveStartKey struct { BlobHash BlobHash MetadataHash MetadataHash BlobStatus int32 // BlobStatus is an integer @@ -158,7 +158,7 @@ type BlobStore interface { GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*BlobMetadata, error) // GetBlobMetadataByStatusWithPagination returns a list of blob metadata for blobs with the given status // Results are limited to the given limit and the pagination token is returned - GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *ExclusiveBlobStoreStartKey) ([]*BlobMetadata, *ExclusiveBlobStoreStartKey, error) + GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *BlobStoreExclusiveStartKey) ([]*BlobMetadata, *BlobStoreExclusiveStartKey, error) // GetAllBlobMetadataByBatch returns the metadata of all the blobs in the batch. GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*BlobMetadata, error) // GetBlobMetadata returns a blob metadata given a metadata key diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 0f5d2a1af..3213b2668 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -221,7 +221,7 @@ func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) B BATCHER_FINALIZER_INTERVAL: "6m", BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", BATCHER_NUM_CONFIRMATIONS: "0", - BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE: "1", + BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE: "100", } env.applyDefaults(&v, "BATCHER", "batcher", ind) From 53f41e5952a0f15bf5994ce7cf892743d327f48f Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Wed, 10 Jan 2024 13:16:11 -0800 Subject: [PATCH 24/26] Fix UT failure --- disperser/common/inmem/store.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 52a0c6205..129d3733a 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -168,26 +168,23 @@ func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, s } } else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) { foundStart = true // Found the starting point, start appending metas from next item + metas = append(metas, meta) + if len(metas) == int(limit) { + return metas, &disperser.BlobStoreExclusiveStartKey{ + BlobStatus: int32(meta.BlobStatus), + RequestedAt: int64(meta.RequestMetadata.RequestedAt), + }, nil + } } } } - // Sort the metas by RequestedAt sort.SliceStable(metas, func(i, j int) bool { return metas[i].RequestMetadata.RequestedAt < metas[j].RequestMetadata.RequestedAt }) - // Determine nextKey for pagination - var nextKey *disperser.BlobStoreExclusiveStartKey - if len(metas) > 0 { - lastMeta := metas[len(metas)-1] - nextKey = &disperser.BlobStoreExclusiveStartKey{ - BlobStatus: int32(lastMeta.BlobStatus), - RequestedAt: int64(lastMeta.RequestMetadata.RequestedAt), - } - } - - return metas, nextKey, nil + // Return all the metas if limit is not reached + return metas, nil, nil } func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { From 69398a313a92fcebcbf5ffac4c0f781a6763379a Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Thu, 11 Jan 2024 14:32:34 -0800 Subject: [PATCH 25/26] fix pr comment --- disperser/common/blobstore/blob_metadata_store.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 8741d6590..97bd3777d 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -105,7 +105,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co // Convert the exclusive start key to a map of AttributeValue if exclusiveStartKey != nil { - attributeMap, err = convertBlobStoreExclusiveStartKeyToAttributeValueMap(exclusiveStartKey) + attributeMap, err = convertToAttribMap(exclusiveStartKey) if err != nil { return nil, nil, err } @@ -139,7 +139,7 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co } // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey - exclusiveStartKey, err = converTypeAttributeValuetToBlobStoreExclusiveStartKey(lastEvaluatedKey) + exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey) if err != nil { return nil, nil, err } @@ -402,7 +402,7 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*disperser.BlobMetadata, e return &metadata, nil } -func converTypeAttributeValuetToBlobStoreExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { +func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) if err != nil { @@ -412,7 +412,7 @@ func converTypeAttributeValuetToBlobStoreExclusiveStartKey(exclusiveStartKeyMap return &blobStoreExclusiveStartKey, nil } -func convertBlobStoreExclusiveStartKeyToAttributeValueMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { +func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { if blobStoreExclusiveStartKey == nil { // Return an empty map or nil return nil, nil From e130739eeca30967867c38cff53a18e9114c4a55 Mon Sep 17 00:00:00 2001 From: Siddharth More Date: Thu, 11 Jan 2024 14:36:29 -0800 Subject: [PATCH 26/26] remove required flag --- disperser/cmd/batcher/flags/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 7c9485769..e14f4fa14 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -168,7 +168,7 @@ var ( MaxBlobsToFetchFromStoreFlag = cli.IntFlag{ Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"), Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination", - Required: true, + Required: false, EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"), Value: 100, }