From e2081edb10132db11e00c6660ba94b668eed5163 Mon Sep 17 00:00:00 2001 From: Jian Xiao <99709935+jianoaix@users.noreply.github.com> Date: Fri, 20 Sep 2024 17:14:57 -0700 Subject: [PATCH] Instrument the retrieval latency stages (#771) --- disperser/apiserver/server.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/disperser/apiserver/server.go b/disperser/apiserver/server.go index b2c14220e..440586605 100644 --- a/disperser/apiserver/server.go +++ b/disperser/apiserver/server.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "google.golang.org/grpc/status" "math/rand" "net" "slices" @@ -12,6 +11,8 @@ import ( "sync" "time" + "google.golang.org/grpc/status" + "github.com/Layr-Labs/eigenda/api" commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" pb "github.com/Layr-Labs/eigenda/api/grpc/disperser" @@ -677,6 +678,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewInvalidArgError(err.Error()) } + stageTimer := time.Now() // Check blob rate limit if s.ratelimiter != nil { allowed, param, err := s.ratelimiter.AllowRequest(ctx, []common.RequestParams{ @@ -702,7 +704,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewResourceExhaustedError(errorString) } } - + s.logger.Debug("checked retrieval blob rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalBlobRateType.Plug()), "duration", time.Since(stageTimer).String()) s.logger.Info("received a new blob retrieval request", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex) batchHeaderHash := req.GetBatchHeaderHash() @@ -712,6 +714,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob blobIndex := req.GetBlobIndex() + stageTimer = time.Now() blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex) if err != nil { s.logger.Error("Failed to retrieve blob metadata", "err", err) @@ -731,6 +734,9 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewNotFoundError("no metadata found for the given batch header hash and blob index") } + s.logger.Debug("fetched blob metadata", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "duration", time.Since(stageTimer).String()) + + stageTimer = time.Now() // Check throughout rate limit blobSize := encoding.GetBlobSize(blobMetadata.ConfirmationInfo.BlobCommitment.Length) @@ -758,7 +764,9 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob return nil, api.NewResourceExhaustedError(errorString) } } + s.logger.Debug("checked retrieval throughput rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalThroughputType.Plug()), "duration (ms)", time.Since(stageTimer).String()) + stageTimer = time.Now() data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash) if err != nil { s.logger.Error("Failed to retrieve blob", "err", err) @@ -769,6 +777,8 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob s.metrics.HandleSuccessfulRpcRequest("RetrieveBlob") s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob") + s.logger.Debug("fetched blob content", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "data size (bytes)", len(data), "duration", time.Since(stageTimer).String()) + return &pb.RetrieveBlobReply{ Data: data, }, nil