Skip to content

Commit

Permalink
Create a dynamodb collector (#190)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc authored Jan 22, 2024
1 parent 3a04329 commit 630fe33
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 5 deletions.
16 changes: 16 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
return response.Items, nil
}

// QueryIndexCount returns the count of the items in the index that match the given key
func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Select: types.SelectCount,
})
if err != nil {
return 0, err
}

return response.Count, nil
}

// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
// When limit is is 0, all items are returned
Expand Down
53 changes: 53 additions & 0 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,59 @@ func TestQueryIndex(t *testing.T) {
assert.Equal(t, len(queryResult), 30)
}

func TestQueryIndexCount(t *testing.T) {
tableName := "ProcessingQueryIndexCount"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItemsProcessing := 10
items1 := make([]commondynamodb.Item, numItemsProcessing)
for i := 0; i < numItemsProcessing; i += 1 {
items1[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}

numItemsConfirmed := 20
items2 := make([]commondynamodb.Item, numItemsConfirmed)
for i := 0; i < numItemsConfirmed; i += 1 {
items2[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i+numItemsProcessing)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i+numItemsProcessing)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "1"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}

unprocessed, err := dynamoClient.PutItems(ctx, tableName, items1)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

unprocessed, err = dynamoClient.PutItems(ctx, tableName, items2)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

count, err := dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}})
assert.NoError(t, err)
assert.Equal(t, int(count), 10)

count, err = dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "1",
}})
assert.NoError(t, err)
assert.Equal(t, int(count), 20)
}

func TestQueryIndexPaginationSingleItem(t *testing.T) {
tableName := "ProcessingWithPaginationSingleItem"
createTable(t, tableName)
Expand Down
1 change: 0 additions & 1 deletion disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

encodingStreamerMetrics := EncodingStreamerMetrics{
EncodedBlobs: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func RunDataApi(ctx *cli.Context) error {
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
chainState = coreeth.NewChainState(tx, client)
metrics = dataapi.NewMetrics(config.MetricsConfig.HTTPPort, logger)
metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)
server = dataapi.NewServer(
dataapi.Config{
ServerMode: config.ServerMode,
Expand Down
15 changes: 15 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// GetBlobMetadataByStatusCount returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, status disperser.BlobStatus) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
}})
if err != nil {
return 0, err
}

return count, nil
}

// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit
// along with items, also returns a pagination token that can be used to fetch the next set of items
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) {
Expand Down
12 changes: 12 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.Len(t, processing, 1)
assert.Equal(t, metadata1, processing[0])

processingCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Equal(t, int32(1), processingCount)

err = blobMetadataStore.IncrementNumRetries(ctx, metadata1)
assert.NoError(t, err)
fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
Expand All @@ -79,6 +83,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.Len(t, finalized, 1)
assert.Equal(t, metadata2, finalized[0])

finalizedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Finalized)
assert.NoError(t, err)
assert.Equal(t, int32(1), finalizedCount)

confirmedMetadata := getConfirmedMetadata(t, blobKey1)
err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata)
assert.NoError(t, err)
Expand All @@ -87,6 +95,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, metadata, confirmedMetadata)

confirmedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Confirmed)
assert.NoError(t, err)
assert.Equal(t, int32(1), confirmedCount)

deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash},
Expand Down
43 changes: 41 additions & 2 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -27,12 +29,12 @@ type Metrics struct {
logger common.Logger
}

func NewMetrics(httpPort string, logger common.Logger) *Metrics {
func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string, logger common.Logger) *Metrics {
namespace := "eigenda_dataapi"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

reg.MustRegister(NewDynamoDBCollector(blobMetadataStore, logger))
metrics := &Metrics{
NumRequests: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -94,3 +96,40 @@ func (g *Metrics) Start(ctx context.Context) {
log.Error("Prometheus server failed", "err", err)
}()
}

type DynamoDBCollector struct {
blobMetadataStore *blobstore.BlobMetadataStore
blobStatusMetric *prometheus.Desc
logger common.Logger
}

func NewDynamoDBCollector(blobMetadataStore *blobstore.BlobMetadataStore, logger common.Logger) *DynamoDBCollector {
return &DynamoDBCollector{
blobMetadataStore: blobMetadataStore,
blobStatusMetric: prometheus.NewDesc("dynamodb_blob_metadata_status_count",
"Number of blobs with specific status in DynamoDB",
[]string{"status"},
nil,
),
logger: logger,
}
}

func (collector *DynamoDBCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- collector.blobStatusMetric
}

func (collector *DynamoDBCollector) Collect(ch chan<- prometheus.Metric) {
count, err := collector.blobMetadataStore.GetBlobMetadataByStatusCount(context.Background(), disperser.Processing)
if err != nil {
collector.logger.Error("failed to get count of blob metadata by status", "err", err)
return
}

ch <- prometheus.MustNewConstMetric(
collector.blobStatusMetric,
prometheus.GaugeValue,
float64(count),
disperser.Processing.String(),
)
}
2 changes: 1 addition & 1 deletion disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (

mockTx = &coremock.MockTransactor{}
mockChainState, _ = coremock.MakeChainDataMock(core.OperatorIndex(1))
testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics("9001", mockLogger))
testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockLogger, dataapi.NewMetrics(nil, "9001", mockLogger))
expectedBatchHeaderHash = [32]byte{1, 2, 3}
expectedBlobIndex = uint32(1)
expectedRequestedAt = uint64(5567830000000000000)
Expand Down

0 comments on commit 630fe33

Please sign in to comment.