diff --git a/disperser/batcher/metrics.go b/disperser/batcher/metrics.go index 415dddc31..7325e563e 100644 --- a/disperser/batcher/metrics.go +++ b/disperser/batcher/metrics.go @@ -34,15 +34,21 @@ type EncodingStreamerMetrics struct { EncodedBlobs *prometheus.GaugeVec } +type TxnManagerMetrics struct { + Latency prometheus.Summary + GasUsed prometheus.Gauge + SpeedUps prometheus.Gauge +} + type Metrics struct { *EncodingStreamerMetrics + *TxnManagerMetrics registry *prometheus.Registry Blob *prometheus.CounterVec Batch *prometheus.CounterVec BatchProcLatency *prometheus.SummaryVec - GasUsed prometheus.Gauge Attestation *prometheus.GaugeVec BatchError *prometheus.CounterVec @@ -67,8 +73,34 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics { ), } + txnManagerMetrics := TxnManagerMetrics{ + Latency: promauto.With(reg).NewSummary( + prometheus.SummaryOpts{ + Namespace: namespace, + Name: "txn_manager_latency_ms", + Help: "transaction confirmation latency summary in milliseconds", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001}, + }, + ), + GasUsed: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "gas_used", + Help: "gas used for onchain batch confirmation", + }, + ), + SpeedUps: promauto.With(reg).NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Name: "speed_ups", + Help: "number of times the gas price was increased", + }, + ), + } + metrics := &Metrics{ EncodingStreamerMetrics: &encodingStreamerMetrics, + TxnManagerMetrics: &txnManagerMetrics, Blob: promauto.With(reg).NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -94,13 +126,6 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics { }, []string{"stage"}, ), - GasUsed: promauto.With(reg).NewGauge( - prometheus.GaugeOpts{ - Namespace: namespace, - Name: "gas_used", - Help: "gas used for onchain batch confirmation", - }, - ), Attestation: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, @@ -181,3 +206,15 @@ func (e *EncodingStreamerMetrics) UpdateEncodedBlobs(count int, size uint64) { e.EncodedBlobs.WithLabelValues("size").Set(float64(size)) e.EncodedBlobs.WithLabelValues("number").Set(float64(count)) } + +func (t *TxnManagerMetrics) ObserveLatency(latencyMs float64) { + t.Latency.Observe(latencyMs) +} + +func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) { + t.GasUsed.Set(float64(gasUsed)) +} + +func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) { + t.SpeedUps.Set(float64(speedUps)) +} diff --git a/disperser/batcher/txn_manager.go b/disperser/batcher/txn_manager.go index ee03c4b27..540be1ead 100644 --- a/disperser/batcher/txn_manager.go +++ b/disperser/batcher/txn_manager.go @@ -57,11 +57,12 @@ type txnManager struct { receiptChan chan *ReceiptOrErr queueSize int txnRefreshInterval time.Duration + metrics *TxnManagerMetrics } var _ TxnManager = (*txnManager)(nil) -func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger) TxnManager { +func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval time.Duration, logger common.Logger, metrics *TxnManagerMetrics) TxnManager { return &txnManager{ ethClient: ethClient, requestChan: make(chan *TxnRequest, queueSize), @@ -69,6 +70,7 @@ func NewTxnManager(ethClient common.EthClient, queueSize int, txnRefreshInterval receiptChan: make(chan *ReceiptOrErr, queueSize), queueSize: queueSize, txnRefreshInterval: txnRefreshInterval, + metrics: metrics, } } @@ -103,7 +105,11 @@ func (t *txnManager) Start(ctx context.Context) { Metadata: req.Metadata, Err: nil, } + if receipt.GasUsed > 0 { + t.metrics.UpdateGasUsed(receipt.GasUsed) + } } + t.metrics.ObserveLatency(float64(time.Since(req.requestedAt).Milliseconds())) } } }() @@ -143,6 +149,7 @@ func (t *txnManager) ReceiptChan() chan *ReceiptOrErr { // monitorTransaction monitors the transaction and resends it with a higher gas price if it is not mined without a timeout. // It returns an error if the transaction fails to be sent for reasons other than timeouts. func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*types.Receipt, error) { + numSpeedUps := 0 for { ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval) defer cancel() @@ -154,6 +161,7 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (* req.Tag, ) if err == nil { + t.metrics.UpdateSpeedUps(numSpeedUps) return receipt, nil } @@ -173,6 +181,7 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (* t.logger.Error("failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err) continue } + numSpeedUps++ } else { t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err) return nil, err diff --git a/disperser/batcher/txn_manager_test.go b/disperser/batcher/txn_manager_test.go index ca948019a..a241639a7 100644 --- a/disperser/batcher/txn_manager_test.go +++ b/disperser/batcher/txn_manager_test.go @@ -19,7 +19,8 @@ func TestProcessTransaction(t *testing.T) { ethClient := &mock.MockEthClient{} logger, err := logging.GetLogger(logging.DefaultCLIConfig()) assert.NoError(t, err) - txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger) + metrics := batcher.NewMetrics("9100", logger) + txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) @@ -68,7 +69,8 @@ func TestReplaceGasFee(t *testing.T) { ethClient := &mock.MockEthClient{} logger, err := logging.GetLogger(logging.DefaultCLIConfig()) assert.NoError(t, err) - txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger) + metrics := batcher.NewMetrics("9100", logger) + txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() txnManager.Start(ctx) diff --git a/disperser/cmd/batcher/main.go b/disperser/cmd/batcher/main.go index 1679d9e14..199221510 100644 --- a/disperser/cmd/batcher/main.go +++ b/disperser/cmd/batcher/main.go @@ -141,7 +141,7 @@ func RunBatcher(ctx *cli.Context) error { return err } finalizer := batcher.NewFinalizer(config.TimeoutConfig.ChainReadTimeout, config.BatcherConfig.FinalizerInterval, queue, client, rpcClient, config.BatcherConfig.MaxNumRetriesPerBlob, logger) - txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger) + txnManager := batcher.NewTxnManager(client, 20, config.TimeoutConfig.ChainWriteTimeout, logger, metrics.TxnManagerMetrics) batcher, err := batcher.NewBatcher(config.BatcherConfig, config.TimeoutConfig, queue, dispatcher, ics, asgn, encoderClient, agg, client, finalizer, tx, txnManager, logger, metrics) if err != nil { return err