From 934fe55fe65d2eabde6e4c296958f753dce25017 Mon Sep 17 00:00:00 2001 From: hopeyen Date: Mon, 7 Oct 2024 20:45:24 -0700 Subject: [PATCH] feat: disperser server integrated with payments metering --- disperser/apiserver/server.go | 212 +++++++++++++++++++++++-- disperser/apiserver/server_test.go | 36 ++++- disperser/cmd/apiserver/config.go | 50 +++--- disperser/cmd/apiserver/flags/flags.go | 34 ++++ disperser/cmd/apiserver/main.go | 41 ++++- 5 files changed, 341 insertions(+), 32 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 440586605..852e8b6fb 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/big" "math/rand" "net" "slices" @@ -20,6 +21,7 @@ import ( healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck" "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/disperser" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/rs" @@ -45,6 +47,7 @@ type DispersalServer struct { tx core.Transactor quorumConfig QuorumConfig + meterer *meterer.Meterer ratelimiter common.RateLimiter authenticator core.BlobRequestAuthenticator @@ -70,6 +73,7 @@ func NewDispersalServer( tx core.Transactor, _logger logging.Logger, metrics *disperser.Metrics, + meterer *meterer.Meterer, ratelimiter common.RateLimiter, rateConfig RateConfig, maxBlobSize int, @@ -91,6 +95,7 @@ func NewDispersalServer( tx: tx, metrics: metrics, logger: logger, + meterer: meterer, ratelimiter: ratelimiter, authenticator: authenticator, mu: &sync.RWMutex{}, @@ -301,6 +306,114 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut }, nil } +func (s *DispersalServer) PaidDisperseBlob(ctx context.Context, req *pb.PaidDisperseBlobRequest) (*pb.DisperseBlobReply, error) { + blob, err := s.validatePaidRequestAndGetBlob(ctx, req) + quorumNumbers := req.CustomQuorumNumbers + binIndex := req.BinIndex + cumulativePayment := req.CumulativePayment + signature := req.Signature + + if err != nil { + for _, quorumID := range req.CustomQuorumNumbers { + s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "PaidDisperseBlob") + } + s.metrics.HandleInvalidArgRpcRequest("PaidDisperseBlob") + return nil, api.NewInvalidArgError(err.Error()) + } + + reply, err := s.paidDisperseBlob(ctx, blob, quorumNumbers, binIndex, cumulativePayment, signature, "", "PaidDisperseBlob") + if err != nil { + // Note the PaidDisperseBlob already updated metrics for this error. + s.logger.Info("failed to disperse blob", "err", err) + } else { + s.metrics.HandleSuccessfulRpcRequest("PaidDisperseBlob") + } + return reply, err +} + +// Note: disperseBlob will internally update metrics upon an error; the caller doesn't need +// to track the error again. +func (s *DispersalServer) paidDisperseBlob(ctx context.Context, blob *core.Blob, quorumNumbers []uint32, binIndex uint32, cumulativePayment uint64, signature []byte, authenticatedAddress string, apiMethodName string) (*pb.DisperseBlobReply, error) { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { + s.metrics.ObserveLatency("PaidDisperseBlob", f*1000) // make milliseconds + })) + defer timer.ObserveDuration() + + securityParams := blob.RequestHeader.SecurityParams + securityParamsStrings := make([]string, len(securityParams)) + for i, sp := range securityParams { + securityParamsStrings[i] = sp.String() + } + + blobSize := len(blob.Data) + + origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true) + if err != nil { + for _, param := range securityParams { + s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) + } + s.metrics.HandleInvalidArgRpcRequest(apiMethodName) + return nil, api.NewInvalidArgError(err.Error()) + } + + s.logger.Debug("received a new paid blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", ")) + + // payments before ratelimits + if s.meterer != nil { + fmt.Println("Metering the request; lots of temporarily code") + //TODO: blob request header needs to be updated for payments; here we create a placeholder + commitment := core.NewG1Point(big.NewInt(0), big.NewInt(1)) + qn := make([]uint8, len(quorumNumbers)) + // don't care about higher bites. need to unify quorum number types + for i, v := range quorumNumbers { + qn[i] = uint8(v) + } + paymentHeader := meterer.BlobHeader{ + Commitment: *commitment, + DataLength: uint32(blobSize), + QuorumNumbers: qn, + AccountID: blob.RequestHeader.AccountID, + + // New fields + BinIndex: binIndex, + // TODO: we are thinking the contract can use uint128 for cumulative payment, + // but the definition on v2 uses uint64. Double check with team. + CumulativePayment: cumulativePayment, + Signature: signature, + } + err := s.meterer.MeterRequest(ctx, paymentHeader) + if err != nil { + return nil, err + } + } else if s.ratelimiter != nil { + err := s.checkRateLimitsAndAddRatesToHeader(ctx, blob, origin, authenticatedAddress, apiMethodName) + if err != nil { + // Note checkRateLimitsAndAddRatesToHeader already updated the metrics for this error. + return nil, err + } + } + + requestedAt := uint64(time.Now().UnixNano()) + metadataKey, err := s.blobStore.StoreBlob(ctx, blob, requestedAt) + if err != nil { + for _, param := range securityParams { + s.metrics.HandleBlobStoreFailedRequest(fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) + } + s.metrics.HandleStoreFailureRpcRequest(apiMethodName) + s.logger.Error("failed to store blob", "err", err) + return nil, api.NewInternalError("failed to store blob, please try again later") + } + + for _, param := range securityParams { + s.metrics.HandleSuccessfulRequest(fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName) + } + + return &pb.DisperseBlobReply{ + Result: pb.BlobStatus_PROCESSING, + RequestId: []byte(metadataKey.String()), + }, nil +} + func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, string, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { @@ -678,7 +791,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewInvalidArgError(err.Error()) } - stageTimer := time.Now() // Check blob rate limit if s.ratelimiter != nil { allowed, param, err := s.ratelimiter.AllowRequest(ctx, []common.RequestParams{ @@ -704,7 +816,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewResourceExhaustedError(errorString) } } - s.logger.Debug("checked retrieval blob rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalBlobRateType.Plug()), "duration", time.Since(stageTimer).String()) + s.logger.Info("received a new blob retrieval request", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex) batchHeaderHash := req.GetBatchHeaderHash() @@ -714,7 +826,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob blobIndex := req.GetBlobIndex() - stageTimer = time.Now() blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex) if err != nil { s.logger.Error("Failed to retrieve blob metadata", "err", err) @@ -734,9 +845,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewNotFoundError("no metadata found for the given batch header hash and blob index") } - s.logger.Debug("fetched blob metadata", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "duration", time.Since(stageTimer).String()) - - stageTimer = time.Now() // Check throughout rate limit blobSize := encoding.GetBlobSize(blobMetadata.ConfirmationInfo.BlobCommitment.Length) @@ -764,9 +872,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewResourceExhaustedError(errorString) } } - s.logger.Debug("checked retrieval throughput rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalThroughputType.Plug()), "duration (ms)", time.Since(stageTimer).String()) - stageTimer = time.Now() data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash) if err != nil { s.logger.Error("Failed to retrieve blob", "err", err) @@ -777,8 +883,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob s.metrics.HandleSuccessfulRpcRequest("RetrieveBlob") s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob") - s.logger.Debug("fetched blob content", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "data size (bytes)", len(data), "duration", time.Since(stageTimer).String()) - return &pb.RetrieveBlobReply{ Data: data, }, nil @@ -1022,3 +1126,91 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb return blob, nil } + +func (s *DispersalServer) validatePaidRequestAndGetBlob(ctx context.Context, req *pb.PaidDisperseBlobRequest) (*core.Blob, error) { + + data := req.GetData() + blobSize := len(data) + // The blob size in bytes must be in range [1, maxBlobSize]. + if blobSize > s.maxBlobSize { + return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize) + } + if blobSize == 0 { + return nil, fmt.Errorf("blob size must be greater than 0") + } + + if len(req.GetCustomQuorumNumbers()) > 256 { + return nil, errors.New("number of custom_quorum_numbers must not exceed 256") + } + + // validate every 32 bytes is a valid field element + _, err := rs.ToFrArray(data) + if err != nil { + s.logger.Error("failed to convert a 32bytes as a field element", "err", err) + return nil, api.NewInvalidArgError("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617") + } + + quorumConfig, err := s.updateQuorumConfig(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get quorum config: %w", err) + } + + if len(req.GetCustomQuorumNumbers()) > int(quorumConfig.QuorumCount) { + return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums") + } + + seenQuorums := make(map[uint8]struct{}) + // The quorum ID must be in range [0, 254]. It'll actually be converted + // to uint8, so it cannot be greater than 254. + // No check with required quorums + for i := range req.GetCustomQuorumNumbers() { + + if req.GetCustomQuorumNumbers()[i] > core.MaxQuorumID { + return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, 254], but found %d", req.GetCustomQuorumNumbers()[i]) + } + + quorumID := uint8(req.GetCustomQuorumNumbers()[i]) + if quorumID >= quorumConfig.QuorumCount { + return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, %d], but found %d", s.quorumConfig.QuorumCount-1, quorumID) + } + + if _, ok := seenQuorums[quorumID]; ok { + return nil, fmt.Errorf("custom_quorum_numbers must not contain duplicates") + } + seenQuorums[quorumID] = struct{}{} + + } + + if len(seenQuorums) == 0 { + return nil, fmt.Errorf("the blob must be sent to at least one quorum") + } + + params := make([]*core.SecurityParam, len(seenQuorums)) + i := 0 + for quorumID := range seenQuorums { + params[i] = &core.SecurityParam{ + QuorumID: core.QuorumID(quorumID), + AdversaryThreshold: quorumConfig.SecurityParams[quorumID].AdversaryThreshold, + ConfirmationThreshold: quorumConfig.SecurityParams[quorumID].ConfirmationThreshold, + } + err = params[i].Validate() + if err != nil { + return nil, fmt.Errorf("invalid request: %w", err) + } + i++ + } + + header := core.BlobRequestHeader{ + BlobAuthHeader: core.BlobAuthHeader{ + AccountID: req.AccountId, + }, + SecurityParams: params, + } + + blob := &core.Blob{ + RequestHeader: header, + Data: data, + } + + return blob, nil +} diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 40a7a6747..25385632b 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -10,7 +10,9 @@ import ( "testing" "time" + commonaws "github.com/Layr-Labs/eigenda/common/aws" "github.com/Layr-Labs/eigenda/core/auth" + "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" @@ -645,6 +647,38 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { if err != nil { panic("failed to create bucket store") } + meterConfig := meterer.Config{ + PricePerChargeable: 1, + GlobalBytesPerSecond: 1000, + ReservationWindow: 60, + } + + paymentChainState := meterer.NewOnchainPaymentState() + + paymentChainState.InitializeOnchainPaymentState() + + clientConfig := commonaws.ClientConfig{ + Region: "us-east-1", + AccessKey: "localstack", + SecretAccessKey: "localstack", + EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"), + } + + store, err := meterer.NewOffchainStore( + clientConfig, + "reservations", + "ondemand", + "global", + logger, + ) + if err != nil { + teardown() + panic("failed to create offchain store") + } + meterer, err := meterer.NewMeterer(meterConfig, meterer.TimeoutConfig{}, paymentChainState, store, logger) + if err != nil { + panic("failed to create meterer") + } ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger) rateConfig := apiserver.RateConfig{ @@ -701,7 +735,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer { return apiserver.NewDispersalServer(disperser.ServerConfig{ GrpcPort: "51001", GrpcTimeout: 1 * time.Second, - }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig, testMaxBlobSize) + }, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), meterer, ratelimiter, rateConfig, testMaxBlobSize) } func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) { diff --git a/disperser/cmd/apiserver/config.go b/disperser/cmd/apiserver/config.go index db5740c34..a31ee0d72 100644 --- a/disperser/cmd/apiserver/config.go +++ b/disperser/cmd/apiserver/config.go @@ -13,19 +13,24 @@ import ( ) type Config struct { - AwsClientConfig aws.ClientConfig - BlobstoreConfig blobstore.Config - ServerConfig disperser.ServerConfig - LoggerConfig common.LoggerConfig - MetricsConfig disperser.MetricsConfig - RatelimiterConfig ratelimit.Config - RateConfig apiserver.RateConfig - EnableRatelimiter bool - BucketTableName string - ShadowTableName string - BucketStoreSize int - EthClientConfig geth.EthClientConfig - MaxBlobSize int + AwsClientConfig aws.ClientConfig + BlobstoreConfig blobstore.Config + ServerConfig disperser.ServerConfig + LoggerConfig common.LoggerConfig + MetricsConfig disperser.MetricsConfig + RatelimiterConfig ratelimit.Config + RateConfig apiserver.RateConfig + EnableRatelimiter bool + EnablePaymentMeterer bool + MinChargeableSize uint32 // in bytes + PricePerChargeable uint32 + OnDemandGlobalLimit uint64 + ReservationWindow uint32 // in seconds + BucketTableName string + ShadowTableName string + BucketStoreSize int + EthClientConfig geth.EthClientConfig + MaxBlobSize int BLSOperatorStateRetrieverAddr string EigenDAServiceManagerAddr string @@ -64,13 +69,18 @@ func NewConfig(ctx *cli.Context) (Config, error) { HTTPPort: ctx.GlobalString(flags.MetricsHTTPPort.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetrics.Name), }, - RatelimiterConfig: ratelimiterConfig, - RateConfig: rateConfig, - EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name), - BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), - BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), - EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), - MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), + RatelimiterConfig: ratelimiterConfig, + RateConfig: rateConfig, + EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name), + EnablePaymentMeterer: ctx.GlobalBool(flags.EnablePaymentMeterer.Name), + ReservationWindow: uint32(ctx.GlobalUint64(flags.ReservationWindow.Name)), + MinChargeableSize: uint32(ctx.GlobalUint64(flags.MinChargeableSize.Name)), + PricePerChargeable: uint32(ctx.GlobalUint64(flags.PricePerChargeable.Name)), + OnDemandGlobalLimit: ctx.GlobalUint64(flags.OnDemandGlobalLimit.Name), + BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), + BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), + EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), + MaxBlobSize: ctx.GlobalInt(flags.MaxBlobSize.Name), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name), diff --git a/disperser/cmd/apiserver/flags/flags.go b/disperser/cmd/apiserver/flags/flags.go index 1211af62c..c377eb7f8 100644 --- a/disperser/cmd/apiserver/flags/flags.go +++ b/disperser/cmd/apiserver/flags/flags.go @@ -76,6 +76,35 @@ var ( Required: true, EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_METRICS"), } + EnablePaymentMeterer = cli.BoolFlag{ + Name: common.PrefixFlag(FlagPrefix, "enable-payment-meterer"), + Usage: "enable payment meterer", + EnvVar: common.PrefixEnvVar(envVarPrefix, "ENABLE_PAYMENT_METERER"), + } + ReservationWindow = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "reservation-window"), + Usage: "reservation window (seconds)", + Value: 375, // Interval that allows 3 32MB blobs at minimal throughput + EnvVar: common.PrefixEnvVar(envVarPrefix, "RESERVATION_WINDOW"), + } + MinChargeableSize = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "min-chargeable-size"), + Usage: "min chargeable size", + Value: 1000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "MIN_CHARGEABLE_SIZE"), + } + PricePerChargeable = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "price-per-chargeable"), + Usage: "price per chargeable", + Value: 1000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "PRICE_PER_CHARGEABLE"), + } + OnDemandGlobalLimit = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "on-demand-global-limit"), + Usage: "on demand global limit (bytes per second)", + Value: 1000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "ON_DEMAND_GLOBAL_LIMIT"), + } EnableRatelimiter = cli.BoolFlag{ Name: common.PrefixFlag(FlagPrefix, "enable-ratelimiter"), Usage: "enable rate limiter", @@ -116,6 +145,11 @@ var optionalFlags = []cli.Flag{ MetricsHTTPPort, EnableMetrics, EnableRatelimiter, + EnablePaymentMeterer, + ReservationWindow, + MinChargeableSize, + OnDemandGlobalLimit, + PricePerChargeable, BucketStoreSize, GrpcTimeoutFlag, ShadowTableNameFlag, diff --git a/disperser/cmd/apiserver/main.go b/disperser/cmd/apiserver/main.go index 5394ac1d8..6431c794e 100644 --- a/disperser/cmd/apiserver/main.go +++ b/disperser/cmd/apiserver/main.go @@ -8,9 +8,11 @@ import ( "time" "github.com/Layr-Labs/eigenda/common" + mt "github.com/Layr-Labs/eigenda/core/meterer" "github.com/Layr-Labs/eigenda/disperser/apiserver" "github.com/Layr-Labs/eigenda/disperser/common/blobstore" "github.com/Layr-Labs/eigenda/encoding/fft" + "github.com/Layr-Labs/eigensdk-go/logging" "github.com/prometheus/client_golang/prometheus" "github.com/Layr-Labs/eigenda/common/aws/dynamodb" @@ -59,7 +61,6 @@ func RunDisperserServer(ctx *cli.Context) error { if err != nil { return err } - client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.Address{}, logger) if err != nil { logger.Error("Cannot create chain.Client", "err", err) @@ -96,6 +97,43 @@ func RunDisperserServer(ctx *cli.Context) error { reg := prometheus.NewRegistry() + var meterer *mt.Meterer + if config.EnablePaymentMeterer { + mtConfig := mt.Config{ + PricePerChargeable: config.PricePerChargeable, + GlobalBytesPerSecond: config.OnDemandGlobalLimit, + MinChargeableSize: config.MinChargeableSize, + ReservationWindow: config.ReservationWindow, + } + + paymentChainState := mt.NewOnchainPaymentState() + + paymentChainState.InitializeOnchainPaymentState() + + store, err := mt.NewOffchainStore( + config.AwsClientConfig, + "reservations", + "ondemand", + "global", + logger, + ) + if err != nil { + return fmt.Errorf("failed to create offchain store: %w", err) + } + // add some default sensible configs + meterer, err = mt.NewMeterer( + mtConfig, + mt.TimeoutConfig{}, + paymentChainState, + store, + logging.NewNoopLogger(), + // metrics.NewNoopMetrics(), + ) + if err != nil { + return fmt.Errorf("failed to create meterer: %w", err) + } + } + var ratelimiter common.RateLimiter if config.EnableRatelimiter { globalParams := config.RatelimiterConfig.GlobalRateParams @@ -131,6 +169,7 @@ func RunDisperserServer(ctx *cli.Context) error { transactor, logger, metrics, + meterer, ratelimiter, config.RateConfig, config.MaxBlobSize,