Skip to content

Commit

Permalink
refactor(mempool)!: match server/v2/cometbft and sdk mempool interfac…
Browse files Browse the repository at this point in the history
…e (backport #21744) (#21803)

Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
mergify[bot] and julienrbrt authored Sep 18, 2024
1 parent fea175a commit 93959d6
Show file tree
Hide file tree
Showing 13 changed files with 54 additions and 34 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
* (runtime) [#21704](https://github.com/cosmos/cosmos-sdk/pull/21704) Move `upgradetypes.StoreLoader` to runtime and alias it in upgrade for backward compatibility.
* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules

### API Breaking Changes

* (types/mempool) [#21744](https://github.com/cosmos/cosmos-sdk/pull/21744) Update types/mempool.Mempool interface to take decoded transactions. This avoid to decode the transaction twice.

## [v0.52.0](https://github.com/cosmos/cosmos-sdk/releases/tag/v0.52.0) - 2024-XX-XX

Every module contains its own CHANGELOG.md. Please refer to the module you are interested in.
Expand Down
22 changes: 14 additions & 8 deletions baseapp/abci_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,25 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan

defer h.txSelector.Clear()

// decode transactions
decodedTxs := make([]sdk.Tx, len(req.Txs))
for i, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

decodedTxs[i] = tx
}

// If the mempool is nil or NoOp we simply return the transactions
// requested from CometBFT, which, by default, should be in FIFO order.
//
// Note, we still need to ensure the transactions returned respect req.MaxTxBytes.
_, isNoOp := h.mempool.(mempool.NoOpMempool)
if h.mempool == nil || isNoOp {
for _, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
for i, tx := range decodedTxs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, req.Txs[i])
if stop {
break
}
Expand All @@ -291,7 +297,7 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
selectedTxsNums int
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
)
h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool {
h.mempool.SelectBy(ctx, decodedTxs, func(memTx sdk.Tx) bool {
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
isUnordered := ok && unorderedTx.GetUnordered()
txSignersSeqs := make(map[string]uint64)
Expand Down
1 change: 1 addition & 0 deletions baseapp/abci_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func (s *ABCIUtilsTestSuite) TestDefaultProposalHandler_PriorityNonceMempoolTxSe
ph := baseapp.NewDefaultProposalHandler(mp, app)

for _, v := range tc.txInputs {
app.EXPECT().TxDecode(v.bz).Return(v.tx, nil).AnyTimes()
app.EXPECT().PrepareProposalVerifyTx(v.tx).Return(v.bz, nil).AnyTimes()
s.NoError(mp.Insert(s.ctx.WithPriority(v.priority), v.tx))
tc.req.Txs = append(tc.req.Txs, v.bz)
Expand Down
7 changes: 4 additions & 3 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ func (c *Consensus[T]) FinalizeBlock(
}

// remove txs from the mempool
err = c.mempool.Remove(decodedTxs)
if err != nil {
return nil, fmt.Errorf("unable to remove txs: %w", err)
for _, tx := range decodedTxs {
if err = c.mempool.Remove(tx); err != nil {
return nil, fmt.Errorf("unable to remove tx: %w", err)
}
}

c.lastCommittedHeight.Store(req.Height)
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/handlers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
// check again.
_, err := app.ValidateTx(ctx, memTx)
if err != nil {
err := h.mempool.Remove([]T{memTx})
err := h.mempool.Remove(memTx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion server/v2/cometbft/internal/mock/mock_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ type MockMempool[T transaction.Tx] struct{}

func (MockMempool[T]) Insert(context.Context, T) error { return nil }
func (MockMempool[T]) Select(context.Context, []T) mempool.Iterator[T] { return nil }
func (MockMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (MockMempool[T]) CountTx() int { return 0 }
func (MockMempool[T]) Remove([]T) error { return nil }
func (MockMempool[T]) Remove(T) error { return nil }
11 changes: 8 additions & 3 deletions server/v2/cometbft/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ type Mempool[T transaction.Tx] interface {
Insert(context.Context, T) error

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator must be
// closed by the caller.
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, []T) Iterator[T]

// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, []T, func(T) bool)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int

// Remove attempts to remove a transaction from the mempool, returning an error
// upon failure.
Remove([]T) error
Remove(T) error
}

// Iterator defines an app-side mempool iterator interface that is as minimal as
Expand Down
12 changes: 8 additions & 4 deletions server/v2/cometbft/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"

"cosmossdk.io/core/transaction"

sdk "github.com/cosmos/cosmos-sdk/types"
)

var _ Mempool[sdk.Tx] = (*NoOpMempool[sdk.Tx])(nil) // verify interface at compile time
var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)

// NoOpMempool defines a no-op mempool. Transactions are completely discarded and
Expand All @@ -16,7 +19,8 @@ var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)
// is FIFO-ordered by default.
type NoOpMempool[T transaction.Tx] struct{}

func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove([]T) error { return nil }
func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove(T) error { return nil }
6 changes: 2 additions & 4 deletions simapp/v2/simdv2/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ func initCometConfig() cometbft.CfgOption {
func initCometOptions[T transaction.Tx]() cometbft.ServerOptions[T] {
serverOptions := cometbft.DefaultServerOptions[T]()

// TODO mempool interface doesn't match!

// overwrite app mempool, using max-txs option
// serverOptions.Mempool = func(cfg map[string]any) mempool.Mempool[T] {
// if maxTxs := cast.ToInt(cfg[cometbft.FlagMempoolMaxTxs]); maxTxs >= 0 {
// return mempool.NewSenderNonceMempool(
// mempool.SenderNonceMaxTxOpt(maxTxs),
// return sdkmempool.NewSenderNonceMempool(
// sdkmempool.SenderNonceMaxTxOpt(maxTxs),
// )
// }

Expand Down
4 changes: 2 additions & 2 deletions types/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type Mempool interface {

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, [][]byte) Iterator
Select(context.Context, []sdk.Tx) Iterator

// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, [][]byte, func(sdk.Tx) bool)
SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int
Expand Down
4 changes: 2 additions & 2 deletions types/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ Mempool = (*NoOpMempool)(nil)
type NoOpMempool struct{}

func (NoOpMempool) Insert(context.Context, sdk.Tx) error { return nil }
func (NoOpMempool) Select(context.Context, [][]byte) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, [][]byte, func(sdk.Tx) bool) {}
func (NoOpMempool) Select(context.Context, []sdk.Tx) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool) {}
func (NoOpMempool) CountTx() int { return 0 }
func (NoOpMempool) Remove(sdk.Tx) error { return nil }
6 changes: 3 additions & 3 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,13 @@ func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs []sdk.Tx) Iterator {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.doSelect(ctx, txs)
}

func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
Expand All @@ -382,7 +382,7 @@ func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Itera
}

// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
mp.mtx.Lock()
defer mp.mtx.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (snm *SenderNonceMempool) Select(ctx context.Context, txs [][]byte) Iterator {
func (snm *SenderNonceMempool) Select(ctx context.Context, txs []sdk.Tx) Iterator {
snm.mtx.Lock()
defer snm.mtx.Unlock()
return snm.doSelect(ctx, txs)
}

func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator {
func (snm *SenderNonceMempool) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
var senders []string

senderCursors := make(map[string]*skiplist.Element)
Expand Down Expand Up @@ -202,7 +202,7 @@ func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator
}

// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
snm.mtx.Lock()
defer snm.mtx.Unlock()

Expand Down

0 comments on commit 93959d6

Please sign in to comment.