From 0adbe174609f4cf70154c9f5fdbf1179c852d827 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Mon, 13 Nov 2023 15:59:54 -0800 Subject: [PATCH] Enable local bucket store (#13) --- disperser/apiserver/server.go | 8 +++- disperser/cmd/disperserserver/config.go | 2 + disperser/cmd/disperserserver/flags/flags.go | 12 +++++- disperser/cmd/disperserserver/main.go | 42 ++++++++++++-------- inabox/deploy/config.go | 3 +- inabox/deploy/env_vars.go | 2 + node/grpc/server.go | 7 ++++ 7 files changed, 55 insertions(+), 21 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 6490c8a97..d3c7395cf 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -27,7 +27,7 @@ const maxBlobSize = 1024 * 512 // 512 KiB type DispersalServer struct { pb.UnimplementedDisperserServer - mu sync.Mutex + mu *sync.Mutex config disperser.ServerConfig @@ -64,6 +64,7 @@ func NewDispersalServer( logger: logger, ratelimiter: ratelimiter, rateConfig: rateConfig, + mu: &sync.Mutex{}, } } @@ -164,6 +165,11 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob } func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *core.Blob, origin string) error { + + // TODO(robert): Remove these locks once we have resolved ratelimiting approach + s.mu.Lock() + defer s.mu.Unlock() + for _, param := range blob.RequestHeader.SecurityParams { rates, ok := s.rateConfig.QuorumRateInfos[param.QuorumID] diff --git a/disperser/cmd/disperserserver/config.go b/disperser/cmd/disperserserver/config.go index e6462b946..48887e6d6 100644 --- a/disperser/cmd/disperserserver/config.go +++ b/disperser/cmd/disperserserver/config.go @@ -22,6 +22,7 @@ type Config struct { RateConfig apiserver.RateConfig EnableRatelimiter bool BucketTableName string + BucketStoreSize int EthClientConfig geth.EthClientConfig BLSOperatorStateRetrieverAddr string @@ -53,6 +54,7 @@ func NewConfig(ctx *cli.Context) (Config, error) { RateConfig: apiserver.ReadCLIConfig(ctx), EnableRatelimiter: ctx.GlobalBool(flags.EnableRatelimiter.Name), BucketTableName: ctx.GlobalString(flags.BucketTableName.Name), + BucketStoreSize: ctx.GlobalInt(flags.BucketStoreSize.Name), EthClientConfig: geth.ReadEthClientConfigRPCOnly(ctx), BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name), diff --git a/disperser/cmd/disperserserver/flags/flags.go b/disperser/cmd/disperserserver/flags/flags.go index af30e0fd6..9d046e874 100644 --- a/disperser/cmd/disperserserver/flags/flags.go +++ b/disperser/cmd/disperserserver/flags/flags.go @@ -68,10 +68,17 @@ var ( } BucketTableName = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "rate-bucket-table-name"), - Usage: "name of the dynamodb table to store rate limiter buckets", - Value: "BucketStore", + Usage: "name of the dynamodb table to store rate limiter buckets. If not provided, a local store will be used", + Value: "", EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_TABLE_NAME"), } + BucketStoreSize = cli.UintFlag{ + Name: common.PrefixFlag(FlagPrefix, "rate-bucket-store-size"), + Usage: "size (max number of entries) of the local store to use for rate limiting buckets", + Value: 100_000, + EnvVar: common.PrefixEnvVar(envVarPrefix, "RATE_BUCKET_STORE_SIZE"), + Required: false, + } ) var requiredFlags = []cli.Flag{ @@ -87,6 +94,7 @@ var optionalFlags = []cli.Flag{ MetricsHTTPPort, EnableMetrics, EnableRatelimiter, + BucketStoreSize, } // Flags contains the list of configuration options available to the binary. diff --git a/disperser/cmd/disperserserver/main.go b/disperser/cmd/disperserserver/main.go index d7953b702..066f40d81 100644 --- a/disperser/cmd/disperserserver/main.go +++ b/disperser/cmd/disperserserver/main.go @@ -58,17 +58,6 @@ func RunDisperserServer(ctx *cli.Context) error { return err } - bucketName := config.BlobstoreConfig.BucketName - s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger) - if err != nil { - return err - } - logger.Info("Initialized S3 client", "bucket", bucketName) - - dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger) - if err != nil { - return err - } client, err := geth.NewClient(config.EthClientConfig, logger) if err != nil { logger.Error("Cannot create chain.Client", err) @@ -87,6 +76,19 @@ func RunDisperserServer(ctx *cli.Context) error { if err != nil || storeDurationBlocks == 0 { return fmt.Errorf("failed to get STORE_DURATION_BLOCKS: %w", err) } + + s3Client, err := s3.NewClient(context.Background(), config.AwsClientConfig, logger) + if err != nil { + return err + } + + dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger) + if err != nil { + return err + } + + bucketName := config.BlobstoreConfig.BucketName + logger.Info("Creating blob store", "bucket", bucketName) blobMetadataStore := blobstore.NewBlobMetadataStore(dynamoClient, logger, config.BlobstoreConfig.TableName, time.Duration((storeDurationBlocks+blockStaleMeasure)*12)*time.Second) blobStore := blobstore.NewSharedStorage(bucketName, s3Client, blobMetadataStore, logger) @@ -94,13 +96,19 @@ func RunDisperserServer(ctx *cli.Context) error { if config.EnableRatelimiter { globalParams := config.RatelimiterConfig.GlobalRateParams - dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger) - if err != nil { - return err + var bucketStore common.KVStore[common.RateBucketParams] + if config.BucketTableName != "" { + dynamoClient, err := dynamodb.NewClient(config.AwsClientConfig, logger) + if err != nil { + return err + } + bucketStore = store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName) + } else { + bucketStore, err = store.NewLocalParamStore[common.RateBucketParams](config.BucketStoreSize) + if err != nil { + return err + } } - logger.Info("Initialized dynamodb client") - - bucketStore := store.NewDynamoParamStore[common.RateBucketParams](dynamoClient, config.BucketTableName) ratelimiter = ratelimit.NewRateLimiter(globalParams, bucketStore, logger) } diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index fbdcf21aa..f9bb504f1 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -161,7 +161,8 @@ func (env *Config) generateDisperserVars(ind int, key, address, logPath, dbPath, v := DisperserVars{ DISPERSER_SERVER_S3_BUCKET_NAME: "test-eigenda-blobstore", DISPERSER_SERVER_DYNAMODB_TABLE_NAME: "test-BlobMetadata", - DISPERSER_SERVER_RATE_BUCKET_TABLE_NAME: "test-BucketStore", + DISPERSER_SERVER_RATE_BUCKET_TABLE_NAME: "", + DISPERSER_SERVER_RATE_BUCKET_STORE_SIZE: "100000", DISPERSER_SERVER_GRPC_PORT: grpcPort, DISPERSER_SERVER_ENABLE_METRICS: "true", DISPERSER_SERVER_METRICS_HTTP_PORT: "9093", diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index d19e73d97..6418efaf8 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -21,6 +21,8 @@ type DisperserVars struct { DISPERSER_SERVER_ENABLE_RATELIMITER string + DISPERSER_SERVER_RATE_BUCKET_STORE_SIZE string + DISPERSER_SERVER_CHAIN_RPC string DISPERSER_SERVER_PRIVATE_KEY string diff --git a/node/grpc/server.go b/node/grpc/server.go index f40d7fac5..ddbd78f7f 100644 --- a/node/grpc/server.go +++ b/node/grpc/server.go @@ -3,6 +3,7 @@ package grpc import ( "context" "fmt" + "sync" "net" @@ -29,6 +30,8 @@ type Server struct { logger common.Logger ratelimiter common.RateLimiter + + mu *sync.Mutex } // NewServer creates a new Server instance with the provided parameters. @@ -41,6 +44,7 @@ func NewServer(config *node.Config, node *node.Node, logger common.Logger, ratel logger: logger, node: node, ratelimiter: ratelimiter, + mu: &sync.Mutex{}, } } @@ -187,7 +191,10 @@ func (s *Server) RetrieveChunks(ctx context.Context, in *pb.RetrieveChunksReques encodedBlobSize := core.GetBlobSize(blobHeader.QuorumInfos[in.GetQuorumId()].EncodedBlobLength) rate := blobHeader.QuorumInfos[in.GetQuorumId()].QuorumRate + + s.mu.Lock() allow, err := s.ratelimiter.AllowRequest(ctx, retrieverID, encodedBlobSize, rate) + s.mu.Unlock() if err != nil { return nil, err }