Skip to content

Commit

Permalink
refactor: rebase with new types
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 23, 2024
1 parent e1796cd commit 072e007
Show file tree
Hide file tree
Showing 11 changed files with 174 additions and 120 deletions.
56 changes: 37 additions & 19 deletions api/clients/accountant.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package clients

import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"math/big"
"time"

commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/meterer"
)
Expand All @@ -26,23 +27,25 @@ type Accountant struct {
// local accounting
// contains 3 bins; 0 for current bin, 1 for next bin, 2 for overflowed bin
binUsages []uint64
cumulativePayment uint64
cumulativePayment *big.Int
stopRotation chan struct{}

accountantSigner *ecdsa.PrivateKey
paymentSigner core.PaymentSigner
}

func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerChargeable uint32, minChargeableSize uint32, accountantSigner *ecdsa.PrivateKey) Accountant {
func NewAccountant(reservation core.ActiveReservation, onDemand core.OnDemandPayment, reservationWindow uint32, pricePerChargeable uint32, minChargeableSize uint32, paymentSigner core.PaymentSigner) Accountant {
//TODO: client storage; currently every instance starts fresh but on-chain or a small store makes more sense
// Also client is currently responsible for supplying network params, we need to add RPC in order to be automatic
a := Accountant{
reservation: reservation,
onDemand: onDemand,
reservationWindow: reservationWindow,
pricePerChargeable: pricePerChargeable,
minChargeableSize: minChargeableSize,
binUsages: []uint64{0, 0, 0},
cumulativePayment: 0,
cumulativePayment: big.NewInt(0),
stopRotation: make(chan struct{}),
accountantSigner: accountantSigner,
paymentSigner: paymentSigner,
}
go a.startBinRotation()
return a
Expand Down Expand Up @@ -76,44 +79,59 @@ func (a *Accountant) Stop() {
// accountant calculates and records payment information
func (a *Accountant) BlobPaymentInfo(ctx context.Context, dataLength uint64) (uint32, *big.Int, error) {
//TODO: do we need to lock the binUsages here in case the blob rotation happens in the middle of the function?
binUsage := a.binUsages[0] + dataLength
// binUsage := a.binUsages[0] + dataLength
a.binUsages[0] += dataLength
now := time.Now().Unix()
currentBinIndex := meterer.GetBinIndex(uint64(now), a.reservationWindow)

// first attempt to use the active reservation
binLimit := a.reservation.SymbolsPerSec * uint64(a.reservationWindow)
if binUsage <= binLimit {
if a.binUsages[0] <= binLimit {
return currentBinIndex, big.NewInt(0), nil
}

// Allow one overflow when the overflow bin is empty, the current usage and new length are both less than the limit
if a.binUsages[2] == 0 && binUsage-dataLength < binLimit && dataLength <= binLimit {
a.binUsages[2] += binUsage - binLimit
if a.binUsages[2] == 0 && a.binUsages[0]-dataLength < binLimit && dataLength <= binLimit {
a.binUsages[2] += a.binUsages[0] - binLimit
return currentBinIndex, big.NewInt(0), nil
}

// reservation not available, attempt on-demand
//todo: rollback if disperser respond with some type of rejection?
incrementRequired := a.PaymentCharged(uint32(dataLength))
a.cumulativePayment += incrementRequired
if a.cumulativePayment <= a.onDemand.CumulativePayment.Uint64() {
return 0, big.NewInt(int64(a.cumulativePayment)), nil
a.binUsages[0] -= dataLength
incrementRequired := big.NewInt(int64(a.PaymentCharged(uint32(dataLength))))
a.cumulativePayment.Add(a.cumulativePayment, incrementRequired)
if a.cumulativePayment.Cmp(a.onDemand.CumulativePayment) <= 0 {
return 0, a.cumulativePayment, nil
}
fmt.Println("Accountant cannot approve payment for this blob")
return 0, big.NewInt(0), errors.New("Accountant cannot approve payment for this blob")
}

// accountant provides and records payment information
func (a *Accountant) AccountBlob(ctx context.Context, dataLength uint64, quorums []uint8) (*core.PaymentMetadata, error) {
func (a *Accountant) AccountBlob(ctx context.Context, dataLength uint64, quorums []uint8) (*commonpb.PaymentHeader, []byte, error) {
binIndex, cumulativePayment, err := a.BlobPaymentInfo(ctx, dataLength)
if err != nil {
return nil, err
return nil, nil, err
}

return &core.PaymentMetadata{
AccountID: string(a.accountantSigner.PublicKey.X.Bytes()),
accountID, err := a.paymentSigner.GetAccountID()
if err != nil {
return nil, nil, err
}
pm := &core.PaymentMetadata{
AccountID: accountID,
BinIndex: binIndex,
CumulativePayment: cumulativePayment,
}, nil
}
protoPaymentHeader := pm.ConvertToProtoPaymentHeader()

signature, err := a.paymentSigner.SignBlobPayment(protoPaymentHeader)
if err != nil {
return nil, nil, err
}

return protoPaymentHeader, signature, nil
}

// PaymentCharged returns the chargeable price for a given data length
Expand Down
Loading

0 comments on commit 072e007

Please sign in to comment.