Skip to content

Commit

Permalink
Instrument the retrieval latency stages (#771)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Sep 21, 2024
1 parent 8c6617f commit e2081ed
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions disperser/apiserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/status"
"math/rand"
"net"
"slices"
"strings"
"sync"
"time"

"google.golang.org/grpc/status"

"github.com/Layr-Labs/eigenda/api"
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
Expand Down Expand Up @@ -677,6 +678,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewInvalidArgError(err.Error())
}

stageTimer := time.Now()
// Check blob rate limit
if s.ratelimiter != nil {
allowed, param, err := s.ratelimiter.AllowRequest(ctx, []common.RequestParams{
Expand All @@ -702,7 +704,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewResourceExhaustedError(errorString)
}
}

s.logger.Debug("checked retrieval blob rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalBlobRateType.Plug()), "duration", time.Since(stageTimer).String())
s.logger.Info("received a new blob retrieval request", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex)

batchHeaderHash := req.GetBatchHeaderHash()
Expand All @@ -712,6 +714,7 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob

blobIndex := req.GetBlobIndex()

stageTimer = time.Now()
blobMetadata, err := s.blobStore.GetMetadataInBatch(ctx, batchHeaderHash32, blobIndex)
if err != nil {
s.logger.Error("Failed to retrieve blob metadata", "err", err)
Expand All @@ -731,6 +734,9 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewNotFoundError("no metadata found for the given batch header hash and blob index")
}

s.logger.Debug("fetched blob metadata", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "duration", time.Since(stageTimer).String())

stageTimer = time.Now()
// Check throughout rate limit
blobSize := encoding.GetBlobSize(blobMetadata.ConfirmationInfo.BlobCommitment.Length)

Expand Down Expand Up @@ -758,7 +764,9 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
return nil, api.NewResourceExhaustedError(errorString)
}
}
s.logger.Debug("checked retrieval throughput rate limiting", "requesterID", fmt.Sprintf("%s:%s", origin, RetrievalThroughputType.Plug()), "duration (ms)", time.Since(stageTimer).String())

stageTimer = time.Now()
data, err := s.blobStore.GetBlobContent(ctx, blobMetadata.BlobHash)
if err != nil {
s.logger.Error("Failed to retrieve blob", "err", err)
Expand All @@ -769,6 +777,8 @@ func (s *DispersalServer) RetrieveBlob(ctx context.Context, req *pb.RetrieveBlob
s.metrics.HandleSuccessfulRpcRequest("RetrieveBlob")
s.metrics.HandleSuccessfulRequest("", len(data), "RetrieveBlob")

s.logger.Debug("fetched blob content", "batchHeaderHash", req.BatchHeaderHash, "blobIndex", req.BlobIndex, "data size (bytes)", len(data), "duration", time.Since(stageTimer).String())

return &pb.RetrieveBlobReply{
Data: data,
}, nil
Expand Down

0 comments on commit e2081ed

Please sign in to comment.