Skip to content

Commit

Permalink
feat: payment state API for client
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 24, 2024
1 parent ecce71e commit 9a085ae
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 112 deletions.
55 changes: 38 additions & 17 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
disperser_rpc "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
)
Expand All @@ -18,11 +19,11 @@ type IAccountant interface {

type Accountant struct {
// on-chain states
reservation core.ActiveReservation
onDemand core.OnDemandPayment
reservationWindow uint32
pricePerChargeable uint32
minChargeableSize uint32
reservation core.ActiveReservation
onDemand core.OnDemandPayment
reservationWindow uint32
pricePerSymbol uint32
minNumSymbols uint32

// local accounting
// contains 3 bins; 0 for current bin, 1 for next bin, 2 for overflowed bin
Expand All @@ -33,19 +34,19 @@ type Accountant struct {
paymentSigner core.PaymentSigner
}

func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerChargeable uint32, minChargeableSize uint32, paymentSigner core.PaymentSigner) Accountant {
func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerSymbol uint32, minNumSymbols uint32, paymentSigner core.PaymentSigner) Accountant {
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
a := Accountant{
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerChargeable: pricePerChargeable,
minChargeableSize: minChargeableSize,
binUsages: []uint64{0, 0, 0},
cumulativePayment: big.NewInt(0),
stopRotation: make(chan struct{}),
paymentSigner: paymentSigner,
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerSymbol: pricePerSymbol,
minNumSymbols: minNumSymbols,
binUsages: []uint64{0, 0, 0},
cumulativePayment: big.NewInt(0),
stopRotation: make(chan struct{}),
paymentSigner: paymentSigner,
}
go a.startBinRotation()
return a
Expand Down Expand Up @@ -136,10 +137,30 @@ func (a *Accountant) AccountBlob(ctx context.Context, dataLength uint64, quorums

// PaymentCharged returns the chargeable price for a given data length
func (a *Accountant) PaymentCharged(dataLength uint32) uint64 {
return uint64(core.RoundUpDivide(uint(a.BlobSizeCharged(dataLength)*a.pricePerChargeable), uint(a.minChargeableSize)))
return uint64(core.RoundUpDivide(uint(a.BlobSizeCharged(dataLength)*a.pricePerSymbol), uint(a.minNumSymbols)))
}

// BlobSizeCharged returns the chargeable data length for a given data length
func (a *Accountant) BlobSizeCharged(dataLength uint32) uint32 {
return max(dataLength, uint32(a.minChargeableSize))
return max(dataLength, uint32(a.minNumSymbols))
}

func (a *Accountant) SetPaymentState(paymentState *disperser_rpc.GetPaymentStateReply) {
quorumNumbers := make([]uint8, len(paymentState.Reservation.QuorumNumbers))
for i, quorum := range paymentState.Reservation.QuorumNumbers {
quorumNumbers[i] = uint8(quorum)
}
quorumSplit := make([]uint8, len(paymentState.Reservation.QuorumSplit))
for i, quorum := range paymentState.Reservation.QuorumSplit {
quorumSplit[i] = uint8(quorum)
}
a.onDemand.CumulativePayment = new(big.Int).SetBytes(paymentState.OnChainCumulativePayment)
a.reservation.SymbolsPerSec = uint64(paymentState.PaymentGlobalParams.GlobalSymbolsPerSecond)
a.reservation.StartTimestamp = uint64(paymentState.Reservation.StartTimestamp)
a.reservation.EndTimestamp = uint64(paymentState.Reservation.EndTimestamp)
a.reservation.QuorumNumbers = quorumNumbers
a.reservation.QuorumSplit = quorumSplit
a.reservationWindow = uint32(paymentState.PaymentGlobalParams.ReservationWindow)
a.pricePerSymbol = uint32(paymentState.PaymentGlobalParams.PricePerSymbol)
a.minNumSymbols = uint32(paymentState.PaymentGlobalParams.MinNumSymbols)
}
66 changes: 33 additions & 33 deletions api/clients/accountant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ func TestNewAccountant(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(6)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

assert.Equal(t, reservation, accountant.reservation)
assert.Equal(t, onDemand, accountant.onDemand)
assert.Equal(t, reservationWindow, accountant.reservationWindow)
assert.Equal(t, pricePerChargeable, accountant.pricePerChargeable)
assert.Equal(t, minChargeableSize, accountant.minChargeableSize)
assert.Equal(t, pricePerSymbol, accountant.pricePerSymbol)
assert.Equal(t, minNumSymbols, accountant.minNumSymbols)
assert.Equal(t, []uint64{0, 0, 0}, accountant.binUsages)
assert.Equal(t, big.NewInt(0), accountant.cumulativePayment)
}
Expand All @@ -57,13 +57,13 @@ func TestAccountBlob_Reservation(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -109,13 +109,13 @@ func TestAccountBlob_OnDemand(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand All @@ -124,7 +124,7 @@ func TestAccountBlob_OnDemand(t *testing.T) {

header, _, err := accountant.AccountBlob(ctx, dataLength, quorums)
metadata := core.ConvertPaymentHeader(header)
expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerChargeable) / uint64(minChargeableSize)))
expectedPayment := big.NewInt(int64(dataLength * uint64(pricePerSymbol) / uint64(minNumSymbols)))
assert.NoError(t, err)
assert.Equal(t, uint32(0), header.BinIndex)
assert.Equal(t, expectedPayment, metadata.CumulativePayment)
Expand All @@ -138,13 +138,13 @@ func TestAccountBlob_InsufficientOnDemand(t *testing.T) {
CumulativePayment: big.NewInt(500),
}
reservationWindow := uint32(60)
pricePerChargeable := uint32(100)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(100)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand All @@ -169,13 +169,13 @@ func TestAccountBlobCallSeries(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(100)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(100)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -221,12 +221,12 @@ func TestAccountBlob_BinRotation(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)
privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -263,13 +263,13 @@ func TestBinRotation(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -312,13 +312,13 @@ func TestConcurrentBinRotationAndAccountBlob(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down Expand Up @@ -357,13 +357,13 @@ func TestAccountBlob_ReservationWithOneOverflow(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(5)
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()
ctx := context.Background()
quorums := []uint8{0, 1}
Expand Down Expand Up @@ -405,13 +405,13 @@ func TestAccountBlob_ReservationOverflowReset(t *testing.T) {
CumulativePayment: big.NewInt(1000),
}
reservationWindow := uint32(1) // Set to 1 second for testing
pricePerChargeable := uint32(1)
minChargeableSize := uint32(100)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(100)

privateKey1, err := crypto.GenerateKey()
assert.NoError(t, err)
paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey1.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, reservationWindow, pricePerSymbol, minNumSymbols, paymentSigner)
defer accountant.Stop()

ctx := context.Background()
Expand Down
33 changes: 33 additions & 0 deletions api/clients/disperser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type DisperserClient interface {
DispersePaidBlob(ctx context.Context, data []byte, customQuorums []uint8) (*disperser.BlobStatus, []byte, error)
GetBlobStatus(ctx context.Context, key []byte) (*disperser_rpc.BlobStatusReply, error)
RetrieveBlob(ctx context.Context, batchHeaderHash []byte, blobIndex uint32) ([]byte, error)
GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error)
}

type disperserClient struct {
Expand Down Expand Up @@ -323,3 +324,35 @@ func (c *disperserClient) RetrieveBlob(ctx context.Context, batchHeaderHash []by
}
return reply.Data, nil
}

func (c *disperserClient) GetPaymentState(ctx context.Context) (*disperser_rpc.GetPaymentStateReply, error) {
addr := fmt.Sprintf("%v:%v", c.config.Hostname, c.config.Port)
dialOptions := c.getDialOptions()
conn, err := grpc.Dial(addr, dialOptions...)
if err != nil {
return nil, err
}

disperserClient := disperser_rpc.NewDisperserClient(conn)
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
defer cancel()

accountID, err := c.accountant.paymentSigner.GetAccountID()

signature, err := c.accountant.paymentSigner.SignAccountID(accountID)
if err != nil {
return nil, err
}

request := &disperser_rpc.GetPaymentStateRequest{
AccountId: accountID,
Signature: signature,
}

reply, err := disperserClient.GetPaymentState(ctxTimeout, request)
if err != nil {
return nil, err
}

return reply, nil
}
12 changes: 9 additions & 3 deletions api/clients/eigenda_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
llConfig := NewConfig(host, port, config.ResponseTimeout, !config.DisableTLS)
binInterval := uint32(time.Minute.Seconds())
now := uint64(time.Now().Unix())
pricePerChargeable := uint32(1)
minChargeableSize := uint32(1)
pricePerSymbol := uint32(1)
minNumSymbols := uint32(1)
reservation := core.ActiveReservation{
SymbolsPerSec: 100,
StartTimestamp: now,
Expand All @@ -82,10 +82,16 @@ func NewEigenDAClient(log log.Logger, config EigenDAClientConfig) (*EigenDAClien
}

paymentSigner := auth.NewPaymentSigner(hex.EncodeToString(privateKey.D.Bytes()))
accountant := NewAccountant(reservation, onDemand, binInterval, pricePerChargeable, minChargeableSize, paymentSigner)
accountant := NewAccountant(reservation, onDemand, binInterval, pricePerSymbol, minNumSymbols, paymentSigner)

llClient := NewDisperserClient(llConfig, signer, accountant)

paymentState, err := llClient.GetPaymentState(context.Background())
if err != nil {
return nil, fmt.Errorf("error getting payment state: %w", err)
}
accountant.SetPaymentState(paymentState)

lowLevelCodec, err := codecs.BlobEncodingVersionToCodec(config.PutBlobEncodingVersion)
if err != nil {
return nil, fmt.Errorf("error initializing EigenDA client: %w", err)
Expand Down
Loading

0 comments on commit 9a085ae

Please sign in to comment.