Skip to content

Commit

Permalink
feat(server/v2/cometbft,stf): Listener integration in server/v2 (#21917)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Oct 2, 2024
1 parent deec679 commit bf95c81
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 22 deletions.
7 changes: 7 additions & 0 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"cosmossdk.io/core/transaction"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/log"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/cometbft/client/grpc/cmtservice"
"cosmossdk.io/server/v2/cometbft/handlers"
Expand All @@ -40,6 +41,7 @@ type Consensus[T transaction.Tx] struct {
txCodec transaction.Codec[T]
store types.Store
streaming streaming.Manager
listener *appdata.Listener
snapshotManager *snapshots.Manager
mempool mempool.Mempool[T]

Expand Down Expand Up @@ -104,6 +106,11 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1
Expand All @@ -45,7 +46,6 @@ require (
cosmossdk.io/depinject v1.0.0 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/math v1.3.0 // indirect
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc // indirect
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91 // indirect
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect
Expand Down
50 changes: 50 additions & 0 deletions server/v2/cometbft/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/store"
errorsmod "cosmossdk.io/errors/v2"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/server/v2/streaming"
)

Expand Down Expand Up @@ -57,6 +58,55 @@ func (c *Consensus[T]) streamDeliverBlockChanges(
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err)
}
}

if c.listener == nil {
return nil
}
// stream the StartBlockData to the listener.
if c.listener.StartBlock != nil {
if err := c.listener.StartBlock(appdata.StartBlockData{
Height: uint64(height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
// stream the TxData to the listener.
if c.listener.OnTx != nil {
for i, tx := range txs {
if err := c.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
}
// stream the EventData to the listener.
if c.listener.OnEvent != nil {
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}
// stream the KVPairData to the listener.
if c.listener.OnKVPair != nil {
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil {
return err
}
}
// stream the CommitData to the listener.
if c.listener.Commit != nil {
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
return err
} else if completionCallback != nil {
if err := completionCallback(); err != nil {
return err
}
}
}

return nil
}

Expand Down
69 changes: 52 additions & 17 deletions server/v2/stf/stf.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func (s STF[T]) DeliverBlock(

// reset events
exCtx.events = make([]event.Event, 0)

// begin block
var beginBlockEvents []event.Event
if !block.IsGenesis {
Expand All @@ -147,7 +146,7 @@ func (s STF[T]) DeliverBlock(
if err = isCtxCancelled(ctx); err != nil {
return nil, nil, err
}
txResults[i] = s.deliverTx(exCtx, newState, txBytes, transaction.ExecModeFinalize, hi)
txResults[i] = s.deliverTx(exCtx, newState, txBytes, transaction.ExecModeFinalize, hi, int32(i+1))
}
// reset events
exCtx.events = make([]event.Event, 0)
Expand All @@ -173,6 +172,7 @@ func (s STF[T]) deliverTx(
tx T,
execMode transaction.ExecMode,
hi header.Info,
txIndex int32,
) server.TxResult {
// recover in the case of a panic
var recoveryError error
Expand All @@ -195,17 +195,32 @@ func (s STF[T]) deliverTx(
Error: recoveryError,
}
}

validateGas, validationEvents, err := s.validateTx(ctx, state, gasLimit, tx, execMode)
if err != nil {
return server.TxResult{
Error: err,
}
}
events := make([]event.Event, 0)
// set the event indexes, set MsgIndex to 0 in validation events
for i, e := range validationEvents {
e.BlockStage = appdata.TxProcessingStage
e.TxIndex = txIndex
e.MsgIndex = 0
e.EventIndex = int32(i + 1)
events = append(events, e)
}

execResp, execGas, execEvents, err := s.execTx(ctx, state, gasLimit-validateGas, tx, execMode, hi)
// set the TxIndex in the exec events
for _, e := range execEvents {
e.BlockStage = appdata.TxProcessingStage
e.TxIndex = txIndex
events = append(events, e)
}

return server.TxResult{
Events: append(validationEvents, execEvents...),
Events: events,
GasUsed: execGas + validateGas,
GasWanted: gasLimit,
Resp: execResp,
Expand Down Expand Up @@ -271,6 +286,12 @@ func (s STF[T]) execTx(
if applyErr != nil {
return nil, 0, nil, applyErr
}
// set the event indexes, set MsgIndex to -1 in post tx events
for i := range postTxCtx.events {
postTxCtx.events[i].EventIndex = int32(i + 1)
postTxCtx.events[i].MsgIndex = -1
}

return nil, gasUsed, postTxCtx.events, txErr
}
// tx execution went fine, now we use the same state to run the post tx exec handler,
Expand All @@ -290,6 +311,11 @@ func (s STF[T]) execTx(
if applyErr != nil {
return nil, 0, nil, applyErr
}
// set the event indexes, set MsgIndex to -1 in post tx events
for i := range postTxCtx.events {
postTxCtx.events[i].EventIndex = int32(i + 1)
postTxCtx.events[i].MsgIndex = -1
}

return msgsResp, gasUsed, append(runTxMsgsEvents, postTxCtx.events...), nil
}
Expand All @@ -316,17 +342,24 @@ func (s STF[T]) runTxMsgs(
execCtx := s.makeContext(ctx, RuntimeIdentity, state, execMode)
execCtx.setHeaderInfo(hi)
execCtx.setGasLimit(gasLimit)
events := make([]event.Event, 0)
for i, msg := range msgs {
execCtx.sender = txSenders[i]
execCtx.events = make([]event.Event, 0) // reset events
resp, err := s.msgRouter.Invoke(execCtx, msg)
if err != nil {
return nil, 0, nil, err // do not wrap the error or we lose the original error type
}
msgResps[i] = resp
for j, e := range execCtx.events {
e.MsgIndex = int32(i + 1)
e.EventIndex = int32(j + 1)
events = append(events, e)
}
}

consumed := execCtx.meter.Limit() - execCtx.meter.Remaining()
return msgResps, consumed, execCtx.events, nil
return msgResps, consumed, events, nil
}

// preBlock executes the pre block logic.
Expand All @@ -341,6 +374,7 @@ func (s STF[T]) preBlock(

for i := range ctx.events {
ctx.events[i].BlockStage = appdata.PreBlockStage
ctx.events[i].EventIndex = int32(i + 1)
}

return ctx.events, nil
Expand All @@ -357,6 +391,7 @@ func (s STF[T]) beginBlock(

for i := range ctx.events {
ctx.events[i].BlockStage = appdata.BeginBlockStage
ctx.events[i].EventIndex = int32(i + 1)
}

return ctx.events, nil
Expand All @@ -370,30 +405,30 @@ func (s STF[T]) endBlock(
if err != nil {
return nil, nil, err
}

events, valsetUpdates, err := s.validatorUpdates(ctx)
events := ctx.events
ctx.events = make([]event.Event, 0) // reset events
valsetUpdates, err := s.validatorUpdates(ctx)
if err != nil {
return nil, nil, err
}

ctx.events = append(ctx.events, events...)

for i := range ctx.events {
ctx.events[i].BlockStage = appdata.EndBlockStage
events = append(events, ctx.events...)
for i := range events {
events[i].BlockStage = appdata.EndBlockStage
events[i].EventIndex = int32(i + 1)
}

return ctx.events, valsetUpdates, nil
return events, valsetUpdates, nil
}

// validatorUpdates returns the validator updates for the current block. It is called by endBlock after the endblock execution has concluded
func (s STF[T]) validatorUpdates(
ctx *executionContext,
) ([]event.Event, []appmodulev2.ValidatorUpdate, error) {
) ([]appmodulev2.ValidatorUpdate, error) {
valSetUpdates, err := s.doValidatorUpdate(ctx)
if err != nil {
return nil, nil, err
return nil, err
}
return ctx.events, valSetUpdates, nil
return valSetUpdates, nil
}

// Simulate simulates the execution of a tx on the provided state.
Expand All @@ -408,7 +443,7 @@ func (s STF[T]) Simulate(
if err != nil {
return server.TxResult{}, nil
}
txr := s.deliverTx(ctx, simulationState, tx, internal.ExecModeSimulate, hi)
txr := s.deliverTx(ctx, simulationState, tx, internal.ExecModeSimulate, hi, 0)

return txr, simulationState
}
Expand Down
Loading

0 comments on commit bf95c81

Please sign in to comment.