diff --git a/core/data.go b/core/data.go index 91fd9769c..7a064f581 100644 --- a/core/data.go +++ b/core/data.go @@ -2,15 +2,17 @@ package core import ( "encoding/binary" - "encoding/hex" "errors" "fmt" "math/big" + "strconv" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/encoding" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/consensys/gnark-crypto/ecc/bn254" - "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/accounts/abi" + "golang.org/x/crypto/sha3" ) type AccountID = string @@ -495,23 +497,69 @@ type PaymentMetadata struct { } // Hash returns the Keccak256 hash of the PaymentMetadata -func (pm *PaymentMetadata) Hash() []byte { - // Create a byte slice to hold the serialized data - data := make([]byte, 0, len(pm.AccountID)+4+pm.CumulativePayment.BitLen()/8+1) +func (pm *PaymentMetadata) Hash() ([32]byte, error) { + blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "accountID", + Type: "string", + }, + { + Name: "binIndex", + Type: "uint32", + }, + { + Name: "cumulativePayment", + Type: "uint256", + }, + }) + if err != nil { + return [32]byte{}, err + } - // Append AccountID - data = append(data, []byte(pm.AccountID)...) + arguments := abi.Arguments{ + { + Type: blobHeaderType, + }, + } - // Append BinIndex - binIndexBytes := make([]byte, 4) - binary.BigEndian.PutUint32(binIndexBytes, pm.BinIndex) - data = append(data, binIndexBytes...) + bytes, err := arguments.Pack(pm) + if err != nil { + return [32]byte{}, err + } - // Append CumulativePayment - paymentBytes := pm.CumulativePayment.Bytes() - data = append(data, paymentBytes...) + var hash [32]byte + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(hash[:], hasher.Sum(nil)[:32]) - return crypto.Keccak256(data) + return hash, nil +} + +func (pm *PaymentMetadata) MarshalDynamoDBAttributeValue() (types.AttributeValue, error) { + return &types.AttributeValueMemberM{ + Value: map[string]types.AttributeValue{ + "AccountID": &types.AttributeValueMemberS{Value: pm.AccountID}, + "BinIndex": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", pm.BinIndex)}, + "CumulativePayment": &types.AttributeValueMemberN{ + Value: pm.CumulativePayment.String(), + }, + }, + }, nil +} + +func (pm *PaymentMetadata) UnmarshalDynamoDBAttributeValue(av types.AttributeValue) error { + m, ok := av.(*types.AttributeValueMemberM) + if !ok { + return fmt.Errorf("expected *types.AttributeValueMemberM, got %T", av) + } + pm.AccountID = m.Value["AccountID"].(*types.AttributeValueMemberS).Value + binIndex, err := strconv.ParseUint(m.Value["BinIndex"].(*types.AttributeValueMemberN).Value, 10, 32) + if err != nil { + return fmt.Errorf("failed to parse BinIndex: %w", err) + } + pm.BinIndex = uint32(binIndex) + pm.CumulativePayment, _ = new(big.Int).SetString(m.Value["CumulativePayment"].(*types.AttributeValueMemberN).Value, 10) + return nil } // OperatorInfo contains information about an operator which is stored on the blockchain state, @@ -528,27 +576,3 @@ type ActiveReservation struct { type OnDemandPayment struct { CumulativePayment *big.Int // Total amount deposited by the user } - -type BlobVersion uint32 - -type BlobKey [32]byte - -func (b BlobKey) Hex() string { - return hex.EncodeToString(b[:]) -} - -func HexToBlobKey(h string) (BlobKey, error) { - b, err := hex.DecodeString(h) - if err != nil { - return BlobKey{}, err - } - return BlobKey(b), nil -} - -type BlobHeaderV2 struct { - BlobVersion BlobVersion `json:"version"` - QuorumIDs []QuorumID `json:"quorum_ids"` - BlobCommitment encoding.BlobCommitments `json:"commitments"` - - PaymentMetadata `json:"payment_metadata"` -} diff --git a/core/v2/assignment.go b/core/v2/assignment.go index 1e50cdbfb..53e6b96fb 100644 --- a/core/v2/assignment.go +++ b/core/v2/assignment.go @@ -8,7 +8,7 @@ import ( "github.com/Layr-Labs/eigenda/core" ) -func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) (map[core.OperatorID]Assignment, error) { +func GetAssignments(state *core.OperatorState, blobVersion BlobVersion, quorum uint8) (map[core.OperatorID]Assignment, error) { params, ok := ParametersMap[blobVersion] if !ok { @@ -81,7 +81,7 @@ func GetAssignments(state *core.OperatorState, blobVersion byte, quorum uint8) ( } -func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { +func GetAssignment(state *core.OperatorState, blobVersion BlobVersion, quorum core.QuorumID, id core.OperatorID) (Assignment, error) { assignments, err := GetAssignments(state, blobVersion, quorum) if err != nil { @@ -96,7 +96,7 @@ func GetAssignment(state *core.OperatorState, blobVersion byte, quorum core.Quor return assignment, nil } -func GetChunkLength(blobVersion byte, blobLength uint32) (uint32, error) { +func GetChunkLength(blobVersion BlobVersion, blobLength uint32) (uint32, error) { if blobLength == 0 { return 0, fmt.Errorf("blob length must be greater than 0") diff --git a/core/v2/assignment_test.go b/core/v2/assignment_test.go index d1245f6c5..d8b5d2da8 100644 --- a/core/v2/assignment_test.go +++ b/core/v2/assignment_test.go @@ -20,7 +20,7 @@ func TestOperatorAssignmentsV2(t *testing.T) { state := dat.GetTotalOperatorState(context.Background(), 0) operatorState := state.OperatorState - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) assignments, err := corev2.GetAssignments(operatorState, blobVersion, 0) assert.NoError(t, err) @@ -90,7 +90,7 @@ func TestAssignmentWithTooManyOperators(t *testing.T) { assert.Equal(t, len(state.Operators[0]), numOperators) - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) _, err = corev2.GetAssignments(state.OperatorState, blobVersion, 0) assert.Error(t, err) @@ -131,7 +131,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { state := dat.GetTotalOperatorState(context.Background(), 0) - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) assignments, err := corev2.GetAssignments(state.OperatorState, blobVersion, 0) assert.NoError(t, err) @@ -162,7 +162,7 @@ func FuzzOperatorAssignmentsV2(f *testing.F) { func TestChunkLength(t *testing.T) { - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) pairs := []struct { blobLength uint32 @@ -188,7 +188,7 @@ func TestChunkLength(t *testing.T) { func TestInvalidChunkLength(t *testing.T) { - blobVersion := byte(0) + blobVersion := corev2.BlobVersion(0) invalidLengths := []uint32{ 0, diff --git a/core/v2/core_test.go b/core/v2/core_test.go index f55303e06..c3d47fe41 100644 --- a/core/v2/core_test.go +++ b/core/v2/core_test.go @@ -84,7 +84,7 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) { return p, v, nil } -func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { +func makeTestBlob(t *testing.T, p encoding.Prover, version corev2.BlobVersion, refBlockNumber uint64, length int, quorums []core.QuorumID) (corev2.BlobCertificate, []byte) { data := make([]byte, length*31) _, err := rand.Read(data) @@ -101,7 +101,7 @@ func makeTestBlob(t *testing.T, p encoding.Prover, version uint8, refBlockNumber header := corev2.BlobCertificate{ BlobHeader: corev2.BlobHeader{ - Version: version, + BlobVersion: version, QuorumNumbers: quorums, BlobCommitments: commitments, }, @@ -148,7 +148,7 @@ func prepareBlobs(t *testing.T, operatorCount uint, headers []corev2.BlobCertifi for _, quorum := range header.QuorumNumbers { - assignments, err := corev2.GetAssignments(state, header.Version, quorum) + assignments, err := corev2.GetAssignments(state, header.BlobVersion, quorum) if err != nil { t.Fatal(err) } @@ -238,7 +238,7 @@ func TestValidationSucceeds(t *testing.T) { bn := uint64(0) - version := uint8(0) + version := corev2.BlobVersion(0) pool := workerpool.New(1) diff --git a/core/v2/types.go b/core/v2/types.go index f3c9eda15..ac3b234ef 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -1,20 +1,27 @@ package v2 import ( + "encoding/hex" "math" + "math/big" + "strings" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" + "github.com/ethereum/go-ethereum/accounts/abi" + "golang.org/x/crypto/sha3" ) var ( // TODO(mooselumph): Put these parameters on chain and add on-chain checks to ensure that the number of operators does not // conflict with the existing on-chain limits - ParametersMap = map[uint8]BlobVersionParameters{ + ParametersMap = map[BlobVersion]BlobVersionParameters{ 0: {CodingRate: 8, ReconstructionThreshold: 0.22, NumChunks: 8192}, } ) +type BlobVersion uint8 + // Assignment contains information about the set of chunks that a specific node will receive type Assignment struct { StartIndex uint32 @@ -30,27 +37,42 @@ func (c *Assignment) GetIndices() []uint32 { return indices } +type BlobKey [32]byte + +func (b BlobKey) Hex() string { + return hex.EncodeToString(b[:]) +} + +func HexToBlobKey(h string) (BlobKey, error) { + s := strings.TrimPrefix(h, "0x") + s = strings.TrimPrefix(s, "0X") + b, err := hex.DecodeString(s) + if err != nil { + return BlobKey{}, err + } + return BlobKey(b), nil +} + // BlobHeader contains all metadata related to a blob including commitments and parameters for encoding type BlobHeader struct { - Version uint8 + BlobVersion BlobVersion - encoding.BlobCommitments + BlobCommitments encoding.BlobCommitments - // QuorumInfos contains the quorum specific parameters for the blob - QuorumNumbers []uint8 + // QuorumNumbers contains the quorums the blob is dispersed to + QuorumNumbers []core.QuorumID - // PaymentHeader contains the payment information for the blob - core.PaymentMetadata + // PaymentMetadata contains the payment information for the blob + PaymentMetadata core.PaymentMetadata - // AuthenticationData is the signature of the blob header by the account ID - AuthenticationData []byte `json:"authentication_data"` + // Signature is the signature of the blob header by the account ID + Signature []byte } func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { + params := ParametersMap[b.BlobVersion] - params := ParametersMap[b.Version] - - length, err := GetChunkLength(b.Version, uint32(b.Length)) + length, err := GetChunkLength(b.BlobVersion, uint32(b.BlobCommitments.Length)) if err != nil { return encoding.EncodingParams{}, err } @@ -59,7 +81,153 @@ func (b *BlobHeader) GetEncodingParams() (encoding.EncodingParams, error) { NumChunks: uint64(params.NumChunks), ChunkLength: uint64(length), }, nil +} + +func (b *BlobHeader) BlobKey() (BlobKey, error) { + blobHeaderType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{ + { + Name: "blobVersion", + Type: "uint8", + }, + { + Name: "blobCommitments", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "commitment", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256", + }, + { + Name: "Y", + Type: "uint256", + }, + }, + }, + { + Name: "lengthCommitment", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256[2]", + }, + { + Name: "Y", + Type: "uint256[2]", + }, + }, + }, + { + Name: "lengthProof", + Type: "tuple", + Components: []abi.ArgumentMarshaling{ + { + Name: "X", + Type: "uint256[2]", + }, + { + Name: "Y", + Type: "uint256[2]", + }, + }, + }, + { + Name: "length", + Type: "uint32", + }, + }, + }, + { + Name: "quorumNumbers", + Type: "bytes", + }, + { + Name: "paymentMetadataHash", + Type: "bytes32", + }, + }) + if err != nil { + return [32]byte{}, err + } + + arguments := abi.Arguments{ + { + Type: blobHeaderType, + }, + } + type g1Commit struct { + X *big.Int + Y *big.Int + } + type g2Commit struct { + X [2]*big.Int + Y [2]*big.Int + } + type blobCommitments struct { + Commitment g1Commit + LengthCommitment g2Commit + LengthProof g2Commit + Length uint32 + } + + paymentHash, err := b.PaymentMetadata.Hash() + if err != nil { + return [32]byte{}, err + } + s := struct { + BlobVersion uint8 + BlobCommitments blobCommitments + QuorumNumbers []byte + PaymentMetadataHash [32]byte + }{ + BlobVersion: uint8(b.BlobVersion), + BlobCommitments: blobCommitments{ + Commitment: g1Commit{ + X: b.BlobCommitments.Commitment.X.BigInt(new(big.Int)), + Y: b.BlobCommitments.Commitment.Y.BigInt(new(big.Int)), + }, + LengthCommitment: g2Commit{ + X: [2]*big.Int{ + b.BlobCommitments.LengthCommitment.X.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthCommitment.X.A1.BigInt(new(big.Int)), + }, + Y: [2]*big.Int{ + b.BlobCommitments.LengthCommitment.Y.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthCommitment.Y.A1.BigInt(new(big.Int)), + }, + }, + LengthProof: g2Commit{ + X: [2]*big.Int{ + b.BlobCommitments.LengthProof.X.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthProof.X.A1.BigInt(new(big.Int)), + }, + Y: [2]*big.Int{ + b.BlobCommitments.LengthProof.Y.A0.BigInt(new(big.Int)), + b.BlobCommitments.LengthProof.Y.A1.BigInt(new(big.Int)), + }, + }, + Length: uint32(b.BlobCommitments.Length), + }, + QuorumNumbers: b.QuorumNumbers, + PaymentMetadataHash: paymentHash, + } + + bytes, err := arguments.Pack(s) + if err != nil { + return [32]byte{}, err + } + + var headerHash [32]byte + hasher := sha3.NewLegacyKeccak256() + hasher.Write(bytes) + copy(headerHash[:], hasher.Sum(nil)[:32]) + + return headerHash, nil } type BlobCertificate struct { @@ -79,9 +247,7 @@ type BlobVersionParameters struct { } func (p BlobVersionParameters) MaxNumOperators() uint32 { - return uint32(math.Floor(float64(p.NumChunks) * (1 - 1/(p.ReconstructionThreshold*float64(p.CodingRate))))) - } const ( diff --git a/core/v2/types_test.go b/core/v2/types_test.go new file mode 100644 index 000000000..c2c9b908c --- /dev/null +++ b/core/v2/types_test.go @@ -0,0 +1,57 @@ +package v2_test + +import ( + "encoding/hex" + "math/big" + "testing" + + "github.com/Layr-Labs/eigenda/core" + v2 "github.com/Layr-Labs/eigenda/core/v2" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/stretchr/testify/assert" +) + +func TestBlobKey(t *testing.T) { + blobKey := v2.BlobKey([32]byte{1, 2, 3}) + + assert.Equal(t, "0102030000000000000000000000000000000000000000000000000000000000", blobKey.Hex()) + bk, err := v2.HexToBlobKey(blobKey.Hex()) + assert.NoError(t, err) + assert.Equal(t, blobKey, bk) +} + +func TestPaymentHash(t *testing.T) { + pm := core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + } + hash, err := pm.Hash() + assert.NoError(t, err) + // 0xf5894a8e9281b5687c0c7757d3d45fb76152bf659e6e61b1062f4c6bcb69c449 verified in solidity + assert.Equal(t, "f5894a8e9281b5687c0c7757d3d45fb76152bf659e6e61b1062f4c6bcb69c449", hex.EncodeToString(hash[:])) +} + +func TestBlobKeyFromHeader(t *testing.T) { + data := codec.ConvertByPaddingEmptyByte(GETTYSBURG_ADDRESS_BYTES) + commitments, err := p.GetCommitments(data) + if err != nil { + t.Fatal(err) + } + + bh := v2.BlobHeader{ + BlobVersion: 0, + BlobCommitments: commitments, + QuorumNumbers: []core.QuorumID{0, 1}, + PaymentMetadata: core.PaymentMetadata{ + AccountID: "0x123", + BinIndex: 5, + CumulativePayment: big.NewInt(100), + }, + Signature: []byte{1, 2, 3}, + } + blobKey, err := bh.BlobKey() + assert.NoError(t, err) + // 0xb19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc verified in solidity + assert.Equal(t, "b19d368345990c79744fe571fe99f427f35787b9383c55089fb5bd6a5c171bbc", blobKey.Hex()) +} diff --git a/core/v2/validator.go b/core/v2/validator.go index 0d5a2960e..db513c319 100644 --- a/core/v2/validator.go +++ b/core/v2/validator.go @@ -43,7 +43,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Get the assignments for the quorum - assignment, err := GetAssignment(operatorState, blob.Version, quorum, v.operatorID) + assignment, err := GetAssignment(operatorState, blob.BlobVersion, quorum, v.operatorID) if err != nil { return nil, nil, err } @@ -57,7 +57,7 @@ func (v *ShardValidator) validateBlobQuorum(quorum core.QuorumID, blob *BlobShar } // Validate the chunkLength against the confirmation and adversary threshold parameters - chunkLength, err := GetChunkLength(blob.Version, uint32(blob.BlobHeader.Length)) + chunkLength, err := GetChunkLength(blob.BlobVersion, uint32(blob.BlobHeader.BlobCommitments.Length)) if err != nil { return nil, nil, fmt.Errorf("invalid chunk length: %w", err) } diff --git a/disperser/common/v2/blob.go b/disperser/common/v2/blob.go index 32ef9e8ad..812a5b6fd 100644 --- a/disperser/common/v2/blob.go +++ b/disperser/common/v2/blob.go @@ -1,6 +1,8 @@ package v2 -import "github.com/Layr-Labs/eigenda/core" +import ( + core "github.com/Layr-Labs/eigenda/core/v2" +) type BlobStatus uint @@ -11,14 +13,20 @@ const ( Failed ) +// BlobMetadata is an internal representation of a blob's metadata. type BlobMetadata struct { - core.BlobHeaderV2 `json:"blob_header"` + BlobHeader core.BlobHeader - BlobKey core.BlobKey `json:"blob_key"` - BlobStatus BlobStatus `json:"blob_status"` + // BlobStatus indicates the current status of the blob + BlobStatus BlobStatus // Expiry is Unix timestamp of the blob expiry in seconds from epoch - Expiry uint64 `json:"expiry"` - NumRetries uint `json:"num_retries"` - BlobSize uint64 `json:"blob_size"` - RequestedAt uint64 `json:"requested_at"` + Expiry uint64 + // NumRetries is the number of times the blob has been retried + NumRetries uint + // BlobSize is the size of the blob in bytes + BlobSize uint64 + // RequestedAt is the Unix timestamp of when the blob was requested in seconds + RequestedAt uint64 + // UpdatedAt is the Unix timestamp of when the blob was last updated in _nanoseconds_ + UpdatedAt uint64 } diff --git a/disperser/common/v2/blobstore/blob_metadata_store.go b/disperser/common/v2/blobstore/blob_metadata_store.go deleted file mode 100644 index 8670d60bb..000000000 --- a/disperser/common/v2/blobstore/blob_metadata_store.go +++ /dev/null @@ -1,15 +0,0 @@ -package blobstore - -import ( - "context" - - "github.com/Layr-Labs/eigenda/core" - v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" -) - -type BlobMetadataStore interface { - PutBlobMetadata(ctx context.Context, metadata *v2.BlobMetadata) error - GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) - GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) - GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) -} diff --git a/disperser/common/v2/blobstore/blob_metadata_store_test.go b/disperser/common/v2/blobstore/blob_metadata_store_test.go index 7113bfb88..a0b63bdcf 100644 --- a/disperser/common/v2/blobstore/blob_metadata_store_test.go +++ b/disperser/common/v2/blobstore/blob_metadata_store_test.go @@ -12,13 +12,13 @@ import ( "github.com/Layr-Labs/eigenda/common/aws/dynamodb" test_utils "github.com/Layr-Labs/eigenda/common/aws/dynamodb/utils" "github.com/Layr-Labs/eigenda/encoding" + "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/consensys/gnark-crypto/ecc/bn254" "github.com/consensys/gnark-crypto/ecc/bn254/fp" "github.com/google/uuid" "github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore" - "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/ory/dockertest/v3" ) @@ -32,7 +32,7 @@ var ( localStackPort = "4571" dynamoClient *dynamodb.Client - blobMetadataStore blobstore.BlobMetadataStore + blobMetadataStore *blobstore.BlobMetadataStore UUID = uuid.New() metadataTableName = fmt.Sprintf("test-BlobMetadata-%v", UUID) @@ -61,7 +61,6 @@ func setup(m *testing.M) { teardown() panic("failed to start localstack container") } - } cfg := aws.ClientConfig{ diff --git a/disperser/common/v2/blobstore/dynamo_store.go b/disperser/common/v2/blobstore/dynamo_store.go index d61616b7e..ca4d80474 100644 --- a/disperser/common/v2/blobstore/dynamo_store.go +++ b/disperser/common/v2/blobstore/dynamo_store.go @@ -8,7 +8,7 @@ import ( "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/core" + core "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/disperser" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/Layr-Labs/eigensdk-go/logging" @@ -23,33 +23,29 @@ const ( OperatorDispersalIndexName = "OperatorDispersalIndex" OperatorResponseIndexName = "OperatorResponseIndex" - blobKeyPrefix = "BlobKey#" - dispersalIDPrefix = "DispersalID#" - blobMetadataSK = "BlobMetadata" - certificateSK = "Certificate" + blobKeyPrefix = "BlobKey#" + blobMetadataSK = "BlobMetadata" ) -// blobMetadataStore is a blob metadata storage backed by DynamoDB -type blobMetadataStore struct { +// BlobMetadataStore is a blob metadata storage backed by DynamoDB +type BlobMetadataStore struct { dynamoDBClient *commondynamodb.Client logger logging.Logger tableName string ttl time.Duration } -var _ BlobMetadataStore = (*blobMetadataStore)(nil) - -func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) BlobMetadataStore { +func NewBlobMetadataStore(dynamoDBClient *commondynamodb.Client, logger logging.Logger, tableName string, ttl time.Duration) *BlobMetadataStore { logger.Debugf("creating blob metadata store v2 with table %s with TTL: %s", tableName, ttl) - return &blobMetadataStore{ + return &BlobMetadataStore{ dynamoDBClient: dynamoDBClient, - logger: logger.With("component", "BlobMetadataStoreV2"), + logger: logger.With("component", "blobMetadataStoreV2"), tableName: tableName, ttl: ttl, } } -func (s *blobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { +func (s *BlobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v2.BlobMetadata) error { item, err := MarshalBlobMetadata(blobMetadata) if err != nil { return err @@ -58,7 +54,7 @@ func (s *blobMetadataStore) PutBlobMetadata(ctx context.Context, blobMetadata *v return s.dynamoDBClient.PutItem(ctx, s.tableName, item) } -func (s *blobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { +func (s *BlobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.BlobKey) (*v2.BlobMetadata, error) { item, err := s.dynamoDBClient.GetItem(ctx, s.tableName, map[string]types.AttributeValue{ "PK": &types.AttributeValueMemberS{ Value: blobKeyPrefix + blobKey.Hex(), @@ -86,12 +82,12 @@ func (s *blobMetadataStore) GetBlobMetadata(ctx context.Context, blobKey core.Bl // GetBlobMetadataByStatus returns all the metadata with the given status // Because this function scans the entire index, it should only be used for status with a limited number of items. -func (s *blobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus) ([]*v2.BlobMetadata, error) { - items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ +func (s *BlobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status v2.BlobStatus, lastUpdatedAt uint64) ([]*v2.BlobMetadata, error) { + items, err := s.dynamoDBClient.QueryIndex(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND UpdatedAt > :updatedAt", commondynamodb.ExpressionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), }, - ":expiry": &types.AttributeValueMemberN{ + ":updatedAt": &types.AttributeValueMemberN{ Value: strconv.FormatInt(time.Now().Unix(), 10), }}) if err != nil { @@ -111,14 +107,11 @@ func (s *blobMetadataStore) GetBlobMetadataByStatus(ctx context.Context, status // GetBlobMetadataCountByStatus returns the count of all the metadata with the given status // Because this function scans the entire index, it should only be used for status with a limited number of items. -func (s *blobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { - count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status AND Expiry > :expiry", commondynamodb.ExpressionValues{ +func (s *BlobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, status v2.BlobStatus) (int32, error) { + count, err := s.dynamoDBClient.QueryIndexCount(ctx, s.tableName, StatusIndexName, "BlobStatus = :status", commondynamodb.ExpressionValues{ ":status": &types.AttributeValueMemberN{ Value: strconv.Itoa(int(status)), }, - ":expiry": &types.AttributeValueMemberN{ - Value: strconv.FormatInt(time.Now().Unix(), 10), - }, }) if err != nil { return 0, err @@ -130,10 +123,12 @@ func (s *blobMetadataStore) GetBlobMetadataCountByStatus(ctx context.Context, st func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacityUnits int64) *dynamodb.CreateTableInput { return &dynamodb.CreateTableInput{ AttributeDefinitions: []types.AttributeDefinition{ + // PK is the composite partition key { AttributeName: aws.String("PK"), AttributeType: types.ScalarAttributeTypeS, }, + // SK is the composite sort key { AttributeName: aws.String("SK"), AttributeType: types.ScalarAttributeTypeS, @@ -143,7 +138,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit AttributeType: types.ScalarAttributeTypeN, }, { - AttributeName: aws.String("Expiry"), + AttributeName: aws.String("UpdatedAt"), AttributeType: types.ScalarAttributeTypeN, }, { @@ -179,7 +174,7 @@ func GenerateTableSchema(tableName string, readCapacityUnits int64, writeCapacit KeyType: types.KeyTypeHash, }, { - AttributeName: aws.String("Expiry"), + AttributeName: aws.String("UpdatedAt"), KeyType: types.KeyTypeRange, }, }, @@ -246,7 +241,11 @@ func MarshalBlobMetadata(metadata *v2.BlobMetadata) (commondynamodb.Item, error) } // Add PK and SK fields - fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + metadata.BlobKey.Hex()} + blobKey, err := metadata.BlobHeader.BlobKey() + if err != nil { + return nil, err + } + fields["PK"] = &types.AttributeValueMemberS{Value: blobKeyPrefix + blobKey.Hex()} fields["SK"] = &types.AttributeValueMemberS{Value: blobMetadataSK} return fields, nil @@ -273,11 +272,5 @@ func UnmarshalBlobMetadata(item commondynamodb.Item) (*v2.BlobMetadata, error) { if err != nil { return nil, err } - blobKey, err := UnmarshalBlobKey(item) - if err != nil { - return nil, err - } - metadata.BlobKey = blobKey - return &metadata, nil } diff --git a/disperser/common/v2/blobstore/dynamo_store_test.go b/disperser/common/v2/blobstore/dynamo_store_test.go index 3be5ca20f..249d213aa 100644 --- a/disperser/common/v2/blobstore/dynamo_store_test.go +++ b/disperser/common/v2/blobstore/dynamo_store_test.go @@ -2,11 +2,13 @@ package blobstore_test import ( "context" + "math/big" "testing" "time" commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" - "github.com/Layr-Labs/eigenda/core" + core "github.com/Layr-Labs/eigenda/core" + corev2 "github.com/Layr-Labs/eigenda/core/v2" v2 "github.com/Layr-Labs/eigenda/disperser/common/v2" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/stretchr/testify/assert" @@ -14,45 +16,47 @@ import ( func TestBlobMetadataStoreOperations(t *testing.T) { ctx := context.Background() - blobHeader1 := &core.BlobHeaderV2{ - BlobVersion: 0, - QuorumIDs: []core.QuorumID{0}, - BlobCommitment: mockCommitment, + blobHeader1 := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{0}, + BlobCommitments: mockCommitment, PaymentMetadata: core.PaymentMetadata{ AccountID: "0x123", BinIndex: 0, - CumulativePayment: 531, + CumulativePayment: big.NewInt(532), }, } - blobKey1 := core.BlobKey([32]byte{1, 2, 3}) - blobHeader2 := &core.BlobHeaderV2{ - BlobVersion: 0, - QuorumIDs: []core.QuorumID{1}, - BlobCommitment: mockCommitment, + blobKey1, err := blobHeader1.BlobKey() + assert.NoError(t, err) + blobHeader2 := &corev2.BlobHeader{ + BlobVersion: 0, + QuorumNumbers: []core.QuorumID{1}, + BlobCommitments: mockCommitment, PaymentMetadata: core.PaymentMetadata{ AccountID: "0x456", BinIndex: 2, - CumulativePayment: 999, + CumulativePayment: big.NewInt(999), }, } - blobKey2 := core.BlobKey([32]byte{4, 5, 6}) + blobKey2, err := blobHeader2.BlobKey() + assert.NoError(t, err) now := time.Now() metadata1 := &v2.BlobMetadata{ - BlobHeaderV2: *blobHeader1, - BlobKey: blobKey1, - BlobStatus: v2.Queued, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, + BlobHeader: *blobHeader1, + BlobStatus: v2.Queued, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), } metadata2 := &v2.BlobMetadata{ - BlobHeaderV2: *blobHeader2, - BlobKey: blobKey2, - BlobStatus: v2.Certified, - Expiry: uint64(now.Add(time.Hour).Unix()), - NumRetries: 0, + BlobHeader: *blobHeader2, + BlobStatus: v2.Certified, + Expiry: uint64(now.Add(time.Hour).Unix()), + NumRetries: 0, + UpdatedAt: uint64(now.UnixNano()), } - err := blobMetadataStore.PutBlobMetadata(ctx, metadata1) + err = blobMetadataStore.PutBlobMetadata(ctx, metadata1) assert.NoError(t, err) err = blobMetadataStore.PutBlobMetadata(ctx, metadata2) assert.NoError(t, err) @@ -64,11 +68,11 @@ func TestBlobMetadataStoreOperations(t *testing.T) { assert.NoError(t, err) assert.Equal(t, metadata2, fetchedMetadata) - queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued) + queued, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Queued, 0) assert.NoError(t, err) assert.Len(t, queued, 1) assert.Equal(t, metadata1, queued[0]) - certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified) + certified, err := blobMetadataStore.GetBlobMetadataByStatus(ctx, v2.Certified, 0) assert.NoError(t, err) assert.Len(t, certified, 1) assert.Equal(t, metadata2, certified[0])