Skip to content

Commit

Permalink
refactor(mempool)!: match server/v2/cometbft and sdk mempool interface (
Browse files Browse the repository at this point in the history
#21744)

Co-authored-by: Marko <[email protected]>
  • Loading branch information
julienrbrt and tac0turtle authored Sep 18, 2024
1 parent f1dd03f commit 356df96
Show file tree
Hide file tree
Showing 14 changed files with 53 additions and 36 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i
### Improvements

* (genutil) [#21701](https://github.com/cosmos/cosmos-sdk/pull/21701) Improved error messages for genesis validation.
* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules

### Bug Fixes

Expand All @@ -58,7 +59,7 @@ Every module contains its own CHANGELOG.md. Please refer to the module you are i

### API Breaking Changes

* (sims)[#21613](https://github.com/cosmos/cosmos-sdk/pull/21613) Add sims2 framework and factory methods for simpler message factories in modules
* (types/mempool) [#21744](https://github.com/cosmos/cosmos-sdk/pull/21744) Update types/mempool.Mempool interface to take decoded transactions. This avoid to decode the transaction twice.

### Deprecated

Expand Down
22 changes: 14 additions & 8 deletions baseapp/abci_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,25 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan

defer h.txSelector.Clear()

// decode transactions
decodedTxs := make([]sdk.Tx, len(req.Txs))
for i, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

decodedTxs[i] = tx
}

// If the mempool is nil or NoOp we simply return the transactions
// requested from CometBFT, which, by default, should be in FIFO order.
//
// Note, we still need to ensure the transactions returned respect req.MaxTxBytes.
_, isNoOp := h.mempool.(mempool.NoOpMempool)
if h.mempool == nil || isNoOp {
for _, txBz := range req.Txs {
tx, err := h.txVerifier.TxDecode(txBz)
if err != nil {
return nil, err
}

stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
for i, tx := range decodedTxs {
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, req.Txs[i])
if stop {
break
}
Expand All @@ -291,7 +297,7 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
selectedTxsNums int
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
)
h.mempool.SelectBy(ctx, req.Txs, func(memTx sdk.Tx) bool {
h.mempool.SelectBy(ctx, decodedTxs, func(memTx sdk.Tx) bool {
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
isUnordered := ok && unorderedTx.GetUnordered()
txSignersSeqs := make(map[string]uint64)
Expand Down
1 change: 1 addition & 0 deletions baseapp/abci_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ func (s *ABCIUtilsTestSuite) TestDefaultProposalHandler_PriorityNonceMempoolTxSe
ph := baseapp.NewDefaultProposalHandler(mp, app)

for _, v := range tc.txInputs {
app.EXPECT().TxDecode(v.bz).Return(v.tx, nil).AnyTimes()
app.EXPECT().PrepareProposalVerifyTx(v.tx).Return(v.bz, nil).AnyTimes()
s.NoError(mp.Insert(s.ctx.WithPriority(v.priority), v.tx))
tc.req.Txs = append(tc.req.Txs, v.bz)
Expand Down
7 changes: 4 additions & 3 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ func (c *Consensus[T]) FinalizeBlock(
}

// remove txs from the mempool
err = c.mempool.Remove(decodedTxs)
if err != nil {
return nil, fmt.Errorf("unable to remove txs: %w", err)
for _, tx := range decodedTxs {
if err = c.mempool.Remove(tx); err != nil {
return nil, fmt.Errorf("unable to remove tx: %w", err)
}
}

c.lastCommittedHeight.Store(req.Height)
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/handlers/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (h *DefaultProposalHandler[T]) PrepareHandler() PrepareHandler[T] {
// check again.
_, err := app.ValidateTx(ctx, memTx)
if err != nil {
err := h.mempool.Remove([]T{memTx})
err := h.mempool.Remove(memTx)
if err != nil && !errors.Is(err, mempool.ErrTxNotFound) {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion server/v2/cometbft/internal/mock/mock_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ type MockMempool[T transaction.Tx] struct{}

func (MockMempool[T]) Insert(context.Context, T) error { return nil }
func (MockMempool[T]) Select(context.Context, []T) mempool.Iterator[T] { return nil }
func (MockMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (MockMempool[T]) CountTx() int { return 0 }
func (MockMempool[T]) Remove([]T) error { return nil }
func (MockMempool[T]) Remove(T) error { return nil }
11 changes: 8 additions & 3 deletions server/v2/cometbft/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@ type Mempool[T transaction.Tx] interface {
Insert(context.Context, T) error

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator must be
// closed by the caller.
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, []T) Iterator[T]

// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, []T, func(T) bool)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int

// Remove attempts to remove a transaction from the mempool, returning an error
// upon failure.
Remove([]T) error
Remove(T) error
}

// Iterator defines an app-side mempool iterator interface that is as minimal as
Expand Down
12 changes: 8 additions & 4 deletions server/v2/cometbft/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"

"cosmossdk.io/core/transaction"

sdk "github.com/cosmos/cosmos-sdk/types"
)

var _ Mempool[sdk.Tx] = (*NoOpMempool[sdk.Tx])(nil) // verify interface at compile time
var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)

// NoOpMempool defines a no-op mempool. Transactions are completely discarded and
Expand All @@ -16,7 +19,8 @@ var _ Mempool[transaction.Tx] = (*NoOpMempool[transaction.Tx])(nil)
// is FIFO-ordered by default.
type NoOpMempool[T transaction.Tx] struct{}

func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove([]T) error { return nil }
func (NoOpMempool[T]) Insert(context.Context, T) error { return nil }
func (NoOpMempool[T]) Select(context.Context, []T) Iterator[T] { return nil }
func (NoOpMempool[T]) SelectBy(context.Context, []T, func(T) bool) {}
func (NoOpMempool[T]) CountTx() int { return 0 }
func (NoOpMempool[T]) Remove(T) error { return nil }
2 changes: 1 addition & 1 deletion simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/cosmos/cosmos-db v1.0.3-0.20240911104526-ddc3f09bfc22 // indirect
// this version is not used as it is always replaced by the latest Cosmos SDK version
github.com/cosmos/cosmos-sdk v0.53.0
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
Expand Down Expand Up @@ -194,7 +195,6 @@ require (
github.com/sasha-s/go-deadlock v0.3.5 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.13 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
Expand Down
6 changes: 2 additions & 4 deletions simapp/v2/simdv2/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,11 @@ func initCometConfig() cometbft.CfgOption {
func initCometOptions[T transaction.Tx]() cometbft.ServerOptions[T] {
serverOptions := cometbft.DefaultServerOptions[T]()

// TODO mempool interface doesn't match!

// overwrite app mempool, using max-txs option
// serverOptions.Mempool = func(cfg map[string]any) mempool.Mempool[T] {
// if maxTxs := cast.ToInt(cfg[cometbft.FlagMempoolMaxTxs]); maxTxs >= 0 {
// return mempool.NewSenderNonceMempool(
// mempool.SenderNonceMaxTxOpt(maxTxs),
// return sdkmempool.NewSenderNonceMempool(
// sdkmempool.SenderNonceMaxTxOpt(maxTxs),
// )
// }

Expand Down
4 changes: 2 additions & 2 deletions types/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ type Mempool interface {

// Select returns an Iterator over the app-side mempool. If txs are specified,
// then they shall be incorporated into the Iterator. The Iterator is not thread-safe to use.
Select(context.Context, [][]byte) Iterator
Select(context.Context, []sdk.Tx) Iterator

// SelectBy use callback to iterate over the mempool, it's thread-safe to use.
SelectBy(context.Context, [][]byte, func(sdk.Tx) bool)
SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool)

// CountTx returns the number of transactions currently in the mempool.
CountTx() int
Expand Down
4 changes: 2 additions & 2 deletions types/mempool/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ Mempool = (*NoOpMempool)(nil)
type NoOpMempool struct{}

func (NoOpMempool) Insert(context.Context, sdk.Tx) error { return nil }
func (NoOpMempool) Select(context.Context, [][]byte) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, [][]byte, func(sdk.Tx) bool) {}
func (NoOpMempool) Select(context.Context, []sdk.Tx) Iterator { return nil }
func (NoOpMempool) SelectBy(context.Context, []sdk.Tx, func(sdk.Tx) bool) {}
func (NoOpMempool) CountTx() int { return 0 }
func (NoOpMempool) Remove(sdk.Tx) error { return nil }
6 changes: 3 additions & 3 deletions types/mempool/priority_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,13 @@ func (i *PriorityNonceIterator[C]) Tx() sdk.Tx {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) Select(ctx context.Context, txs []sdk.Tx) Iterator {
mp.mtx.Lock()
defer mp.mtx.Unlock()
return mp.doSelect(ctx, txs)
}

func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Iterator {
func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
if mp.priorityIndex.Len() == 0 {
return nil
}
Expand All @@ -383,7 +383,7 @@ func (mp *PriorityNonceMempool[C]) doSelect(_ context.Context, _ [][]byte) Itera
}

// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (mp *PriorityNonceMempool[C]) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
mp.mtx.Lock()
defer mp.mtx.Unlock()

Expand Down
6 changes: 3 additions & 3 deletions types/mempool/sender_nonce.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,13 @@ func (snm *SenderNonceMempool) Insert(_ context.Context, tx sdk.Tx) error {
//
// NOTE: It is not safe to use this iterator while removing transactions from
// the underlying mempool.
func (snm *SenderNonceMempool) Select(ctx context.Context, txs [][]byte) Iterator {
func (snm *SenderNonceMempool) Select(ctx context.Context, txs []sdk.Tx) Iterator {
snm.mtx.Lock()
defer snm.mtx.Unlock()
return snm.doSelect(ctx, txs)
}

func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator {
func (snm *SenderNonceMempool) doSelect(_ context.Context, _ []sdk.Tx) Iterator {
var senders []string

senderCursors := make(map[string]*skiplist.Element)
Expand Down Expand Up @@ -202,7 +202,7 @@ func (snm *SenderNonceMempool) doSelect(_ context.Context, _ [][]byte) Iterator
}

// SelectBy will hold the mutex during the iteration, callback returns if continue.
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs [][]byte, callback func(sdk.Tx) bool) {
func (snm *SenderNonceMempool) SelectBy(ctx context.Context, txs []sdk.Tx, callback func(sdk.Tx) bool) {
snm.mtx.Lock()
defer snm.mtx.Unlock()

Expand Down

0 comments on commit 356df96

Please sign in to comment.