Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/Layr-Labs/eigenda into ba…
Browse files Browse the repository at this point in the history
…tchmetrics
  • Loading branch information
jianoaix committed Jan 22, 2024
2 parents 6ba09fb + 630fe33 commit 24f4e55
Show file tree
Hide file tree
Showing 19 changed files with 164 additions and 27 deletions.
2 changes: 1 addition & 1 deletion clients/tests/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func setup(t *testing.T) {
}

func mustMakeOpertatorPubKeysPair(t *testing.T) *coreindexer.OperatorPubKeys {
operators := make(map[[32]byte]coreindexer.OperatorPubKeysPair, len(operatorState.Operators))
operators := make(map[core.OperatorID]coreindexer.OperatorPubKeysPair, len(operatorState.Operators))
for operatorId := range operatorState.Operators[0] {
keyPair, err := core.GenRandomBlsKeys()
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions common/aws/dynamodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ func (c *Client) QueryIndex(ctx context.Context, tableName string, indexName str
return response.Items, nil
}

// QueryIndexCount returns the count of the items in the index that match the given key
func (c *Client) QueryIndexCount(ctx context.Context, tableName string, indexName string, keyCondition string, expAttributeValues ExpresseionValues) (int32, error) {
response, err := c.dynamoClient.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(tableName),
IndexName: aws.String(indexName),
KeyConditionExpression: aws.String(keyCondition),
ExpressionAttributeValues: expAttributeValues,
Select: types.SelectCount,
})
if err != nil {
return 0, err
}

return response.Count, nil
}

// QueryIndexWithPagination returns all items in the index that match the given key
// Results are limited to the given limit and the pagination token is returned
// When limit is is 0, all items are returned
Expand Down
53 changes: 53 additions & 0 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,59 @@ func TestQueryIndex(t *testing.T) {
assert.Equal(t, len(queryResult), 30)
}

func TestQueryIndexCount(t *testing.T) {
tableName := "ProcessingQueryIndexCount"
createTable(t, tableName)
indexName := "StatusIndex"

ctx := context.Background()
numItemsProcessing := 10
items1 := make([]commondynamodb.Item, numItemsProcessing)
for i := 0; i < numItemsProcessing; i += 1 {
items1[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "0"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}

numItemsConfirmed := 20
items2 := make([]commondynamodb.Item, numItemsConfirmed)
for i := 0; i < numItemsConfirmed; i += 1 {
items2[i] = commondynamodb.Item{
"MetadataKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("key%d", i+numItemsProcessing)},
"BlobKey": &types.AttributeValueMemberS{Value: fmt.Sprintf("blob%d", i+numItemsProcessing)},
"BlobSize": &types.AttributeValueMemberN{Value: "123"},
"BlobStatus": &types.AttributeValueMemberN{Value: "1"},
"RequestedAt": &types.AttributeValueMemberN{Value: strconv.FormatInt(time.Now().Unix(), 10)},
}
}

unprocessed, err := dynamoClient.PutItems(ctx, tableName, items1)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

unprocessed, err = dynamoClient.PutItems(ctx, tableName, items2)
assert.NoError(t, err)
assert.Len(t, unprocessed, 0)

count, err := dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "0",
}})
assert.NoError(t, err)
assert.Equal(t, int(count), 10)

count, err = dynamoClient.QueryIndexCount(ctx, tableName, indexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: "1",
}})
assert.NoError(t, err)
assert.Equal(t, int(count), 20)
}

func TestQueryIndexPaginationSingleItem(t *testing.T) {
tableName := "ProcessingWithPaginationSingleItem"
createTable(t, tableName)
Expand Down
2 changes: 1 addition & 1 deletion core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
for numReply := 0; numReply < numOperators; numReply++ {
var err error
r := <-messageChan
operatorIDHex := hexutil.Encode(r.Operator[:])
operatorIDHex := r.Operator.Hex()
operatorAddr, ok := a.OperatorAddresses.Get(r.Operator)
if !ok && a.Transactor != nil {
operatorAddr, err = a.Transactor.OperatorIDToAddress(ctx, r.Operator)
Expand Down
11 changes: 8 additions & 3 deletions core/assignment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"encoding/hex"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -28,7 +29,11 @@ var (

// Assignment

type OperatorID = [32]byte
type OperatorID [32]byte

func (id OperatorID) Hex() string {
return hex.EncodeToString(id[:])
}

type OperatorIndex = uint

Expand Down Expand Up @@ -156,7 +161,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo

// Check that the chunk length meets the minimum requirement
if info.ChunkLength < MinChunkLength {
return false, ErrChunkLengthTooSmall
return false, fmt.Errorf("%w: chunk length: %d, min chunk length: %d", ErrChunkLengthTooSmall, info.ChunkLength, MinChunkLength)
}

// Get minimum stake amont
Expand All @@ -183,7 +188,7 @@ func (c *StdAssignmentCoordinator) ValidateChunkLength(state *OperatorState, blo
maxChunkLength = uint(nextPowerOf2(uint64(maxChunkLength)))

if info.ChunkLength > maxChunkLength {
return false, ErrChunkLengthTooLarge
return false, fmt.Errorf("%w: chunk length: %d, max chunk length: %d", ErrChunkLengthTooLarge, info.ChunkLength, maxChunkLength)
}

}
Expand Down
2 changes: 1 addition & 1 deletion core/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *Signature) Verify(pubkey *G2Point, message [32]byte) bool {
func (p *G1Point) GetOperatorID() OperatorID {
x := p.X.BigInt(new(big.Int))
y := p.Y.BigInt(new(big.Int))
return crypto.Keccak256Hash(append(math.U256Bytes(x), math.U256Bytes(y)...))
return OperatorID(crypto.Keccak256Hash(append(math.U256Bytes(x), math.U256Bytes(y)...)))
}

type PrivateKey = fr.Element
Expand Down
2 changes: 1 addition & 1 deletion core/indexer/operator_sockets_filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (f *operatorSocketsFilterer) WatchOperatorSocketUpdate(ctx context.Context,
}

sink := make(chan *blsregcoord.ContractBLSRegistryCoordinatorWithIndicesOperatorSocketUpdate)
operatorID := []core.OperatorID{operatorId}
operatorID := [][32]byte{operatorId}
_, err = filterer.WatchOperatorSocketUpdate(&bind.WatchOpts{Context: ctx}, sink, operatorID)
if err != nil {
return nil, err
Expand Down
1 change: 0 additions & 1 deletion core/mock/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

var (
ErrChunkLengthMismatch = errors.New("chunk length mismatch")
ErrInvalidHeader = errors.New("invalid header")
)

// MockChunkValidator is a mock implementation of ChunkValidator
Expand Down
4 changes: 2 additions & 2 deletions core/thegraph/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (ics *indexedChainState) GetIndexedOperatorInfoByOperatorId(ctx context.Con
var (
query QueryOperatorByIdGql
variables = map[string]any{
"id": graphql.String(fmt.Sprintf("0x%s", hex.EncodeToString(operatorId[:]))),
"id": graphql.String(fmt.Sprintf("0x%s", operatorId.Hex())),
}
)
err := ics.querier.Query(context.Background(), &query, variables)
Expand Down Expand Up @@ -235,7 +235,7 @@ func (ics *indexedChainState) getRegisteredIndexedOperatorInfo(ctx context.Conte
return nil, err
}

operators := make(map[[32]byte]*core.IndexedOperatorInfo, len(operatorsGql))
operators := make(map[core.OperatorID]*core.IndexedOperatorInfo, len(operatorsGql))
for i := range operatorsGql {
operator := operatorsGql[i]
operatorIndexedInfo, err := convertIndexedOperatorInfoGqlToIndexedOperatorInfo(&operator)
Expand Down
2 changes: 1 addition & 1 deletion core/thegraph/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestIndexedChainState_GetIndexedOperatorInfoByOperatorId(t *testing.T) {
assert.NoError(t, err)

opID := ethcomm.HexToHash("0x3eb7d5df61c48ec2718d8c8ad52304effc970ae92f19138e032dae07b7c0d629")
info, err := cs.GetIndexedOperatorInfoByOperatorId(context.Background(), opID, uint32(headerNum))
info, err := cs.GetIndexedOperatorInfoByOperatorId(context.Background(), core.OperatorID(opID.Bytes()), uint32(headerNum))
assert.NoError(t, err)
assert.Equal(t, "3336192159512049190945679273141887248666932624338963482128432381981287252980", info.PubkeyG1.X.String())
assert.Equal(t, "15195175002875833468883745675063986308012687914999552116603423331534089122704", info.PubkeyG1.Y.String())
Expand Down
17 changes: 8 additions & 9 deletions core/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

var (
ErrChunkLengthMismatch = errors.New("chunk length mismatch")
ErrInvalidHeader = errors.New("invalid header")
ErrBlobQuorumSkip = errors.New("blob skipped for a quorum before verification")
)

Expand Down Expand Up @@ -38,12 +37,12 @@ func NewChunkValidator(enc Encoder, asgn AssignmentCoordinator, cst ChainState,

func (v *chunkValidator) validateBlobQuorum(quorumHeader *BlobQuorumInfo, blob *BlobMessage, operatorState *OperatorState) ([]*Chunk, *Assignment, *EncodingParams, error) {
if quorumHeader.AdversaryThreshold >= quorumHeader.QuorumThreshold {
return nil, nil, nil, errors.New("invalid header: quorum threshold does not exceed adversary threshold")
return nil, nil, nil, fmt.Errorf("invalid header: quorum threshold (%d) does not exceed adversary threshold (%d)", quorumHeader.QuorumThreshold, quorumHeader.AdversaryThreshold)
}

// Check if the operator is a member of the quorum
if _, ok := operatorState.Operators[quorumHeader.QuorumID]; !ok {
return nil, nil, nil, ErrBlobQuorumSkip
return nil, nil, nil, fmt.Errorf("%w: operator %s is not a member of quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorumHeader.QuorumID)
}

// Get the assignments for the quorum
Expand All @@ -54,10 +53,10 @@ func (v *chunkValidator) validateBlobQuorum(quorumHeader *BlobQuorumInfo, blob *

// Validate the number of chunks
if assignment.NumChunks == 0 {
return nil, nil, nil, ErrBlobQuorumSkip
return nil, nil, nil, fmt.Errorf("%w: operator %s has no chunks in quorum %d", ErrBlobQuorumSkip, v.operatorID.Hex(), quorumHeader.QuorumID)
}
if assignment.NumChunks != uint(len(blob.Bundles[quorumHeader.QuorumID])) {
return nil, nil, nil, errors.New("number of chunks does not match assignment")
return nil, nil, nil, fmt.Errorf("number of chunks (%d) does not match assignment (%d)", len(blob.Bundles[quorumHeader.QuorumID]), assignment.NumChunks)
}

// Validate the chunkLength against the quorum and adversary threshold parameters
Expand All @@ -70,7 +69,7 @@ func (v *chunkValidator) validateBlobQuorum(quorumHeader *BlobQuorumInfo, blob *
chunks := blob.Bundles[quorumHeader.QuorumID]
for _, chunk := range chunks {
if uint(chunk.Length()) != quorumHeader.ChunkLength {
return nil, nil, nil, ErrChunkLengthMismatch
return nil, nil, nil, fmt.Errorf("%w: chunk length (%d) does not match quorum header (%d)", ErrChunkLengthMismatch, chunk.Length(), quorumHeader.ChunkLength)
}
}

Expand All @@ -81,15 +80,15 @@ func (v *chunkValidator) validateBlobQuorum(quorumHeader *BlobQuorumInfo, blob *
}

if params.ChunkLength != quorumHeader.ChunkLength {
return nil, nil, nil, errors.New("invalid chunk length")
return nil, nil, nil, fmt.Errorf("%w: chunk length from encoding parameters (%d) does not match quorum header (%d)", ErrChunkLengthMismatch, params.ChunkLength, quorumHeader.ChunkLength)
}

return chunks, &assignment, &params, nil
}

func (v *chunkValidator) ValidateBlob(blob *BlobMessage, operatorState *OperatorState) error {
if len(blob.Bundles) != len(blob.BlobHeader.QuorumInfos) {
return errors.New("number of bundles does not match number of quorums")
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumInfos))
}

// Validate the blob length
Expand Down Expand Up @@ -127,7 +126,7 @@ func (v *chunkValidator) ValidateBatch(blobs []*BlobMessage, operatorState *Oper

for k, blob := range blobs {
if len(blob.Bundles) != len(blob.BlobHeader.QuorumInfos) {
return errors.New("number of bundles does not match number of quorums")
return fmt.Errorf("number of bundles (%d) does not match number of quorums (%d)", len(blob.Bundles), len(blob.BlobHeader.QuorumInfos))
}

// Saved for the blob length validation
Expand Down
1 change: 0 additions & 1 deletion disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

encodingStreamerMetrics := EncodingStreamerMetrics{
EncodedBlobs: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func RunDataApi(ctx *cli.Context) error {
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
chainState = coreeth.NewChainState(tx, client)
metrics = dataapi.NewMetrics(config.MetricsConfig.HTTPPort, logger)
metrics = dataapi.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)
server = dataapi.NewServer(
dataapi.Config{
ServerMode: config.ServerMode,
Expand Down
15 changes: 15 additions & 0 deletions disperser/common/blobstore/blob_metadata_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,21 @@ func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status
return metadata, nil
}

// GetBlobMetadataByStatusCount returns the count of all the metadata with the given status
// Because this function scans the entire index, it should only be used for status with a limited number of items.
// It should only be used to filter "Processing" status. To support other status, a streaming version should be implemented.
func (s *BlobMetadataStore) GetBlobMetadataByStatusCount(ctx context.Context, status disperser.BlobStatus) (int32, error) {
count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, statusIndexName, "BlobStatus = :status", commondynamodb.ExpresseionValues{
":status": &types.AttributeValueMemberN{
Value: strconv.Itoa(int(status)),
}})
if err != nil {
return 0, err
}

return count, nil
}

// GetBlobMetadataByStatusWithPagination returns all the metadata with the given status upto the specified limit
// along with items, also returns a pagination token that can be used to fetch the next set of items
func (s *BlobMetadataStore) GetBlobMetadataByStatusWithPagination(ctx context.Context, status disperser.BlobStatus, limit int32, exclusiveStartKey *disperser.BlobStoreExclusiveStartKey) ([]*disperser.BlobMetadata, *disperser.BlobStoreExclusiveStartKey, error) {
Expand Down
12 changes: 12 additions & 0 deletions disperser/common/blobstore/blob_metadata_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.Len(t, processing, 1)
assert.Equal(t, metadata1, processing[0])

processingCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Processing)
assert.NoError(t, err)
assert.Equal(t, int32(1), processingCount)

err = blobMetadataStore.IncrementNumRetries(ctx, metadata1)
assert.NoError(t, err)
fetchedMetadata, err = blobMetadataStore.GetBlobMetadata(ctx, blobKey1)
Expand All @@ -79,6 +83,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.Len(t, finalized, 1)
assert.Equal(t, metadata2, finalized[0])

finalizedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Finalized)
assert.NoError(t, err)
assert.Equal(t, int32(1), finalizedCount)

confirmedMetadata := getConfirmedMetadata(t, blobKey1)
err = blobMetadataStore.UpdateBlobMetadata(ctx, blobKey1, confirmedMetadata)
assert.NoError(t, err)
Expand All @@ -87,6 +95,10 @@ func TestBlobMetadataStoreOperations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, metadata, confirmedMetadata)

confirmedCount, err := blobMetadataStore.GetBlobMetadataByStatusCount(ctx, disperser.Confirmed)
assert.NoError(t, err)
assert.Equal(t, int32(1), confirmedCount)

deleteItems(t, []commondynamodb.Key{
{
"MetadataHash": &types.AttributeValueMemberS{Value: blobKey1.MetadataHash},
Expand Down
Loading

0 comments on commit 24f4e55

Please sign in to comment.