Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Encoding ]Request with pagination #157

Merged
merged 30 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5dc0c31
Add pagination method to dynamodB
Jan 5, 2024
9b8dada
Fix lint error
Jan 5, 2024
604e29d
return lastEvaluatedKey
Jan 5, 2024
47de551
Update Table to use RequestedAt instead of CreatedAt key
Jan 5, 2024
5198c14
Encoding Request With Pagination
Jan 5, 2024
1946d6f
Update to use Extended BlobStore
Jan 5, 2024
595f74d
Fix PR comment
Jan 5, 2024
74856fc
make blobstofetch a configuration
Jan 5, 2024
f688e0e
Fix PR Comment
Jan 5, 2024
80f8152
Merge branch 'blob_metadata_with_pagination' into encoding_request_wi…
Jan 5, 2024
1f2a6ad
Add Test for Invalid RequestEncoding when no max blob fetch is set
Jan 5, 2024
e804628
Add Test for GetBlobMetadataByStatusWithPagination
Jan 8, 2024
77b36fd
Fix Test
Jan 8, 2024
ca05b43
Add more test
Jan 8, 2024
55b4229
Merge branch 'blob_metadata_with_pagination' into encoding_request_wi…
Jan 8, 2024
de4953c
fix merge conflict
Jan 8, 2024
4365263
Merge branch 'master' into encoding_request_with_pagination
Jan 8, 2024
2931566
remove commented code
Jan 8, 2024
ba22d7b
Set Default Value for BlobsToFetch from store
Jan 8, 2024
ba13aa2
Add MetadataHash and BlobHash to ExclusiveStartKEy
Jan 9, 2024
ee26668
return nil when their are no results
Jan 9, 2024
65c0078
Fetch all items when limit is set to 0
Jan 9, 2024
346bcde
Update comment
Jan 9, 2024
87780da
changes
Jan 9, 2024
034bb63
Handle case when no limit is provided
Jan 9, 2024
350ecdc
remove test
Jan 9, 2024
d40716f
Fix PR Comments
Jan 10, 2024
53f41e5
Fix UT failure
Jan 10, 2024
69398a3
fix pr comment
Jan 11, 2024
e130739
remove required flag
Jan 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type Item = map[string]types.AttributeValue
type Key = map[string]types.AttributeValue
type ExpresseionValues = map[string]types.AttributeValue

type QueryResult struct {
Items []map[string]types.AttributeValue
LastEvaluatedKey map[string]types.AttributeValue
}

type Client struct {
dynamoClient *dynamodb.Client
logger common.Logger
Expand Down Expand Up @@ -161,6 +166,34 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
return response.Items, nil
}

// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
siddimore marked this conversation as resolved.
Show resolved Hide resolved
func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) {
queryInput := &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Limit: &limit,
}

// If a pagination token was provided, set it as the ExclusiveStartKey
if exclusiveStartKey != nil {
queryInput.ExclusiveStartKey = exclusiveStartKey
}

response, err := c.dynamoClient.Query(ctx, queryInput)
if err != nil {
return QueryResult{}, err
}

// Return the items and the pagination token
return QueryResult{
Items: response.Items,
LastEvaluatedKey: response.LastEvaluatedKey,
}, nil
}

func (c *Client) DeleteItem(ctx context.Context, tableName string, key Key) error {
_, err := c.dynamoClient.DeleteItem(ctx, &dynamodb.DeleteItemInput{Key: key, TableName: aws.String(tableName)})
if err != nil {
Expand Down
197 changes: 189 additions & 8 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"os"
"strconv"
"testing"
"time"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb"
Expand Down Expand Up @@ -79,14 +81,48 @@ func teardown() {
func createTable(t *testing.T, tableName string) {
ctx := context.Background()
tableDescription, err := test_utils.CreateTable(ctx, clientConfig, tableName, &dynamodb.CreateTableInput{
AttributeDefinitions: []types.AttributeDefinition{{
AttributeName: aws.String("MetadataKey"),
AttributeType: types.ScalarAttributeTypeS,
}},
KeySchema: []types.KeySchemaElement{{
AttributeName: aws.String("MetadataKey"),
KeyType: types.KeyTypeHash,
}},
AttributeDefinitions: []types.AttributeDefinition{
{
AttributeName: aws.String("MetadataKey"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("BlobStatus"),
AttributeType: types.ScalarAttributeTypeN, // Assuming BlobStatus is a numeric value
},
{
AttributeName: aws.String("RequestedAt"),
AttributeType: types.ScalarAttributeTypeN, // Assuming RequestedAt is a string representing a timestamp
},
},
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("MetadataKey"),
KeyType: types.KeyTypeHash,
},
},
GlobalSecondaryIndexes: []types.GlobalSecondaryIndex{
{
IndexName: aws.String("StatusIndex"),
KeySchema: []types.KeySchemaElement{
{
AttributeName: aws.String("BlobStatus"),
KeyType: types.KeyTypeHash,
},
{
AttributeName: aws.String("RequestedAt"),
KeyType: types.KeyTypeRange, // Using RequestedAt as sort key
},
},
Projection: &types.Projection{
ProjectionType: types.ProjectionTypeAll, // ProjectionTypeAll means all attributes are projected into the index
},
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
WriteCapacityUnits: aws.Int64(10),
},
},
},
TableName: aws.String(tableName),
ProvisionedThroughput: &types.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(10),
Expand Down Expand Up @@ -234,3 +270,148 @@ func TestBatchOperations(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, fetchedItem, 0)
}

func TestQueryIndex(t *testing.T) {
tableName := "ProcessingQueryIndex"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItems := 30
items := make([]commondynamodb.Item, numItems)
for i := 0; i < numItems; i += 1 {
items[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}
unprocessed, err := dynamoClient.PutItems(ctx, tableName, items)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

queryResult, err := dynamoClient.QueryIndex(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}})
assert.NoError(t, err)
assert.Equal(t, len(queryResult), 30)
}

func TestQueryIndexPagination(t *testing.T) {
tableName := "ProcessingWithPagination"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)},
}
err := dynamoClient.PutItem(ctx, tableName, item)
assert.NoError(t, err)
}

queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, nil)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 10)
assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.NotNil(t, queryResult.LastEvaluatedKey)
assert.Equal(t, "key20", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value)
assert.Equal(t, "0", queryResult.LastEvaluatedKey["BlobStatus"].(*types.AttributeValueMemberN).Value)

// Get the next 10 items
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, queryResult.LastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 10)
assert.Equal(t, "key10", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value)

// Get the last 10 items
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, queryResult.LastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 10)
assert.Equal(t, "key0", queryResult.LastEvaluatedKey["MetadataKey"].(*types.AttributeValueMemberS).Value)

// Empty result Since all items are processed
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, queryResult.LastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 0)
assert.Nil(t, queryResult.LastEvaluatedKey)
}

func TestQueryIndexWithPaginationForBatch(t *testing.T) {
tableName := "ProcessingWithPaginationForBatch"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItems := 30
items := make([]commondynamodb.Item, numItems)
for i := 0; i < numItems; i += 1 {
items[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}
unprocessed, err := dynamoClient.PutItems(ctx, tableName, items)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

// Get First 10 items
queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, nil)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 10)

// Get the next 10 items
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, queryResult.LastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 10)

// Get the last 10 items
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, queryResult.LastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 10)

// Empty result Since all items are processed
queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}}, 10, queryResult.LastEvaluatedKey)
assert.NoError(t, err)
assert.Len(t, queryResult.Items, 0)
assert.Nil(t, queryResult.LastEvaluatedKey)
}
12 changes: 7 additions & 5 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type Config struct {
BatchSizeMBLimit uint
MaxNumRetriesPerBlob uint

TargetNumChunks uint
TargetNumChunks uint
MaxBlobsToFetchFromStore int
}

type Batcher struct {
Expand Down Expand Up @@ -97,10 +98,11 @@ func NewBatcher(
uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes
)
streamerConfig := StreamerConfig{
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
SRSOrder: config.SRSOrder,
EncodingRequestTimeout: config.PullInterval,
EncodingQueueLimit: config.EncodingRequestQueueSize,
TargetNumChunks: config.TargetNumChunks,
MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore,
}
encodingWorkerPool := workerpool.New(config.NumConnections)
encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger)
Expand Down
14 changes: 13 additions & 1 deletion disperser/batcher/encoding_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type StreamerConfig struct {

// TargetNumChunks is the target number of chunks per encoded blob
TargetNumChunks uint

// Maximum number of Blobs to fetch from store
MaxBlobsToFetchFromStore int
siddimore marked this conversation as resolved.
Show resolved Hide resolved
}

type EncodingStreamer struct {
Expand All @@ -62,6 +65,9 @@ type EncodingStreamer struct {

metrics *EncodingStreamerMetrics
logger common.Logger

// Used to keep track of the last evaluated key for fetching metadatas
exclusiveStartKey *disperser.ExclusiveBlobStoreStartKey
}

type batch struct {
Expand Down Expand Up @@ -107,6 +113,7 @@ func NewEncodingStreamer(
encodingCtxCancelFuncs: make([]context.CancelFunc, 0),
metrics: metrics,
logger: logger,
exclusiveStartKey: nil,
}, nil
}

Expand Down Expand Up @@ -175,7 +182,12 @@ func (e *EncodingStreamer) dedupRequests(metadatas []*disperser.BlobMetadata, re
func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan EncodingResultOrStatus) error {
stageTimer := time.Now()
// pull new blobs and send to encoder
metadatas, err := e.blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing)
e.mu.Lock()
// TODO: Get Limit from Config
metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey)
e.exclusiveStartKey = newExclusiveStartKey
e.mu.Lock()
siddimore marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
return fmt.Errorf("error getting blob metadatas: %w", err)
}
Expand Down
14 changes: 8 additions & 6 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (

var (
streamerConfig = batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
)

Expand Down Expand Up @@ -294,9 +295,10 @@ func TestEncodingFailure(t *testing.T) {
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 1e12)
workerpool := workerpool.New(5)
streamerConfig := batcher.StreamerConfig{
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
SRSOrder: 300000,
EncodingRequestTimeout: 5 * time.Second,
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
metrics := batcher.NewMetrics("9100", logger)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger)
Expand Down
1 change: 1 addition & 0 deletions disperser/cmd/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewConfig(ctx *cli.Context) Config {
SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name),
MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name),
TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name),
MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name),
},
TimeoutConfig: batcher.TimeoutConfig{
EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name),
Expand Down
8 changes: 8 additions & 0 deletions disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,14 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "TARGET_NUM_CHUNKS"),
Value: 0,
}

MaxBlobsToFetchFromStoreFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"),
Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination",
Required: false,
EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"),
Value: 10,
}
)

var requiredFlags = []cli.Flag{
Expand Down
Loading
Loading