From 7d78e25ad27442dae57291f7856780c9cd7f9e5a Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Fri, 12 Jan 2024 17:23:28 +0000 Subject: [PATCH 1/8] Add eth accounts to allowlist --- disperser/apiserver/rate_config.go | 2 +- disperser/apiserver/server.go | 36 ++++++++++++++++++++++-------- 2 files changed, 28 insertions(+), 10 deletions(-) diff --git a/disperser/apiserver/rate_config.go b/disperser/apiserver/rate_config.go index ebe386852..7ae58e7e4 100644 --- a/disperser/apiserver/rate_config.go +++ b/disperser/apiserver/rate_config.go @@ -86,7 +86,7 @@ func CLIFlags(envPrefix string) []cli.Flag { }, cli.StringSliceFlag{ Name: AllowlistFlagName, - Usage: "Allowlist of IPs and corresponding blob/byte rates to bypass rate limiting. Format: :::. Example: 127.0.0.1:0:10:10485760", + Usage: "Allowlist of IPs or ethereum addresses (including initial \"0x\") and corresponding blob/byte rates to bypass rate limiting. Format: [||]:::. Example: 127.0.0.1:0:10:10485760 0x1234567890123456789012345678901234567890:0:10:10485760", EnvVar: common.PrefixEnvVar(envPrefix, "ALLOWLIST"), Required: false, Value: &cli.StringSlice{}, diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 783ee8a95..33357f67f 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -16,6 +16,8 @@ import ( "github.com/Layr-Labs/eigenda/core" "github.com/Layr-Labs/eigenda/core/auth" "github.com/Layr-Labs/eigenda/disperser" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/crypto" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -128,8 +130,24 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse return fmt.Errorf("failed to authenticate blob request: %v", err) } + // Get the ethereum address associated with the public key. This is just for convenience so we can put addresses instead of public keys in the allowlist. + authenticatedAddress := "" + // Decode public key + publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID) + if err != nil { + s.logger.Warn("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + } else { + pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) + if err != nil { + s.logger.Warn("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + } else { + // Get the address + authenticatedAddress = crypto.PubkeyToAddress(*pubKey).String() + } + } + // Disperse the blob - reply, err := s.disperseBlob(stream.Context(), blob) + reply, err := s.disperseBlob(stream.Context(), blob, authenticatedAddress) if err != nil { return err } @@ -150,11 +168,11 @@ func (s *DispersalServer) DisperseBlob(ctx context.Context, req *pb.DisperseBlob blob := getBlobFromRequest(req) - return s.disperseBlob(ctx, blob) + return s.disperseBlob(ctx, blob, "") } -func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob) (*pb.DisperseBlobReply, error) { +func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, authenticatedAddress string) (*pb.DisperseBlobReply, error) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(f float64) { s.metrics.ObserveLatency("DisperseBlob", f*1000) // make milliseconds })) @@ -219,7 +237,7 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob) (*p } if s.ratelimiter != nil { - err := s.checkRateLimitsAndAddRates(ctx, blob, origin) + err := s.checkRateLimitsAndAddRates(ctx, blob, origin, authenticatedAddress) if err != nil { for _, param := range securityParams { quorumId := string(param.QuorumID) @@ -257,14 +275,14 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob) (*p }, nil } -func (s *DispersalServer) getAccountRate(origin string, quorumID core.QuorumID) (*PerUserRateInfo, error) { +func (s *DispersalServer) getAccountRate(origin, address string, quorumID core.QuorumID) (*PerUserRateInfo, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { return nil, fmt.Errorf("no configured rate exists for quorum %d", quorumID) } - for ip, rateInfoByQuorum := range s.rateConfig.Allowlist { - if !strings.Contains(origin, ip) { + for account, rateInfoByQuorum := range s.rateConfig.Allowlist { + if !strings.Contains(origin, account) && !strings.EqualFold(address, account) { continue } @@ -295,7 +313,7 @@ func (s *DispersalServer) getAccountRate(origin string, quorumID core.QuorumID) }, nil } -func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *core.Blob, origin string) error { +func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *core.Blob, origin, authenticatedAddress string) error { // TODO(robert): Remove these locks once we have resolved ratelimiting approach s.mu.Lock() @@ -307,7 +325,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * if !ok { return fmt.Errorf("no configured rate exists for quorum %d", param.QuorumID) } - accountRates, err := s.getAccountRate(origin, param.QuorumID) + accountRates, err := s.getAccountRate(origin, authenticatedAddress, param.QuorumID) if err != nil { return err } From 9eea2a6a5359276cf749b599441d9f87a49b3e28 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Mon, 15 Jan 2024 22:47:29 +0000 Subject: [PATCH 2/8] Add authenticated ratelimit unit tests --- api/grpc/mock/disperser.go | 60 +++++++ disperser/apiserver/ratelimit_test.go | 249 ++++++++++++++++++++++++++ disperser/apiserver/server.go | 4 +- disperser/apiserver/server_test.go | 94 ++-------- 4 files changed, 320 insertions(+), 87 deletions(-) create mode 100644 api/grpc/mock/disperser.go create mode 100644 disperser/apiserver/ratelimit_test.go diff --git a/api/grpc/mock/disperser.go b/api/grpc/mock/disperser.go new file mode 100644 index 000000000..e396f7022 --- /dev/null +++ b/api/grpc/mock/disperser.go @@ -0,0 +1,60 @@ +package mock + +import ( + "context" + "errors" + + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + + "google.golang.org/grpc" +) + +func MakeStreamMock(ctx context.Context) *StreamMock { + return &StreamMock{ + ctx: ctx, + recvToServer: make(chan *disperser.AuthenticatedRequest, 10), + sentFromServer: make(chan *disperser.AuthenticatedReply, 10), + } +} + +type StreamMock struct { + grpc.ServerStream + ctx context.Context + recvToServer chan *disperser.AuthenticatedRequest + sentFromServer chan *disperser.AuthenticatedReply +} + +func (m *StreamMock) Context() context.Context { + return m.ctx +} + +func (m *StreamMock) Send(resp *disperser.AuthenticatedReply) error { + m.sentFromServer <- resp + return nil +} + +func (m *StreamMock) Recv() (*disperser.AuthenticatedRequest, error) { + req, more := <-m.recvToServer + if !more { + return nil, errors.New("empty") + } + return req, nil +} + +func (m *StreamMock) SendFromClient(req *disperser.AuthenticatedRequest) error { + m.recvToServer <- req + return nil +} + +func (m *StreamMock) RecvToClient() (*disperser.AuthenticatedReply, error) { + response, more := <-m.sentFromServer + if !more { + return nil, errors.New("empty") + } + return response, nil +} + +func (m *StreamMock) Close() { + close(m.recvToServer) + close(m.sentFromServer) +} diff --git a/disperser/apiserver/ratelimit_test.go b/disperser/apiserver/ratelimit_test.go new file mode 100644 index 000000000..6f0d9d7d7 --- /dev/null +++ b/disperser/apiserver/ratelimit_test.go @@ -0,0 +1,249 @@ +package apiserver_test + +import ( + "context" + "crypto/rand" + "net" + "strings" + "testing" + + "github.com/Layr-Labs/eigenda/api/grpc/disperser" + pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" + "github.com/Layr-Labs/eigenda/api/grpc/mock" + "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/auth" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc/peer" +) + +func TestRatelimit(t *testing.T) { + data50KiB := make([]byte, 50*1024) + _, err := rand.Read(data50KiB) + assert.NoError(t, err) + data1KiB := make([]byte, 1024) + _, err = rand.Read(data1KiB) + assert.NoError(t, err) + + // Try with a non-allowlisted IP + p := &peer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP("0.0.0.0"), + Port: 51001, + }, + } + ctx := peer.NewContext(context.Background(), p) + + // Try with non-allowlisted IP + // Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0 + _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ + Data: data50KiB, + SecurityParams: []*pb.SecurityParams{ + { + QuorumId: 0, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, + }) + assert.ErrorContains(t, err, "account throughput limit") + + // Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. + for i := 0; i < 10; i++ { + _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ + Data: data1KiB, + SecurityParams: []*pb.SecurityParams{ + { + QuorumId: 1, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, + }) + } + assert.ErrorContains(t, err, "account blob limit") + + // Now try with an allowlisted IP + // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 + p = &peer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP("1.2.3.4"), + Port: 51001, + }, + } + ctx = peer.NewContext(context.Background(), p) + + _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ + Data: data50KiB, + SecurityParams: []*pb.SecurityParams{ + { + QuorumId: 0, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, + }) + assert.NoError(t, err) + + // This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs. + for i := 0; i < 10; i++ { + _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ + Data: data1KiB, + SecurityParams: []*pb.SecurityParams{ + { + QuorumId: 1, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, + }) + assert.NoError(t, err) + } +} + +func TestAuthRatelimit(t *testing.T) { + + data50KiB := make([]byte, 50*1024) + _, err := rand.Read(data50KiB) + assert.NoError(t, err) + data1KiB := make([]byte, 1024) + _, err = rand.Read(data1KiB) + assert.NoError(t, err) + + // Use an unauthenticated signer + privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb" + signer := auth.NewSigner(privateKeyHex) + + errorChan := make(chan error, 10) + + // Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0 + simulateClient(t, signer, data50KiB, []*pb.SecurityParams{ + { + QuorumId: 0, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, errorChan, false) + + err = <-errorChan + assert.ErrorContains(t, err, "account throughput limit") + + // Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. + for i := 0; i < 10; i++ { + simulateClient(t, signer, data1KiB, []*pb.SecurityParams{ + { + QuorumId: 1, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, errorChan, false) + } + numLimited := 0 + for i := 0; i < 10; i++ { + err = <-errorChan + if err != nil && strings.Contains(err.Error(), "account blob limit") { + numLimited++ + } + } + assert.Greater(t, numLimited, 0) + + // Use an authenticated signer + privateKeyHex = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded" + signer = auth.NewSigner(privateKeyHex) + + // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 + simulateClient(t, signer, data50KiB, []*pb.SecurityParams{ + { + QuorumId: 0, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, errorChan, false) + + err = <-errorChan + assert.NoError(t, err) + + // This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs. + for i := 0; i < 10; i++ { + simulateClient(t, signer, data1KiB, []*pb.SecurityParams{ + { + QuorumId: 1, + AdversaryThreshold: 50, + QuorumThreshold: 100, + }, + }, errorChan, false) + } + numLimited = 0 + for i := 0; i < 10; i++ { + err = <-errorChan + if err != nil && strings.Contains(err.Error(), "account blob limit") { + numLimited++ + } + } + assert.Equal(t, numLimited, 0) + +} + +func simulateClient(t *testing.T, signer core.BlobRequestSigner, data []byte, params []*pb.SecurityParams, errorChan chan error, shouldSucceed bool) { + + p := &peer.Peer{ + Addr: &net.TCPAddr{ + IP: net.ParseIP("0.0.0.0"), + Port: 51001, + }, + } + ctx := peer.NewContext(context.Background(), p) + stream := mock.MakeStreamMock(ctx) + + go func() { + err := dispersalServer.DisperseBlobAuthenticated(stream) + errorChan <- err + stream.Close() + }() + + err := stream.SendFromClient(&pb.AuthenticatedRequest{ + Payload: &pb.AuthenticatedRequest_DisperseRequest{ + DisperseRequest: &pb.DisperseBlobRequest{ + Data: data, + SecurityParams: params, + AccountId: signer.GetAccountID(), + }, + }, + }) + assert.NoError(t, err) + + reply, err := stream.RecvToClient() + assert.NoError(t, err) + + authHeaderReply, ok := reply.Payload.(*disperser.AuthenticatedReply_BlobAuthHeader) + assert.True(t, ok) + + authHeader := core.BlobAuthHeader{ + BlobCommitments: core.BlobCommitments{}, + AccountID: "", + Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter, + } + + authData, err := signer.SignBlobRequest(authHeader) + assert.NoError(t, err) + + // Process challenge and send back challenge_reply + err = stream.SendFromClient(&disperser.AuthenticatedRequest{Payload: &disperser.AuthenticatedRequest_AuthenticationData{ + AuthenticationData: &disperser.AuthenticationData{ + AuthenticationData: authData, + }, + }}) + assert.NoError(t, err) + + if shouldSucceed { + + reply, err = stream.RecvToClient() + assert.NoError(t, err) + + disperseReply, ok := reply.Payload.(*disperser.AuthenticatedReply_DisperseReply) + assert.True(t, ok) + + assert.Equal(t, disperseReply.DisperseReply.Result, disperser.BlobStatus_PROCESSING) + + } + +} diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 33357f67f..d843b3326 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -361,9 +361,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * // Check Account Ratelimit - blob.RequestHeader.AccountID = "ip:" + origin - - userQuorumKey := fmt.Sprintf("%s:%d", blob.RequestHeader.AccountID, param.QuorumID) + userQuorumKey := fmt.Sprintf("%s:%d", "ip:"+origin, param.QuorumID) allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, encodedSize, accountRates.Throughput) if err != nil { return fmt.Errorf("ratelimiter error: %v", err) diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 3df211b0b..01a3c4045 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -279,90 +279,6 @@ func TestDisperseBlobWithExceedSizeLimit(t *testing.T) { assert.Equal(t, err.Error(), "blob size cannot exceed 2 MiB") } -func TestRatelimit(t *testing.T) { - data50KiB := make([]byte, 50*1024) - _, err := rand.Read(data50KiB) - assert.NoError(t, err) - data1KiB := make([]byte, 1024) - _, err = rand.Read(data1KiB) - assert.NoError(t, err) - - // Try with a non-allowlisted IP - p := &peer.Peer{ - Addr: &net.TCPAddr{ - IP: net.ParseIP("0.0.0.0"), - Port: 51001, - }, - } - ctx := peer.NewContext(context.Background(), p) - - // Try with non-allowlisted IP - // Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0 - _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ - Data: data50KiB, - SecurityParams: []*pb.SecurityParams{ - { - QuorumId: 0, - AdversaryThreshold: 50, - QuorumThreshold: 100, - }, - }, - }) - assert.ErrorContains(t, err, "account throughput limit") - - // Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. - for i := 0; i < 10; i++ { - _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ - Data: data1KiB, - SecurityParams: []*pb.SecurityParams{ - { - QuorumId: 1, - AdversaryThreshold: 50, - QuorumThreshold: 100, - }, - }, - }) - } - assert.ErrorContains(t, err, "account blob limit") - - // Now try with an allowlisted IP - // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 - p = &peer.Peer{ - Addr: &net.TCPAddr{ - IP: net.ParseIP("1.2.3.4"), - Port: 51001, - }, - } - ctx = peer.NewContext(context.Background(), p) - - _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ - Data: data50KiB, - SecurityParams: []*pb.SecurityParams{ - { - QuorumId: 0, - AdversaryThreshold: 50, - QuorumThreshold: 100, - }, - }, - }) - assert.NoError(t, err) - - // This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs. - for i := 0; i < 10; i++ { - _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ - Data: data1KiB, - SecurityParams: []*pb.SecurityParams{ - { - QuorumId: 1, - AdversaryThreshold: 50, - QuorumThreshold: 100, - }, - }, - }) - assert.NoError(t, err) - } -} - func setup(m *testing.M) { deployLocalStack = !(os.Getenv("DEPLOY_LOCALSTACK") == "false") @@ -457,6 +373,16 @@ func newTestServer(m *testing.M) *apiserver.DispersalServer { BlobRate: 5 * 1e6, }, }, + "0x1aa8226f6d354380dDE75eE6B634875c4203e522": map[uint8]apiserver.PerUserRateInfo{ + 0: { + Throughput: 100 * 1024, + BlobRate: 5 * 1e6, + }, + 1: { + Throughput: 1024 * 1024, + BlobRate: 5 * 1e6, + }, + }, }, } From 5a243613bb89dbee8e66139bed3ad7f5437ac6ae Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Mon, 15 Jan 2024 23:13:49 +0000 Subject: [PATCH 3/8] Fix ratelimit keys --- disperser/apiserver/server.go | 74 ++++++++++++++++++++++++----------- 1 file changed, 52 insertions(+), 22 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index d843b3326..f5c6e8334 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -275,42 +275,70 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut }, nil } -func (s *DispersalServer) getAccountRate(origin, address string, quorumID core.QuorumID) (*PerUserRateInfo, error) { +type RateKeys struct { + ThroughputKey string + BlobRateKey string +} + +func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, *RateKeys, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { - return nil, fmt.Errorf("no configured rate exists for quorum %d", quorumID) + return nil, nil, fmt.Errorf("no configured rate exists for quorum %d", quorumID) + } + + // Check if the origin is in the allowlist + rates := &PerUserRateInfo{ + Throughput: unauthRates.PerUserUnauthThroughput, + BlobRate: unauthRates.PerUserUnauthBlobRate, + } + + keys := &RateKeys{ + ThroughputKey: "ip:" + origin, + BlobRateKey: "ip:" + origin, } for account, rateInfoByQuorum := range s.rateConfig.Allowlist { - if !strings.Contains(origin, account) && !strings.EqualFold(address, account) { + if !strings.Contains(origin, account) { continue } rateInfo, ok := rateInfoByQuorum[quorumID] if !ok { - continue + break } - throughput := unauthRates.PerUserUnauthThroughput - if rateInfo.Throughput > 0 { - throughput = rateInfo.Throughput + if rateInfo.Throughput > unauthRates.PerUserUnauthThroughput { + rates.Throughput = rateInfo.Throughput } - blobRate := unauthRates.PerUserUnauthBlobRate - if rateInfo.BlobRate > 0 { - blobRate = rateInfo.BlobRate + if rateInfo.BlobRate > unauthRates.PerUserUnauthBlobRate { + rates.BlobRate = rateInfo.BlobRate } - return &PerUserRateInfo{ - Throughput: throughput, - BlobRate: blobRate, - }, nil + break } - return &PerUserRateInfo{ - Throughput: unauthRates.PerUserUnauthThroughput, - BlobRate: unauthRates.PerUserUnauthBlobRate, - }, nil + // Check if the address is in the allowlist + quorumRates, ok := s.rateConfig.Allowlist[authenticatedAddress] + if ok { + rateInfo, ok := quorumRates[quorumID] + if ok { + + if rateInfo.Throughput > rates.Throughput { + rates.Throughput = rateInfo.Throughput + keys.ThroughputKey = "address:" + authenticatedAddress + } + + if rateInfo.BlobRate > rates.BlobRate { + rates.BlobRate = rateInfo.BlobRate + keys.BlobRateKey = "address:" + authenticatedAddress + } + + } + } + + return rates, keys, nil + } func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob *core.Blob, origin, authenticatedAddress string) error { @@ -325,7 +353,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * if !ok { return fmt.Errorf("no configured rate exists for quorum %d", param.QuorumID) } - accountRates, err := s.getAccountRate(origin, authenticatedAddress, param.QuorumID) + accountRates, keys, err := s.getAccountRate(origin, authenticatedAddress, param.QuorumID) if err != nil { return err } @@ -336,7 +364,9 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * encodedLength := core.GetEncodedBlobLength(length, uint8(param.QuorumThreshold), uint8(param.AdversaryThreshold)) encodedSize := core.GetBlobSize(encodedLength) - s.logger.Debug("checking rate limits", "origin", origin, "quorum", param.QuorumID, "encodedSize", encodedSize, "blobSize", blobSize) + s.logger.Debug("checking rate limits", "origin", origin, "address", authenticatedAddress, "quorum", param.QuorumID, "encodedSize", encodedSize, "blobSize", blobSize, + "accountThroughput", accountRates.Throughput, "accountBlobRate", accountRates.BlobRate, + "throughputKey", keys.ThroughputKey, "blobRateKey", keys.BlobRateKey) // Check System Ratelimit systemQuorumKey := fmt.Sprintf("%s:%d", systemAccountKey, param.QuorumID) @@ -361,7 +391,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * // Check Account Ratelimit - userQuorumKey := fmt.Sprintf("%s:%d", "ip:"+origin, param.QuorumID) + userQuorumKey := fmt.Sprintf("%s:%d", keys.ThroughputKey, param.QuorumID) allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, encodedSize, accountRates.Throughput) if err != nil { return fmt.Errorf("ratelimiter error: %v", err) @@ -371,7 +401,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * return errAccountThroughputRateLimit } - userQuorumKey = fmt.Sprintf("%s:%d-blobrate", blob.RequestHeader.AccountID, param.QuorumID) + userQuorumKey = fmt.Sprintf("%s:%d-blobrate", keys.BlobRateKey, param.QuorumID) allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, blobRateMultiplier, accountRates.BlobRate) if err != nil { return fmt.Errorf("ratelimiter error: %v", err) From b5db323fc77e35d9270a8ff2a3b915864c2f3252 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Tue, 16 Jan 2024 18:35:38 +0000 Subject: [PATCH 4/8] Fix unit tests --- common/ratelimit/limiter.go | 4 ++++ disperser/apiserver/ratelimit_test.go | 22 +++++++++++++--------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 30bd57843..560b1ac45 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -53,9 +53,13 @@ func (d *rateLimiter) AllowRequest(ctx context.Context, requesterID common.Reque // Determine bucket deduction deduction := time.Microsecond * time.Duration(1e6*float32(blobSize)/float32(rate)/d.globalRateParams.Multipliers[i]) + // prevLevel := bucketParams.BucketLevels[i] + // Update the bucket level bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 + + // d.logger.Debug("Bucket level", "key", requesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) } // Update the bucket based on blob size and current rate diff --git a/disperser/apiserver/ratelimit_test.go b/disperser/apiserver/ratelimit_test.go index 6f0d9d7d7..fc2f4f185 100644 --- a/disperser/apiserver/ratelimit_test.go +++ b/disperser/apiserver/ratelimit_test.go @@ -47,8 +47,9 @@ func TestRatelimit(t *testing.T) { }) assert.ErrorContains(t, err, "account throughput limit") - // Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. - for i := 0; i < 10; i++ { + // Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 20 blobs. + numLimited := 0 + for i := 0; i < 20; i++ { _, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{ Data: data1KiB, SecurityParams: []*pb.SecurityParams{ @@ -59,8 +60,11 @@ func TestRatelimit(t *testing.T) { }, }, }) + if err != nil && strings.Contains(err.Error(), "account blob limit") { + numLimited++ + } } - assert.ErrorContains(t, err, "account blob limit") + assert.Greater(t, numLimited, 0) // Now try with an allowlisted IP // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 @@ -116,7 +120,7 @@ func TestAuthRatelimit(t *testing.T) { errorChan := make(chan error, 10) // Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0 - simulateClient(t, signer, data50KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "1.1.1.1", data50KiB, []*pb.SecurityParams{ { QuorumId: 0, AdversaryThreshold: 50, @@ -129,7 +133,7 @@ func TestAuthRatelimit(t *testing.T) { // Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. for i := 0; i < 10; i++ { - simulateClient(t, signer, data1KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "2.2.2.2", data1KiB, []*pb.SecurityParams{ { QuorumId: 1, AdversaryThreshold: 50, @@ -151,7 +155,7 @@ func TestAuthRatelimit(t *testing.T) { signer = auth.NewSigner(privateKeyHex) // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 - simulateClient(t, signer, data50KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "3.3.3.3", data50KiB, []*pb.SecurityParams{ { QuorumId: 0, AdversaryThreshold: 50, @@ -164,7 +168,7 @@ func TestAuthRatelimit(t *testing.T) { // This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs. for i := 0; i < 10; i++ { - simulateClient(t, signer, data1KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "4.4.4.4", data1KiB, []*pb.SecurityParams{ { QuorumId: 1, AdversaryThreshold: 50, @@ -183,11 +187,11 @@ func TestAuthRatelimit(t *testing.T) { } -func simulateClient(t *testing.T, signer core.BlobRequestSigner, data []byte, params []*pb.SecurityParams, errorChan chan error, shouldSucceed bool) { +func simulateClient(t *testing.T, signer core.BlobRequestSigner, origin string, data []byte, params []*pb.SecurityParams, errorChan chan error, shouldSucceed bool) { p := &peer.Peer{ Addr: &net.TCPAddr{ - IP: net.ParseIP("0.0.0.0"), + IP: net.ParseIP(origin), Port: 51001, }, } From cb3827dc258e9f96cc135445301787aa55f5ef65 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Tue, 16 Jan 2024 19:51:12 +0000 Subject: [PATCH 5/8] Use separate IPs for ratelimit testing --- common/ratelimit/limiter.go | 4 ++-- disperser/apiserver/ratelimit_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/ratelimit/limiter.go b/common/ratelimit/limiter.go index 560b1ac45..7bda567b8 100644 --- a/common/ratelimit/limiter.go +++ b/common/ratelimit/limiter.go @@ -53,13 +53,13 @@ func (d *rateLimiter) AllowRequest(ctx context.Context, requesterID common.Reque // Determine bucket deduction deduction := time.Microsecond * time.Duration(1e6*float32(blobSize)/float32(rate)/d.globalRateParams.Multipliers[i]) - // prevLevel := bucketParams.BucketLevels[i] + prevLevel := bucketParams.BucketLevels[i] // Update the bucket level bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction) allowed = allowed && bucketParams.BucketLevels[i] > 0 - // d.logger.Debug("Bucket level", "key", requesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) + d.logger.Debug("Bucket level", "key", requesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed) } // Update the bucket based on blob size and current rate diff --git a/disperser/apiserver/ratelimit_test.go b/disperser/apiserver/ratelimit_test.go index fc2f4f185..17e625048 100644 --- a/disperser/apiserver/ratelimit_test.go +++ b/disperser/apiserver/ratelimit_test.go @@ -27,7 +27,7 @@ func TestRatelimit(t *testing.T) { // Try with a non-allowlisted IP p := &peer.Peer{ Addr: &net.TCPAddr{ - IP: net.ParseIP("0.0.0.0"), + IP: net.ParseIP("1.1.1.1"), Port: 51001, }, } @@ -120,7 +120,7 @@ func TestAuthRatelimit(t *testing.T) { errorChan := make(chan error, 10) // Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0 - simulateClient(t, signer, "1.1.1.1", data50KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "2.2.2.2", data50KiB, []*pb.SecurityParams{ { QuorumId: 0, AdversaryThreshold: 50, @@ -133,7 +133,7 @@ func TestAuthRatelimit(t *testing.T) { // Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. for i := 0; i < 10; i++ { - simulateClient(t, signer, "2.2.2.2", data1KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "3.3.3.3", data1KiB, []*pb.SecurityParams{ { QuorumId: 1, AdversaryThreshold: 50, @@ -155,7 +155,7 @@ func TestAuthRatelimit(t *testing.T) { signer = auth.NewSigner(privateKeyHex) // This should succeed because the account throughput limit is 100 KiB/s for quorum 0 - simulateClient(t, signer, "3.3.3.3", data50KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "4.4.4.4", data50KiB, []*pb.SecurityParams{ { QuorumId: 0, AdversaryThreshold: 50, @@ -168,7 +168,7 @@ func TestAuthRatelimit(t *testing.T) { // This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs. for i := 0; i < 10; i++ { - simulateClient(t, signer, "4.4.4.4", data1KiB, []*pb.SecurityParams{ + simulateClient(t, signer, "5.5.5.5", data1KiB, []*pb.SecurityParams{ { QuorumId: 1, AdversaryThreshold: 50, From b9be1ea47af5be7f984d0a86ed167b3f4a1ea411 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Tue, 16 Jan 2024 21:00:37 +0000 Subject: [PATCH 6/8] Increase system blob rate for test --- disperser/apiserver/ratelimit_test.go | 4 ++-- disperser/apiserver/server_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/disperser/apiserver/ratelimit_test.go b/disperser/apiserver/ratelimit_test.go index 17e625048..9bf39976a 100644 --- a/disperser/apiserver/ratelimit_test.go +++ b/disperser/apiserver/ratelimit_test.go @@ -132,7 +132,7 @@ func TestAuthRatelimit(t *testing.T) { assert.ErrorContains(t, err, "account throughput limit") // Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs. - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { simulateClient(t, signer, "3.3.3.3", data1KiB, []*pb.SecurityParams{ { QuorumId: 1, @@ -142,7 +142,7 @@ func TestAuthRatelimit(t *testing.T) { }, errorChan, false) } numLimited := 0 - for i := 0; i < 10; i++ { + for i := 0; i < 20; i++ { err = <-errorChan if err != nil && strings.Contains(err.Error(), "account blob limit") { numLimited++ diff --git a/disperser/apiserver/server_test.go b/disperser/apiserver/server_test.go index 01a3c4045..d85bce379 100644 --- a/disperser/apiserver/server_test.go +++ b/disperser/apiserver/server_test.go @@ -352,13 +352,13 @@ func newTestServer(m *testing.M) *apiserver.DispersalServer { PerUserUnauthThroughput: 20 * 1024, TotalUnauthThroughput: 1048576, PerUserUnauthBlobRate: 3 * 1e6, - TotalUnauthBlobRate: 10 * 1e6, + TotalUnauthBlobRate: 100 * 1e6, }, 1: { PerUserUnauthThroughput: 20 * 1024, TotalUnauthThroughput: 1048576, PerUserUnauthBlobRate: 3 * 1e6, - TotalUnauthBlobRate: 10 * 1e6, + TotalUnauthBlobRate: 100 * 1e6, }, }, ClientIPHeader: "", From c012d5189d4a19f27ee1a62aa41fbe2f064f45e7 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Tue, 16 Jan 2024 21:36:07 +0000 Subject: [PATCH 7/8] Abort on pubkey decoding errors --- disperser/apiserver/rate_config.go | 2 +- disperser/apiserver/server.go | 18 ++++++++---------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/disperser/apiserver/rate_config.go b/disperser/apiserver/rate_config.go index 7ae58e7e4..9a7f00c15 100644 --- a/disperser/apiserver/rate_config.go +++ b/disperser/apiserver/rate_config.go @@ -86,7 +86,7 @@ func CLIFlags(envPrefix string) []cli.Flag { }, cli.StringSliceFlag{ Name: AllowlistFlagName, - Usage: "Allowlist of IPs or ethereum addresses (including initial \"0x\") and corresponding blob/byte rates to bypass rate limiting. Format: [||]:::. Example: 127.0.0.1:0:10:10485760 0x1234567890123456789012345678901234567890:0:10:10485760", + Usage: "Allowlist of IPs or ethereum addresses (including initial \"0x\") and corresponding blob/byte rates to bypass rate limiting. Format: [||]:::. Example: 127.0.0.1:0:10:10485760", EnvVar: common.PrefixEnvVar(envPrefix, "ALLOWLIST"), Required: false, Value: &cli.StringSlice{}, diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index f5c6e8334..70eebfdae 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -131,21 +131,19 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse } // Get the ethereum address associated with the public key. This is just for convenience so we can put addresses instead of public keys in the allowlist. - authenticatedAddress := "" // Decode public key publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID) if err != nil { - s.logger.Warn("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) - } else { - pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) - if err != nil { - s.logger.Warn("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) - } else { - // Get the address - authenticatedAddress = crypto.PubkeyToAddress(*pubKey).String() - } + return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + } + + pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) + if err != nil { + return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) } + authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String() + // Disperse the blob reply, err := s.disperseBlob(stream.Context(), blob, authenticatedAddress) if err != nil { From f6477e65c8bec2235827df98b41898b6eb8324b0 Mon Sep 17 00:00:00 2001 From: Robert Raynor <35671663+mooselumph@users.noreply.github.com> Date: Wed, 17 Jan 2024 22:26:37 +0000 Subject: [PATCH 8/8] Address comments --- api/proto/disperser/disperser.proto | 2 + disperser/apiserver/server.go | 107 +++++++++++++--------------- 2 files changed, 51 insertions(+), 58 deletions(-) diff --git a/api/proto/disperser/disperser.proto b/api/proto/disperser/disperser.proto index 05450d9ea..773164a4a 100644 --- a/api/proto/disperser/disperser.proto +++ b/api/proto/disperser/disperser.proto @@ -80,6 +80,8 @@ message DisperseBlobRequest { // within the same batch. repeated SecurityParams security_params = 2; + // The account ID of the client. This should be a hex-encoded string of the ECSDA public key + // corresponding to the key used by the client to sign the BlobAuthHeader. string account_id = 3; } diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index 70eebfdae..ac5ff0685 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -98,6 +98,22 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse return errors.New("expected DisperseBlobRequest") } + blob := getBlobFromRequest(request.DisperseRequest) + + // Get the ethereum address associated with the public key. This is just for convenience so we can put addresses instead of public keys in the allowlist. + // Decode public key + publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID) + if err != nil { + return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + } + + pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) + if err != nil { + return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) + } + + authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String() + // Send back challenge to client challenge := rand.Uint32() err = stream.Send(&pb.AuthenticatedReply{Payload: &pb.AuthenticatedReply_BlobAuthHeader{ @@ -120,8 +136,6 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse return errors.New("expected AuthenticationData") } - blob := getBlobFromRequest(request.DisperseRequest) - blob.RequestHeader.Nonce = challenge blob.RequestHeader.AuthenticationData = challengeReply.AuthenticationData.AuthenticationData @@ -130,20 +144,6 @@ func (s *DispersalServer) DisperseBlobAuthenticated(stream pb.Disperser_Disperse return fmt.Errorf("failed to authenticate blob request: %v", err) } - // Get the ethereum address associated with the public key. This is just for convenience so we can put addresses instead of public keys in the allowlist. - // Decode public key - publicKeyBytes, err := hexutil.Decode(blob.RequestHeader.AccountID) - if err != nil { - return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) - } - - pubKey, err := crypto.UnmarshalPubkey(publicKeyBytes) - if err != nil { - return fmt.Errorf("failed to decode public key (%v): %v", blob.RequestHeader.AccountID, err) - } - - authenticatedAddress := crypto.PubkeyToAddress(*pubKey).String() - // Disperse the blob reply, err := s.disperseBlob(stream.Context(), blob, authenticatedAddress) if err != nil { @@ -273,28 +273,39 @@ func (s *DispersalServer) disperseBlob(ctx context.Context, blob *core.Blob, aut }, nil } -type RateKeys struct { - ThroughputKey string - BlobRateKey string -} - -func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, *RateKeys, error) { +func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, quorumID core.QuorumID) (*PerUserRateInfo, string, error) { unauthRates, ok := s.rateConfig.QuorumRateInfos[quorumID] if !ok { - return nil, nil, fmt.Errorf("no configured rate exists for quorum %d", quorumID) + return nil, "", fmt.Errorf("no configured rate exists for quorum %d", quorumID) } - // Check if the origin is in the allowlist rates := &PerUserRateInfo{ Throughput: unauthRates.PerUserUnauthThroughput, BlobRate: unauthRates.PerUserUnauthBlobRate, } - keys := &RateKeys{ - ThroughputKey: "ip:" + origin, - BlobRateKey: "ip:" + origin, + // Check if the address is in the allowlist + if len(authenticatedAddress) > 0 { + quorumRates, ok := s.rateConfig.Allowlist[authenticatedAddress] + if ok { + rateInfo, ok := quorumRates[quorumID] + if ok { + key := "address:" + authenticatedAddress + if rateInfo.Throughput > 0 { + rates.Throughput = rateInfo.Throughput + } + if rateInfo.BlobRate > 0 { + rates.BlobRate = rateInfo.BlobRate + } + return rates, key, nil + } + } } + // Check if the origin is in the allowlist + + key := "ip:" + origin + for account, rateInfoByQuorum := range s.rateConfig.Allowlist { if !strings.Contains(origin, account) { continue @@ -305,37 +316,18 @@ func (s *DispersalServer) getAccountRate(origin, authenticatedAddress string, qu break } - if rateInfo.Throughput > unauthRates.PerUserUnauthThroughput { + if rateInfo.Throughput > 0 { rates.Throughput = rateInfo.Throughput } - if rateInfo.BlobRate > unauthRates.PerUserUnauthBlobRate { + if rateInfo.BlobRate > 0 { rates.BlobRate = rateInfo.BlobRate } break } - // Check if the address is in the allowlist - quorumRates, ok := s.rateConfig.Allowlist[authenticatedAddress] - if ok { - rateInfo, ok := quorumRates[quorumID] - if ok { - - if rateInfo.Throughput > rates.Throughput { - rates.Throughput = rateInfo.Throughput - keys.ThroughputKey = "address:" + authenticatedAddress - } - - if rateInfo.BlobRate > rates.BlobRate { - rates.BlobRate = rateInfo.BlobRate - keys.BlobRateKey = "address:" + authenticatedAddress - } - - } - } - - return rates, keys, nil + return rates, key, nil } @@ -351,7 +343,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * if !ok { return fmt.Errorf("no configured rate exists for quorum %d", param.QuorumID) } - accountRates, keys, err := s.getAccountRate(origin, authenticatedAddress, param.QuorumID) + accountRates, accountKey, err := s.getAccountRate(origin, authenticatedAddress, param.QuorumID) if err != nil { return err } @@ -363,8 +355,7 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * encodedSize := core.GetBlobSize(encodedLength) s.logger.Debug("checking rate limits", "origin", origin, "address", authenticatedAddress, "quorum", param.QuorumID, "encodedSize", encodedSize, "blobSize", blobSize, - "accountThroughput", accountRates.Throughput, "accountBlobRate", accountRates.BlobRate, - "throughputKey", keys.ThroughputKey, "blobRateKey", keys.BlobRateKey) + "accountThroughput", accountRates.Throughput, "accountBlobRate", accountRates.BlobRate, "accountKey", accountKey) // Check System Ratelimit systemQuorumKey := fmt.Sprintf("%s:%d", systemAccountKey, param.QuorumID) @@ -389,23 +380,23 @@ func (s *DispersalServer) checkRateLimitsAndAddRates(ctx context.Context, blob * // Check Account Ratelimit - userQuorumKey := fmt.Sprintf("%s:%d", keys.ThroughputKey, param.QuorumID) - allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, encodedSize, accountRates.Throughput) + accountQuorumKey := fmt.Sprintf("%s:%d", accountKey, param.QuorumID) + allowed, err = s.ratelimiter.AllowRequest(ctx, accountQuorumKey, encodedSize, accountRates.Throughput) if err != nil { return fmt.Errorf("ratelimiter error: %v", err) } if !allowed { - s.logger.Warn("account byte ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", accountRates.Throughput) + s.logger.Warn("account byte ratelimit exceeded", "accountQuorumKey", accountQuorumKey, "rate", accountRates.Throughput) return errAccountThroughputRateLimit } - userQuorumKey = fmt.Sprintf("%s:%d-blobrate", keys.BlobRateKey, param.QuorumID) - allowed, err = s.ratelimiter.AllowRequest(ctx, userQuorumKey, blobRateMultiplier, accountRates.BlobRate) + accountQuorumKey = fmt.Sprintf("%s:%d-blobrate", accountKey, param.QuorumID) + allowed, err = s.ratelimiter.AllowRequest(ctx, accountQuorumKey, blobRateMultiplier, accountRates.BlobRate) if err != nil { return fmt.Errorf("ratelimiter error: %v", err) } if !allowed { - s.logger.Warn("account blob ratelimit exceeded", "userQuorumKey", userQuorumKey, "rate", float32(accountRates.BlobRate)/blobRateMultiplier) + s.logger.Warn("account blob ratelimit exceeded", "accountQuorumKey", accountQuorumKey, "rate", float32(accountRates.BlobRate)/blobRateMultiplier) return errAccountBlobRateLimit }