From 5c33742ac0b8c495bd44aa36a1594a00fbc86d02 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Tue, 22 Oct 2024 14:50:11 -0700 Subject: [PATCH] feat: init disperser server payment metering --- core/meterer/onchain_state.go | 48 ++++-- disperser/apiserver/server.go | 193 +++++++++++++++++++++++++ disperser/apiserver/server_test.go | 32 +++- disperser/cmd/apiserver/config.go | 56 ++++--- disperser/cmd/apiserver/flags/flags.go | 50 +++++++ disperser/cmd/apiserver/main.go | 38 +++++ test/integration_test.go | 90 +++++++++++- 7 files changed, 474 insertions(+), 33 deletions(-) diff --git a/core/meterer/onchain_state.go b/core/meterer/onchain_state.go index 952872ce8..75b851f06 100644 --- a/core/meterer/onchain_state.go +++ b/core/meterer/onchain_state.go @@ -19,6 +19,8 @@ type OnchainPayment interface { GetOnDemandQuorumNumbers(ctx context.Context) ([]uint8, error) } +var _ OnchainPayment = (*OnchainPaymentState)(nil) + type OnchainPaymentState struct { tx *eth.Transactor @@ -85,14 +87,13 @@ func (pcs *OnchainPaymentState) RefreshOnchainPaymentState(ctx context.Context, } // GetActiveReservationByAccount returns a pointer to the active reservation for the given account ID; no writes will be made to the reservation -func (pcs *OnchainPaymentState) GetActiveReservationByAccount(ctx context.Context, blockNumber uint32, accountID string) (core.ActiveReservation, error) { +func (pcs *OnchainPaymentState) GetActiveReservationByAccount(ctx context.Context, accountID string) (core.ActiveReservation, error) { if reservation, ok := pcs.ActiveReservations[accountID]; ok { return reservation, nil } - // pulls the chain state - res, err := pcs.tx.GetActiveReservationByAccount(ctx, blockNumber, accountID) + res, err := pcs.GetActiveReservationByAccountOnChain(ctx, accountID) if err != nil { - return core.ActiveReservation{}, errors.New("payment not found") + return core.ActiveReservation{}, err } pcs.ReservationsLock.Lock() @@ -101,15 +102,27 @@ func (pcs *OnchainPaymentState) GetActiveReservationByAccount(ctx context.Contex return res, nil } +// GetActiveReservationByAccountOnChain returns on-chain reservation for the given account ID +func (pcs *OnchainPaymentState) GetActiveReservationByAccountOnChain(ctx context.Context, accountID string) (core.ActiveReservation, error) { + blockNumber, err := pcs.tx.GetCurrentBlockNumber(ctx) + if err != nil { + return core.ActiveReservation{}, err + } + res, err := pcs.tx.GetActiveReservationByAccount(ctx, blockNumber, accountID) + if err != nil { + return core.ActiveReservation{}, errors.New("reservation account not found on-chain") + } + return res, nil +} + // GetOnDemandPaymentByAccount returns a pointer to the on-demand payment for the given account ID; no writes will be made to the payment -func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint32, accountID string) (core.OnDemandPayment, error) { +func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccount(ctx context.Context, accountID string) (core.OnDemandPayment, error) { if payment, ok := pcs.OnDemandPayments[accountID]; ok { return payment, nil } - // pulls the chain state - res, err := pcs.tx.GetOnDemandPaymentByAccount(ctx, blockNumber, accountID) + res, err := pcs.GetOnDemandPaymentByAccountOnChain(ctx, accountID) if err != nil { - return core.OnDemandPayment{}, errors.New("payment not found") + return core.OnDemandPayment{}, err } pcs.OnDemandLocks.Lock() @@ -118,6 +131,23 @@ func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccount(ctx context.Context, return res, nil } -func (pcs *OnchainPaymentState) GetOnDemandQuorumNumbers(ctx context.Context, blockNumber uint32) ([]uint8, error) { +func (pcs *OnchainPaymentState) GetOnDemandPaymentByAccountOnChain(ctx context.Context, accountID string) (core.OnDemandPayment, error) { + // pulls the chain state + blockNumber, err := pcs.tx.GetCurrentBlockNumber(ctx) + if err != nil { + return core.OnDemandPayment{}, err + } + res, err := pcs.tx.GetOnDemandPaymentByAccount(ctx, blockNumber, accountID) + if err != nil { + return core.OnDemandPayment{}, errors.New("on-demand not found on-chain") + } + return res, nil +} + +func (pcs *OnchainPaymentState) GetOnDemandQuorumNumbers(ctx context.Context) ([]uint8, error) { + blockNumber, err := pcs.tx.GetCurrentBlockNumber(ctx) + if err != nil { + return nil, err + } return pcs.tx.GetRequiredQuorumNumbers(ctx, blockNumber) } diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 3c47cf982..af553f1c3 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/big" "math/rand" "net" "slices" @@ -18,6 +19,7 @@ import ( healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/rs" @@ -43,6 +45,7 @@ type DispersalServer struct { tx core.Transactor quorumConfig QuorumConfig + meterer *meterer.Meterer ratelimiter common.RateLimiter authenticator core.BlobRequestAuthenticator @@ -68,6 +71,7 @@ func NewDispersalServer( tx core.Transactor, _logger logging.Logger, metrics *disperser.Metrics, + meterer *meterer.Meterer, ratelimiter common.RateLimiter, rateConfig RateConfig, maxBlobSize int, @@ -89,6 +93,7 @@ func NewDispersalServer( tx: tx, metrics: metrics, logger: logger, + meterer: meterer, ratelimiter: ratelimiter, authenticator: authenticator, mu: &sync.RWMutex{}, @@ -303,6 +308,106 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut }, nil } +func (s *DispersalServer) DispersePaidBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*pb.DisperseBlobReply, error) { + blob, err := s.validatePaidRequestAndGetBlob(ctx, req) + quorumNumbers := req.CustomQuorumNumbers + binIndex := req.BinIndex + cumulativePayment := req.CumulativePayment + signature := req.Signature + + if err != nil { + for _, quorumID := range req.CustomQuorumNumbers { + s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "DispersePaidBlob") + } + s.metrics.HandleInvalidArgRpcRequest("DispersePaidBlob") + return nil, api.NewInvalidArgError(err.Error()) + } + + reply, err := s.dispersePaidBlob(ctx, blob, quorumNumbers, binIndex, big.NewInt(int64(cumulativePayment)), signature, "", "DispersePaidBlob") + if err != nil { + // Note the DispersePaidBlob already updated metrics for this error. + s.logger.Info("failed to disperse blob", "err", err) + } else { + s.metrics.HandleSuccessfulRpcRequest("DispersePaidBlob") + } + return reply, err +} + +// dispersePaidBlob checks for payment metering, otherwise does the same thing as disperseBlob +func (s *DispersalServer) dispersePaidBlob(ctx context.Context, blob *core.Blob, quorumNumbers []uint32, binIndex uint32, cumulativePayment *big.Int, signature []byte, authenticatedAddress string, apiMethodName string) (*pb.DisperseBlobReply, error) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("DispersePaidBlob", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + securityParams := blob.RequestHeader.SecurityParams + securityParamsStrings := make([]string, len(securityParams)) + for i, sp := range securityParams { + securityParamsStrings[i] = sp.String() + } + + blobSize := len(blob.Data) + + origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true) + if err != nil { + for _, param := range securityParams { + s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) + } + s.metrics.HandleInvalidArgRpcRequest(apiMethodName) + return nil, api.NewInvalidArgError(err.Error()) + } + + s.logger.Debug("received a new paid blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", ")) + + // payments before ratelimits + if s.meterer != nil { + fmt.Println("Metering the request; lots of temporarily code") + //TODO: blob request header needs to be updated for payments; here we create a placeholder + qn := make([]uint8, len(quorumNumbers)) + // don't care about higher bites. need to unify quorum number types + for i, v := range quorumNumbers { + qn[i] = uint8(v) + } + paymentHeader := core.PaymentMetadata{ + AccountID: blob.RequestHeader.AccountID, + BinIndex: binIndex, + // TODO: we are thinking the contract can use uint128 for cumulative payment, + // but the definition on v2 uses uint64. Double check with team. + CumulativePayment: cumulativePayment, + } + err := s.meterer.MeterRequest(ctx, *blob, paymentHeader) + if err != nil { + return nil, err + } + } else if s.ratelimiter != nil { + err := s.checkRateLimitsAndAddRatesToHeader(ctx, blob, origin, authenticatedAddress, apiMethodName) + if err != nil { + // Note checkRateLimitsAndAddRatesToHeader already updated the metrics for this error. + return nil, err + } + } + + requestedAt := uint64(time.Now().UnixNano()) + metadataKey, err := s.blobStore.StoreBlob(ctx, blob, requestedAt) + if err != nil { + for _, param := range securityParams { + s.metrics.HandleBlobStoreFailedRequest(fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) + } + s.metrics.HandleStoreFailureRpcRequest(apiMethodName) + s.logger.Error("failed to store blob", "err", err) + return nil, api.NewInternalError("failed to store blob, please try again later") + } + + for _, param := range securityParams { + s.metrics.HandleSuccessfulRequest(fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) + } + + return &pb.DisperseBlobReply{ + Result: pb.BlobStatus_PROCESSING, + RequestId: []byte(metadataKey.String()), + }, nil +} + func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, string, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { @@ -1020,3 +1125,91 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb return blob, nil } + +func (s *DispersalServer) validatePaidRequestAndGetBlob(ctx context.Context, req *pb.DispersePaidBlobRequest) (*core.Blob, error) { + + data := req.GetData() + blobSize := len(data) + // The blob size in bytes must be in range [1, maxBlobSize]. + if blobSize > s.maxBlobSize { + return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize) + } + if blobSize == 0 { + return nil, fmt.Errorf("blob size must be greater than 0") + } + + if len(req.GetCustomQuorumNumbers()) > 256 { + return nil, errors.New("number of custom_quorum_numbers must not exceed 256") + } + + // validate every 32 bytes is a valid field element + _, err := rs.ToFrArray(data) + if err != nil { + s.logger.Error("failed to convert a 32bytes as a field element", "err", err) + return nil, api.NewInvalidArgError("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617") + } + + quorumConfig, err := s.updateQuorumConfig(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get quorum config: %w", err) + } + + if len(req.GetCustomQuorumNumbers()) > int(quorumConfig.QuorumCount) { + return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums") + } + + seenQuorums := make(map[uint8]struct{}) + // The quorum ID must be in range [0, 254]. It'll actually be converted + // to uint8, so it cannot be greater than 254. + // No check with required quorums + for i := range req.GetCustomQuorumNumbers() { + + if req.GetCustomQuorumNumbers()[i] > core.MaxQuorumID { + return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, 254], but found %d", req.GetCustomQuorumNumbers()[i]) + } + + quorumID := uint8(req.GetCustomQuorumNumbers()[i]) + if quorumID >= quorumConfig.QuorumCount { + return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, %d], but found %d", s.quorumConfig.QuorumCount-1, quorumID) + } + + if _, ok := seenQuorums[quorumID]; ok { + return nil, fmt.Errorf("custom_quorum_numbers must not contain duplicates") + } + seenQuorums[quorumID] = struct{}{} + + } + + if len(seenQuorums) == 0 { + return nil, fmt.Errorf("the blob must be sent to at least one quorum") + } + + params := make([]*core.SecurityParam, len(seenQuorums)) + i := 0 + for quorumID := range seenQuorums { + params[i] = &core.SecurityParam{ + QuorumID: core.QuorumID(quorumID), + AdversaryThreshold: quorumConfig.SecurityParams[quorumID].AdversaryThreshold, + ConfirmationThreshold: quorumConfig.SecurityParams[quorumID].ConfirmationThreshold, + } + err = params[i].Validate() + if err != nil { + return nil, fmt.Errorf("invalid request: %w", err) + } + i++ + } + + header := core.BlobRequestHeader{ + BlobAuthHeader: core.BlobAuthHeader{ + AccountID: req.AccountId, + }, + SecurityParams: params, + } + + blob := &core.Blob{ + RequestHeader: header, + Data: data, + } + + return blob, nil +} diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 1a7d5f7e7..ca87619a8 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -10,7 +10,9 @@ import ( "testing" "time" + commonaws "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" @@ -647,6 +649,34 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create bucket store") } + meterConfig := meterer.Config{ + PricePerSymbol: 1, + MinNumSymbols: 1, + GlobalSymbolsPerSecond: 1000, + ReservationWindow: 60, + } + + mockState := &mock.MockOnchainPaymentState{} + + clientConfig := commonaws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"), + } + + store, err := meterer.NewOffchainStore( + clientConfig, + "reservations", + "ondemand", + "global", + logger, + ) + if err != nil { + teardown() + panic("failed to create offchain store") + } + meterer := meterer.NewMeterer(meterConfig, mockState, store, logger) ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ @@ -703,7 +733,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig, testMaxBlobSize) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), meterer, ratelimiter, rateConfig, testMaxBlobSize) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index 480a0fb4a..5ca3f7e65 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -22,20 +22,27 @@ const ( ) type Config struct { - DisperserVersion DisperserVersion - AwsClientConfig aws.ClientConfig - BlobstoreConfig blobstore.Config - ServerConfig disperser.ServerConfig - LoggerConfig common.LoggerConfig - MetricsConfig disperser.MetricsConfig - RatelimiterConfig ratelimit.Config - RateConfig apiserver.RateConfig - EnableRatelimiter bool - BucketTableName string - ShadowTableName string - BucketStoreSize int - EthClientConfig geth.EthClientConfig - MaxBlobSize int + DisperserVersion DisperserVersion + AwsClientConfig aws.ClientConfig + BlobstoreConfig blobstore.Config + ServerConfig disperser.ServerConfig + LoggerConfig common.LoggerConfig + MetricsConfig disperser.MetricsConfig + RatelimiterConfig ratelimit.Config + RateConfig apiserver.RateConfig + EnableRatelimiter bool + EnablePaymentMeterer bool + MinNumSymbols uint32 + PricePerSymbol uint32 + OnDemandGlobalLimit uint64 + ReservationWindow uint32 // in seconds + PaymentChainID uint64 + PaymentContractAddress string + BucketTableName string + ShadowTableName string + BucketStoreSize int + EthClientConfig geth.EthClientConfig + MaxBlobSize int BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string @@ -79,13 +86,20 @@ func NewConfig(ctx *cli.Context) (Config, error) { HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name), }, - RatelimiterConfig: ratelimiterConfig, - RateConfig: rateConfig, - EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name), - BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), - BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), - EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), - MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), + RatelimiterConfig: ratelimiterConfig, + RateConfig: rateConfig, + EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name), + EnablePaymentMeterer: ctx.GlobalBool(flags.EnablePaymentMeterer.Name), + ReservationWindow: uint32(ctx.GlobalUint64(flags.ReservationWindow.Name)), + MinNumSymbols: uint32(ctx.GlobalUint64(flags.MinNumSymbols.Name)), + PricePerSymbol: uint32(ctx.GlobalUint64(flags.PricePerSymbol.Name)), + OnDemandGlobalLimit: ctx.GlobalUint64(flags.OnDemandGlobalLimit.Name), + PaymentChainID: ctx.GlobalUint64(flags.PaymentChainID.Name), + PaymentContractAddress: ctx.GlobalString(flags.PaymentContractAddress.Name), + BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), + BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), + EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), + MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index 9402f7f9f..a65236d9b 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -83,6 +83,49 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_METRICS"), } + EnablePaymentMeterer = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-payment-meterer"), + Usage: "enable payment meterer", + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_PAYMENT_METERER"), + } + ReservationWindow = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "reservation-window"), + Usage: "duration of all reservation bins in seconds, used to calculate bin indices", + Value: 375, // Interval that allows 3 32MB blobs at minimal throughput + EnvVar: common.PrefixEnvVar(envVarPrefix, "RESERVATION_WINDOW"), + } + MinNumSymbols = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "min-chargeable-size"), + Usage: "minimum number of symbols charged", + Value: 1024, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MIN_NUM_SYMBOLS"), + } + PricePerSymbol = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "price-per-symbol"), + Usage: "price per symbol in gwei, used for on-demand payments", + Value: 1000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "PRICE_PER_SYMBOL"), + } + OnDemandGlobalLimit = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "on-demand-global-limit"), + Usage: "on demand global limit (bytes per second)", + Value: 1000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ON_DEMAND_GLOBAL_LIMIT"), + } + PaymentChainID = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "payment-chain-id"), + Usage: "chain id of the payment chain", + Value: 17000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "PAYMENT_CHAIN_ID"), + Required: false, + } + PaymentContractAddress = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "payment-contract-address"), + Usage: "address of the payment contract", + Value: "", + EnvVar: common.PrefixEnvVar(envVarPrefix, "PAYMENT_CONTRACT_ADDRESS"), + Required: false, + } EnableRatelimiter = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-ratelimiter"), Usage: "enable rate limiter", @@ -124,6 +167,13 @@ var optionalFlags = []cli.Flag{ MetricsHTTPPort, EnableMetrics, EnableRatelimiter, + EnablePaymentMeterer, + ReservationWindow, + MinNumSymbols, + PricePerSymbol, + OnDemandGlobalLimit, + PaymentChainID, + PaymentContractAddress, BucketStoreSize, GrpcTimeoutFlag, ShadowTableNameFlag, diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index d205e44fc..12e1f4c18 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -8,9 +8,11 @@ import ( "time" "github.com/Layr-Labs/eigenda/common" + mt "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" "github.com/Layr-Labs/eigenda/encoding/fft" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" @@ -101,6 +103,41 @@ func RunDisperserServer(ctx *cli.Context) error { reg := prometheus.NewRegistry() + var meterer *mt.Meterer + if config.EnablePaymentMeterer { + mtConfig := mt.Config{ + PricePerSymbol: config.PricePerSymbol, + GlobalSymbolsPerSecond: config.OnDemandGlobalLimit, + MinNumSymbols: config.MinNumSymbols, + ReservationWindow: config.ReservationWindow, + ChainReadTimeout: 3 * time.Second, + } + + paymentChainState, err := mt.NewOnchainPaymentState(context.Background(), transactor) + if err != nil { + return fmt.Errorf("failed to create onchain payment state: %w", err) + } + + offchainStore, err := mt.NewOffchainStore( + config.AwsClientConfig, + "reservations", + "ondemand", + "global", + logger, + ) + if err != nil { + return fmt.Errorf("failed to create offchain store: %w", err) + } + // add some default sensible configs + meterer = mt.NewMeterer( + mtConfig, + &paymentChainState, + offchainStore, + logging.NewNoopLogger(), + // metrics.NewNoopMetrics(), + ) + } + var ratelimiter common.RateLimiter if config.EnableRatelimiter { globalParams := config.RatelimiterConfig.GlobalRateParams @@ -136,6 +173,7 @@ func RunDisperserServer(ctx *cli.Context) error { transactor, logger, metrics, + meterer, ratelimiter, config.RateConfig, config.MaxBlobSize, diff --git a/test/integration_test.go b/test/integration_test.go index 693f75e49..8d1b476e7 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "encoding/hex" + "errors" "fmt" "log" + "math" "math/big" "net" "net/http" @@ -20,9 +22,14 @@ import ( "github.com/Layr-Labs/eigenda/encoding/kzg/prover" "github.com/Layr-Labs/eigenda/encoding/kzg/verifier" "github.com/Layr-Labs/eigenda/encoding/utils/codec" + "github.com/Layr-Labs/eigenda/inabox/deploy" "github.com/consensys/gnark-crypto/ecc/bn254/fp" + "github.com/ory/dockertest/v3" clientsmock "github.com/Layr-Labs/eigenda/api/clients/mock" + commonaws "github.com/Layr-Labs/eigenda/common/aws" + commondynamodb "github.com/Layr-Labs/eigenda/common/aws/dynamodb" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/disperser/apiserver" dispatcher "github.com/Layr-Labs/eigenda/disperser/batcher/grpc" "github.com/Layr-Labs/eigenda/disperser/encoder" @@ -52,6 +59,7 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -65,6 +73,14 @@ var ( gettysburgAddressBytes = []byte("Fourscore and seven years ago our fathers brought forth, on this continent, a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation so conceived, and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field, as a final resting-place for those who here gave their lives, that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate, we cannot consecrate—we cannot hallow—this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note, nor long remember what we say here, but it can never forget what they did here. It is for us the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us—that from these honored dead we take increased devotion to that cause for which they here gave the last full measure of devotion—that we here highly resolve that these dead shall not have died in vain—that this nation, under God, shall have a new birth of freedom, and that government of the people, by the people, for the people, shall not perish from the earth.") serviceManagerAddress = gethcommon.HexToAddress("0x0000000000000000000000000000000000000000") handleBatchLivenessChan = make(chan time.Time, 1) + + dockertestPool *dockertest.Pool + dockertestResource *dockertest.Resource + dynamoClient *commondynamodb.Client + clientConfig commonaws.ClientConfig + + deployLocalStack bool + localStackPort = "4565" ) const ( @@ -195,7 +211,69 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser tx := &coremock.MockTransactor{} tx.On("GetCurrentBlockNumber").Return(uint64(100), nil) tx.On("GetQuorumCount").Return(1, nil) - server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, ratelimiter, rateConfig, testMaxBlobSize) + + minimumNumSymbols := uint32(128) + pricePerSymbol := uint32(1) + reservationLimit := uint64(1024) + paymentLimit := big.NewInt(512) + meterConfig := meterer.Config{ + GlobalSymbolsPerSecond: 1024, + PricePerSymbol: pricePerSymbol, + MinNumSymbols: minimumNumSymbols, + ReservationWindow: uint32(60), + } + + // this is disperser client's private key used in tests + privateKey, err := crypto.HexToECDSA("0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded") // Remove "0x" prefix + if err != nil { + panic("failed to convert hex to ECDSA") + } + publicKey := crypto.PubkeyToAddress(privateKey.PublicKey).Hex() + mockState := &coremock.MockOnchainPaymentState{} + mockState.On("GetActiveReservationByAccount", mock.Anything, mock.MatchedBy(func(account string) bool { + return account == publicKey + })).Return(core.ActiveReservation{SymbolsPerSec: reservationLimit, StartTimestamp: 0, EndTimestamp: math.MaxUint32, QuorumSplit: []byte{50, 50}, QuorumNumbers: []uint8{0, 1}}, nil) + mockState.On("GetActiveReservationByAccount", mock.Anything, mock.Anything).Return(core.ActiveReservation{}, errors.New("reservation not found")) + + mockState.On("GetOnDemandPaymentByAccount", mock.Anything, mock.MatchedBy(func(account string) bool { + return account == publicKey + })).Return(core.OnDemandPayment{CumulativePayment: paymentLimit}, nil) + mockState.On("GetOnDemandPaymentByAccount", mock.Anything, mock.Anything).Return(core.OnDemandPayment{}, errors.New("payment not found")) + mockState.On("GetOnDemandQuorumNumbers", mock.Anything).Return([]uint8{0, 1}, nil) + + deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") + if !deployLocalStack { + localStackPort = os.Getenv("LOCALSTACK_PORT") + } + + if deployLocalStack { + var err error + dockertestPool, dockertestResource, err = deploy.StartDockertestWithLocalstackContainer(localStackPort) + if err != nil { + teardown() + panic("failed to start localstack container") + } + } + + clientConfig = commonaws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:%s", localStackPort), + } + + offchainStore, err := meterer.NewOffchainStore( + clientConfig, + "reservations", + "ondemand", + "global", + logging.NewNoopLogger(), + ) + if err != nil { + panic("failed to create offchain store") + } + meterer := meterer.NewMeterer(meterConfig, mockState, offchainStore, logger) + server := apiserver.NewDispersalServer(serverConfig, store, tx, logger, disperserMetrics, meterer, ratelimiter, rateConfig, testMaxBlobSize) return TestDisperser{ batcher: batcher, @@ -349,7 +427,9 @@ func mustMakeRetriever(cst core.IndexedChainState, logger logging.Logger) (*comm } func TestMain(m *testing.M) { - os.Exit(m.Run()) + code := m.Run() + teardown() + os.Exit(code) } func TestDispersalAndRetrieval(t *testing.T) { @@ -582,3 +662,9 @@ func TestDispersalAndRetrieval(t *testing.T) { restored = bytes.TrimRight(restored, "\x00") assert.Equal(t, gettysburgAddressBytes, restored[:len(gettysburgAddressBytes)]) } + +func teardown() { + if deployLocalStack { + deploy.PurgeDockertestResources(dockertestPool, dockertestResource) + } +}