Skip to content

Commit

Permalink
refactor: server payment state init
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 25, 2024
1 parent f5835a8 commit df6ca37
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 14 deletions.
5 changes: 0 additions & 5 deletions core/meterer/meterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func (m *Meterer) Start(ctx context.Context) {
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()

// initial tick immediately upto Start
if err := m.ChainPaymentState.RefreshOnchainPaymentState(ctx, nil); err != nil {
m.logger.Error("Failed to make initial query to the on-chain state", "error", err)
}

for {
select {
case <-ticker.C:
Expand Down
9 changes: 6 additions & 3 deletions core/meterer/meterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package meterer_test

import (
"context"
"errors"
"fmt"
"math/big"
"os"
Expand Down Expand Up @@ -145,6 +144,10 @@ func setup(_ *testing.M) {
}

paymentChainState.On("RefreshOnchainPaymentState", testifymock.Anything).Return(nil).Maybe()
if err := paymentChainState.RefreshOnchainPaymentState(context.Background(), nil); err != nil {
panic("failed to make initial query to the on-chain state")
}

// add some default sensible configs
mt = meterer.NewMeterer(
config,
Expand Down Expand Up @@ -178,7 +181,7 @@ func TestMetererReservations(t *testing.T) {
paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.MatchedBy(func(account string) bool {
return account == accountID2
})).Return(account2Reservations, nil)
paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.Anything).Return(core.ActiveReservation{}, errors.New("reservation not found"))
paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.Anything).Return(core.ActiveReservation{}, fmt.Errorf("reservation not found"))

// test invalid quorom ID
blob, header := createMetererInput(1, 0, 1000, []uint8{0, 1, 2}, accountID1)
Expand Down Expand Up @@ -262,7 +265,7 @@ func TestMetererOnDemand(t *testing.T) {
paymentChainState.On("GetOnDemandPaymentByAccount", testifymock.Anything, testifymock.MatchedBy(func(account string) bool {
return account == accountID2
})).Return(account2OnDemandPayments, nil)
paymentChainState.On("GetOnDemandPaymentByAccount", testifymock.Anything, testifymock.Anything).Return(core.OnDemandPayment{}, errors.New("payment not found"))
paymentChainState.On("GetOnDemandPaymentByAccount", testifymock.Anything, testifymock.Anything).Return(core.OnDemandPayment{}, fmt.Errorf("payment not found"))
paymentChainState.On("GetOnDemandQuorumNumbers", testifymock.Anything).Return(quorumNumbers, nil)

// test unregistered account
Expand Down
6 changes: 3 additions & 3 deletions core/meterer/onchain_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package meterer

import (
"context"
"errors"
"fmt"
"sync"

"github.com/Layr-Labs/eigenda/core"
Expand Down Expand Up @@ -165,7 +165,7 @@ func (pcs *OnchainPaymentState) GetActiveReservationByAccountOnChain(ctx context
}
res, err := pcs.tx.GetActiveReservationByAccount(ctx, blockNumber, accountID)
if err != nil {
return core.ActiveReservation{}, errors.New("reservation account not found on-chain")
return core.ActiveReservation{}, fmt.Errorf("reservation account not found on-chain: %w", err)
}
return res, nil
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccountOnChain(ctx context.C
}
res, err := pcs.tx.GetOnDemandPaymentByAccount(ctx, blockNumber, accountID)
if err != nil {
return core.OnDemandPayment{}, errors.New("on-demand not found on-chain")
return core.OnDemandPayment{}, fmt.Errorf("on-demand not found on-chain: %w", err)
}
return res, nil
}
Expand Down
6 changes: 3 additions & 3 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
s.logger.Debug("received a new blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", "))

// If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available
if *paymentHeader != (core.PaymentMetadata{}) {
if paymentHeader != nil && *paymentHeader != (core.PaymentMetadata{}) {
err := s.meterer.MeterRequest(ctx, *blob, *paymentHeader)
if err != nil {
return nil, err
Expand Down Expand Up @@ -983,7 +983,7 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb
}

if len(req.GetCustomQuorumNumbers()) > 256 {
return nil, errors.New("number of custom_quorum_numbers must not exceed 256")
return nil, fmt.Errorf("number of custom_quorum_numbers must not exceed 256")
}

// validate every 32 bytes is a valid field element
Expand All @@ -999,7 +999,7 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb
}

if len(req.GetCustomQuorumNumbers()) > int(quorumConfig.QuorumCount) {
return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums")
return nil, fmt.Errorf("number of custom_quorum_numbers must not exceed number of quorums")
}

seenQuorums := make(map[uint8]struct{})
Expand Down
4 changes: 4 additions & 0 deletions disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,10 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal

mockState := &mock.MockOnchainPaymentState{}
mockState.On("RefreshOnchainPaymentState", tmock.Anything).Return(nil).Maybe()
if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil {
panic("failed to make initial query to the on-chain state")
}

// append test name to each table name for an unique store
table_names := []string{"reservations_server_" + testName, "ondemand_server_" + testName, "global_server_" + testName}
err = meterer.CreateReservationTable(awsConfig, table_names[0])
Expand Down
3 changes: 3 additions & 0 deletions disperser/cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ func RunDisperserServer(ctx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to create onchain payment state: %w", err)
}
if err := paymentChainState.RefreshOnchainPaymentState(context.Background(), nil); err != nil {
return fmt.Errorf("failed to make initial query to the on-chain state: %w", err)
}

offchainStore, err := mt.NewOffchainStore(
config.AwsClientConfig,
Expand Down
4 changes: 4 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
}

mockState.On("RefreshOnchainPaymentState", mock.Anything).Return(nil).Maybe()
if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil {
panic("failed to make initial query to the on-chain state")
}

meterer := meterer.NewMeterer(meterer.Config{}, mockState, offchainStore, logger)
server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, meterer, ratelimiter, rateConfig, testMaxBlobSize)

Expand Down

0 comments on commit df6ca37

Please sign in to comment.