Skip to content

Commit

Permalink
Enable local bucket store (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Nov 13, 2023
1 parent 045ed3f commit 0adbe17
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 21 deletions.
8 changes: 7 additions & 1 deletion disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const maxBlobSize = 1024 * 512 // 512 KiB

type DispersalServer struct {
pb.UnimplementedDisperserServer
mu sync.Mutex
mu *sync.Mutex

config disperser.ServerConfig

Expand Down Expand Up @@ -64,6 +64,7 @@ func NewDispersalServer(
logger: logger,
ratelimiter: ratelimiter,
rateConfig: rateConfig,
mu: &sync.Mutex{},
}
}

Expand Down Expand Up @@ -164,6 +165,11 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob
}

func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *core.Blob, origin string) error {

// TODO(robert): Remove these locks once we have resolved ratelimiting approach
s.mu.Lock()
defer s.mu.Unlock()

for _, param := range blob.RequestHeader.SecurityParams {

rates, ok := s.rateConfig.QuorumRateInfos[param.QuorumID]
Expand Down
2 changes: 2 additions & 0 deletions disperser/cmd/disperserserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Config struct {
RateConfig apiserver.RateConfig
EnableRatelimiter bool
BucketTableName string
BucketStoreSize int
EthClientConfig geth.EthClientConfig

BLSOperatorStateRetrieverAddr string
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
RateConfig: apiserver.ReadCLIConfig(ctx),
EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name),
BucketTableName: ctx.GlobalString(flags.BucketTableName.Name),
BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name),
EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx),

BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
Expand Down
12 changes: 10 additions & 2 deletions disperser/cmd/disperserserver/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,17 @@ var (
}
BucketTableName = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "rate-bucket-table-name"),
Usage: "name of the dynamodb table to store rate limiter buckets",
Value: "BucketStore",
Usage: "name of the dynamodb table to store rate limiter buckets. If not provided, a local store will be used",
Value: "",
EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_TABLE_NAME"),
}
BucketStoreSize = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "rate-bucket-store-size"),
Usage: "size (max number of entries) of the local store to use for rate limiting buckets",
Value: 100_000,
EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_STORE_SIZE"),
Required: false,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -87,6 +94,7 @@ var optionalFlags = []cli.Flag{
MetricsHTTPPort,
EnableMetrics,
EnableRatelimiter,
BucketStoreSize,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
42 changes: 25 additions & 17 deletions disperser/cmd/disperserserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,6 @@ func RunDisperserServer(ctx *cli.Context) error {
return err
}

bucketName := config.BlobstoreConfig.BucketName
s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger)
if err != nil {
return err
}
logger.Info("Initialized S3 client", "bucket", bucketName)

dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}
client, err := geth.NewClient(config.EthClientConfig, logger)
if err != nil {
logger.Error("Cannot create chain.Client", err)
Expand All @@ -87,20 +76,39 @@ func RunDisperserServer(ctx *cli.Context) error {
if err != nil || storeDurationBlocks == 0 {
return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err)
}

s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger)
if err != nil {
return err
}

dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}

bucketName := config.BlobstoreConfig.BucketName
logger.Info("Creating blob store", "bucket", bucketName)
blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second)
blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger)

var ratelimiter common.RateLimiter
if config.EnableRatelimiter {
globalParams := config.RatelimiterConfig.GlobalRateParams

dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
var bucketStore common.KVStore[common.RateBucketParams]
if config.BucketTableName != "" {
dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger)
if err != nil {
return err
}
bucketStore = store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName)
} else {
bucketStore, err = store.NewLocalParamStore[common.RateBucketParams](config.BucketStoreSize)
if err != nil {
return err
}
}
logger.Info("Initialized dynamodb client")

bucketStore := store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName)
ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger)
}

Expand Down
3 changes: 2 additions & 1 deletion inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath,
v := DisperserVars{
DISPERSER_SERVER_S3_BUCKET_NAME: "test-eigenda-blobstore",
DISPERSER_SERVER_DYNAMODB_TABLE_NAME: "test-BlobMetadata",
DISPERSER_SERVER_RATE_BUCKET_TABLE_NAME: "test-BucketStore",
DISPERSER_SERVER_RATE_BUCKET_TABLE_NAME: "",
DISPERSER_SERVER_RATE_BUCKET_STORE_SIZE: "100000",
DISPERSER_SERVER_GRPC_PORT: grpcPort,
DISPERSER_SERVER_ENABLE_METRICS: "true",
DISPERSER_SERVER_METRICS_HTTP_PORT: "9093",
Expand Down
2 changes: 2 additions & 0 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type DisperserVars struct {

DISPERSER_SERVER_ENABLE_RATELIMITER string

DISPERSER_SERVER_RATE_BUCKET_STORE_SIZE string

DISPERSER_SERVER_CHAIN_RPC string

DISPERSER_SERVER_PRIVATE_KEY string
Expand Down
7 changes: 7 additions & 0 deletions node/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"context"
"fmt"
"sync"

"net"

Expand All @@ -29,6 +30,8 @@ type Server struct {
logger common.Logger

ratelimiter common.RateLimiter

mu *sync.Mutex
}

// NewServer creates a new Server instance with the provided parameters.
Expand All @@ -41,6 +44,7 @@ func NewServer(config *node.Config, node *node.Node, logger common.Logger, ratel
logger: logger,
node: node,
ratelimiter: ratelimiter,
mu: &sync.Mutex{},
}
}

Expand Down Expand Up @@ -187,7 +191,10 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques

encodedBlobSize := core.GetBlobSize(blobHeader.QuorumInfos[in.GetQuorumId()].EncodedBlobLength)
rate := blobHeader.QuorumInfos[in.GetQuorumId()].QuorumRate

s.mu.Lock()
allow, err := s.ratelimiter.AllowRequest(ctx, retrieverID, encodedBlobSize, rate)
s.mu.Unlock()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0adbe17

Please sign in to comment.