Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2/cometbft,stf): Listener integration in server/v2 #21917

Merged
merged 8 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/log"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/client/grpc/cmtservice"
"cosmossdk.io/server/v2/cometbft/handlers"
Expand All @@ -40,6 +41,7 @@ type Consensus[T transaction.Tx] struct {
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]

Expand Down Expand Up @@ -104,6 +106,11 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like to add setters tbh, I feel like it pollutes the server instantiation in root.go
Additionally, Consensus is wired from the cometbft server and not manually.
Let's instead add it to the Consensus contructor if possible and have the CometBFT ServerOptions take a listener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of the separated setters is to make the listener a nil default since all nodes are not open to an off-chain indexer.

Copy link
Member

@julienrbrt julienrbrt Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, the option can be nil by default then. But a user won't have to call NewConsensus manually, so putting it in a setter isn't useful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When init the consensus from the config, if the indexer is set as on, then call the setter API.
I think it is more flexible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

works too, could you wire that in cometbft server.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can I do it in the next PR after baseapp integration? it seems not trivial, and have no idea of the indexer config, not exist???

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a TODO in cometbft/server.go after NewConsensus then?

func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.3
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1
Expand All @@ -46,7 +47,6 @@ require (
cosmossdk.io/depinject v1.0.0 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/math v1.3.0 // indirect
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc // indirect
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91 // indirect
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect
Expand Down
50 changes: 50 additions & 0 deletions server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/streaming"
)

Expand Down Expand Up @@ -57,6 +58,55 @@ func (c *Consensus[T]) streamDeliverBlockChanges(
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err)
}
}

if c.listener == nil {
return nil
}
// stream the StartBlockData to the listener.
if c.listener.StartBlock != nil {
if err := c.listener.StartBlock(appdata.StartBlockData{
Height: uint64(height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
// stream the TxData to the listener.
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
}
// stream the EventData to the listener.
if c.listener.OnEvent != nil {
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}
// stream the KVPairData to the listener.
if c.listener.OnKVPair != nil {
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
return err
}
}
// stream the CommitData to the listener.
if c.listener.Commit != nil {
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {
return err
}
}
}
Comment on lines +101 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to simplify control flow and enhance readability

The else block after a return statement is unnecessary and can be refactored for clarity. According to the Uber Go Style Guide, it's preferable to avoid else statements when the if block ends with a return.

Apply this diff to refactor the code:

-		if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
-			return err
-		} else if completionCallback != nil {
-			if err := completionCallback(); err != nil {
-				return err
-			}
-		}
+		completionCallback, err := c.listener.Commit(appdata.CommitData{})
+		if err != nil {
+			return err
+		}
+		if completionCallback != nil {
+			if err := completionCallback(); err != nil {
+				return err
+			}
+		}

This refactoring removes the unnecessary else and enhances the readability of the control flow.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {
return err
}
}
}
completionCallback, err := c.listener.Commit(appdata.CommitData{})
if err != nil {
return err
}
if completionCallback != nil {
if err := completionCallback(); err != nil {
return err
}
}
}


return nil
}

Expand Down
4 changes: 4 additions & 0 deletions server/v2/stf/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@ require (
cosmossdk.io/core v1.0.0-alpha.3
cosmossdk.io/schema v0.3.0
github.com/cosmos/gogoproto v1.7.0
github.com/stretchr/testify v1.8.4
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
github.com/tidwall/btree v1.7.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions server/v2/stf/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,21 @@ cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
github.com/cosmos/gogoproto v1.7.0 h1:79USr0oyXAbxg3rspGh/m4SWNyoz/GLaAh0QlCe2fro=
github.com/cosmos/gogoproto v1.7.0/go.mod h1:yWChEv5IUEYURQasfyBW5ffkMHR/90hiHgbNgrtp4j0=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
69 changes: 52 additions & 17 deletions server/v2/stf/stf.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func (s STF[T]) DeliverBlock(

// reset events
exCtx.events = make([]event.Event, 0)

// begin block
var beginBlockEvents []event.Event
if !block.IsGenesis {
Expand All @@ -147,7 +146,7 @@ func (s STF[T]) DeliverBlock(
if err = isCtxCancelled(ctx); err != nil {
return nil, nil, err
}
txResults[i] = s.deliverTx(exCtx, newState, txBytes, transaction.ExecModeFinalize, hi)
txResults[i] = s.deliverTx(exCtx, newState, txBytes, transaction.ExecModeFinalize, hi, int32(i+1))
}
// reset events
exCtx.events = make([]event.Event, 0)
Expand All @@ -173,6 +172,7 @@ func (s STF[T]) deliverTx(
tx T,
execMode transaction.ExecMode,
hi header.Info,
txIndex int32,
) server.TxResult {
// recover in the case of a panic
var recoveryError error
Expand All @@ -195,17 +195,32 @@ func (s STF[T]) deliverTx(
Error: recoveryError,
}
}

validateGas, validationEvents, err := s.validateTx(ctx, state, gasLimit, tx, execMode)
if err != nil {
return server.TxResult{
Error: err,
}
}
events := make([]event.Event, 0)
// set the event indexes, set MsgIndex to 0 in validation events
for i, e := range validationEvents {
e.BlockStage = appdata.TxProcessingStage
e.TxIndex = txIndex
e.MsgIndex = 0
e.EventIndex = int32(i + 1)
events = append(events, e)
}

execResp, execGas, execEvents, err := s.execTx(ctx, state, gasLimit-validateGas, tx, execMode, hi)
// set the TxIndex in the exec events
for _, e := range execEvents {
e.BlockStage = appdata.TxProcessingStage
e.TxIndex = txIndex
events = append(events, e)
}

return server.TxResult{
Events: append(validationEvents, execEvents...),
Events: events,
GasUsed: execGas + validateGas,
GasWanted: gasLimit,
Resp: execResp,
Expand Down Expand Up @@ -271,6 +286,12 @@ func (s STF[T]) execTx(
if applyErr != nil {
return nil, 0, nil, applyErr
}
// set the event indexes, set MsgIndex to -1 in post tx events
for i := range postTxCtx.events {
postTxCtx.events[i].EventIndex = int32(i + 1)
postTxCtx.events[i].MsgIndex = -1
}

return nil, gasUsed, postTxCtx.events, txErr
}
// tx execution went fine, now we use the same state to run the post tx exec handler,
Expand All @@ -290,6 +311,11 @@ func (s STF[T]) execTx(
if applyErr != nil {
return nil, 0, nil, applyErr
}
// set the event indexes, set MsgIndex to -1 in post tx events
for i := range postTxCtx.events {
postTxCtx.events[i].EventIndex = int32(i + 1)
postTxCtx.events[i].MsgIndex = -1
}

return msgsResp, gasUsed, append(runTxMsgsEvents, postTxCtx.events...), nil
}
Expand All @@ -316,17 +342,24 @@ func (s STF[T]) runTxMsgs(
execCtx := s.makeContext(ctx, RuntimeIdentity, state, execMode)
execCtx.setHeaderInfo(hi)
execCtx.setGasLimit(gasLimit)
events := make([]event.Event, 0)
for i, msg := range msgs {
execCtx.sender = txSenders[i]
execCtx.events = make([]event.Event, 0) // reset events
resp, err := s.msgRouter.Invoke(execCtx, msg)
if err != nil {
return nil, 0, nil, err // do not wrap the error or we lose the original error type
}
msgResps[i] = resp
for j, e := range execCtx.events {
e.MsgIndex = int32(i + 1)
e.EventIndex = int32(j + 1)
events = append(events, e)
}
}

consumed := execCtx.meter.Limit() - execCtx.meter.Remaining()
return msgResps, consumed, execCtx.events, nil
return msgResps, consumed, events, nil
}

// preBlock executes the pre block logic.
Expand All @@ -341,6 +374,7 @@ func (s STF[T]) preBlock(

for i := range ctx.events {
ctx.events[i].BlockStage = appdata.PreBlockStage
ctx.events[i].EventIndex = int32(i + 1)
}

return ctx.events, nil
Expand All @@ -357,6 +391,7 @@ func (s STF[T]) beginBlock(

for i := range ctx.events {
ctx.events[i].BlockStage = appdata.BeginBlockStage
ctx.events[i].EventIndex = int32(i + 1)
}

return ctx.events, nil
Expand All @@ -370,30 +405,30 @@ func (s STF[T]) endBlock(
if err != nil {
return nil, nil, err
}

events, valsetUpdates, err := s.validatorUpdates(ctx)
events := ctx.events
ctx.events = make([]event.Event, 0) // reset events
valsetUpdates, err := s.validatorUpdates(ctx)
if err != nil {
return nil, nil, err
}

ctx.events = append(ctx.events, events...)

for i := range ctx.events {
ctx.events[i].BlockStage = appdata.EndBlockStage
events = append(events, ctx.events...)
for i := range events {
events[i].BlockStage = appdata.EndBlockStage
events[i].EventIndex = int32(i + 1)
}

return ctx.events, valsetUpdates, nil
return events, valsetUpdates, nil
}

// validatorUpdates returns the validator updates for the current block. It is called by endBlock after the endblock execution has concluded
func (s STF[T]) validatorUpdates(
ctx *executionContext,
) ([]event.Event, []appmodulev2.ValidatorUpdate, error) {
) ([]appmodulev2.ValidatorUpdate, error) {
valSetUpdates, err := s.doValidatorUpdate(ctx)
if err != nil {
return nil, nil, err
return nil, err
}
return ctx.events, valSetUpdates, nil
return valSetUpdates, nil
}

// Simulate simulates the execution of a tx on the provided state.
Expand All @@ -408,7 +443,7 @@ func (s STF[T]) Simulate(
if err != nil {
return server.TxResult{}, nil
}
txr := s.deliverTx(ctx, simulationState, tx, internal.ExecModeSimulate, hi)
txr := s.deliverTx(ctx, simulationState, tx, internal.ExecModeSimulate, hi, 0)

return txr, simulationState
}
Expand Down
1 change: 0 additions & 1 deletion server/v2/stf/stf_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,5 @@ func (r coreRouterImpl) Invoke(ctx context.Context, req transaction.Msg) (res tr
if !exists {
return nil, fmt.Errorf("%w: %s", ErrNoHandler, typeName)
}

return handler(ctx, req)
}
Loading
Loading