Skip to content

Commit

Permalink
Create a dynamodb exporter on batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
dmanc committed Jan 18, 2024
1 parent 9d4be86 commit 34620fc
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 11 deletions.
2 changes: 1 addition & 1 deletion disperser/batcher/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func makeBatcher(t *testing.T) (*batcherComponents, *bat.Batcher) {
ChainWriteTimeout: 10 * time.Second,
}

metrics := bat.NewMetrics("9100", logger)
metrics := bat.NewMetrics(nil, "9100", logger)

encoderClient := disperser.NewLocalEncoderClient(enc)
finalizer := batchermock.NewFinalizer()
Expand Down
6 changes: 3 additions & 3 deletions disperser/batcher/encoding_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func createEncodingStreamer(t *testing.T, initialBlockNumber uint, batchThreshol
asgn := &core.StdAssignmentCoordinator{}
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), batchThreshold)
workerpool := workerpool.New(5)
metrics := batcher.NewMetrics("9100", logger)
metrics := batcher.NewMetrics(nil, "9100", logger)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger)
assert.Nil(t, err)
encodingStreamer.ReferenceBlockNumber = initialBlockNumber
Expand All @@ -69,7 +69,7 @@ func TestEncodingQueueLimit(t *testing.T) {
asgn := &core.StdAssignmentCoordinator{}
sizeNotifier := batcher.NewEncodedSizeNotifier(make(chan struct{}, 1), 100000)
pool := &cmock.MockWorkerpool{}
metrics := batcher.NewMetrics("9100", logger)
metrics := batcher.NewMetrics(nil, "9100", logger)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, pool, metrics.EncodingStreamerMetrics, logger)
assert.Nil(t, err)
encodingStreamer.ReferenceBlockNumber = 10
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestEncodingFailure(t *testing.T) {
EncodingQueueLimit: 100,
MaxBlobsToFetchFromStore: 10,
}
metrics := batcher.NewMetrics("9100", logger)
metrics := batcher.NewMetrics(nil, "9100", logger)
encodingStreamer, err := batcher.NewEncodingStreamer(streamerConfig, blobStore, cst, encoderClient, asgn, sizeNotifier, workerpool, metrics.EncodingStreamerMetrics, logger)
assert.Nil(t, err)
encodingStreamer.ReferenceBlockNumber = 10
Expand Down
44 changes: 42 additions & 2 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -59,12 +60,12 @@ type Metrics struct {
logger common.Logger
}

func NewMetrics(httpPort string, logger common.Logger) *Metrics {
func NewMetrics(blobMetadataStore *blobstore.BlobMetadataStore, httpPort string, logger common.Logger) *Metrics {
namespace := "eigenda_batcher"
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

reg.MustRegister(NewDynamoDBCollector(blobMetadataStore, logger))
encodingStreamerMetrics := EncodingStreamerMetrics{
EncodedBlobs: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -249,3 +250,42 @@ func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}

type DynamoDBCollector struct {
blobMetadataStore *blobstore.BlobMetadataStore
blobStatusMetric *prometheus.Desc
logger common.Logger
}

func NewDynamoDBCollector(blobMetadataStore *blobstore.BlobMetadataStore, logger common.Logger) *DynamoDBCollector {
return &DynamoDBCollector{
blobMetadataStore: blobMetadataStore,
blobStatusMetric: prometheus.NewDesc("dynamodb_blob_metadata_status_count",
"Number of blobs with specific status in DynamoDB",
[]string{"status"},
nil,
),
logger: logger,
}
}

func (collector *DynamoDBCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- collector.blobStatusMetric
}

func (collector *DynamoDBCollector) Collect(ch chan<- prometheus.Metric) {
metadatas, err := collector.blobMetadataStore.GetBlobMetadataByStatus(context.Background(), disperser.Processing)
if err != nil {
collector.logger.Error("failed to get blob metadata by status", "err", err)
return
}

count := len(metadatas)

ch <- prometheus.MustNewConstMetric(
collector.blobStatusMetric,
prometheus.GaugeValue,
float64(count),
disperser.Processing.String(),
)
}
6 changes: 3 additions & 3 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestProcessTransaction(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
metrics := batcher.NewMetrics(nil, "9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestReplaceGasFee(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
metrics := batcher.NewMetrics(nil, "9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestTransactionFailure(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
metrics := batcher.NewMetrics(nil, "9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func RunBatcher(ctx *cli.Context) error {
}
}

metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)
metrics := batcher.NewMetrics(blobMetadataStore, config.MetricsConfig.HTTPPort, logger)

if len(config.BatcherConfig.EncoderSocket) == 0 {
return fmt.Errorf("encoder socket must be specified")
Expand Down
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
finalizer := batchermock.NewFinalizer()

disperserMetrics := disperser.NewMetrics("9100", logger)
batcherMetrics := batcher.NewMetrics("9100", logger)
batcherMetrics := batcher.NewMetrics(nil, "9100", logger)
txnManager := batchermock.NewTxnManager()

batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics)
Expand Down

0 comments on commit 34620fc

Please sign in to comment.