diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index 76464cfd8..f1e2f730e 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -168,13 +168,26 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str // QueryIndexWithPagination returns all items in the index that match the given key // Results are limited to the given limit and the pagination token is returned +// When limit is is 0, all items are returned func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues, limit int32, exclusiveStartKey map[string]types.AttributeValue) (QueryResult, error) { - queryInput := &dynamodb.QueryInput{ - TableName: aws.String(tableName), - IndexName: aws.String(indexName), - KeyConditionExpression: aws.String(keyCondition), - ExpressionAttributeValues: expAttributeValues, - Limit: &limit, + var queryInput *dynamodb.QueryInput + + // Fetch all items if limit is 0 + if limit > 0 { + queryInput = &dynamodb.QueryInput{ + TableName: aws.String(tableName), + IndexName: aws.String(indexName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + Limit: &limit, + } + } else { + queryInput = &dynamodb.QueryInput{ + TableName: aws.String(tableName), + IndexName: aws.String(indexName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + } } // If a pagination token was provided, set it as the ExclusiveStartKey @@ -187,6 +200,10 @@ func (c *Client) QueryIndexWithPagination(ctx context.Context, tableName string, return QueryResult{}, err } + if len(response.Items) == 0 { + return QueryResult{Items: nil, LastEvaluatedKey: nil}, nil + } + // Return the items and the pagination token return QueryResult{ Items: response.Items, diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index 995e6889a..f0804fa47 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -337,7 +337,67 @@ func TestQueryIndexPaginationSingleItem(t *testing.T) { Value: "0", }}, 1, lastEvaluatedKey) assert.NoError(t, err) - assert.Len(t, queryResult.Items, 0) + assert.Nil(t, queryResult.Items) + assert.Nil(t, queryResult.LastEvaluatedKey) +} + +func TestQueryIndexPaginationItemNoLimit(t *testing.T) { + tableName := "ProcessingWithNoPaginationLimit" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + numItems := 30 + for i := 0; i < numItems; i += 1 { + requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix() + + // Create new item + item := commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(requestedAt, 10)}, + } + err := dynamoClient.PutItem(ctx, tableName, item) + assert.NoError(t, err) + } + + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 0, nil) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 30) + assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.Nil(t, queryResult.LastEvaluatedKey) + + // Save Last Evaluated Key + lastEvaluatedKey := queryResult.LastEvaluatedKey + + // Get the next item using LastEvaluatedKey expect to be nil + queryResult, err = dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 2, lastEvaluatedKey) + assert.NoError(t, err) + assert.Len(t, queryResult.Items, 2) + assert.Equal(t, "key29", queryResult.Items[0]["MetadataKey"].(*types.AttributeValueMemberS).Value) + assert.NotNil(t, queryResult.LastEvaluatedKey) +} + +func TestQueryIndexPaginationNoStoredItems(t *testing.T) { + tableName := "ProcessingWithPaginationNoItem" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + queryResult, err := dynamoClient.QueryIndexWithPagination(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}, 1, nil) + assert.NoError(t, err) + assert.Nil(t, queryResult.Items) assert.Nil(t, queryResult.LastEvaluatedKey) } diff --git a/disperser/batcher/batcher.go b/disperser/batcher/batcher.go index 958973c46..947d398bd 100644 --- a/disperser/batcher/batcher.go +++ b/disperser/batcher/batcher.go @@ -54,7 +54,8 @@ type Config struct { BatchSizeMBLimit uint MaxNumRetriesPerBlob uint - TargetNumChunks uint + TargetNumChunks uint + MaxBlobsToFetchFromStore int } type Batcher struct { @@ -97,10 +98,11 @@ func NewBatcher( uint64(config.BatchSizeMBLimit)*1024*1024, // convert to bytes ) streamerConfig := StreamerConfig{ - SRSOrder: config.SRSOrder, - EncodingRequestTimeout: config.PullInterval, - EncodingQueueLimit: config.EncodingRequestQueueSize, - TargetNumChunks: config.TargetNumChunks, + SRSOrder: config.SRSOrder, + EncodingRequestTimeout: config.PullInterval, + EncodingQueueLimit: config.EncodingRequestQueueSize, + TargetNumChunks: config.TargetNumChunks, + MaxBlobsToFetchFromStore: config.MaxBlobsToFetchFromStore, } encodingWorkerPool := workerpool.New(config.NumConnections) encodingStreamer, err := NewEncodingStreamer(streamerConfig, queue, chainState, encoderClient, assignmentCoordinator, batchTrigger, encodingWorkerPool, metrics.EncodingStreamerMetrics, logger) diff --git a/disperser/batcher/encoding_streamer.go b/disperser/batcher/encoding_streamer.go index 4ff48bf76..572f7c72d 100644 --- a/disperser/batcher/encoding_streamer.go +++ b/disperser/batcher/encoding_streamer.go @@ -41,6 +41,9 @@ type StreamerConfig struct { // TargetNumChunks is the target number of chunks per encoded blob TargetNumChunks uint + + // Maximum number of Blobs to fetch from store + MaxBlobsToFetchFromStore int } type EncodingStreamer struct { @@ -62,6 +65,9 @@ type EncodingStreamer struct { metrics *EncodingStreamerMetrics logger common.Logger + + // Used to keep track of the last evaluated key for fetching metadatas + exclusiveStartKey *disperser.BlobStoreExclusiveStartKey } type batch struct { @@ -107,6 +113,7 @@ func NewEncodingStreamer( encodingCtxCancelFuncs: make([]context.CancelFunc, 0), metrics: metrics, logger: logger, + exclusiveStartKey: nil, }, nil } @@ -175,7 +182,11 @@ func (e *EncodingStreamer) dedupRequests(metadatas []*disperser.BlobMetadata, re func (e *EncodingStreamer) RequestEncoding(ctx context.Context, encoderChan chan EncodingResultOrStatus) error { stageTimer := time.Now() // pull new blobs and send to encoder - metadatas, err := e.blobStore.GetBlobMetadataByStatus(ctx, disperser.Processing) + e.mu.Lock() + metadatas, newExclusiveStartKey, err := e.blobStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, int32(e.StreamerConfig.MaxBlobsToFetchFromStore), e.exclusiveStartKey) + e.exclusiveStartKey = newExclusiveStartKey + e.mu.Unlock() + if err != nil { return fmt.Errorf("error getting blob metadatas: %w", err) } diff --git a/disperser/batcher/encoding_streamer_test.go b/disperser/batcher/encoding_streamer_test.go index 38e18e068..b7b807e86 100644 --- a/disperser/batcher/encoding_streamer_test.go +++ b/disperser/batcher/encoding_streamer_test.go @@ -21,9 +21,10 @@ import ( var ( streamerConfig = batcher.StreamerConfig{ - SRSOrder: 300000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 300000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } ) @@ -294,9 +295,10 @@ func TestEncodingFailure(t *testing.T) { sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 1e12) workerpool := workerpool.New(5) streamerConfig := batcher.StreamerConfig{ - SRSOrder: 300000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 300000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } metrics := batcher.NewMetrics("9100", logger) encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger) @@ -467,9 +469,10 @@ func TestIncorrectParameters(t *testing.T) { ctx := context.Background() streamerConfig := batcher.StreamerConfig{ - SRSOrder: 3000, - EncodingRequestTimeout: 5 * time.Second, - EncodingQueueLimit: 100, + SRSOrder: 3000, + EncodingRequestTimeout: 5 * time.Second, + EncodingQueueLimit: 100, + MaxBlobsToFetchFromStore: 10, } encodingStreamer, c := createEncodingStreamer(t, 0, 1e12, streamerConfig) diff --git a/disperser/cmd/batcher/config.go b/disperser/cmd/batcher/config.go index 03b0b8917..49d918b5b 100644 --- a/disperser/cmd/batcher/config.go +++ b/disperser/cmd/batcher/config.go @@ -32,6 +32,7 @@ type Config struct { } func NewConfig(ctx *cli.Context) Config { + config := Config{ BlobstoreConfig: blobstore.Config{ BucketName: ctx.GlobalString(flags.S3BucketNameFlag.Name), @@ -51,6 +52,7 @@ func NewConfig(ctx *cli.Context) Config { SRSOrder: ctx.GlobalInt(flags.SRSOrderFlag.Name), MaxNumRetriesPerBlob: ctx.GlobalUint(flags.MaxNumRetriesPerBlobFlag.Name), TargetNumChunks: ctx.GlobalUint(flags.TargetNumChunksFlag.Name), + MaxBlobsToFetchFromStore: ctx.GlobalInt(flags.MaxBlobsToFetchFromStoreFlag.Name), }, TimeoutConfig: batcher.TimeoutConfig{ EncodingTimeout: ctx.GlobalDuration(flags.EncodingTimeoutFlag.Name), diff --git a/disperser/cmd/batcher/flags/flags.go b/disperser/cmd/batcher/flags/flags.go index 9afea62fc..e14f4fa14 100644 --- a/disperser/cmd/batcher/flags/flags.go +++ b/disperser/cmd/batcher/flags/flags.go @@ -164,6 +164,14 @@ var ( EnvVar: common.PrefixEnvVar(envVarPrefix, "TARGET_NUM_CHUNKS"), Value: 0, } + + MaxBlobsToFetchFromStoreFlag = cli.IntFlag{ + Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"), + Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination", + Required: false, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MAX_BLOBS_TO_FETCH_FROM_STORE"), + Value: 100, + } ) var requiredFlags = []cli.Flag{ diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 7b154e842..97bd3777d 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -98,15 +98,33 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status // GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit // along with items, also returns a pagination token that can be used to fetch the next set of items -func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey map[string]types.AttributeValue) ([]*disperser.BlobMetadata, map[string]types.AttributeValue, error) { +func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { + + var attributeMap map[string]types.AttributeValue + var err error + + // Convert the exclusive start key to a map of AttributeValue + if exclusiveStartKey != nil { + attributeMap, err = convertToAttribMap(exclusiveStartKey) + if err != nil { + return nil, nil, err + } + } + queryResult, err := s.dynamoDBClient.QueryIndexWithPagination(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), - }}, limit, exclusiveStartKey) + }}, limit, attributeMap) + if err != nil { return nil, nil, err } + // When no more results to fetch, the LastEvaluatedKey is nil + if queryResult.Items == nil && queryResult.LastEvaluatedKey == nil { + return nil, nil, nil + } + metadata := make([]*disperser.BlobMetadata, len(queryResult.Items)) for i, item := range queryResult.Items { metadata[i], err = UnmarshalBlobMetadata(item) @@ -119,7 +137,13 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Co if lastEvaluatedKey == nil { return metadata, nil, nil } - return metadata, lastEvaluatedKey, nil + + // Convert the last evaluated key to a disperser.BlobStoreExclusiveStartKey + exclusiveStartKey, err = convertToExclusiveStartKey(lastEvaluatedKey) + if err != nil { + return nil, nil, err + } + return metadata, exclusiveStartKey, nil } func (s *BlobMetadataStore) GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*disperser.BlobMetadata, error) { @@ -377,3 +401,26 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*disperser.BlobMetadata, e return &metadata, nil } + +func convertToExclusiveStartKey(exclusiveStartKeyMap map[string]types.AttributeValue) (*disperser.BlobStoreExclusiveStartKey, error) { + blobStoreExclusiveStartKey := disperser.BlobStoreExclusiveStartKey{} + err := attributevalue.UnmarshalMap(exclusiveStartKeyMap, &blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + + return &blobStoreExclusiveStartKey, nil +} + +func convertToAttribMap(blobStoreExclusiveStartKey *disperser.BlobStoreExclusiveStartKey) (map[string]types.AttributeValue, error) { + if blobStoreExclusiveStartKey == nil { + // Return an empty map or nil + return nil, nil + } + + avMap, err := attributevalue.MarshalMap(blobStoreExclusiveStartKey) + if err != nil { + return nil, err + } + return avMap, nil +} diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index 3b393ac78..b31e10de2 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -175,6 +175,16 @@ func TestBlobMetadataStoreOperationsWithPagination(t *testing.T) { }) } +func TestBlobMetadataStoreOperationsWithPaginationNoStoredBlob(t *testing.T) { + ctx := context.Background() + // Query BlobMetadataStore for a blob that does not exist + // This should return nil for both the blob and lastEvaluatedKey + processing, lastEvaluatedKey, err := blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, disperser.Processing, 1, nil) + assert.NoError(t, err) + assert.Nil(t, processing) + assert.Nil(t, lastEvaluatedKey) +} + func deleteItems(t *testing.T, keys []commondynamodb.Key) { _, err := dynamoClient.DeleteItems(context.Background(), metadataTableName, keys) assert.NoError(t, err) diff --git a/disperser/common/blobstore/shared_storage.go b/disperser/common/blobstore/shared_storage.go index d64cf8923..8995b583e 100644 --- a/disperser/common/blobstore/shared_storage.go +++ b/disperser/common/blobstore/shared_storage.go @@ -203,6 +203,10 @@ func (s *SharedBlobStore) GetBlobMetadataByStatus(ctx context.Context, blobStatu return s.blobMetadataStore.GetBlobMetadataByStatus(ctx, blobStatus) } +func (s *SharedBlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { + return s.blobMetadataStore.GetBlobMetadataByStatusWithPagination(ctx, blobStatus, limit, exclusiveStartKey) +} + func (s *SharedBlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { return s.blobMetadataStore.GetBlobMetadataInBatch(ctx, batchHeaderHash, blobIndex) } diff --git a/disperser/common/inmem/store.go b/disperser/common/inmem/store.go index 006cbc4ce..129d3733a 100644 --- a/disperser/common/inmem/store.go +++ b/disperser/common/inmem/store.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "sort" "strconv" "github.com/Layr-Labs/eigenda/core" @@ -154,6 +155,38 @@ func (q *BlobStore) GetBlobMetadataByStatus(ctx context.Context, status disperse return metas, nil } +func (q *BlobStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { + metas := make([]*disperser.BlobMetadata, 0) + foundStart := exclusiveStartKey == nil + + for _, meta := range q.Metadata { + if meta.BlobStatus == status { + if foundStart { + metas = append(metas, meta) + if len(metas) == int(limit) { + break + } + } else if meta.BlobStatus == disperser.BlobStatus(exclusiveStartKey.BlobStatus) && meta.RequestMetadata.RequestedAt == uint64(exclusiveStartKey.RequestedAt) { + foundStart = true // Found the starting point, start appending metas from next item + metas = append(metas, meta) + if len(metas) == int(limit) { + return metas, &disperser.BlobStoreExclusiveStartKey{ + BlobStatus: int32(meta.BlobStatus), + RequestedAt: int64(meta.RequestMetadata.RequestedAt), + }, nil + } + } + } + } + + sort.SliceStable(metas, func(i, j int) bool { + return metas[i].RequestMetadata.RequestedAt < metas[j].RequestMetadata.RequestedAt + }) + + // Return all the metas if limit is not reached + return metas, nil, nil +} + func (q *BlobStore) GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*disperser.BlobMetadata, error) { for _, meta := range q.Metadata { if meta.ConfirmationInfo != nil && meta.ConfirmationInfo.BatchHeaderHash == batchHeaderHash && meta.ConfirmationInfo.BlobIndex == blobIndex { diff --git a/disperser/disperser.go b/disperser/disperser.go index 116b0910d..8f884e2b0 100644 --- a/disperser/disperser.go +++ b/disperser/disperser.go @@ -124,6 +124,13 @@ type ConfirmationInfo struct { BlobQuorumInfos []*core.BlobQuorumInfo `json:"blob_quorum_infos"` } +type BlobStoreExclusiveStartKey struct { + BlobHash BlobHash + MetadataHash MetadataHash + BlobStatus int32 // BlobStatus is an integer + RequestedAt int64 // RequestedAt is epoch time in seconds +} + type BlobStore interface { // StoreBlob adds a blob to the queue and returns a key that can be used to retrieve the blob later StoreBlob(ctx context.Context, blob *core.Blob, requestedAt uint64) (BlobKey, error) @@ -149,6 +156,9 @@ type BlobStore interface { GetBlobMetadataByStatus(ctx context.Context, blobStatus BlobStatus) ([]*BlobMetadata, error) // GetMetadataInBatch returns the metadata in a given batch at given index. GetMetadataInBatch(ctx context.Context, batchHeaderHash [32]byte, blobIndex uint32) (*BlobMetadata, error) + // GetBlobMetadataByStatusWithPagination returns a list of blob metadata for blobs with the given status + // Results are limited to the given limit and the pagination token is returned + GetBlobMetadataByStatusWithPagination(ctx context.Context, blobStatus BlobStatus, limit int32, exclusiveStartKey *BlobStoreExclusiveStartKey) ([]*BlobMetadata, *BlobStoreExclusiveStartKey, error) // GetAllBlobMetadataByBatch returns the metadata of all the blobs in the batch. GetAllBlobMetadataByBatch(ctx context.Context, batchHeaderHash [32]byte) ([]*BlobMetadata, error) // GetBlobMetadata returns a blob metadata given a metadata key diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 569ee4c22..3213b2668 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -197,30 +197,31 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath, // Generates batcher .env func (env *Config) generateBatcherVars(ind int, key, graphUrl, logPath string) BatcherVars { v := BatcherVars{ - BATCHER_S3_BUCKET_NAME: "test-eigenda-blobstore", - BATCHER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", - BATCHER_ENABLE_METRICS: "true", - BATCHER_METRICS_HTTP_PORT: "9094", - BATCHER_PULL_INTERVAL: "5s", - BATCHER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver, - BATCHER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager, - BATCHER_SRS_ORDER: "300000", - BATCHER_CHAIN_RPC: "", - BATCHER_PRIVATE_KEY: key[2:], - BATCHER_STD_LOG_LEVEL: "debug", - BATCHER_FILE_LOG_LEVEL: "trace", - BATCHER_LOG_PATH: logPath, - BATCHER_GRAPH_URL: graphUrl, - BATCHER_USE_GRAPH: "true", - BATCHER_BATCH_SIZE_LIMIT: "10240", // 10 GiB - BATCHER_INDEXER_PULL_INTERVAL: "1s", - BATCHER_AWS_REGION: "", - BATCHER_AWS_ACCESS_KEY_ID: "", - BATCHER_AWS_SECRET_ACCESS_KEY: "", - BATCHER_AWS_ENDPOINT_URL: "", - BATCHER_FINALIZER_INTERVAL: "6m", - BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", - BATCHER_NUM_CONFIRMATIONS: "0", + BATCHER_S3_BUCKET_NAME: "test-eigenda-blobstore", + BATCHER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", + BATCHER_ENABLE_METRICS: "true", + BATCHER_METRICS_HTTP_PORT: "9094", + BATCHER_PULL_INTERVAL: "5s", + BATCHER_BLS_OPERATOR_STATE_RETRIVER: env.EigenDA.OperatorStateRetreiver, + BATCHER_EIGENDA_SERVICE_MANAGER: env.EigenDA.ServiceManager, + BATCHER_SRS_ORDER: "300000", + BATCHER_CHAIN_RPC: "", + BATCHER_PRIVATE_KEY: key[2:], + BATCHER_STD_LOG_LEVEL: "debug", + BATCHER_FILE_LOG_LEVEL: "trace", + BATCHER_LOG_PATH: logPath, + BATCHER_GRAPH_URL: graphUrl, + BATCHER_USE_GRAPH: "true", + BATCHER_BATCH_SIZE_LIMIT: "10240", // 10 GiB + BATCHER_INDEXER_PULL_INTERVAL: "1s", + BATCHER_AWS_REGION: "", + BATCHER_AWS_ACCESS_KEY_ID: "", + BATCHER_AWS_SECRET_ACCESS_KEY: "", + BATCHER_AWS_ENDPOINT_URL: "", + BATCHER_FINALIZER_INTERVAL: "6m", + BATCHER_ENCODING_REQUEST_QUEUE_SIZE: "500", + BATCHER_NUM_CONFIRMATIONS: "0", + BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE: "100", } env.applyDefaults(&v, "BATCHER", "batcher", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index 939fc1794..98813ee00 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -139,6 +139,8 @@ type BatcherVars struct { BATCHER_AWS_SECRET_ACCESS_KEY string BATCHER_AWS_ENDPOINT_URL string + + BATCHER_MAX_BLOBS_TO_FETCH_FROM_STORE string } func (vars BatcherVars) getEnvMap() map[string]string {