Skip to content

Commit

Permalink
Merge branch 'master' into eth-account-allowlist
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph committed Jan 16, 2024
2 parents b5db323 + 08cd40e commit 3584557
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 18 deletions.
2 changes: 1 addition & 1 deletion churner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
FailReasonInsufficientStakeToChurn FailReason = "insufficient_stake_to_churn" // Operator doesn't have enough stake to be churned
FailReasonQuorumIdOutOfRange FailReason = "quorum_id_out_of_range" // Quorum ID out of range: quorum is not in the range of [0, QuorumCount]
FailReasonPrevApprovalNotExpired FailReason = "prev_approval_not_expired" // Expiry: previous approval hasn't expired
FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wong
FailReasonInvalidSignature FailReason = "invalid_signature" // Invalid signature: operator's signature is wrong
FailReasonProcessChurnRequestFailed FailReason = "failed_process_churn_request" // Failed to process churn request
FailReasonInvalidRequest FailReason = "invalid_request" // Invalid request: request is malformed
)
Expand Down
13 changes: 11 additions & 2 deletions common/aws/dynamodb/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func TestQueryIndexPaginationItemNoLimit(t *testing.T) {
ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()
requestedAt := time.Now().Add(-time.Duration(3*i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
Expand Down Expand Up @@ -409,7 +409,16 @@ func TestQueryIndexPagination(t *testing.T) {
ctx := context.Background()
numItems := 30
for i := 0; i < numItems; i += 1 {
requestedAt := time.Now().Add(-time.Duration(i) * time.Second).Unix()
// Noticed same timestamp for multiple items which resulted in key28
// being returned when 10 items were queried as first item,hence multiplying
// by random number 3 here to avoid such a situation
// requestedAt: 1705040877
// metadataKey: key28
// BlobKey: blob28
// requestedAt: 1705040877
// metadataKey: key29
// BlobKey: blob29
requestedAt := time.Now().Add(-time.Duration(3*i) * time.Second).Unix()

// Create new item
item := commondynamodb.Item{
Expand Down
4 changes: 2 additions & 2 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (c *EthClient) EnsureTransactionEvaled(ctx context.Context, tx *types.Trans
c.Logger.Error("Transaction Failed", "tag", tag, "txHash", tx.Hash().Hex(), "status", receipt.Status, "GasUsed", receipt.GasUsed)
return nil, ErrTransactionFailed
}
c.Logger.Trace("successfully submitted transaction", "txHash", tx.Hash().Hex(), "tag", tag, "gasUsed", receipt.GasUsed)
c.Logger.Trace("transaction confirmed", "txHash", tx.Hash().Hex(), "tag", tag, "gasUsed", receipt.GasUsed)
return receipt, nil
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func (c *EthClient) waitMined(ctx context.Context, tx *types.Transaction) (*type
}

if errors.Is(err, ethereum.NotFound) {
c.Logger.Trace("Transaction not yet mined")
c.Logger.Trace("Transaction not yet mined", "txHash", tx.Hash().Hex())
} else if err != nil {
c.Logger.Trace("Receipt retrieval failed", "err", err)
}
Expand Down
7 changes: 5 additions & 2 deletions common/mock/ethclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,11 @@ func (mock *MockEthClient) GetLatestGasCaps(ctx context.Context) (gasTipCap, gas

func (mock *MockEthClient) UpdateGas(ctx context.Context, tx *types.Transaction, value, gasTipCap, gasFeeCap *big.Int) (*types.Transaction, error) {
args := mock.Called()
result := args.Get(0)
return result.(*types.Transaction), args.Error(1)
var newTx *types.Transaction
if args.Get(0) != nil {
newTx = args.Get(0).(*types.Transaction)
}
return newTx, args.Error(1)
}

func (mock *MockEthClient) EstimateGasPriceAndLimitAndSendTx(ctx context.Context, tx *types.Transaction, tag string, value *big.Int) (*types.Receipt, error) {
Expand Down
25 changes: 25 additions & 0 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type TxnManagerMetrics struct {
Latency prometheus.Summary
GasUsed prometheus.Gauge
SpeedUps prometheus.Gauge
TxQueue prometheus.Gauge
NumTx *prometheus.CounterVec
}

type Metrics struct {
Expand Down Expand Up @@ -97,6 +99,21 @@ func NewMetrics(httpPort string, logger common.Logger) *Metrics {
Help: "number of times the gas price was increased",
},
),
TxQueue: promauto.With(reg).NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "tx_queue",
Help: "number of transactions in transaction queue",
},
),
NumTx: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Name: "tx_total",
Help: "number of transactions processed",
},
[]string{"state"},
),
}

metrics := &Metrics{
Expand Down Expand Up @@ -224,3 +241,11 @@ func (t *TxnManagerMetrics) UpdateGasUsed(gasUsed uint64) {
func (t *TxnManagerMetrics) UpdateSpeedUps(speedUps int) {
t.SpeedUps.Set(float64(speedUps))
}

func (t *TxnManagerMetrics) UpdateTxQueue(txQueue int) {
t.TxQueue.Set(float64(txQueue))
}

func (t *TxnManagerMetrics) IncrementTxnCount(state string) {
t.NumTx.WithLabelValues(state).Inc()
}
29 changes: 19 additions & 10 deletions disperser/batcher/txn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (t *txnManager) Start(ctx context.Context) {
func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) error {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Debug("[ProcessTransaction] new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
t.logger.Debug("[TxnManager] new transaction", "tag", req.Tag, "nonce", req.Tx.Nonce(), "gasFeeCap", req.Tx.GasFeeCap(), "gasTipCap", req.Tx.GasTipCap())
gasTipCap, gasFeeCap, err := t.ethClient.GetLatestGasCaps(ctx)
if err != nil {
return fmt.Errorf("failed to get latest gas caps: %w", err)
Expand All @@ -135,10 +135,13 @@ func (t *txnManager) ProcessTransaction(ctx context.Context, req *TxnRequest) er
err = t.ethClient.SendTransaction(ctx, txn)
if err != nil {
return fmt.Errorf("failed to send txn (%s) %s: %w", req.Tag, req.Tx.Hash().Hex(), err)
} else {
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", txn.Hash().Hex())
}
req.Tx = txn

t.requestChan <- req
t.metrics.UpdateTxQueue(len(t.requestChan))
return nil
}

Expand All @@ -154,37 +157,43 @@ func (t *txnManager) monitorTransaction(ctx context.Context, req *TxnRequest) (*
ctxWithTimeout, cancel := context.WithTimeout(ctx, t.txnRefreshInterval)
defer cancel()

t.logger.Debug("[monitorTransaction] monitoring transaction", "tag", req.Tag, "nonce", req.Tx.Nonce())
t.logger.Debug("[TxnManager] monitoring transaction", "txHash", req.Tx.Hash().Hex(), "tag", req.Tag, "nonce", req.Tx.Nonce())
receipt, err := t.ethClient.EnsureTransactionEvaled(
ctxWithTimeout,
req.Tx,
req.Tag,
)
if err == nil {
t.metrics.UpdateSpeedUps(numSpeedUps)
t.metrics.IncrementTxnCount("success")
return receipt, nil
}

if errors.Is(err, context.DeadlineExceeded) {
if receipt != nil {
t.logger.Warn("transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("[TxnManager] transaction has been mined, but hasn't accumulated the required number of confirmations", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
continue
}
t.logger.Warn("transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
t.logger.Warn("[TxnManager] transaction not mined within timeout, resending with higher gas price", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "nonce", req.Tx.Nonce())
newTx, err := t.speedUpTxn(ctx, req.Tx, req.Tag)
if err != nil {
t.logger.Error("failed to speed up transaction", "err", err)
continue
t.logger.Error("[TxnManager] failed to speed up transaction", "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
err = t.ethClient.SendTransaction(ctx, newTx)
if err != nil {
t.logger.Error("failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
continue
t.logger.Error("[TxnManager] failed to send txn", "tag", req.Tag, "txn", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
} else {
t.logger.Debug("[TxnManager] successfully sent txn", "tag", req.Tag, "txn", newTx.Hash().Hex())
}
req.Tx = newTx
numSpeedUps++
} else {
t.logger.Error("transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.logger.Error("[TxnManager] transaction failed", "tag", req.Tag, "txHash", req.Tx.Hash().Hex(), "err", err)
t.metrics.IncrementTxnCount("failure")
return nil, err
}
}
Expand Down Expand Up @@ -215,7 +224,7 @@ func (t *txnManager) speedUpTxn(ctx context.Context, tx *types.Transaction, tag
newGasFeeCap = increasedGasFeeCap
}

t.logger.Debug("[speedUpTxn] increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
t.logger.Info("[TxnManager] increasing gas price", "tag", tag, "txHash", tx.Hash().Hex(), "nonce", tx.Nonce(), "prevGasTipCap", prevGasTipCap, "prevGasFeeCap", prevGasFeeCap, "newGasTipCap", newGasTipCap, "newGasFeeCap", newGasFeeCap)
return t.ethClient.UpdateGas(ctx, tx, tx.Value(), newGasTipCap, newGasFeeCap)
}

Expand Down
33 changes: 33 additions & 0 deletions disperser/batcher/txn_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,36 @@ func TestReplaceGasFee(t *testing.T) {
ethClient.AssertNumberOfCalls(t, "SendTransaction", 2)
ethClient.AssertNumberOfCalls(t, "EnsureTransactionEvaled", 2)
}

func TestTransactionFailure(t *testing.T) {
ethClient := &mock.MockEthClient{}
logger, err := logging.GetLogger(logging.DefaultCLIConfig())
assert.NoError(t, err)
metrics := batcher.NewMetrics("9100", logger)
txnManager := batcher.NewTxnManager(ethClient, 5, 48*time.Second, logger, metrics.TxnManagerMetrics)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
defer cancel()
txnManager.Start(ctx)
txn := types.NewTransaction(0, common.HexToAddress("0x1"), big.NewInt(1e18), 100000, big.NewInt(1e9), []byte{})
ethClient.On("GetLatestGasCaps").Return(big.NewInt(1e9), big.NewInt(1e9), nil)
ethClient.On("UpdateGas").Return(txn, nil).Once()
// now assume that the transaction fails on retry
speedUpFailure := fmt.Errorf("speed up failure")
ethClient.On("UpdateGas").Return(nil, speedUpFailure).Once()
ethClient.On("SendTransaction").Return(nil)
// assume that the transaction is not mined within the timeout
ethClient.On("EnsureTransactionEvaled").Return(nil, context.DeadlineExceeded).Once()
ethClient.On("EnsureTransactionEvaled").Return(&types.Receipt{
BlockNumber: new(big.Int).SetUint64(1),
}, nil)

err = txnManager.ProcessTransaction(ctx, &batcher.TxnRequest{
Tx: txn,
Tag: "test transaction",
Value: nil,
})
<-ctx.Done()
assert.NoError(t, err)
res := <-txnManager.ReceiptChan()
assert.Error(t, res.Err, speedUpFailure)
}
2 changes: 1 addition & 1 deletion disperser/cmd/batcher/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ var (
EnvVar: common.PrefixEnvVar(envVarPrefix, "TARGET_NUM_CHUNKS"),
Value: 0,
}

MaxBlobsToFetchFromStoreFlag = cli.IntFlag{
Name: common.PrefixFlag(FlagPrefix, "max-blobs-to-fetch-from-store"),
Usage: "Limit used to specify how many blobs to fetch from store at time when used with dynamodb pagination",
Expand Down Expand Up @@ -200,6 +199,7 @@ var optionalFlags = []cli.Flag{
EncodingRequestQueueSizeFlag,
MaxNumRetriesPerBlobFlag,
TargetNumChunksFlag,
MaxBlobsToFetchFromStoreFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down

0 comments on commit 3584557

Please sign in to comment.