Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TxnManager metrics #170

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 45 additions & 8 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,21 @@ type EncodingStreamerMetrics struct {
EncodedBlobs *prometheus.GaugeVec
}

type TxnManagerMetrics struct {
Latency prometheus.Summary
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
}

type Metrics struct {
*EncodingStreamerMetrics
*TxnManagerMetrics

registry *prometheus.Registry

Blob *prometheus.CounterVec
Batch *prometheus.CounterVec
BatchProcLatency *prometheus.SummaryVec
GasUsed prometheus.Gauge
Attestation *prometheus.GaugeVec
BatchError *prometheus.CounterVec

Expand All @@ -67,8 +73,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
),
}

txnManagerMetrics := TxnManagerMetrics{
Latency: promauto.With(reg).NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "txn_manager_latency_ms",
Help: "transaction confirmation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
SpeedUps: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "speed_ups",
Help: "number of times the gas price was increased",
},
),
}

metrics := &Metrics{
EncodingStreamerMetrics: &encodingStreamerMetrics,
TxnManagerMetrics: &txnManagerMetrics,
Blob: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand All @@ -94,13 +126,6 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
},
[]string{"stage"},
),
GasUsed: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "gas_used",
Help: "gas used for onchain batch confirmation",
},
),
Attestation: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -181,3 +206,15 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) {
e.EncodedBlobs.WithLabelValues("size").Set(float64(size))
e.EncodedBlobs.WithLabelValues("number").Set(float64(count))
}

func (t *TxnManagerMetrics) ObserveLatency(latencyMs float64) {
t.Latency.Observe(latencyMs)
}

func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
t.GasUsed.Set(float64(gasUsed))
}

func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) {
t.SpeedUps.Set(float64(speedUps))
}
11 changes: 10 additions & 1 deletion disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,20 @@ type txnManager struct {
receiptChan chan *ReceiptOrErr
queueSize int
txnRefreshInterval time.Duration
metrics *TxnManagerMetrics
}

var _ TxnManager = (*txnManager)(nil)

func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) TxnManager {
func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger, metrics *TxnManagerMetrics) TxnManager {
return &txnManager{
ethClient: ethClient,
requestChan: make(chan *TxnRequest, queueSize),
logger: logger,
receiptChan: make(chan *ReceiptOrErr, queueSize),
queueSize: queueSize,
txnRefreshInterval: txnRefreshInterval,
metrics: metrics,
}
}

Expand Down Expand Up @@ -103,7 +105,11 @@ func (t *txnManager) Start(ctx context.Context) {
Metadata: req.Metadata,
Err: nil,
}
if receipt.GasUsed > 0 {
t.metrics.UpdateGasUsed(receipt.GasUsed)
}
}
t.metrics.ObserveLatency(float64(time.Since(req.requestedAt).Milliseconds()))
}
}
}()
Expand Down Expand Up @@ -143,6 +149,7 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr {
// monitorTransaction monitors the transaction and resends it with a higher gas price if it is not mined without a timeout.
// It returns an error if the transaction fails to be sent for reasons other than timeouts.
func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) {
numSpeedUps := 0
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should their be some sort of backoff here? to handle case with network congestion and avoid immediate retry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It always waits for t.txnRefreshInterval for every retry

ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()
Expand All @@ -154,6 +161,7 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
req.Tag,
)
if err == nil {
t.metrics.UpdateSpeedUps(numSpeedUps)
return receipt, nil
}

Expand All @@ -173,6 +181,7 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
t.logger.Error("failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
continue
}
numSpeedUps++
} else {
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to track how many Transactions failed because of timeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition doesn't mean the transaction failed due to timeout as there is no timeout that fails an onchain confirmation anymore. We may get into this condition for any other errors (i.e. RPC error, networking issue, txn failure etc.), hence reporting error and exiting right away.
Since this condition isn't expected, I'm not sure tracking it as a metric makes sense. It should be alerted as any other error logs might.

return nil, err
Expand Down
6 changes: 4 additions & 2 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ func TestProcessTransaction(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestReplaceGasFee(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func RunBatcher(ctx *cli.Context) error {
return err
}
finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger)
txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics)
batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics)
if err != nil {
return err
Expand Down
Loading