From f8ac2a5381733399c02085c4650fb9227339138d Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 25 Oct 2024 11:04:24 -0700 Subject: [PATCH 1/5] feat: client payment integration --- api/clients/accountant.go | 142 +++++++++ api/clients/accountant_test.go | 439 +++++++++++++++++++++++++++ api/clients/disperser_client.go | 79 +++-- api/clients/eigenda_client.go | 135 +++++++- api/clients/eigenda_client_test.go | 2 +- api/clients/mock/disperser_client.go | 20 +- api/clients/retrieval_client.go | 1 + core/data.go | 9 + disperser/apiserver/server_test.go | 4 + inabox/deploy/env_vars.go | 2 + inabox/tests/integration_test.go | 19 +- inabox/tests/payment_test.go | 154 ++++++++++ inabox/tests/ratelimit_test.go | 2 +- test/integration_test.go | 16 +- 14 files changed, 976 insertions(+), 48 deletions(-) create mode 100644 api/clients/accountant.go create mode 100644 api/clients/accountant_test.go create mode 100644 inabox/tests/payment_test.go diff --git a/api/clients/accountant.go b/api/clients/accountant.go new file mode 100644 index 000000000..bf360afb3 --- /dev/null +++ b/api/clients/accountant.go @@ -0,0 +1,142 @@ +package clients + +import ( + "context" + "errors" + "fmt" + "math/big" + "time" + + commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/meterer" +) + +type IAccountant interface { + AccountBlob(ctx context.Context, data []byte, quorums []uint8) (uint32, uint64, error) +} + +type Accountant struct { + // on-chain states + reservation core.ActiveReservation + onDemand core.OnDemandPayment + reservationWindow uint32 + pricePerChargeable uint32 + minChargeableSize uint32 + + // local accounting + // contains 3 bins; 0 for current bin, 1 for next bin, 2 for overflowed bin + binUsages []uint64 + cumulativePayment *big.Int + stopRotation chan struct{} + + paymentSigner core.PaymentSigner +} + +func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerChargeable uint32, minChargeableSize uint32, paymentSigner core.PaymentSigner) Accountant { + //TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense + // Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic + a := Accountant{ + reservation: reservation, + onDemand: onDemand, + reservationWindow: reservationWindow, + pricePerChargeable: pricePerChargeable, + minChargeableSize: minChargeableSize, + binUsages: []uint64{0, 0, 0}, + cumulativePayment: big.NewInt(0), + stopRotation: make(chan struct{}), + paymentSigner: paymentSigner, + } + go a.startBinRotation() + return a +} + +func (a *Accountant) startBinRotation() { + ticker := time.NewTicker(time.Duration(a.reservationWindow) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + a.rotateBins() + case <-a.stopRotation: + return + } + } +} + +func (a *Accountant) rotateBins() { + // Shift bins: bin_i to bin_{i-1}, add 0 to bin2 + a.binUsages[0] = a.binUsages[1] + a.binUsages[1] = a.binUsages[2] + a.binUsages[2] = 0 +} + +func (a *Accountant) Stop() { + close(a.stopRotation) +} + +// accountant calculates and records payment information +func (a *Accountant) BlobPaymentInfo(ctx context.Context, dataLength uint64) (uint32, *big.Int, error) { + //TODO: do we need to lock the binUsages here in case the blob rotation happens in the middle of the function? + // binUsage := a.binUsages[0] + dataLength + a.binUsages[0] += dataLength + now := time.Now().Unix() + currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow) + + // first attempt to use the active reservation + binLimit := a.reservation.SymbolsPerSec * uint64(a.reservationWindow) + if a.binUsages[0] <= binLimit { + return currentBinIndex, big.NewInt(0), nil + } + + // Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit + if a.binUsages[2] == 0 && a.binUsages[0]-dataLength < binLimit && dataLength <= binLimit { + a.binUsages[2] += a.binUsages[0] - binLimit + return currentBinIndex, big.NewInt(0), nil + } + + // reservation not available, attempt on-demand + //todo: rollback if disperser respond with some type of rejection? + a.binUsages[0] -= dataLength + incrementRequired := big.NewInt(int64(a.PaymentCharged(uint32(dataLength)))) + a.cumulativePayment.Add(a.cumulativePayment, incrementRequired) + if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 { + return 0, a.cumulativePayment, nil + } + fmt.Println("Accountant cannot approve payment for this blob") + return 0, big.NewInt(0), errors.New("Accountant cannot approve payment for this blob") +} + +// accountant provides and records payment information +func (a *Accountant) AccountBlob(ctx context.Context, dataLength uint64, quorums []uint8) (*commonpb.PaymentHeader, []byte, error) { + binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, dataLength) + if err != nil { + return nil, nil, err + } + + accountID := a.paymentSigner.GetAccountID() + pm := &core.PaymentMetadata{ + AccountID: accountID, + BinIndex: binIndex, + CumulativePayment: cumulativePayment, + } + protoPaymentHeader := pm.ConvertToProtoPaymentHeader() + + signature, err := a.paymentSigner.SignBlobPayment(protoPaymentHeader) + if err != nil { + return nil, nil, err + } + + return protoPaymentHeader, signature, nil +} + +// PaymentCharged returns the chargeable price for a given data length +func (a *Accountant) PaymentCharged(dataLength uint32) uint64 { + return uint64(core.RoundUpDivide(uint(a.BlobSizeCharged(dataLength)*a.pricePerChargeable), uint(a.minChargeableSize))) +} + +// BlobSizeCharged returns the chargeable data length for a given data length +func (a *Accountant) BlobSizeCharged(dataLength uint32) uint32 { + return max(dataLength, uint32(a.minChargeableSize)) +} diff --git a/api/clients/accountant_test.go b/api/clients/accountant_test.go new file mode 100644 index 000000000..bad2db905 --- /dev/null +++ b/api/clients/accountant_test.go @@ -0,0 +1,439 @@ +package clients + +import ( + "context" + "encoding/hex" + "math/big" + "sync" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/ethereum/go-ethereum/crypto" + "github.com/stretchr/testify/assert" +) + +func TestNewAccountant(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 100, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(6) + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + assert.Equal(t, reservation, accountant.reservation) + assert.Equal(t, onDemand, accountant.onDemand) + assert.Equal(t, reservationWindow, accountant.reservationWindow) + assert.Equal(t, pricePerChargeable, accountant.pricePerChargeable) + assert.Equal(t, minChargeableSize, accountant.minChargeableSize) + assert.Equal(t, []uint64{0, 0, 0}, accountant.binUsages) + assert.Equal(t, big.NewInt(0), accountant.cumulativePayment) +} + +func TestAccountBlob_Reservation(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(5) + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + dataLength := uint64(500) + quorums := []uint8{0, 1} + + header, _, err := accountant.AccountBlob(ctx, dataLength, quorums) + metadata := core.ConvertPaymentHeader(header) + + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(time.Now().Unix()), reservationWindow), header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, []uint64{500, 0, 0}, accountant.binUsages) + + dataLength = uint64(700) + + header, _, err = accountant.AccountBlob(ctx, dataLength, quorums) + metadata = core.ConvertPaymentHeader(header) + + assert.NoError(t, err) + assert.NotEqual(t, 0, header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, []uint64{1200, 0, 200}, accountant.binUsages) + + // Second call should use on-demand payment + header, _, err = accountant.AccountBlob(ctx, 300, quorums) + metadata = core.ConvertPaymentHeader(header) + + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + assert.Equal(t, big.NewInt(3), metadata.CumulativePayment) +} + +func TestAccountBlob_OnDemand(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(5) + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + dataLength := uint64(1500) + quorums := []uint8{0, 1} + + header, _, err := accountant.AccountBlob(ctx, dataLength, quorums) + metadata := core.ConvertPaymentHeader(header) + expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerChargeable) / uint64(minChargeableSize))) + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + assert.Equal(t, expectedPayment, metadata.CumulativePayment) + assert.Equal(t, []uint64{0, 0, 0}, accountant.binUsages) + assert.Equal(t, expectedPayment, accountant.cumulativePayment) +} + +func TestAccountBlob_InsufficientOnDemand(t *testing.T) { + reservation := core.ActiveReservation{} + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(500), + } + reservationWindow := uint32(60) + pricePerChargeable := uint32(100) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + dataLength := uint64(2000) + quorums := []uint8{0, 1} + + _, _, err = accountant.AccountBlob(ctx, dataLength, quorums) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "Accountant cannot approve payment for this blob") +} + +func TestAccountBlobCallSeries(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(5) + pricePerChargeable := uint32(100) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + quorums := []uint8{0, 1} + now := time.Now().Unix() + + // First call: Use reservation + header, _, err := accountant.AccountBlob(ctx, 800, quorums) + metadata := core.ConvertPaymentHeader(header) + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + + // Second call: Use remaining reservation + overflow + header, _, err = accountant.AccountBlob(ctx, 300, quorums) + metadata = core.ConvertPaymentHeader(header) + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + + // Third call: Use on-demand + header, _, err = accountant.AccountBlob(ctx, 500, quorums) + metadata = core.ConvertPaymentHeader(header) + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + assert.Equal(t, big.NewInt(500), metadata.CumulativePayment) + + // Fourth call: Insufficient on-demand + _, _, err = accountant.AccountBlob(ctx, 600, quorums) + assert.Error(t, err) + assert.Contains(t, err.Error(), "Accountant cannot approve payment for this blob") +} + +func TestAccountBlob_BinRotation(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + quorums := []uint8{0, 1} + + // First call + _, _, err = accountant.AccountBlob(ctx, 800, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{800, 0, 0}, accountant.binUsages) + + // Wait for bin rotation + time.Sleep(2 * time.Second) + + // Second call after bin rotation + _, _, err = accountant.AccountBlob(ctx, 300, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{300, 0, 0}, accountant.binUsages) + + // Third call + _, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{800, 0, 0}, accountant.binUsages) +} + +func TestBinRotation(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + quorums := []uint8{0, 1} + + // First call + _, _, err = accountant.AccountBlob(ctx, 800, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{800, 0, 0}, accountant.binUsages) + + // Second call for overflow + _, _, err = accountant.AccountBlob(ctx, 800, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{1600, 0, 600}, accountant.binUsages) + + // Wait for bin rotation + time.Sleep(1200 * time.Millisecond) + + _, _, err = accountant.AccountBlob(ctx, 300, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{300, 600, 0}, accountant.binUsages) + + // another bin rotation + time.Sleep(1200 * time.Millisecond) + + _, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{1100, 0, 100}, accountant.binUsages) +} + +func TestConcurrentBinRotationAndAccountBlob(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + quorums := []uint8{0, 1} + + // Start concurrent AccountBlob calls + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 5; j++ { + _, _, err := accountant.AccountBlob(ctx, 100, quorums) + assert.NoError(t, err) + time.Sleep(500 * time.Millisecond) + } + }() + } + + // Wait for all goroutines to finish + wg.Wait() + + // Check final state + assert.Equal(t, uint64(1000), accountant.binUsages[0]+accountant.binUsages[1]+accountant.binUsages[2]) +} + +func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 200, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(5) + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + ctx := context.Background() + quorums := []uint8{0, 1} + now := time.Now().Unix() + + // Okay reservation + header, _, err := accountant.AccountBlob(ctx, 800, quorums) + assert.NoError(t, err) + assert.Equal(t, meterer.GetBinIndex(uint64(now), reservationWindow), header.BinIndex) + metadata := core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, []uint64{800, 0, 0}, accountant.binUsages) + + // Second call: Allow one overflow + header, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + metadata = core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(0), metadata.CumulativePayment) + assert.Equal(t, []uint64{1300, 0, 300}, accountant.binUsages) + + // Third call: Should use on-demand payment + header, _, err = accountant.AccountBlob(ctx, 200, quorums) + assert.NoError(t, err) + assert.Equal(t, uint32(0), header.BinIndex) + metadata = core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(2), metadata.CumulativePayment) + assert.Equal(t, []uint64{1300, 0, 300}, accountant.binUsages) +} + +func TestAccountBlob_ReservationOverflowReset(t *testing.T) { + reservation := core.ActiveReservation{ + SymbolsPerSec: 1000, + StartTimestamp: 100, + EndTimestamp: 200, + QuorumSplit: []byte{50, 50}, + QuorumNumbers: []uint8{0, 1}, + } + onDemand := core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } + reservationWindow := uint32(1) // Set to 1 second for testing + pricePerChargeable := uint32(1) + minChargeableSize := uint32(100) + + privateKey1, err := crypto.GenerateKey() + assert.NoError(t, err) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + defer accountant.Stop() + + ctx := context.Background() + quorums := []uint8{0, 1} + + // full reservation + _, _, err = accountant.AccountBlob(ctx, 1000, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{1000, 0, 0}, accountant.binUsages) + + // no overflow + header, _, err := accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{1000, 0, 0}, accountant.binUsages) + metadata := core.ConvertPaymentHeader(header) + assert.Equal(t, big.NewInt(5), metadata.CumulativePayment) + + // Wait for bin rotation + time.Sleep(1500 * time.Millisecond) + + // Third call: Should use new bin and allow overflow again + header, _, err = accountant.AccountBlob(ctx, 500, quorums) + assert.NoError(t, err) + assert.Equal(t, []uint64{500, 0, 0}, accountant.binUsages) +} diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index e677b9683..425574a11 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -8,8 +8,8 @@ import ( "sync" "time" - "github.com/Layr-Labs/eigenda/api" disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" @@ -63,37 +63,18 @@ type disperserClient struct { // This means a conservative estimate of 100-1000MB/sec, which should be amply sufficient. // If we ever need to increase this, we could either consider asking the disperser to increase its limit, // or to use a pool of connections here. - conn *grpc.ClientConn - client disperser_rpc.DisperserClient + conn *grpc.ClientConn + client disperser_rpc.DisperserClient + accountant Accountant } var _ DisperserClient = &disperserClient{} -// DisperserClient maintains a single underlying grpc connection to the disperser server, -// through which it sends requests to disperse blobs, get blob status, and retrieve blobs. -// The connection is established lazily on the first method call. Don't forget to call Close(), -// which is safe to call even if the connection was never established. -// -// DisperserClient is safe to be used concurrently by multiple goroutines. -// -// Example usage: -// -// client := NewDisperserClient(config, signer) -// defer client.Close() -// -// // The connection will be established on the first call -// status, requestId, err := client.DisperseBlob(ctx, someData, someQuorums) -// if err != nil { -// // Handle error -// } -// -// // Subsequent calls will use the existing connection -// status2, requestId2, err := client.DisperseBlob(ctx, otherData, otherQuorums) -func NewDisperserClient(config *Config, signer core.BlobRequestSigner) *disperserClient { +func NewDisperserClient(config *Config, signer core.BlobRequestSigner, accountant Accountant) DisperserClient { return &disperserClient{ - config: config, - signer: signer, - // conn and client are initialized lazily + config: config, + signer: signer, + accountant: accountant, } } @@ -146,9 +127,49 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums return blobStatus, reply.GetRequestId(), nil } -// TODO: implemented in subsequent PR func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { - return nil, nil, api.NewErrorInternal("not implemented") + err := c.initOnceGrpcConnection() + if err != nil { + return nil, nil, fmt.Errorf("error initializing connection: %w", err) + } + + ctxTimeout, cancel := context.WithTimeout(ctx, c.config.Timeout) + defer cancel() + + quorumNumbers := make([]uint32, len(quorums)) + for i, q := range quorums { + quorumNumbers[i] = uint32(q) + } + + // check every 32 bytes of data are within the valid range for a bn254 field element + _, err = rs.ToFrArray(data) + if err != nil { + return nil, nil, fmt.Errorf("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617 %w", err) + } + + header, signature, err := c.accountant.AccountBlob(ctx, uint64(len(data)), quorums) + if err != nil { + return nil, nil, err + } + + request := &disperser_rpc.DispersePaidBlobRequest{ + Data: data, + CustomQuorumNumbers: quorumNumbers, + PaymentHeader: header, + PaymentSignature: signature, + } + + reply, err := c.client.DispersePaidBlob(ctxTimeout, request) + if err != nil { + return nil, nil, err + } + + blobStatus, err := disperser.FromBlobStatusProto(reply.GetResult()) + if err != nil { + return nil, nil, err + } + + return blobStatus, reply.GetRequestId(), nil } func (c *disperserClient) DisperseBlobAuthenticated(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index 173af1a7c..9a76706ae 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -29,10 +29,11 @@ type IEigenDAClient interface { type EigenDAClient struct { // TODO: all of these should be private, to prevent users from using them directly, // which breaks encapsulation and makes it hard for us to do refactors or changes - Config EigenDAClientConfig - Log log.Logger - Client DisperserClient - Codec codecs.BlobCodec + Config EigenDAClientConfig + Log log.Logger + Client DisperserClient + Codec codecs.BlobCodec + accountant Accountant } var _ IEigenDAClient = &EigenDAClient{} @@ -86,7 +87,12 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien } disperserConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS) - disperserClient := NewDisperserClient(disperserConfig, signer) + + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString([]byte(config.SignerPrivateKeyHex))) + // a subsequent PR contains updates to fill in payment state + accountant := NewAccountant(core.ActiveReservation{}, core.OnDemandPayment{}, 0, 0, 0, paymentSigner) + + disperserClient := NewDisperserClient(disperserConfig, signer, accountant) lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion) if err != nil { @@ -260,6 +266,125 @@ func (m *EigenDAClient) putBlob(ctx context.Context, rawData []byte, resultChan } } +// PaidPutBlob behaves like PutBlob but with authenticated payment. +func (m EigenDAClient) PaidPutBlob(ctx context.Context, data []byte) (*grpcdisperser.BlobInfo, error) { + resultChan, errorChan := m.PaidPutBlobAsync(ctx, data) + select { // no timeout here because we depend on the configured timeout in PutBlobAsync + case result := <-resultChan: + return result, nil + case err := <-errorChan: + return nil, err + } +} + +func (m EigenDAClient) PaidPutBlobAsync(ctx context.Context, data []byte) (resultChan chan *grpcdisperser.BlobInfo, errChan chan error) { + resultChan = make(chan *grpcdisperser.BlobInfo, 1) + errChan = make(chan error, 1) + go m.paidPutBlob(ctx, data, resultChan, errChan) + return +} + +func (m EigenDAClient) paidPutBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) { + m.Log.Info("Attempting to disperse blob to EigenDA") + + // encode blob + if m.Codec == nil { + errChan <- fmt.Errorf("Codec cannot be nil") + return + } + + data, err := m.Codec.EncodeBlob(rawData) + if err != nil { + errChan <- fmt.Errorf("error encoding blob: %w", err) + return + } + + customQuorumNumbers := make([]uint8, len(m.Config.CustomQuorumIDs)) + for i, e := range m.Config.CustomQuorumIDs { + customQuorumNumbers[i] = uint8(e) + } + // disperse blob + blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers) + if err != nil { + errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err) + return + } + + // process response + if *blobStatus == disperser.Failed { + m.Log.Error("Unable to disperse blob to EigenDA, aborting", "err", err) + errChan <- fmt.Errorf("reply status is %d", blobStatus) + return + } + + base64RequestID := base64.StdEncoding.EncodeToString(requestID) + m.Log.Info("Blob dispersed to EigenDA, now waiting for confirmation", "requestID", base64RequestID) + + ticker := time.NewTicker(m.Config.StatusQueryRetryInterval) + defer ticker.Stop() + + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, m.Config.StatusQueryTimeout) + defer cancel() + + alreadyWaitingForDispersal := false + alreadyWaitingForFinalization := false + for { + select { + case <-ctx.Done(): + errChan <- fmt.Errorf("timed out waiting for EigenDA blob to confirm blob with request id=%s: %w", base64RequestID, ctx.Err()) + return + case <-ticker.C: + statusRes, err := m.Client.GetBlobStatus(ctx, requestID) + if err != nil { + m.Log.Error("Unable to retrieve blob dispersal status, will retry", "requestID", base64RequestID, "err", err) + continue + } + + switch statusRes.Status { + case grpcdisperser.BlobStatus_PROCESSING, grpcdisperser.BlobStatus_DISPERSING: + // to prevent log clutter, we only log at info level once + if alreadyWaitingForDispersal { + m.Log.Debug("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID) + } else { + m.Log.Info("Blob submitted, waiting for dispersal from EigenDA", "requestID", base64RequestID) + alreadyWaitingForDispersal = true + } + case grpcdisperser.BlobStatus_FAILED: + m.Log.Error("EigenDA blob dispersal failed in processing", "requestID", base64RequestID, "err", err) + errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing, requestID=%s: %w", base64RequestID, err) + return + case grpcdisperser.BlobStatus_INSUFFICIENT_SIGNATURES: + m.Log.Error("EigenDA blob dispersal failed in processing with insufficient signatures", "requestID", base64RequestID, "err", err) + errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with insufficient signatures, requestID=%s: %w", base64RequestID, err) + return + case grpcdisperser.BlobStatus_CONFIRMED: + if m.Config.WaitForFinalization { + // to prevent log clutter, we only log at info level once + if alreadyWaitingForFinalization { + m.Log.Debug("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID) + } else { + m.Log.Info("EigenDA blob confirmed, waiting for finalization", "requestID", base64RequestID) + alreadyWaitingForFinalization = true + } + } else { + m.Log.Info("EigenDA blob confirmed", "requestID", base64RequestID) + resultChan <- statusRes.Info + return + } + case grpcdisperser.BlobStatus_FINALIZED: + batchHeaderHashHex := fmt.Sprintf("0x%s", hex.EncodeToString(statusRes.Info.BlobVerificationProof.BatchMetadata.BatchHeaderHash)) + m.Log.Info("Successfully dispersed blob to EigenDA", "requestID", base64RequestID, "batchHeaderHash", batchHeaderHashHex) + resultChan <- statusRes.Info + return + default: + errChan <- fmt.Errorf("EigenDA blob dispersal failed in processing with reply status %d", statusRes.Status) + return + } + } + } +} + // Close simply calls Close() on the wrapped disperserClient, to close the grpc connection to the disperser server. // It is thread safe and can be called multiple times. func (c *EigenDAClient) Close() error { diff --git a/api/clients/eigenda_client_test.go b/api/clients/eigenda_client_test.go index 79e8556df..91684d95c 100644 --- a/api/clients/eigenda_client_test.go +++ b/api/clients/eigenda_client_test.go @@ -510,7 +510,7 @@ func TestPutBlobTotalTimeout(t *testing.T) { func TestPutBlobNoopSigner(t *testing.T) { config := clients.NewConfig("nohost", "noport", time.Second, false) - disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner()) + disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), clients.Accountant{}) test := []byte("test") test[0] = 0x00 // make sure the first byte of the requst is always 0 diff --git a/api/clients/mock/disperser_client.go b/api/clients/mock/disperser_client.go index c7ab9627f..6784a1afd 100644 --- a/api/clients/mock/disperser_client.go +++ b/api/clients/mock/disperser_client.go @@ -81,9 +81,25 @@ func (c *MockDisperserClient) DisperseBlob(ctx context.Context, data []byte, quo return status, key, err } -// TODO: implement in the subsequent PR func (c *MockDisperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { - return nil, nil, nil + args := c.Called(data, quorums) + var status *disperser.BlobStatus + if args.Get(0) != nil { + status = (args.Get(0)).(*disperser.BlobStatus) + } + var key []byte + if args.Get(1) != nil { + key = (args.Get(1)).([]byte) + } + var err error + if args.Get(2) != nil { + err = (args.Get(2)).(error) + } + + keyStr := base64.StdEncoding.EncodeToString(key) + c.mockRequestIDStore[keyStr] = data + + return status, key, err } func (c *MockDisperserClient) GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error) { diff --git a/api/clients/retrieval_client.go b/api/clients/retrieval_client.go index 8bc034cd2..e8782a5e6 100644 --- a/api/clients/retrieval_client.go +++ b/api/clients/retrieval_client.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigensdk-go/logging" diff --git a/core/data.go b/core/data.go index 2100be6a1..893c45217 100644 --- a/core/data.go +++ b/core/data.go @@ -519,6 +519,15 @@ func ConvertPaymentHeader(header *commonpb.PaymentHeader) *PaymentMetadata { } } +// Hash returns the Keccak256 hash of the PaymentMetadata +func (pm *PaymentMetadata) ConvertToProtoPaymentHeader() *commonpb.PaymentHeader { + return &commonpb.PaymentHeader{ + AccountId: pm.AccountID, + BinIndex: pm.BinIndex, + CumulativePayment: pm.CumulativePayment.Bytes(), + } +} + // OperatorInfo contains information about an operator which is stored on the blockchain state, // corresponding to a particular quorum type ActiveReservation struct { diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 43fbba993..f1627e444 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -655,6 +655,10 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal panic("failed to make initial query to the on-chain state") } + mockState.On("GetPricePerSymbol").Return(uint32(1), nil) + mockState.On("GetMinNumSymbols").Return(uint32(1), nil) + mockState.On("GetGlobalSymbolsPerSecond").Return(uint32(1000), nil) + mockState.On("GetReservationWindow").Return(uint32(60), nil) // append test name to each table name for an unique store table_names := []string{"reservations_server_" + testName, "ondemand_server_" + testName, "global_server_" + testName} err = meterer.CreateReservationTable(awsConfig, table_names[0]) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index f701ce64d..9dde51b8f 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -76,6 +76,8 @@ type DisperserVars struct { DISPERSER_SERVER_RETRIEVAL_BLOB_RATE string DISPERSER_SERVER_RETRIEVAL_BYTE_RATE string + + DISPERSER_SERVER_ENABLE_PAYMENT_METERER string } func (vars DisperserVars) getEnvMap() map[string]string { diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index 4bfdfa22f..f90cab5a5 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -4,14 +4,17 @@ import ( "bytes" "context" "crypto/rand" + "encoding/hex" "math/big" "time" "github.com/Layr-Labs/eigenda/api/clients" disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser" rollupbindings "github.com/Layr-Labs/eigenda/contracts/bindings/MockRollup" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/disperser" + "github.com/ethereum/go-ethereum/crypto" "github.com/Layr-Labs/eigenda/encoding/utils/codec" . "github.com/onsi/ginkgo/v2" @@ -25,6 +28,18 @@ func mineAnvilBlocks(numBlocks int) { } } +var ( + dummyActiveReservation = core.ActiveReservation{ + SymbolsPerSec: 100, + StartTimestamp: 1000, + EndTimestamp: 2000, + QuorumSplit: []byte{50, 50}, + } + dummyOnDemandPayment = core.OnDemandPayment{ + CumulativePayment: big.NewInt(1000), + } +) + var _ = Describe("Inabox Integration", func() { It("test end to end scenario", func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) @@ -36,11 +51,13 @@ var _ = Describe("Inabox Integration", func() { privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" signer := auth.NewLocalBlobRequestSigner(privateKeyHex) + privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey.D.Bytes())) disp := clients.NewDisperserClient(&clients.Config{ Hostname: "localhost", Port: "32003", Timeout: 10 * time.Second, - }, signer) + }, signer, clients.NewAccountant(dummyActiveReservation, dummyOnDemandPayment, 60, 128, 128, paymentSigner)) Expect(disp).To(Not(BeNil())) diff --git a/inabox/tests/payment_test.go b/inabox/tests/payment_test.go new file mode 100644 index 000000000..bcfbb8e94 --- /dev/null +++ b/inabox/tests/payment_test.go @@ -0,0 +1,154 @@ +package integration_test + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "time" + + "github.com/Layr-Labs/eigenda/api/clients" + disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/disperser" + "github.com/ethereum/go-ethereum/crypto" + + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Inabox Integration", func() { + It("test payment metering", func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) + defer cancel() + + gasTipCap, gasFeeCap, err := ethClient.GetLatestGasCaps(ctx) + Expect(err).To(BeNil()) + + privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" + signer := auth.NewLocalBlobRequestSigner(privateKeyHex) + // Disperser configs: rsv window 60s, min chargeable size 100 bytes, price per chargeable 100, global limit 500 + // -> need to check the mock, can't just use any account for the disperser client, consider using static wallets... + + // say with dataLength of 150 bytes, within a window, we can send 7 blobs with overflow of 50 bytes + // the later requests is then 250 bytes, try send 4 blobs within a second, 2 of them would fail but not charged for + // wait for a second, retry, and that should allow ondemand to work + privateKey, err := crypto.HexToECDSA(privateKeyHex[2:]) // Remove "0x" prefix + Expect(err).To(BeNil()) + paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey.D.Bytes())) + disp := clients.NewDisperserClient(&clients.Config{ + Hostname: "localhost", + Port: "32003", + Timeout: 10 * time.Second, + }, signer, clients.NewAccountant(dummyActiveReservation, dummyOnDemandPayment, 60, 128, 128, paymentSigner)) + + Expect(disp).To(Not(BeNil())) + + singleBlobSize := uint32(128) + data := make([]byte, singleBlobSize) + _, err = rand.Read(data) + Expect(err).To(BeNil()) + + paddedData := codec.ConvertByPaddingEmptyByte(data) + + // requests that count towards either reservation or payments + paidBlobStatus := []disperser.BlobStatus{} + paidKeys := [][]byte{} + reservationBytesLimit := 1024 + paymentLimit := 512 + // TODO: payment calculation unit consistency + for i := 0; i < (int(reservationBytesLimit+paymentLimit))/int(singleBlobSize); i++ { + blobStatus, key, err := disp.DisperseBlob(ctx, paddedData, []uint8{0}) + Expect(err).To(BeNil()) + Expect(key).To(Not(BeNil())) + Expect(blobStatus).To(Not(BeNil())) + Expect(*blobStatus).To(Equal(disperser.Processing)) + paidBlobStatus = append(paidBlobStatus, *blobStatus) + paidKeys = append(paidKeys, key) + } + + // requests that aren't covered by reservation or on-demand payment + blobStatus, key, err := disp.DispersePaidBlob(ctx, paddedData, []uint8{0}) + Expect(err).To(Not(BeNil())) + Expect(key).To(BeNil()) + Expect(blobStatus).To(BeNil()) + + ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + + var replies = make([]*disperserpb.BlobStatusReply, len(paidBlobStatus)) + // now make sure all the paid blobs get confirmed + loop: + for { + select { + case <-ctx.Done(): + Fail("timed out") + case <-ticker.C: + notConfirmed := false + for i, key := range paidKeys { + reply, err := disp.GetBlobStatus(context.Background(), key) + Expect(err).To(BeNil()) + Expect(reply).To(Not(BeNil())) + status, err := disperser.FromBlobStatusProto(reply.GetStatus()) + Expect(err).To(BeNil()) + if *status != disperser.Confirmed { + notConfirmed = true + } + replies[i] = reply + paidBlobStatus[i] = *status + } + + if notConfirmed { + mineAnvilBlocks(numConfirmations + 1) + continue + } + + for _, reply := range replies { + blobHeader := blobHeaderFromProto(reply.GetInfo().GetBlobHeader()) + verificationProof := blobVerificationProofFromProto(reply.GetInfo().GetBlobVerificationProof()) + opts, err := ethClient.GetNoSendTransactOpts() + Expect(err).To(BeNil()) + tx, err := mockRollup.PostCommitment(opts, blobHeader, verificationProof) + Expect(err).To(BeNil()) + tx, err = ethClient.UpdateGas(ctx, tx, nil, gasTipCap, gasFeeCap) + Expect(err).To(BeNil()) + err = ethClient.SendTransaction(ctx, tx) + Expect(err).To(BeNil()) + mineAnvilBlocks(numConfirmations + 1) + _, err = ethClient.EnsureTransactionEvaled(ctx, tx, "PostCommitment") + Expect(err).To(BeNil()) + } + + break loop + } + } + for _, status := range paidBlobStatus { + Expect(status).To(Equal(disperser.Confirmed)) + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + for _, reply := range replies { + retrieved, err := retrievalClient.RetrieveBlob(ctx, + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()), + reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), + uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()), + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()), + 0, // retrieve blob 1 from quorum 0 + ) + Expect(err).To(BeNil()) + restored := codec.RemoveEmptyByteFromPaddedBytes(retrieved) + Expect(bytes.TrimRight(restored, "\x00")).To(Equal(bytes.TrimRight(data, "\x00"))) + + _, err = retrievalClient.RetrieveBlob(ctx, + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeaderHash()), + reply.GetInfo().GetBlobVerificationProof().GetBlobIndex(), + uint(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetReferenceBlockNumber()), + [32]byte(reply.GetInfo().GetBlobVerificationProof().GetBatchMetadata().GetBatchHeader().GetBatchRoot()), + 1, // retrieve blob 1 from quorum 1 + ) + Expect(err).NotTo(BeNil()) + } + }) +}) diff --git a/inabox/tests/ratelimit_test.go b/inabox/tests/ratelimit_test.go index 47a93decb..42ee49961 100644 --- a/inabox/tests/ratelimit_test.go +++ b/inabox/tests/ratelimit_test.go @@ -111,7 +111,7 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase) Hostname: "localhost", Port: testConfig.Dispersers[0].DISPERSER_SERVER_GRPC_PORT, Timeout: 10 * time.Second, - }, nil) + }, nil, clients.Accountant{}) assert.NotNil(t, disp) data := make([]byte, c.blobSize) diff --git a/test/integration_test.go b/test/integration_test.go index 7dde36695..ffb1a90b7 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -210,18 +210,16 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser tx.On("GetCurrentBlockNumber").Return(uint64(100), nil) tx.On("GetQuorumCount").Return(1, nil) - minimumNumSymbols := uint32(128) - pricePerSymbol := uint32(1) - reservationLimit := uint64(1024) - paymentLimit := big.NewInt(512) - // this is disperser client's private key used in tests privateKey, err := crypto.HexToECDSA("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded") // Remove "0x" prefix if err != nil { panic("failed to convert hex to ECDSA") } publicKey := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() + mockState := &coremock.MockOnchainPaymentState{} + reservationLimit := uint64(1024) + paymentLimit := big.NewInt(512) mockState.On("GetActiveReservationByAccount", mock.Anything, mock.MatchedBy(func(account string) bool { return account == publicKey })).Return(core.ActiveReservation{SymbolsPerSec: reservationLimit, StartTimestamp: 0, EndTimestamp: math.MaxUint32, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}}, nil) @@ -232,11 +230,11 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser })).Return(core.OnDemandPayment{CumulativePayment: paymentLimit}, nil) mockState.On("GetOnDemandPaymentByAccount", mock.Anything, mock.Anything).Return(core.OnDemandPayment{}, errors.New("payment not found")) mockState.On("GetOnDemandQuorumNumbers", mock.Anything).Return([]uint8{0, 1}, nil) - mockState.On("GetMinNumSymbols", mock.Anything).Return(minimumNumSymbols, nil) - mockState.On("GetPricePerSymbol", mock.Anything).Return(pricePerSymbol, nil) - mockState.On("GetReservationWindow", mock.Anything).Return(uint32(60), nil) mockState.On("GetGlobalSymbolsPerSecond", mock.Anything).Return(uint64(1024), nil) - mockState.On("GetOnDemandQuorumNumbers", mock.Anything).Return([]uint8{0, 1}, nil) + mockState.On("GetPricePerSymbol", mock.Anything).Return(uint32(1), nil) + mockState.On("GetMinNumSymbols", mock.Anything).Return(uint32(128), nil) + mockState.On("GetReservationWindow", mock.Anything).Return(uint32(60), nil) + mockState.On("RefreshOnchainPaymentState", mock.Anything).Return(nil).Maybe() deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") if !deployLocalStack { From 76e1aadb8720ce177912825203a6d2874b936160 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 25 Oct 2024 13:20:10 -0700 Subject: [PATCH 2/5] refactor: rebase and update --- api/clients/accountant.go | 66 +++++++++++++++++------------ api/clients/accountant_test.go | 68 +++++++++++++++--------------- api/clients/disperser_client.go | 5 ++- api/clients/eigenda_client.go | 6 +-- api/clients/eigenda_client_test.go | 2 +- inabox/deploy/config.go | 24 ++++++----- inabox/deploy/env_vars.go | 6 +++ inabox/tests/integration_test.go | 14 +----- inabox/tests/payment_test.go | 3 +- inabox/tests/ratelimit_test.go | 2 +- 10 files changed, 104 insertions(+), 92 deletions(-) diff --git a/api/clients/accountant.go b/api/clients/accountant.go index bf360afb3..cd3484bd3 100644 --- a/api/clients/accountant.go +++ b/api/clients/accountant.go @@ -2,9 +2,9 @@ package clients import ( "context" - "errors" "fmt" "math/big" + "sync" "time" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" @@ -18,37 +18,40 @@ type IAccountant interface { type Accountant struct { // on-chain states - reservation core.ActiveReservation - onDemand core.OnDemandPayment - reservationWindow uint32 - pricePerChargeable uint32 - minChargeableSize uint32 + reservation core.ActiveReservation + onDemand core.OnDemandPayment + reservationWindow uint32 + pricePerSymbol uint32 + minNumSymbols uint32 // local accounting - // contains 3 bins; 0 for current bin, 1 for next bin, 2 for overflowed bin + // contains 3 bins; index 0 for current bin, 1 for next bin, 2 for overflowed bin binUsages []uint64 + usageLock sync.Mutex cumulativePayment *big.Int stopRotation chan struct{} paymentSigner core.PaymentSigner } -func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerChargeable uint32, minChargeableSize uint32, paymentSigner core.PaymentSigner) Accountant { +func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, paymentSigner core.PaymentSigner) *Accountant { //TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense // Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic + // There's a subsequent PR that handles populating the accountant with on-chain state from the disperser a := Accountant{ - reservation: reservation, - onDemand: onDemand, - reservationWindow: reservationWindow, - pricePerChargeable: pricePerChargeable, - minChargeableSize: minChargeableSize, - binUsages: []uint64{0, 0, 0}, - cumulativePayment: big.NewInt(0), - stopRotation: make(chan struct{}), - paymentSigner: paymentSigner, + reservation: reservation, + onDemand: onDemand, + reservationWindow: reservationWindow, + pricePerSymbol: pricePerSymbol, + minNumSymbols: minNumSymbols, + binUsages: []uint64{0, 0, 0}, + cumulativePayment: big.NewInt(0), + stopRotation: make(chan struct{}), + paymentSigner: paymentSigner, } go a.startBinRotation() - return a + // TODO: add a routine to refresh the on-chain state occasionally? + return &a } func (a *Accountant) startBinRotation() { @@ -66,7 +69,9 @@ func (a *Accountant) startBinRotation() { } func (a *Accountant) rotateBins() { - // Shift bins: bin_i to bin_{i-1}, add 0 to bin2 + a.usageLock.Lock() + defer a.usageLock.Unlock() + // Shift bins: bin_i to bin_{i-1}, set 0 to bin2 a.binUsages[0] = a.binUsages[1] a.binUsages[1] = a.binUsages[2] a.binUsages[2] = 0 @@ -80,6 +85,8 @@ func (a *Accountant) Stop() { func (a *Accountant) BlobPaymentInfo(ctx context.Context, dataLength uint64) (uint32, *big.Int, error) { //TODO: do we need to lock the binUsages here in case the blob rotation happens in the middle of the function? // binUsage := a.binUsages[0] + dataLength + a.usageLock.Lock() + defer a.usageLock.Unlock() a.binUsages[0] += dataLength now := time.Now().Unix() currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow) @@ -99,13 +106,12 @@ func (a *Accountant) BlobPaymentInfo(ctx context.Context, dataLength uint64) (ui // reservation not available, attempt on-demand //todo: rollback if disperser respond with some type of rejection? a.binUsages[0] -= dataLength - incrementRequired := big.NewInt(int64(a.PaymentCharged(uint32(dataLength)))) + incrementRequired := big.NewInt(int64(a.PaymentCharged(uint(dataLength)))) a.cumulativePayment.Add(a.cumulativePayment, incrementRequired) if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 { return 0, a.cumulativePayment, nil } - fmt.Println("Accountant cannot approve payment for this blob") - return 0, big.NewInt(0), errors.New("Accountant cannot approve payment for this blob") + return 0, big.NewInt(0), fmt.Errorf("Accountant cannot approve payment for this blob") } // accountant provides and records payment information @@ -131,12 +137,18 @@ func (a *Accountant) AccountBlob(ctx context.Context, dataLength uint64, quorums return protoPaymentHeader, signature, nil } +// TODO: PaymentCharged and SymbolsCharged copied from meterer, should be refactored // PaymentCharged returns the chargeable price for a given data length -func (a *Accountant) PaymentCharged(dataLength uint32) uint64 { - return uint64(core.RoundUpDivide(uint(a.BlobSizeCharged(dataLength)*a.pricePerChargeable), uint(a.minChargeableSize))) +func (a *Accountant) PaymentCharged(dataLength uint) uint64 { + return uint64(a.SymbolsCharged(dataLength)) * uint64(a.pricePerSymbol) } -// BlobSizeCharged returns the chargeable data length for a given data length -func (a *Accountant) BlobSizeCharged(dataLength uint32) uint32 { - return max(dataLength, uint32(a.minChargeableSize)) +// SymbolsCharged returns the number of symbols charged for a given data length +// being at least MinNumSymbols or the nearest rounded-up multiple of MinNumSymbols. +func (a *Accountant) SymbolsCharged(dataLength uint) uint32 { + if dataLength <= uint(a.minNumSymbols) { + return a.minNumSymbols + } + // Round up to the nearest multiple of MinNumSymbols + return uint32(core.RoundUpDivide(uint(dataLength), uint(a.minNumSymbols))) * a.minNumSymbols } diff --git a/api/clients/accountant_test.go b/api/clients/accountant_test.go index bad2db905..84c44fda9 100644 --- a/api/clients/accountant_test.go +++ b/api/clients/accountant_test.go @@ -27,20 +27,20 @@ func TestNewAccountant(t *testing.T) { CumulativePayment: big.NewInt(500), } reservationWindow := uint32(6) - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) - defer accountant.Stop() + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) + assert.NotNil(t, accountant) assert.Equal(t, reservation, accountant.reservation) assert.Equal(t, onDemand, accountant.onDemand) assert.Equal(t, reservationWindow, accountant.reservationWindow) - assert.Equal(t, pricePerChargeable, accountant.pricePerChargeable) - assert.Equal(t, minChargeableSize, accountant.minChargeableSize) + assert.Equal(t, pricePerSymbol, accountant.pricePerSymbol) + assert.Equal(t, minNumSymbols, accountant.minNumSymbols) assert.Equal(t, []uint64{0, 0, 0}, accountant.binUsages) assert.Equal(t, big.NewInt(0), accountant.cumulativePayment) } @@ -57,13 +57,13 @@ func TestAccountBlob_Reservation(t *testing.T) { CumulativePayment: big.NewInt(500), } reservationWindow := uint32(5) - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -109,13 +109,13 @@ func TestAccountBlob_OnDemand(t *testing.T) { CumulativePayment: big.NewInt(500), } reservationWindow := uint32(5) - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -124,7 +124,7 @@ func TestAccountBlob_OnDemand(t *testing.T) { header, _, err := accountant.AccountBlob(ctx, dataLength, quorums) metadata := core.ConvertPaymentHeader(header) - expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerChargeable) / uint64(minChargeableSize))) + expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerSymbol) / uint64(minNumSymbols))) assert.NoError(t, err) assert.Equal(t, uint32(0), header.BinIndex) assert.Equal(t, expectedPayment, metadata.CumulativePayment) @@ -138,13 +138,13 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) { CumulativePayment: big.NewInt(500), } reservationWindow := uint32(60) - pricePerChargeable := uint32(100) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(100) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -169,13 +169,13 @@ func TestAccountBlobCallSeries(t *testing.T) { CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(5) - pricePerChargeable := uint32(100) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(100) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -221,12 +221,12 @@ func TestAccountBlob_BinRotation(t *testing.T) { CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -263,13 +263,13 @@ func TestBinRotation(t *testing.T) { CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -312,13 +312,13 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) { CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() @@ -357,13 +357,13 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) { CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(5) - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() quorums := []uint8{0, 1} @@ -405,13 +405,13 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) { CumulativePayment: big.NewInt(1000), } reservationWindow := uint32(1) // Set to 1 second for testing - pricePerChargeable := uint32(1) - minChargeableSize := uint32(100) + pricePerSymbol := uint32(1) + minNumSymbols := uint32(100) privateKey1, err := crypto.GenerateKey() assert.NoError(t, err) paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes())) - accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner) + accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner) defer accountant.Stop() ctx := context.Background() diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index 425574a11..c0f0e4419 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -65,12 +65,12 @@ type disperserClient struct { // or to use a pool of connections here. conn *grpc.ClientConn client disperser_rpc.DisperserClient - accountant Accountant + accountant *Accountant } var _ DisperserClient = &disperserClient{} -func NewDisperserClient(config *Config, signer core.BlobRequestSigner, accountant Accountant) DisperserClient { +func NewDisperserClient(config *Config, signer core.BlobRequestSigner, accountant *Accountant) DisperserClient { return &disperserClient{ config: config, signer: signer, @@ -127,6 +127,7 @@ func (c *disperserClient) DisperseBlob(ctx context.Context, data []byte, quorums return blobStatus, reply.GetRequestId(), nil } +// DispersePaidBlob disperses a blob with a payment header and signature. Similar to DisperseBlob but with signed payment header. func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quorums []uint8) (*disperser.BlobStatus, []byte, error) { err := c.initOnceGrpcConnection() if err != nil { diff --git a/api/clients/eigenda_client.go b/api/clients/eigenda_client.go index 9a76706ae..a6955f87b 100644 --- a/api/clients/eigenda_client.go +++ b/api/clients/eigenda_client.go @@ -285,7 +285,7 @@ func (m EigenDAClient) PaidPutBlobAsync(ctx context.Context, data []byte) (resul } func (m EigenDAClient) paidPutBlob(ctx context.Context, rawData []byte, resultChan chan *grpcdisperser.BlobInfo, errChan chan error) { - m.Log.Info("Attempting to disperse blob to EigenDA") + m.Log.Info("Attempting to disperse blob to EigenDA with payment") // encode blob if m.Codec == nil { @@ -304,9 +304,9 @@ func (m EigenDAClient) paidPutBlob(ctx context.Context, rawData []byte, resultCh customQuorumNumbers[i] = uint8(e) } // disperse blob - blobStatus, requestID, err := m.Client.DisperseBlobAuthenticated(ctx, data, customQuorumNumbers) + blobStatus, requestID, err := m.Client.DispersePaidBlob(ctx, data, customQuorumNumbers) if err != nil { - errChan <- fmt.Errorf("error initializing DisperseBlobAuthenticated() client: %w", err) + errChan <- fmt.Errorf("error initializing DispersePaidBlob() client: %w", err) return } diff --git a/api/clients/eigenda_client_test.go b/api/clients/eigenda_client_test.go index 91684d95c..96d2d4ac0 100644 --- a/api/clients/eigenda_client_test.go +++ b/api/clients/eigenda_client_test.go @@ -510,7 +510,7 @@ func TestPutBlobTotalTimeout(t *testing.T) { func TestPutBlobNoopSigner(t *testing.T) { config := clients.NewConfig("nohost", "noport", time.Second, false) - disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), clients.Accountant{}) + disperserClient := clients.NewDisperserClient(config, auth.NewLocalNoopSigner(), &clients.Accountant{}) test := []byte("test") test[0] = 0x00 // make sure the first byte of the requst is always 0 diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index 01795d65f..1de79f003 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -159,16 +159,20 @@ func (env *Config) generateChurnerVars(ind int, graphUrl, logPath, grpcPort stri // Generates disperser .env func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath, grpcPort string) DisperserVars { v := DisperserVars{ - DISPERSER_SERVER_S3_BUCKET_NAME: "test-eigenda-blobstore", - DISPERSER_SERVER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", - DISPERSER_SERVER_RATE_BUCKET_TABLE_NAME: "", - DISPERSER_SERVER_RATE_BUCKET_STORE_SIZE: "100000", - DISPERSER_SERVER_GRPC_PORT: grpcPort, - DISPERSER_SERVER_ENABLE_METRICS: "true", - DISPERSER_SERVER_METRICS_HTTP_PORT: "9093", - DISPERSER_SERVER_CHAIN_RPC: "", - DISPERSER_SERVER_PRIVATE_KEY: "123", - DISPERSER_SERVER_NUM_CONFIRMATIONS: "0", + DISPERSER_SERVER_S3_BUCKET_NAME: "test-eigenda-blobstore", + DISPERSER_SERVER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", + DISPERSER_SERVER_RATE_BUCKET_TABLE_NAME: "", + DISPERSER_SERVER_RATE_BUCKET_STORE_SIZE: "100000", + DISPERSER_SERVER_GRPC_PORT: grpcPort, + DISPERSER_SERVER_ENABLE_METRICS: "true", + DISPERSER_SERVER_ENABLE_PAYMENT_METERER: "true", + DISPERSER_SERVER_RESERVATIONS_TABLE_NAME: "reservations_inabox", + DISPERSER_SERVER_ON_DEMAND_TABLE_NAME: "on_demand_inabox", + DISPERSER_SERVER_GLOBAL_RATE_TABLE_NAME: "global_rate_inabox", + DISPERSER_SERVER_METRICS_HTTP_PORT: "9093", + DISPERSER_SERVER_CHAIN_RPC: "", + DISPERSER_SERVER_PRIVATE_KEY: "123", + DISPERSER_SERVER_NUM_CONFIRMATIONS: "0", DISPERSER_SERVER_REGISTERED_QUORUM_ID: "0,1", DISPERSER_SERVER_TOTAL_UNAUTH_BYTE_RATE: "10000000,10000000", diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index 9dde51b8f..2c654b17d 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -78,6 +78,12 @@ type DisperserVars struct { DISPERSER_SERVER_RETRIEVAL_BYTE_RATE string DISPERSER_SERVER_ENABLE_PAYMENT_METERER string + + DISPERSER_SERVER_RESERVATIONS_TABLE_NAME string + + DISPERSER_SERVER_ON_DEMAND_TABLE_NAME string + + DISPERSER_SERVER_GLOBAL_RATE_TABLE_NAME string } func (vars DisperserVars) getEnvMap() map[string]string { diff --git a/inabox/tests/integration_test.go b/inabox/tests/integration_test.go index f90cab5a5..953dd9493 100644 --- a/inabox/tests/integration_test.go +++ b/inabox/tests/integration_test.go @@ -28,18 +28,6 @@ func mineAnvilBlocks(numBlocks int) { } } -var ( - dummyActiveReservation = core.ActiveReservation{ - SymbolsPerSec: 100, - StartTimestamp: 1000, - EndTimestamp: 2000, - QuorumSplit: []byte{50, 50}, - } - dummyOnDemandPayment = core.OnDemandPayment{ - CumulativePayment: big.NewInt(1000), - } -) - var _ = Describe("Inabox Integration", func() { It("test end to end scenario", func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) @@ -57,7 +45,7 @@ var _ = Describe("Inabox Integration", func() { Hostname: "localhost", Port: "32003", Timeout: 10 * time.Second, - }, signer, clients.NewAccountant(dummyActiveReservation, dummyOnDemandPayment, 60, 128, 128, paymentSigner)) + }, signer, clients.NewAccountant(core.ActiveReservation{}, core.OnDemandPayment{}, 60, 128, 128, paymentSigner)) Expect(disp).To(Not(BeNil())) diff --git a/inabox/tests/payment_test.go b/inabox/tests/payment_test.go index bcfbb8e94..6da9c40c7 100644 --- a/inabox/tests/payment_test.go +++ b/inabox/tests/payment_test.go @@ -9,6 +9,7 @@ import ( "github.com/Layr-Labs/eigenda/api/clients" disperserpb "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/disperser" "github.com/ethereum/go-ethereum/crypto" @@ -41,7 +42,7 @@ var _ = Describe("Inabox Integration", func() { Hostname: "localhost", Port: "32003", Timeout: 10 * time.Second, - }, signer, clients.NewAccountant(dummyActiveReservation, dummyOnDemandPayment, 60, 128, 128, paymentSigner)) + }, signer, clients.NewAccountant(core.ActiveReservation{}, core.OnDemandPayment{}, 60, 128, 128, paymentSigner)) Expect(disp).To(Not(BeNil())) diff --git a/inabox/tests/ratelimit_test.go b/inabox/tests/ratelimit_test.go index 42ee49961..99fa2f655 100644 --- a/inabox/tests/ratelimit_test.go +++ b/inabox/tests/ratelimit_test.go @@ -111,7 +111,7 @@ func testRatelimit(t *testing.T, testConfig *deploy.Config, c ratelimitTestCase) Hostname: "localhost", Port: testConfig.Dispersers[0].DISPERSER_SERVER_GRPC_PORT, Timeout: 10 * time.Second, - }, nil, clients.Accountant{}) + }, nil, &clients.Accountant{}) assert.NotNil(t, disp) data := make([]byte, c.blobSize) From 4a42e963b6042c4f9095c2ec595a737532a9d64e Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 25 Oct 2024 14:35:58 -0700 Subject: [PATCH 3/5] refactor: rebase with new proto def --- api/clients/disperser_client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/api/clients/disperser_client.go b/api/clients/disperser_client.go index c0f0e4419..302deb5ba 100644 --- a/api/clients/disperser_client.go +++ b/api/clients/disperser_client.go @@ -154,10 +154,10 @@ func (c *disperserClient) DispersePaidBlob(ctx context.Context, data []byte, quo } request := &disperser_rpc.DispersePaidBlobRequest{ - Data: data, - CustomQuorumNumbers: quorumNumbers, - PaymentHeader: header, - PaymentSignature: signature, + Data: data, + QuorumNumbers: quorumNumbers, + PaymentHeader: header, + PaymentSignature: signature, } reply, err := c.client.DispersePaidBlob(ctxTimeout, request) From 0e4438459bb976ec95c700f964cf747b806a1fd5 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 25 Oct 2024 16:26:59 -0700 Subject: [PATCH 4/5] chore: adding integration test --- disperser/apiserver/payment_test.go | 115 ++++++++++++++++++++++++++++ disperser/apiserver/server_test.go | 22 +++++- 2 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 disperser/apiserver/payment_test.go diff --git a/disperser/apiserver/payment_test.go b/disperser/apiserver/payment_test.go new file mode 100644 index 000000000..f66525feb --- /dev/null +++ b/disperser/apiserver/payment_test.go @@ -0,0 +1,115 @@ +package apiserver_test + +import ( + "context" + "crypto/rand" + "math/big" + "net" + "testing" + "time" + + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" + "github.com/Layr-Labs/eigenda/core/mock" + "github.com/Layr-Labs/eigenda/encoding/utils/codec" + + pbcommon "github.com/Layr-Labs/eigenda/api/grpc/common" + pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/core" + "github.com/stretchr/testify/assert" + tmock "github.com/stretchr/testify/mock" + "google.golang.org/grpc/peer" +) + +func TestDispersePaidBlob(t *testing.T) { + + transactor := &mock.MockWriter{} + transactor.On("GetCurrentBlockNumber").Return(uint32(100), nil) + transactor.On("GetQuorumCount").Return(uint8(2), nil) + quorumParams := []core.SecurityParam{ + {QuorumID: 0, AdversaryThreshold: 80, ConfirmationThreshold: 100}, + {QuorumID: 1, AdversaryThreshold: 80, ConfirmationThreshold: 100}, + } + transactor.On("GetQuorumSecurityParams", tmock.Anything).Return(quorumParams, nil) + transactor.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil) + + quorums := []uint32{0, 1} + + dispersalServer := newTestServer(transactor, t.Name()) + + data := make([]byte, 1024) + _, err := rand.Read(data) + assert.NoError(t, err) + + data = codec.ConvertByPaddingEmptyByte(data) + + p := &peer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP("0.0.0.0"), + Port: 51001, + }, + } + ctx := peer.NewContext(context.Background(), p) + + transactor.On("GetRequiredQuorumNumbers", tmock.Anything).Return([]uint8{0, 1}, nil).Twice() + + pk := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" + signer := auth.NewPaymentSigner(pk) + // disperse paid reservation (any quorum number) + // Something is weird about setting the correct bin index so can't get reservation to work yet + binIndex := meterer.GetBinIndex(uint64(time.Now().Unix())-1, 2) + pm := pbcommon.PaymentHeader{ + AccountId: signer.GetAccountID(), + BinIndex: binIndex, + CumulativePayment: big.NewInt(0).Bytes(), + } + sig, err := signer.SignBlobPayment(&pm) + assert.NoError(t, err) + _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ + Data: data, + QuorumNumbers: []uint32{1}, + PaymentHeader: &pm, + PaymentSignature: sig, + }) + assert.Error(t, err) + + // disperse on-demand payment + for i := 1; i < 3; i++ { + shortData := make([]byte, 512) + _, err := rand.Read(shortData) + assert.NoError(t, err) + pm := pbcommon.PaymentHeader{ + AccountId: signer.GetAccountID(), + BinIndex: 0, + CumulativePayment: big.NewInt(512 * int64(i)).Bytes(), + } + sig, err := signer.SignBlobPayment(&pm) + assert.NoError(t, err) + reply, err := dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ + Data: data, + QuorumNumbers: quorums, + PaymentHeader: &pm, + PaymentSignature: sig, + }) + assert.NoError(t, err) + assert.Equal(t, reply.GetResult(), pb.BlobStatus_PROCESSING) + assert.NotNil(t, reply.GetRequestId()) + } + + // exceeded payment limit + pm := pbcommon.PaymentHeader{ + AccountId: signer.GetAccountID(), + BinIndex: 0, + CumulativePayment: big.NewInt(1025).Bytes(), + } + sig, err := signer.SignBlobPayment(&pm) + assert.NoError(t, err) + _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ + Data: data, + QuorumNumbers: quorums, + PaymentHeader: &pm, + PaymentSignature: sig, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "request claims a cumulative payment greater than the on-chain deposit") +} diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index f1627e444..02841f5d6 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "flag" "fmt" + "math/big" "net" "os" "testing" @@ -657,8 +658,20 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal mockState.On("GetPricePerSymbol").Return(uint32(1), nil) mockState.On("GetMinNumSymbols").Return(uint32(1), nil) - mockState.On("GetGlobalSymbolsPerSecond").Return(uint32(1000), nil) - mockState.On("GetReservationWindow").Return(uint32(60), nil) + mockState.On("GetGlobalSymbolsPerSecond").Return(uint64(4096), nil) + mockState.On("GetRequiredQuorumNumbers").Return([]uint8{0, 1}, nil) + mockState.On("GetOnDemandQuorumNumbers").Return([]uint8{0, 1}, nil) + mockState.On("GetReservationWindow").Return(uint32(2), nil) + mockState.On("GetOnDemandPaymentByAccount", tmock.Anything, tmock.Anything).Return(core.OnDemandPayment{ + CumulativePayment: big.NewInt(1024), + }, nil) + mockState.On("GetActiveReservationByAccount", tmock.Anything, tmock.Anything).Return(core.ActiveReservation{ + SymbolsPerSec: 1024, + StartTimestamp: 1000, + EndTimestamp: 2000, + QuorumNumbers: []uint8{0, 1}, + QuorumSplit: []byte{50, 50}, + }, nil) // append test name to each table name for an unique store table_names := []string{"reservations_server_" + testName, "ondemand_server_" + testName, "global_server_" + testName} err = meterer.CreateReservationTable(awsConfig, table_names[0]) @@ -688,7 +701,8 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal teardown() panic("failed to create offchain store") } - meterer := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) + mt := meterer.NewMeterer(meterer.Config{}, mockState, store, logger) + mt.ChainPaymentState.RefreshOnchainPaymentState(context.Background(), nil) ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ @@ -745,7 +759,7 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), meterer, ratelimiter, rateConfig, testMaxBlobSize) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), mt, ratelimiter, rateConfig, testMaxBlobSize) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { From 313a2cf3594f1027113505c7d8e9edfe4af2e90d Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 25 Oct 2024 17:56:23 -0700 Subject: [PATCH 5/5] chore: adding integration test --- core/meterer/meterer.go | 8 +-- core/meterer/meterer_test.go | 96 ++++++++++++----------------- disperser/apiserver/payment_test.go | 79 +++++++++++++++++------- disperser/apiserver/server.go | 3 +- disperser/apiserver/server_test.go | 11 ++-- 5 files changed, 106 insertions(+), 91 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 3e4fb97b8..07ba79024 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -67,15 +67,14 @@ func (m *Meterer) Start(ctx context.Context) { // MeterRequest validates a blob header and adds it to the meterer's state // TODO: return error if there's a rejection (with reasoning) or internal error (should be very rare) -func (m *Meterer) MeterRequest(ctx context.Context, blob core.Blob, header core.PaymentMetadata) error { - headerQuorums := blob.GetQuorumNumbers() +func (m *Meterer) MeterRequest(ctx context.Context, header core.PaymentMetadata, blobLength uint, quorumNumbers []uint8) error { // Validate against the payment method if header.CumulativePayment.Sign() == 0 { reservation, err := m.ChainPaymentState.GetActiveReservationByAccount(ctx, header.AccountID) if err != nil { return fmt.Errorf("failed to get active reservation by account: %w", err) } - if err := m.ServeReservationRequest(ctx, header, &reservation, blob.RequestHeader.BlobAuthHeader.Length, headerQuorums); err != nil { + if err := m.ServeReservationRequest(ctx, header, &reservation, blobLength, quorumNumbers); err != nil { return fmt.Errorf("invalid reservation: %w", err) } } else { @@ -83,7 +82,7 @@ func (m *Meterer) MeterRequest(ctx context.Context, blob core.Blob, header core. if err != nil { return fmt.Errorf("failed to get on-demand payment by account: %w", err) } - if err := m.ServeOnDemandRequest(ctx, header, &onDemandPayment, blob.RequestHeader.BlobAuthHeader.Length, headerQuorums); err != nil { + if err := m.ServeOnDemandRequest(ctx, header, &onDemandPayment, blobLength, quorumNumbers); err != nil { return fmt.Errorf("invalid on-demand request: %w", err) } } @@ -249,7 +248,6 @@ func (m *Meterer) SymbolsCharged(dataLength uint) uint32 { return m.ChainPaymentState.GetMinNumSymbols() } // Round up to the nearest multiple of MinNumSymbols - fmt.Println("return ", uint32(core.RoundUpDivide(uint(dataLength), uint(m.ChainPaymentState.GetMinNumSymbols())))*m.ChainPaymentState.GetMinNumSymbols()) return uint32(core.RoundUpDivide(uint(dataLength), uint(m.ChainPaymentState.GetMinNumSymbols()))) * m.ChainPaymentState.GetMinNumSymbols() } diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index dfe56b536..5ef8643a8 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -15,7 +15,6 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/core/mock" - "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/ethereum/go-ethereum/crypto" @@ -184,17 +183,17 @@ func TestMetererReservations(t *testing.T) { paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.Anything).Return(core.ActiveReservation{}, fmt.Errorf("reservation not found")) // test invalid quorom ID - blob, header := createMetererInput(1, 0, 1000, []uint8{0, 1, 2}, accountID1) - err := mt.MeterRequest(ctx, *blob, *header) + header := createPaymentHeader(1, 0, accountID1) + err := mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2}) assert.ErrorContains(t, err, "quorum number mismatch") // overwhelming bin overflow for empty bins - blob, header = createMetererInput(binIndex-1, 0, 10, quoromNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex-1, 0, accountID2) + err = mt.MeterRequest(ctx, *header, 10, quoromNumbers) assert.NoError(t, err) // overwhelming bin overflow for empty bins - blob, header = createMetererInput(binIndex-1, 0, 1000, quoromNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex-1, 0, accountID2) + err = mt.MeterRequest(ctx, *header, 1000, quoromNumbers) assert.ErrorContains(t, err, "overflow usage exceeds bin limit") // test non-existent account @@ -202,22 +201,22 @@ func TestMetererReservations(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - blob, header = createMetererInput(1, 0, 1000, []uint8{0, 1, 2}, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) + header = createPaymentHeader(1, 0, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2}) assert.ErrorContains(t, err, "failed to get active reservation by account: reservation not found") // test invalid bin index - blob, header = createMetererInput(binIndex, 0, 2000, quoromNumbers, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 0, accountID1) + err = mt.MeterRequest(ctx, *header, 2000, quoromNumbers) assert.ErrorContains(t, err, "invalid bin index for reservation") // test bin usage metering dataLength := uint(20) requiredLength := uint(21) // 21 should be charged for length of 20 since minNumSymbols is 3 for i := 0; i < 9; i++ { - blob, header = createMetererInput(binIndex, 0, dataLength, quoromNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 0, accountID2) + err = mt.MeterRequest(ctx, *header, dataLength, quoromNumbers) assert.NoError(t, err) item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{ "AccountID": &types.AttributeValueMemberS{Value: accountID2}, @@ -230,9 +229,9 @@ func TestMetererReservations(t *testing.T) { } // first over flow is allowed - blob, header = createMetererInput(binIndex, 0, 25, quoromNumbers, accountID2) + header = createPaymentHeader(binIndex, 0, accountID2) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 25, quoromNumbers) assert.NoError(t, err) overflowedBinIndex := binIndex + 2 item, err := dynamoClient.GetItem(ctx, reservationTableName, commondynamodb.Key{ @@ -246,9 +245,9 @@ func TestMetererReservations(t *testing.T) { assert.Equal(t, strconv.Itoa(int(16)), item["BinUsage"].(*types.AttributeValueMemberN).Value) // second over flow - blob, header = createMetererInput(binIndex, 0, 1, quoromNumbers, accountID2) + header = createPaymentHeader(binIndex, 0, accountID2) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 1, quoromNumbers) assert.ErrorContains(t, err, "bin has already been filled") } @@ -273,19 +272,19 @@ func TestMetererOnDemand(t *testing.T) { if err != nil { t.Fatalf("Failed to generate key: %v", err) } - blob, header := createMetererInput(binIndex, 2, 1000, quorumNumbers, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) + header := createPaymentHeader(binIndex, 2, crypto.PubkeyToAddress(unregisteredUser.PublicKey).Hex()) assert.NoError(t, err) - err = mt.MeterRequest(ctx, *blob, *header) + err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers) assert.ErrorContains(t, err, "failed to get on-demand payment by account: payment not found") // test invalid quorom ID - blob, header = createMetererInput(binIndex, 1, 1000, []uint8{0, 1, 2}, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 1, accountID1) + err = mt.MeterRequest(ctx, *header, 1000, []uint8{0, 1, 2}) assert.ErrorContains(t, err, "invalid quorum for On-Demand Request") // test insufficient cumulative payment - blob, header = createMetererInput(binIndex, 1, 2000, quorumNumbers, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 1, accountID1) + err = mt.MeterRequest(ctx, *header, 1000, quorumNumbers) assert.ErrorContains(t, err, "insufficient cumulative payment increment") // No rollback after meter request result, err := dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ @@ -299,37 +298,37 @@ func TestMetererOnDemand(t *testing.T) { dataLength := uint(100) priceCharged := mt.PaymentCharged(dataLength) assert.Equal(t, uint64(102*mt.ChainPaymentState.GetPricePerSymbol()), priceCharged) - blob, header = createMetererInput(binIndex, priceCharged, dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, priceCharged, accountID2) + err = mt.MeterRequest(ctx, *header, dataLength, quorumNumbers) assert.NoError(t, err) - blob, header = createMetererInput(binIndex, priceCharged, dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, priceCharged, accountID2) + err = mt.MeterRequest(ctx, *header, dataLength, quorumNumbers) assert.ErrorContains(t, err, "exact payment already exists") // test valid payments for i := 1; i < 9; i++ { - blob, header = createMetererInput(binIndex, uint64(priceCharged)*uint64(i+1), dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, uint64(priceCharged)*uint64(i+1), accountID2) + err = mt.MeterRequest(ctx, *header, dataLength, quorumNumbers) assert.NoError(t, err) } // test cumulative payment on-chain constraint - blob, header = createMetererInput(binIndex, 2023, 1, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, 2023, accountID2) + err = mt.MeterRequest(ctx, *header, 1, quorumNumbers) assert.ErrorContains(t, err, "invalid on-demand payment: request claims a cumulative payment greater than the on-chain deposit") // test insufficient increment in cumulative payment previousCumulativePayment := uint64(priceCharged) * uint64(9) dataLength = uint(2) priceCharged = mt.PaymentCharged(dataLength) - blob, header = createMetererInput(binIndex, previousCumulativePayment+priceCharged-1, dataLength, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, previousCumulativePayment+priceCharged-1, accountID2) + err = mt.MeterRequest(ctx, *header, dataLength, quorumNumbers) assert.ErrorContains(t, err, "invalid on-demand payment: insufficient cumulative payment increment") previousCumulativePayment = previousCumulativePayment + priceCharged // test cannot insert cumulative payment in out of order - blob, header = createMetererInput(binIndex, mt.PaymentCharged(50), 50, quorumNumbers, accountID2) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, mt.PaymentCharged(50), accountID2) + err = mt.MeterRequest(ctx, *header, 50, quorumNumbers) assert.ErrorContains(t, err, "invalid on-demand payment: breaking cumulative payment invariants") numPrevRecords := 12 @@ -340,9 +339,8 @@ func TestMetererOnDemand(t *testing.T) { assert.NoError(t, err) assert.Equal(t, numPrevRecords, len(result)) // test failed global rate limit (previously payment recorded: 2, global limit: 1009) - fmt.Println("need ", previousCumulativePayment+mt.PaymentCharged(1010)) - blob, header = createMetererInput(binIndex, previousCumulativePayment+mt.PaymentCharged(1010), 1010, quorumNumbers, accountID1) - err = mt.MeterRequest(ctx, *blob, *header) + header = createPaymentHeader(binIndex, previousCumulativePayment+mt.PaymentCharged(1010), accountID1) + err = mt.MeterRequest(ctx, *header, 1010, quorumNumbers) assert.ErrorContains(t, err, "failed global rate limiting") // Correct rollback result, err = dynamoClient.Query(ctx, ondemandTableName, "AccountID = :account", commondynamodb.ExpressionValues{ @@ -464,28 +462,10 @@ func TestMeterer_symbolsCharged(t *testing.T) { } } -func createMetererInput(binIndex uint32, cumulativePayment uint64, dataLength uint, quorumNumbers []uint8, accountID string) (blob *core.Blob, header *core.PaymentMetadata) { - sp := make([]*core.SecurityParam, len(quorumNumbers)) - for i, quorumID := range quorumNumbers { - sp[i] = &core.SecurityParam{ - QuorumID: quorumID, - } - } - blob = &core.Blob{ - RequestHeader: core.BlobRequestHeader{ - BlobAuthHeader: core.BlobAuthHeader{ - AccountID: accountID2, - BlobCommitments: encoding.BlobCommitments{ - Length: dataLength, - }, - }, - SecurityParams: sp, - }, - } - header = &core.PaymentMetadata{ +func createPaymentHeader(binIndex uint32, cumulativePayment uint64, accountID string) *core.PaymentMetadata { + return &core.PaymentMetadata{ AccountID: accountID, BinIndex: binIndex, CumulativePayment: big.NewInt(int64(cumulativePayment)), } - return blob, header } diff --git a/disperser/apiserver/payment_test.go b/disperser/apiserver/payment_test.go index f66525feb..8f57e7ea0 100644 --- a/disperser/apiserver/payment_test.go +++ b/disperser/apiserver/payment_test.go @@ -55,33 +55,14 @@ func TestDispersePaidBlob(t *testing.T) { pk := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" signer := auth.NewPaymentSigner(pk) - // disperse paid reservation (any quorum number) - // Something is weird about setting the correct bin index so can't get reservation to work yet - binIndex := meterer.GetBinIndex(uint64(time.Now().Unix())-1, 2) - pm := pbcommon.PaymentHeader{ - AccountId: signer.GetAccountID(), - BinIndex: binIndex, - CumulativePayment: big.NewInt(0).Bytes(), - } - sig, err := signer.SignBlobPayment(&pm) - assert.NoError(t, err) - _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ - Data: data, - QuorumNumbers: []uint32{1}, - PaymentHeader: &pm, - PaymentSignature: sig, - }) - assert.Error(t, err) + dataLength := len(data) // disperse on-demand payment for i := 1; i < 3; i++ { - shortData := make([]byte, 512) - _, err := rand.Read(shortData) - assert.NoError(t, err) pm := pbcommon.PaymentHeader{ AccountId: signer.GetAccountID(), BinIndex: 0, - CumulativePayment: big.NewInt(512 * int64(i)).Bytes(), + CumulativePayment: big.NewInt(int64(dataLength * i)).Bytes(), } sig, err := signer.SignBlobPayment(&pm) assert.NoError(t, err) @@ -100,7 +81,7 @@ func TestDispersePaidBlob(t *testing.T) { pm := pbcommon.PaymentHeader{ AccountId: signer.GetAccountID(), BinIndex: 0, - CumulativePayment: big.NewInt(1025).Bytes(), + CumulativePayment: big.NewInt(int64(dataLength*3) - 1).Bytes(), } sig, err := signer.SignBlobPayment(&pm) assert.NoError(t, err) @@ -112,4 +93,58 @@ func TestDispersePaidBlob(t *testing.T) { }) assert.Error(t, err) assert.Contains(t, err.Error(), "request claims a cumulative payment greater than the on-chain deposit") + + // disperse paid reservation (any quorum number) + // TODO: somehow meterer is not defined as a method or field in dispersalServer; reservationWindow we set was 1 + for i := 0; i < 2; i++ { + binIndex := meterer.GetBinIndex(uint64(time.Now().Unix()), 1) + pm = pbcommon.PaymentHeader{ + AccountId: signer.GetAccountID(), + BinIndex: binIndex, + CumulativePayment: big.NewInt(0).Bytes(), + } + sig, err = signer.SignBlobPayment(&pm) + assert.NoError(t, err) + reply, err := dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ + Data: data, + QuorumNumbers: []uint32{1}, + PaymentHeader: &pm, + PaymentSignature: sig, + }) + assert.NoError(t, err) + assert.Equal(t, reply.GetResult(), pb.BlobStatus_PROCESSING) + assert.NotNil(t, reply.GetRequestId()) + + } + binIndex := meterer.GetBinIndex(uint64(time.Now().Unix()), 1) + pm = pbcommon.PaymentHeader{ + AccountId: signer.GetAccountID(), + BinIndex: binIndex, + CumulativePayment: big.NewInt(0).Bytes(), + } + sig, err = signer.SignBlobPayment(&pm) + assert.NoError(t, err) + _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ + Data: data, + QuorumNumbers: []uint32{1}, + PaymentHeader: &pm, + PaymentSignature: sig, + }) + assert.Contains(t, err.Error(), "bin has already been filled") + + // invalid bin index + binIndex = meterer.GetBinIndex(uint64(time.Now().Unix())/2, 1) + pm = pbcommon.PaymentHeader{ + AccountId: signer.GetAccountID(), + BinIndex: binIndex, + CumulativePayment: big.NewInt(0).Bytes(), + } + sig, err = signer.SignBlobPayment(&pm) + _, err = dispersalServer.DispersePaidBlob(ctx, &pb.DispersePaidBlobRequest{ + Data: data, + QuorumNumbers: []uint32{1}, + PaymentHeader: &pm, + PaymentSignature: sig, + }) + assert.Contains(t, err.Error(), "invalid bin index for reservation") } diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 3ebcc18a2..0305d3984 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -285,7 +285,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut // If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available if paymentHeader != nil && *paymentHeader != (core.PaymentMetadata{}) { - err := s.meterer.MeterRequest(ctx, *blob, *paymentHeader) + err := s.meterer.MeterRequest(ctx, *paymentHeader, uint(blobSize), blob.GetQuorumNumbers()) if err != nil { return nil, api.NewErrorResourceExhausted(err.Error()) } @@ -1073,6 +1073,7 @@ func (s *DispersalServer) validatePaidRequestAndGetBlob(ctx context.Context, req data := req.GetData() blobSize := len(data) + // The blob size in bytes must be in range [1, maxBlobSize]. if blobSize > s.maxBlobSize { return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize) diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 02841f5d6..a5cd38fe8 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "flag" "fmt" + "math" "math/big" "net" "os" @@ -661,14 +662,14 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal mockState.On("GetGlobalSymbolsPerSecond").Return(uint64(4096), nil) mockState.On("GetRequiredQuorumNumbers").Return([]uint8{0, 1}, nil) mockState.On("GetOnDemandQuorumNumbers").Return([]uint8{0, 1}, nil) - mockState.On("GetReservationWindow").Return(uint32(2), nil) + mockState.On("GetReservationWindow").Return(uint32(1), nil) mockState.On("GetOnDemandPaymentByAccount", tmock.Anything, tmock.Anything).Return(core.OnDemandPayment{ - CumulativePayment: big.NewInt(1024), + CumulativePayment: big.NewInt(3000), }, nil) mockState.On("GetActiveReservationByAccount", tmock.Anything, tmock.Anything).Return(core.ActiveReservation{ - SymbolsPerSec: 1024, - StartTimestamp: 1000, - EndTimestamp: 2000, + SymbolsPerSec: 2048, + StartTimestamp: 0, + EndTimestamp: math.MaxUint32, QuorumNumbers: []uint8{0, 1}, QuorumSplit: []byte{50, 50}, }, nil)