From 630fe3326e9c79eabeed74fb686d106df0179948 Mon Sep 17 00:00:00 2001 From: Daniel Mancia <21249320+dmanc@users.noreply.github.com> Date: Mon, 22 Jan 2024 13:36:52 -0800 Subject: [PATCH] Create a dynamodb collector (#190) --- common/aws/dynamodb/client.go | 16 ++++++ common/aws/dynamodb/client_test.go | 53 +++++++++++++++++++ disperser/batcher/metrics.go | 1 - disperser/cmd/dataapi/main.go | 2 +- .../common/blobstore/blob_metadata_store.go | 15 ++++++ .../blobstore/blob_metadata_store_test.go | 12 +++++ disperser/dataapi/metrics.go | 43 ++++++++++++++- disperser/dataapi/server_test.go | 2 +- 8 files changed, 139 insertions(+), 5 deletions(-) diff --git a/common/aws/dynamodb/client.go b/common/aws/dynamodb/client.go index f1e2f730e..1dcb329ed 100644 --- a/common/aws/dynamodb/client.go +++ b/common/aws/dynamodb/client.go @@ -166,6 +166,22 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str return response.Items, nil } +// QueryIndexCount returns the count of the items in the index that match the given key +func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) { + response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(tableName), + IndexName: aws.String(indexName), + KeyConditionExpression: aws.String(keyCondition), + ExpressionAttributeValues: expAttributeValues, + Select: types.SelectCount, + }) + if err != nil { + return 0, err + } + + return response.Count, nil +} + // QueryIndexWithPagination returns all items in the index that match the given key // Results are limited to the given limit and the pagination token is returned // When limit is is 0, all items are returned diff --git a/common/aws/dynamodb/client_test.go b/common/aws/dynamodb/client_test.go index 5ab2fb681..16e7946ed 100644 --- a/common/aws/dynamodb/client_test.go +++ b/common/aws/dynamodb/client_test.go @@ -300,6 +300,59 @@ func TestQueryIndex(t *testing.T) { assert.Equal(t, len(queryResult), 30) } +func TestQueryIndexCount(t *testing.T) { + tableName := "ProcessingQueryIndexCount" + createTable(t, tableName) + indexName := "StatusIndex" + + ctx := context.Background() + numItemsProcessing := 10 + items1 := make([]commondynamodb.Item, numItemsProcessing) + for i := 0; i < numItemsProcessing; i += 1 { + items1[i] = commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "0"}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)}, + } + } + + numItemsConfirmed := 20 + items2 := make([]commondynamodb.Item, numItemsConfirmed) + for i := 0; i < numItemsConfirmed; i += 1 { + items2[i] = commondynamodb.Item{ + "MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i+numItemsProcessing)}, + "BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i+numItemsProcessing)}, + "BlobSize": &types.AttributeValueMemberN{Value: "123"}, + "BlobStatus": &types.AttributeValueMemberN{Value: "1"}, + "RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)}, + } + } + + unprocessed, err := dynamoClient.PutItems(ctx, tableName, items1) + assert.NoError(t, err) + assert.Len(t, unprocessed, 0) + + unprocessed, err = dynamoClient.PutItems(ctx, tableName, items2) + assert.NoError(t, err) + assert.Len(t, unprocessed, 0) + + count, err := dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "0", + }}) + assert.NoError(t, err) + assert.Equal(t, int(count), 10) + + count, err = dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: "1", + }}) + assert.NoError(t, err) + assert.Equal(t, int(count), 20) +} + func TestQueryIndexPaginationSingleItem(t *testing.T) { tableName := "ProcessingWithPaginationSingleItem" createTable(t, tableName) diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index 0ad74ee97..c5dfce257 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -64,7 +64,6 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics { reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - encodingStreamerMetrics := EncodingStreamerMetrics{ EncodedBlobs: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ diff --git a/disperser/cmd/dataapi/main.go b/disperser/cmd/dataapi/main.go index 66a498c9e..3027ed9eb 100644 --- a/disperser/cmd/dataapi/main.go +++ b/disperser/cmd/dataapi/main.go @@ -87,7 +87,7 @@ func RunDataApi(ctx *cli.Context) error { subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr) subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger) chainState = coreeth.NewChainState(tx, client) - metrics = dataapi.NewMetrics(config.MetricsConfig.HTTPPort, logger) + metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger) server = dataapi.NewServer( dataapi.Config{ ServerMode: config.ServerMode, diff --git a/disperser/common/blobstore/blob_metadata_store.go b/disperser/common/blobstore/blob_metadata_store.go index 97bd3777d..667f38408 100644 --- a/disperser/common/blobstore/blob_metadata_store.go +++ b/disperser/common/blobstore/blob_metadata_store.go @@ -96,6 +96,21 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status return metadata, nil } +// GetBlobMetadataByStatusCount returns the count of all the metadata with the given status +// Because this function scans the entire index, it should only be used for status with a limited number of items. +// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented. +func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, status disperser.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{ + ":status": &types.AttributeValueMemberN{ + Value: strconv.Itoa(int(status)), + }}) + if err != nil { + return 0, err + } + + return count, nil +} + // GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit // along with items, also returns a pagination token that can be used to fetch the next set of items func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) { diff --git a/disperser/common/blobstore/blob_metadata_store_test.go b/disperser/common/blobstore/blob_metadata_store_test.go index b31e10de2..564745941 100644 --- a/disperser/common/blobstore/blob_metadata_store_test.go +++ b/disperser/common/blobstore/blob_metadata_store_test.go @@ -67,6 +67,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.Len(t, processing, 1) assert.Equal(t, metadata1, processing[0]) + processingCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Processing) + assert.NoError(t, err) + assert.Equal(t, int32(1), processingCount) + err = blobMetadataStore.IncrementNumRetries(ctx, metadata1) assert.NoError(t, err) fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1) @@ -79,6 +83,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.Len(t, finalized, 1) assert.Equal(t, metadata2, finalized[0]) + finalizedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Finalized) + assert.NoError(t, err) + assert.Equal(t, int32(1), finalizedCount) + confirmedMetadata := getConfirmedMetadata(t, blobKey1) err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata) assert.NoError(t, err) @@ -87,6 +95,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, metadata, confirmedMetadata) + confirmedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Confirmed) + assert.NoError(t, err) + assert.Equal(t, int32(1), confirmedCount) + deleteItems(t, []commondynamodb.Key{ { "MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash}, diff --git a/disperser/dataapi/metrics.go b/disperser/dataapi/metrics.go index 2c60c06cf..961887741 100644 --- a/disperser/dataapi/metrics.go +++ b/disperser/dataapi/metrics.go @@ -6,6 +6,8 @@ import ( "net/http" "github.com/Layr-Labs/eigenda/common" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/Layr-Labs/eigenda/disperser/common/blobstore" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promauto" @@ -27,12 +29,12 @@ type Metrics struct { logger common.Logger } -func NewMetrics(httpPort string, logger common.Logger) *Metrics { +func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string, logger common.Logger) *Metrics { namespace := "eigenda_dataapi" reg := prometheus.NewRegistry() reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) - + reg.MustRegister(NewDynamoDBCollector(blobMetadataStore, logger)) metrics := &Metrics{ NumRequests: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ @@ -94,3 +96,40 @@ func (g *Metrics) Start(ctx context.Context) { log.Error("Prometheus server failed", "err", err) }() } + +type DynamoDBCollector struct { + blobMetadataStore *blobstore.BlobMetadataStore + blobStatusMetric *prometheus.Desc + logger common.Logger +} + +func NewDynamoDBCollector(blobMetadataStore *blobstore.BlobMetadataStore, logger common.Logger) *DynamoDBCollector { + return &DynamoDBCollector{ + blobMetadataStore: blobMetadataStore, + blobStatusMetric: prometheus.NewDesc("dynamodb_blob_metadata_status_count", + "Number of blobs with specific status in DynamoDB", + []string{"status"}, + nil, + ), + logger: logger, + } +} + +func (collector *DynamoDBCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- collector.blobStatusMetric +} + +func (collector *DynamoDBCollector) Collect(ch chan<- prometheus.Metric) { + count, err := collector.blobMetadataStore.GetBlobMetadataByStatusCount(context.Background(), disperser.Processing) + if err != nil { + collector.logger.Error("failed to get count of blob metadata by status", "err", err) + return + } + + ch <- prometheus.MustNewConstMetric( + collector.blobStatusMetric, + prometheus.GaugeValue, + float64(count), + disperser.Processing.String(), + ) +} diff --git a/disperser/dataapi/server_test.go b/disperser/dataapi/server_test.go index 25d55bd30..568c7535c 100644 --- a/disperser/dataapi/server_test.go +++ b/disperser/dataapi/server_test.go @@ -47,7 +47,7 @@ var ( mockTx = &coremock.MockTransactor{} mockChainState, _ = coremock.MakeChainDataMock(core.OperatorIndex(1)) - testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics("9001", mockLogger)) + testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger)) expectedBatchHeaderHash = [32]byte{1, 2, 3} expectedBlobIndex = uint32(1) expectedRequestedAt = uint64(5567830000000000000)