Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metrics #32

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState
socket = op.Socket
}
if r.Err != nil {
a.Logger.Warn("Error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err)
a.Logger.Warn("[AggregateSignatures] error returned from messageChan", "operator", operatorIDHex, "socket", socket, "err", r.Err)
continue
}

Expand All @@ -110,6 +110,8 @@ func (a *StdSignatureAggregator) AggregateSignatures(state *IndexedOperatorState
continue
}

a.Logger.Info("[AggregateSignatures] received signature from operator", "operator", operatorIDHex, "socket", socket)

for ind, id := range quorumIDs {

// Get stake amounts for operator
Expand Down
8 changes: 4 additions & 4 deletions disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,9 @@ func (b *Batcher) handleFailure(ctx context.Context, blobMetadatas []*disperser.
// Append the error
result = multierror.Append(result, err)
}
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Failed)
}

b.Metrics.UpdateFailedBatchAndBlobs(len(blobMetadatas))

// Return the error(s)
return result.ErrorOrNil()
}
Expand Down Expand Up @@ -318,8 +317,10 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {

if status == disperser.Confirmed {
_, updateConfirmationInfoErr = b.Queue.MarkBlobConfirmed(ctx, metadata, confirmationInfo)
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.Confirmed)
} else if status == disperser.InsufficientSignatures {
_, updateConfirmationInfoErr = b.Queue.MarkBlobInsufficientSignatures(ctx, metadata, confirmationInfo)
b.Metrics.UpdateCompletedBlob(int(metadata.RequestMetadata.BlobSize), disperser.InsufficientSignatures)
} else {
updateConfirmationInfoErr = fmt.Errorf("HandleSingleBatch: trying to update confirmation info for blob in status other than confirmed or insufficient signatures: %s", status.String())
}
Expand All @@ -340,8 +341,7 @@ func (b *Batcher) HandleSingleBatch(ctx context.Context) error {

log.Trace("[batcher] Update confirmation info took", "duration", time.Since(stageTimer))
b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds()))

b.Metrics.UpdateCompletedBatchAndBlobs(batch.BlobMetadata, passed)
b.Metrics.IncrementBatchCount(len(batch.BlobMetadata))
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/common/logging"
cmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/encoding"
Expand Down Expand Up @@ -63,14 +64,15 @@ func makeTestBlob(securityParams []*core.SecurityParam) core.Blob {

func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) {
// Common Components
logger := &cmock.Logger{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)

// Core Components
cst, err := coremock.NewChainDataMock(10)
assert.NoError(t, err)
cst.On("GetCurrentBlockNumber").Return(uint(10), nil)
asgn := &core.StdAssignmentCoordinator{}
agg := &core.StdSignatureAggregator{}
agg := core.NewStdSignatureAggregator(logger)
enc, err := makeTestEncoder()
assert.NoError(t, err)

Expand Down
1 change: 0 additions & 1 deletion disperser/batcher/encoded_blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (e *encodedBlobStore) PutEncodingResult(result *EncodingResult) error {
}
e.encoded[requestID] = result
delete(e.requested, requestID)
e.logger.Trace("[PutEncodingResult]", "referenceBlockNumber", result.ReferenceBlockNumber, "requestID", requestID, "encodedSize", e.encodedResultSize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reason for removing? Is it too noisy / not helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included this to debug an issue but it's very noisy


return nil
}
Expand Down
57 changes: 24 additions & 33 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
Name: "batches_total",
Help: "the number and size of total dispersal batch",
},
[]string{"state", "data"},
[]string{"data"},
),
BatchProcLatency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Expand Down Expand Up @@ -85,43 +85,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
return metrics
}

func (g *Metrics) UpdateAttestation(signers, nonSigners int) {
g.Attestation.WithLabelValues("signers").Set(float64(signers))
g.Attestation.WithLabelValues("non_signers").Set(float64(nonSigners))
func (g *Metrics) UpdateAttestation(operatorCount, nonSignerCount int) {
g.Attestation.WithLabelValues("signers").Set(float64(operatorCount - nonSignerCount))
g.Attestation.WithLabelValues("non_signers").Set(float64(nonSignerCount))
}

// UpdateFailedBatchAndBlobs updates failed a batch and number of blob within it, it only
// counts the number of blob and batches
func (g *Metrics) UpdateFailedBatchAndBlobs(numBlob int) {
g.Blob.WithLabelValues("failed", "number").Add(float64(numBlob))
g.Batch.WithLabelValues("failed", "number").Inc()
}

// UpdateCompletedBatchAndBlobs updates whenever there is a completed batch. it updates both the
// number for batch and blob, and it updates size of data blob. Moreover, it updates the
// time it takes to process the entire batch from "getting the blobs" to "marking as finished"
func (g *Metrics) UpdateCompletedBatchAndBlobs(blobsInBatch []*disperser.BlobMetadata, succeeded []bool) {
totalBlobSucceeded := 0
totalBlobFailed := 0
totalBlobSize := 0

for ind, metadata := range blobsInBatch {
if succeeded[ind] {
totalBlobSucceeded += 1
totalBlobSize += int(metadata.RequestMetadata.BlobSize)
} else {
totalBlobFailed += 1
}
// UpdateCompletedBlob increments the number and updates size of processed blobs.
func (g *Metrics) UpdateCompletedBlob(size int, status disperser.BlobStatus) {
switch status {
case disperser.Confirmed:
g.Blob.WithLabelValues("confirmed", "number").Inc()
g.Blob.WithLabelValues("confirmed", "size").Add(float64(size))
case disperser.Failed:
g.Blob.WithLabelValues("failed", "number").Inc()
g.Blob.WithLabelValues("failed", "size").Add(float64(size))
case disperser.InsufficientSignatures:
g.Blob.WithLabelValues("insufficient_signature", "number").Inc()
g.Blob.WithLabelValues("insufficient_signature", "size").Add(float64(size))
default:
return
}

// Failed blob
g.UpdateFailedBatchAndBlobs(totalBlobFailed)
g.Blob.WithLabelValues("total", "number").Inc()
g.Blob.WithLabelValues("total", "size").Add(float64(size))
}

// Blob
g.Blob.WithLabelValues("completed", "number").Add(float64(totalBlobSucceeded))
g.Blob.WithLabelValues("completed", "size").Add(float64(totalBlobSize))
// Batch
g.Batch.WithLabelValues("completed", "number").Inc()
func (g *Metrics) IncrementBatchCount(size int) {
g.Batch.WithLabelValues("number").Inc()
g.Batch.WithLabelValues("size").Add(float64(size))
}

func (g *Metrics) ObserveLatency(stage string, latencyMs float64) {
Expand Down
12 changes: 7 additions & 5 deletions disperser/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,13 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
ctx, cancel := context.WithTimeout(ctx, c.Timeout)
defer cancel()

request, err := GetStoreChunksRequest(blobs, header)
request, totalSize, err := GetStoreChunksRequest(blobs, header)
if err != nil {
return nil, err
}

opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 1024)
c.logger.Debug("sending chunks to operator", "operator", op.Socket, "size", totalSize)
reply, err := gc.StoreChunks(ctx, request, opt)

if err != nil {
Expand All @@ -106,23 +107,24 @@ func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.BlobMessage,
return sig, nil
}

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, error) {

func GetStoreChunksRequest(blobMessages []*core.BlobMessage, header *core.BatchHeader) (*node.StoreChunksRequest, int, error) {
blobs := make([]*node.Blob, len(blobMessages))
totalSize := 0
for i, blob := range blobMessages {
var err error
blobs[i], err = getBlobMessage(blob)
if err != nil {
return nil, err
return nil, 0, err
}
totalSize += blob.BlobHeader.EncodedSizeAllQuorums()
}

request := &node.StoreChunksRequest{
BatchHeader: getBatchHeaderMessage(header),
Blobs: blobs,
}

return request, nil
return request, totalSize, nil
}

func getBlobMessage(blob *core.BlobMessage) (*node.Blob, error) {
Expand Down
3 changes: 2 additions & 1 deletion node/grpc/server_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ func TestStoreChunks(t *testing.T) {
numTotalChunks += len(blobMessagesByOp[opID][i].Bundles[0])
}
t.Logf("Batch numTotalChunks: %d", numTotalChunks)
req, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
req, totalSize, err := dispatcher.GetStoreChunksRequest(blobMessagesByOp[opID], batchHeader)
assert.NoError(t, err)
assert.Equal(t, 50790400, totalSize)

timer := time.Now()
reply, err := server.StoreChunks(context.Background(), req)
Expand Down
Loading