Skip to content

Commit

Permalink
feat: disperser server integrated with payments metering
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 8, 2024
1 parent 45df40e commit 934fe55
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 32 deletions.
212 changes: 202 additions & 10 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"math/rand"
"net"
"slices"
Expand All @@ -20,6 +21,7 @@ import (
healthcheck "github.com/Layr-Labs/eigenda/common/healthcheck"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/rs"
Expand All @@ -45,6 +47,7 @@ type DispersalServer struct {
tx core.Transactor
quorumConfig QuorumConfig

meterer *meterer.Meterer
ratelimiter common.RateLimiter
authenticator core.BlobRequestAuthenticator

Expand All @@ -70,6 +73,7 @@ func NewDispersalServer(
tx core.Transactor,
_logger logging.Logger,
metrics *disperser.Metrics,
meterer *meterer.Meterer,
ratelimiter common.RateLimiter,
rateConfig RateConfig,
maxBlobSize int,
Expand All @@ -91,6 +95,7 @@ func NewDispersalServer(
tx: tx,
metrics: metrics,
logger: logger,
meterer: meterer,
ratelimiter: ratelimiter,
authenticator: authenticator,
mu: &sync.RWMutex{},
Expand Down Expand Up @@ -301,6 +306,114 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut
}, nil
}

func (s *DispersalServer) PaidDisperseBlob(ctx context.Context, req *pb.PaidDisperseBlobRequest) (*pb.DisperseBlobReply, error) {
blob, err := s.validatePaidRequestAndGetBlob(ctx, req)
quorumNumbers := req.CustomQuorumNumbers
binIndex := req.BinIndex
cumulativePayment := req.CumulativePayment
signature := req.Signature

if err != nil {
for _, quorumID := range req.CustomQuorumNumbers {
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprint(quorumID), len(req.GetData()), "PaidDisperseBlob")
}
s.metrics.HandleInvalidArgRpcRequest("PaidDisperseBlob")
return nil, api.NewInvalidArgError(err.Error())
}

reply, err := s.paidDisperseBlob(ctx, blob, quorumNumbers, binIndex, cumulativePayment, signature, "", "PaidDisperseBlob")
if err != nil {
// Note the PaidDisperseBlob already updated metrics for this error.
s.logger.Info("failed to disperse blob", "err", err)
} else {
s.metrics.HandleSuccessfulRpcRequest("PaidDisperseBlob")
}
return reply, err
}

// Note: disperseBlob will internally update metrics upon an error; the caller doesn't need
// to track the error again.
func (s *DispersalServer) paidDisperseBlob(ctx context.Context, blob *core.Blob, quorumNumbers []uint32, binIndex uint32, cumulativePayment uint64, signature []byte, authenticatedAddress string, apiMethodName string) (*pb.DisperseBlobReply, error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) {
s.metrics.ObserveLatency("PaidDisperseBlob", f*1000) // make milliseconds
}))
defer timer.ObserveDuration()

securityParams := blob.RequestHeader.SecurityParams
securityParamsStrings := make([]string, len(securityParams))
for i, sp := range securityParams {
securityParamsStrings[i] = sp.String()
}

blobSize := len(blob.Data)

origin, err := common.GetClientAddress(ctx, s.rateConfig.ClientIPHeader, 2, true)
if err != nil {
for _, param := range securityParams {
s.metrics.HandleFailedRequest(codes.InvalidArgument.String(), fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName)
}
s.metrics.HandleInvalidArgRpcRequest(apiMethodName)
return nil, api.NewInvalidArgError(err.Error())
}

s.logger.Debug("received a new paid blob dispersal request", "authenticatedAddress", authenticatedAddress, "origin", origin, "blobSizeBytes", blobSize, "securityParams", strings.Join(securityParamsStrings, ", "))

// payments before ratelimits
if s.meterer != nil {
fmt.Println("Metering the request; lots of temporarily code")
//TODO: blob request header needs to be updated for payments; here we create a placeholder
commitment := core.NewG1Point(big.NewInt(0), big.NewInt(1))
qn := make([]uint8, len(quorumNumbers))
// don't care about higher bites. need to unify quorum number types
for i, v := range quorumNumbers {
qn[i] = uint8(v)
}
paymentHeader := meterer.BlobHeader{
Commitment: *commitment,
DataLength: uint32(blobSize),
QuorumNumbers: qn,
AccountID: blob.RequestHeader.AccountID,

// New fields
BinIndex: binIndex,
// TODO: we are thinking the contract can use uint128 for cumulative payment,
// but the definition on v2 uses uint64. Double check with team.
CumulativePayment: cumulativePayment,
Signature: signature,
}
err := s.meterer.MeterRequest(ctx, paymentHeader)
if err != nil {
return nil, err
}
} else if s.ratelimiter != nil {
err := s.checkRateLimitsAndAddRatesToHeader(ctx, blob, origin, authenticatedAddress, apiMethodName)
if err != nil {
// Note checkRateLimitsAndAddRatesToHeader already updated the metrics for this error.
return nil, err
}
}

requestedAt := uint64(time.Now().UnixNano())
metadataKey, err := s.blobStore.StoreBlob(ctx, blob, requestedAt)
if err != nil {
for _, param := range securityParams {
s.metrics.HandleBlobStoreFailedRequest(fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName)
}
s.metrics.HandleStoreFailureRpcRequest(apiMethodName)
s.logger.Error("failed to store blob", "err", err)
return nil, api.NewInternalError("failed to store blob, please try again later")
}

for _, param := range securityParams {
s.metrics.HandleSuccessfulRequest(fmt.Sprintf("%d", param.QuorumID), blobSize, apiMethodName)
}

return &pb.DisperseBlobReply{
Result: pb.BlobStatus_PROCESSING,
RequestId: []byte(metadataKey.String()),
}, nil
}

func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, string, error) {
unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID]
if !ok {
Expand Down Expand Up @@ -678,7 +791,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewInvalidArgError(err.Error())
}

stageTimer := time.Now()
// Check blob rate limit
if s.ratelimiter != nil {
allowed, param, err := s.ratelimiter.AllowRequest(ctx, []common.RequestParams{
Expand All @@ -704,7 +816,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewResourceExhaustedError(errorString)
}
}
s.logger.Debug("checked retrieval blob rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalBlobRateType.Plug()), "duration", time.Since(stageTimer).String())

s.logger.Info("received a new blob retrieval request", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex)

batchHeaderHash := req.GetBatchHeaderHash()
Expand All @@ -714,7 +826,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob

blobIndex := req.GetBlobIndex()

stageTimer = time.Now()
blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
Expand All @@ -734,9 +845,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewNotFoundError("no metadata found for the given batch header hash and blob index")
}

s.logger.Debug("fetched blob metadata", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "duration", time.Since(stageTimer).String())

stageTimer = time.Now()
// Check throughout rate limit
blobSize := encoding.GetBlobSize(blobMetadata.ConfirmationInfo.BlobCommitment.Length)

Expand Down Expand Up @@ -764,9 +872,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewResourceExhaustedError(errorString)
}
}
s.logger.Debug("checked retrieval throughput rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalThroughputType.Plug()), "duration (ms)", time.Since(stageTimer).String())

stageTimer = time.Now()
data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash)
if err != nil {
s.logger.Error("Failed to retrieve blob", "err", err)
Expand All @@ -777,8 +883,6 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
s.metrics.HandleSuccessfulRpcRequest("RetrieveBlob")
s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob")

s.logger.Debug("fetched blob content", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "data size (bytes)", len(data), "duration", time.Since(stageTimer).String())

return &pb.RetrieveBlobReply{
Data: data,
}, nil
Expand Down Expand Up @@ -1022,3 +1126,91 @@ func (s *DispersalServer) validateRequestAndGetBlob(ctx context.Context, req *pb

return blob, nil
}

func (s *DispersalServer) validatePaidRequestAndGetBlob(ctx context.Context, req *pb.PaidDisperseBlobRequest) (*core.Blob, error) {

data := req.GetData()
blobSize := len(data)
// The blob size in bytes must be in range [1, maxBlobSize].
if blobSize > s.maxBlobSize {
return nil, fmt.Errorf("blob size cannot exceed %v Bytes", s.maxBlobSize)
}
if blobSize == 0 {
return nil, fmt.Errorf("blob size must be greater than 0")
}

if len(req.GetCustomQuorumNumbers()) > 256 {
return nil, errors.New("number of custom_quorum_numbers must not exceed 256")
}

// validate every 32 bytes is a valid field element
_, err := rs.ToFrArray(data)
if err != nil {
s.logger.Error("failed to convert a 32bytes as a field element", "err", err)
return nil, api.NewInvalidArgError("encountered an error to convert a 32-bytes into a valid field element, please use the correct format where every 32bytes(big-endian) is less than 21888242871839275222246405745257275088548364400416034343698204186575808495617")
}

quorumConfig, err := s.updateQuorumConfig(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get quorum config: %w", err)
}

if len(req.GetCustomQuorumNumbers()) > int(quorumConfig.QuorumCount) {
return nil, errors.New("number of custom_quorum_numbers must not exceed number of quorums")
}

seenQuorums := make(map[uint8]struct{})
// The quorum ID must be in range [0, 254]. It'll actually be converted
// to uint8, so it cannot be greater than 254.
// No check with required quorums
for i := range req.GetCustomQuorumNumbers() {

if req.GetCustomQuorumNumbers()[i] > core.MaxQuorumID {
return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, 254], but found %d", req.GetCustomQuorumNumbers()[i])
}

quorumID := uint8(req.GetCustomQuorumNumbers()[i])
if quorumID >= quorumConfig.QuorumCount {
return nil, fmt.Errorf("custom_quorum_numbers must be in range [0, %d], but found %d", s.quorumConfig.QuorumCount-1, quorumID)
}

if _, ok := seenQuorums[quorumID]; ok {
return nil, fmt.Errorf("custom_quorum_numbers must not contain duplicates")
}
seenQuorums[quorumID] = struct{}{}

}

if len(seenQuorums) == 0 {
return nil, fmt.Errorf("the blob must be sent to at least one quorum")
}

params := make([]*core.SecurityParam, len(seenQuorums))
i := 0
for quorumID := range seenQuorums {
params[i] = &core.SecurityParam{
QuorumID: core.QuorumID(quorumID),
AdversaryThreshold: quorumConfig.SecurityParams[quorumID].AdversaryThreshold,
ConfirmationThreshold: quorumConfig.SecurityParams[quorumID].ConfirmationThreshold,
}
err = params[i].Validate()
if err != nil {
return nil, fmt.Errorf("invalid request: %w", err)
}
i++
}

header := core.BlobRequestHeader{
BlobAuthHeader: core.BlobAuthHeader{
AccountID: req.AccountId,
},
SecurityParams: params,
}

blob := &core.Blob{
RequestHeader: header,
Data: data,
}

return blob, nil
}
36 changes: 35 additions & 1 deletion disperser/apiserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"testing"
"time"

commonaws "github.com/Layr-Labs/eigenda/common/aws"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/Layr-Labs/eigenda/core/meterer"
"github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/disperser/apiserver"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
Expand Down Expand Up @@ -645,6 +647,38 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
if err != nil {
panic("failed to create bucket store")
}
meterConfig := meterer.Config{
PricePerChargeable: 1,
GlobalBytesPerSecond: 1000,
ReservationWindow: 60,
}

paymentChainState := meterer.NewOnchainPaymentState()

paymentChainState.InitializeOnchainPaymentState()

clientConfig := commonaws.ClientConfig{
Region: "us-east-1",
AccessKey: "localstack",
SecretAccessKey: "localstack",
EndpointURL: fmt.Sprintf("http://0.0.0.0:4566"),
}

store, err := meterer.NewOffchainStore(
clientConfig,
"reservations",
"ondemand",
"global",
logger,
)
if err != nil {
teardown()
panic("failed to create offchain store")
}
meterer, err := meterer.NewMeterer(meterConfig, meterer.TimeoutConfig{}, paymentChainState, store, logger)
if err != nil {
panic("failed to create meterer")
}
ratelimiter := ratelimit.NewRateLimiter(prometheus.NewRegistry(), globalParams, bucketStore, logger)

rateConfig := apiserver.RateConfig{
Expand Down Expand Up @@ -701,7 +735,7 @@ func newTestServer(transactor core.Transactor) *apiserver.DispersalServer {
return apiserver.NewDispersalServer(disperser.ServerConfig{
GrpcPort: "51001",
GrpcTimeout: 1 * time.Second,
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), ratelimiter, rateConfig, testMaxBlobSize)
}, queue, transactor, logger, disperser.NewMetrics(prometheus.NewRegistry(), "9001", logger), meterer, ratelimiter, rateConfig, testMaxBlobSize)
}

func disperseBlob(t *testing.T, server *apiserver.DispersalServer, data []byte) (pb.BlobStatus, uint, []byte) {
Expand Down
Loading

0 comments on commit 934fe55

Please sign in to comment.