From df6ca37e16b98716b64f701d082f99c7da615f4a Mon Sep 17 00:00:00 2001 From: hopeyen Date: Fri, 25 Oct 2024 11:37:35 -0700 Subject: [PATCH] refactor: server payment state init --- core/meterer/meterer.go | 5 ----- core/meterer/meterer_test.go | 9 ++++++--- core/meterer/onchain_state.go | 6 +++--- disperser/apiserver/server.go | 6 +++--- disperser/apiserver/server_test.go | 4 ++++ disperser/cmd/apiserver/main.go | 3 +++ test/integration_test.go | 4 ++++ 7 files changed, 23 insertions(+), 14 deletions(-) diff --git a/core/meterer/meterer.go b/core/meterer/meterer.go index 6ac0e29fb..94972b1f9 100644 --- a/core/meterer/meterer.go +++ b/core/meterer/meterer.go @@ -52,11 +52,6 @@ func (m *Meterer) Start(ctx context.Context) { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() - // initial tick immediately upto Start - if err := m.ChainPaymentState.RefreshOnchainPaymentState(ctx, nil); err != nil { - m.logger.Error("Failed to make initial query to the on-chain state", "error", err) - } - for { select { case <-ticker.C: diff --git a/core/meterer/meterer_test.go b/core/meterer/meterer_test.go index ffe25fc28..693422b8a 100644 --- a/core/meterer/meterer_test.go +++ b/core/meterer/meterer_test.go @@ -2,7 +2,6 @@ package meterer_test import ( "context" - "errors" "fmt" "math/big" "os" @@ -145,6 +144,10 @@ func setup(_ *testing.M) { } paymentChainState.On("RefreshOnchainPaymentState", testifymock.Anything).Return(nil).Maybe() + if err := paymentChainState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + panic("failed to make initial query to the on-chain state") + } + // add some default sensible configs mt = meterer.NewMeterer( config, @@ -178,7 +181,7 @@ func TestMetererReservations(t *testing.T) { paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.MatchedBy(func(account string) bool { return account == accountID2 })).Return(account2Reservations, nil) - paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.Anything).Return(core.ActiveReservation{}, errors.New("reservation not found")) + paymentChainState.On("GetActiveReservationByAccount", testifymock.Anything, testifymock.Anything).Return(core.ActiveReservation{}, fmt.Errorf("reservation not found")) // test invalid quorom ID blob, header := createMetererInput(1, 0, 1000, []uint8{0, 1, 2}, accountID1) @@ -262,7 +265,7 @@ func TestMetererOnDemand(t *testing.T) { paymentChainState.On("GetOnDemandPaymentByAccount", testifymock.Anything, testifymock.MatchedBy(func(account string) bool { return account == accountID2 })).Return(account2OnDemandPayments, nil) - paymentChainState.On("GetOnDemandPaymentByAccount", testifymock.Anything, testifymock.Anything).Return(core.OnDemandPayment{}, errors.New("payment not found")) + paymentChainState.On("GetOnDemandPaymentByAccount", testifymock.Anything, testifymock.Anything).Return(core.OnDemandPayment{}, fmt.Errorf("payment not found")) paymentChainState.On("GetOnDemandQuorumNumbers", testifymock.Anything).Return(quorumNumbers, nil) // test unregistered account diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 4d1892809..c1c77c2c0 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -2,7 +2,7 @@ package meterer import ( "context" - "errors" + "fmt" "sync" "github.com/Layr-Labs/eigenda/core" @@ -165,7 +165,7 @@ func (pcs *OnchainPaymentState) GetActiveReservationByAccountOnChain(ctx context } res, err := pcs.tx.GetActiveReservationByAccount(ctx, blockNumber, accountID) if err != nil { - return core.ActiveReservation{}, errors.New("reservation account not found on-chain") + return core.ActiveReservation{}, fmt.Errorf("reservation account not found on-chain: %w", err) } return res, nil } @@ -194,7 +194,7 @@ func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccountOnChain(ctx context.C } res, err := pcs.tx.GetOnDemandPaymentByAccount(ctx, blockNumber, accountID) if err != nil { - return core.OnDemandPayment{}, errors.New("on-demand not found on-chain") + return core.OnDemandPayment{}, fmt.Errorf("on-demand not found on-chain: %w", err) } return res, nil } diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 6697af0c9..93a4a943b 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -284,7 +284,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut s.logger.Debug("received a new blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", ")) // If paymentHeader is not empty, we use the meterer, otherwise we use the ratelimiter if the ratelimiter is available - if *paymentHeader != (core.PaymentMetadata{}) { + if paymentHeader != nil && *paymentHeader != (core.PaymentMetadata{}) { err := s.meterer.MeterRequest(ctx, *blob, *paymentHeader) if err != nil { return nil, err @@ -983,7 +983,7 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb } if len(req.GetCustomQuorumNumbers()) > 256 { - return nil, errors.New("number of custom_quorum_numbers must not exceed 256") + return nil, fmt.Errorf("number of custom_quorum_numbers must not exceed 256") } // validate every 32 bytes is a valid field element @@ -999,7 +999,7 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb } if len(req.GetCustomQuorumNumbers()) > int(quorumConfig.QuorumCount) { - return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums") + return nil, fmt.Errorf("number of custom_quorum_numbers must not exceed number of quorums") } seenQuorums := make(map[uint8]struct{}) diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 26462fa8e..4b06fd1d4 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -652,6 +652,10 @@ func newTestServer(transactor core.Writer, testName string) *apiserver.Dispersal mockState := &mock.MockOnchainPaymentState{} mockState.On("RefreshOnchainPaymentState", tmock.Anything).Return(nil).Maybe() + if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + panic("failed to make initial query to the on-chain state") + } + // append test name to each table name for an unique store table_names := []string{"reservations_server_" + testName, "ondemand_server_" + testName, "global_server_" + testName} err = meterer.CreateReservationTable(awsConfig, table_names[0]) diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index eb01b5bfc..74941e140 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -113,6 +113,9 @@ func RunDisperserServer(ctx *cli.Context) error { if err != nil { return fmt.Errorf("failed to create onchain payment state: %w", err) } + if err := paymentChainState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + return fmt.Errorf("failed to make initial query to the on-chain state: %w", err) + } offchainStore, err := mt.NewOffchainStore( config.AwsClientConfig, diff --git a/test/integration_test.go b/test/integration_test.go index 374c70211..668a4d71f 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -289,6 +289,10 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser } mockState.On("RefreshOnchainPaymentState", mock.Anything).Return(nil).Maybe() + if err := mockState.RefreshOnchainPaymentState(context.Background(), nil); err != nil { + panic("failed to make initial query to the on-chain state") + } + meterer := meterer.NewMeterer(meterer.Config{}, mockState, offchainStore, logger) server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, meterer, ratelimiter, rateConfig, testMaxBlobSize)