From bf95c814f802c2ed8af0c6f8935e371c098ed68e Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:25:51 -0400 Subject: [PATCH] feat(server/v2/cometbft,stf): Listener integration in server/v2 (#21917) --- server/v2/cometbft/abci.go | 7 ++ server/v2/cometbft/go.mod | 2 +- server/v2/cometbft/streaming.go | 50 ++++++++++ server/v2/stf/stf.go | 69 ++++++++++---- server/v2/stf/stf_test.go | 158 +++++++++++++++++++++++++++++++- 5 files changed, 264 insertions(+), 22 deletions(-) diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 02966c874770..fd24f2cbe093 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -20,6 +20,7 @@ import ( "cosmossdk.io/core/transaction" errorsmod "cosmossdk.io/errors/v2" "cosmossdk.io/log" + "cosmossdk.io/schema/appdata" "cosmossdk.io/server/v2/appmanager" "cosmossdk.io/server/v2/cometbft/client/grpc/cmtservice" "cosmossdk.io/server/v2/cometbft/handlers" @@ -40,6 +41,7 @@ type Consensus[T transaction.Tx] struct { txCodec transaction.Codec[T] store types.Store streaming streaming.Manager + listener *appdata.Listener snapshotManager *snapshots.Manager mempool mempool.Mempool[T] @@ -104,6 +106,11 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) { c.streaming = sm } +// SetListener sets the listener for the consensus module. +func (c *Consensus[T]) SetListener(l *appdata.Listener) { + c.listener = l +} + // RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager. // It allows additional snapshotter implementations to be used for creating and restoring snapshots. func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error { diff --git a/server/v2/cometbft/go.mod b/server/v2/cometbft/go.mod index a8ee291b2276..f807f2c2d07e 100644 --- a/server/v2/cometbft/go.mod +++ b/server/v2/cometbft/go.mod @@ -22,6 +22,7 @@ require ( cosmossdk.io/core v1.0.0-alpha.4 cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 cosmossdk.io/log v1.4.1 + cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000 cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1 @@ -45,7 +46,6 @@ require ( cosmossdk.io/depinject v1.0.0 // indirect cosmossdk.io/errors v1.0.1 // indirect cosmossdk.io/math v1.3.0 // indirect - cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc // indirect cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91 // indirect cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000 // indirect diff --git a/server/v2/cometbft/streaming.go b/server/v2/cometbft/streaming.go index 2ccb33dffb99..65f52a002af1 100644 --- a/server/v2/cometbft/streaming.go +++ b/server/v2/cometbft/streaming.go @@ -7,6 +7,7 @@ import ( "cosmossdk.io/core/server" "cosmossdk.io/core/store" errorsmod "cosmossdk.io/errors/v2" + "cosmossdk.io/schema/appdata" "cosmossdk.io/server/v2/streaming" ) @@ -57,6 +58,55 @@ func (c *Consensus[T]) streamDeliverBlockChanges( c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err) } } + + if c.listener == nil { + return nil + } + // stream the StartBlockData to the listener. + if c.listener.StartBlock != nil { + if err := c.listener.StartBlock(appdata.StartBlockData{ + Height: uint64(height), + HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + }); err != nil { + return err + } + } + // stream the TxData to the listener. + if c.listener.OnTx != nil { + for i, tx := range txs { + if err := c.listener.OnTx(appdata.TxData{ + TxIndex: int32(i), + Bytes: func() ([]byte, error) { return tx, nil }, + JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + }); err != nil { + return err + } + } + } + // stream the EventData to the listener. + if c.listener.OnEvent != nil { + if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil { + return err + } + } + // stream the KVPairData to the listener. + if c.listener.OnKVPair != nil { + if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil { + return err + } + } + // stream the CommitData to the listener. + if c.listener.Commit != nil { + if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil { + return err + } else if completionCallback != nil { + if err := completionCallback(); err != nil { + return err + } + } + } + return nil } diff --git a/server/v2/stf/stf.go b/server/v2/stf/stf.go index 0dffe561dc98..f665aaf3c6e9 100644 --- a/server/v2/stf/stf.go +++ b/server/v2/stf/stf.go @@ -123,7 +123,6 @@ func (s STF[T]) DeliverBlock( // reset events exCtx.events = make([]event.Event, 0) - // begin block var beginBlockEvents []event.Event if !block.IsGenesis { @@ -147,7 +146,7 @@ func (s STF[T]) DeliverBlock( if err = isCtxCancelled(ctx); err != nil { return nil, nil, err } - txResults[i] = s.deliverTx(exCtx, newState, txBytes, transaction.ExecModeFinalize, hi) + txResults[i] = s.deliverTx(exCtx, newState, txBytes, transaction.ExecModeFinalize, hi, int32(i+1)) } // reset events exCtx.events = make([]event.Event, 0) @@ -173,6 +172,7 @@ func (s STF[T]) deliverTx( tx T, execMode transaction.ExecMode, hi header.Info, + txIndex int32, ) server.TxResult { // recover in the case of a panic var recoveryError error @@ -195,17 +195,32 @@ func (s STF[T]) deliverTx( Error: recoveryError, } } - validateGas, validationEvents, err := s.validateTx(ctx, state, gasLimit, tx, execMode) if err != nil { return server.TxResult{ Error: err, } } + events := make([]event.Event, 0) + // set the event indexes, set MsgIndex to 0 in validation events + for i, e := range validationEvents { + e.BlockStage = appdata.TxProcessingStage + e.TxIndex = txIndex + e.MsgIndex = 0 + e.EventIndex = int32(i + 1) + events = append(events, e) + } execResp, execGas, execEvents, err := s.execTx(ctx, state, gasLimit-validateGas, tx, execMode, hi) + // set the TxIndex in the exec events + for _, e := range execEvents { + e.BlockStage = appdata.TxProcessingStage + e.TxIndex = txIndex + events = append(events, e) + } + return server.TxResult{ - Events: append(validationEvents, execEvents...), + Events: events, GasUsed: execGas + validateGas, GasWanted: gasLimit, Resp: execResp, @@ -271,6 +286,12 @@ func (s STF[T]) execTx( if applyErr != nil { return nil, 0, nil, applyErr } + // set the event indexes, set MsgIndex to -1 in post tx events + for i := range postTxCtx.events { + postTxCtx.events[i].EventIndex = int32(i + 1) + postTxCtx.events[i].MsgIndex = -1 + } + return nil, gasUsed, postTxCtx.events, txErr } // tx execution went fine, now we use the same state to run the post tx exec handler, @@ -290,6 +311,11 @@ func (s STF[T]) execTx( if applyErr != nil { return nil, 0, nil, applyErr } + // set the event indexes, set MsgIndex to -1 in post tx events + for i := range postTxCtx.events { + postTxCtx.events[i].EventIndex = int32(i + 1) + postTxCtx.events[i].MsgIndex = -1 + } return msgsResp, gasUsed, append(runTxMsgsEvents, postTxCtx.events...), nil } @@ -316,17 +342,24 @@ func (s STF[T]) runTxMsgs( execCtx := s.makeContext(ctx, RuntimeIdentity, state, execMode) execCtx.setHeaderInfo(hi) execCtx.setGasLimit(gasLimit) + events := make([]event.Event, 0) for i, msg := range msgs { execCtx.sender = txSenders[i] + execCtx.events = make([]event.Event, 0) // reset events resp, err := s.msgRouter.Invoke(execCtx, msg) if err != nil { return nil, 0, nil, err // do not wrap the error or we lose the original error type } msgResps[i] = resp + for j, e := range execCtx.events { + e.MsgIndex = int32(i + 1) + e.EventIndex = int32(j + 1) + events = append(events, e) + } } consumed := execCtx.meter.Limit() - execCtx.meter.Remaining() - return msgResps, consumed, execCtx.events, nil + return msgResps, consumed, events, nil } // preBlock executes the pre block logic. @@ -341,6 +374,7 @@ func (s STF[T]) preBlock( for i := range ctx.events { ctx.events[i].BlockStage = appdata.PreBlockStage + ctx.events[i].EventIndex = int32(i + 1) } return ctx.events, nil @@ -357,6 +391,7 @@ func (s STF[T]) beginBlock( for i := range ctx.events { ctx.events[i].BlockStage = appdata.BeginBlockStage + ctx.events[i].EventIndex = int32(i + 1) } return ctx.events, nil @@ -370,30 +405,30 @@ func (s STF[T]) endBlock( if err != nil { return nil, nil, err } - - events, valsetUpdates, err := s.validatorUpdates(ctx) + events := ctx.events + ctx.events = make([]event.Event, 0) // reset events + valsetUpdates, err := s.validatorUpdates(ctx) if err != nil { return nil, nil, err } - - ctx.events = append(ctx.events, events...) - - for i := range ctx.events { - ctx.events[i].BlockStage = appdata.EndBlockStage + events = append(events, ctx.events...) + for i := range events { + events[i].BlockStage = appdata.EndBlockStage + events[i].EventIndex = int32(i + 1) } - return ctx.events, valsetUpdates, nil + return events, valsetUpdates, nil } // validatorUpdates returns the validator updates for the current block. It is called by endBlock after the endblock execution has concluded func (s STF[T]) validatorUpdates( ctx *executionContext, -) ([]event.Event, []appmodulev2.ValidatorUpdate, error) { +) ([]appmodulev2.ValidatorUpdate, error) { valSetUpdates, err := s.doValidatorUpdate(ctx) if err != nil { - return nil, nil, err + return nil, err } - return ctx.events, valSetUpdates, nil + return valSetUpdates, nil } // Simulate simulates the execution of a tx on the provided state. @@ -408,7 +443,7 @@ func (s STF[T]) Simulate( if err != nil { return server.TxResult{}, nil } - txr := s.deliverTx(ctx, simulationState, tx, internal.ExecModeSimulate, hi) + txr := s.deliverTx(ctx, simulationState, tx, internal.ExecModeSimulate, hi, 0) return txr, simulationState } diff --git a/server/v2/stf/stf_test.go b/server/v2/stf/stf_test.go index bb2cab62189d..b4c84a62ff9a 100644 --- a/server/v2/stf/stf_test.go +++ b/server/v2/stf/stf_test.go @@ -11,15 +11,19 @@ import ( gogotypes "github.com/cosmos/gogoproto/types" appmodulev2 "cosmossdk.io/core/appmodule/v2" + "cosmossdk.io/core/event" coregas "cosmossdk.io/core/gas" "cosmossdk.io/core/server" "cosmossdk.io/core/store" "cosmossdk.io/core/transaction" + "cosmossdk.io/schema/appdata" "cosmossdk.io/server/v2/stf/branch" "cosmossdk.io/server/v2/stf/gas" "cosmossdk.io/server/v2/stf/mock" ) +const senderAddr = "sender" + func addMsgHandlerToSTF[T any, PT interface { *T transaction.Msg @@ -60,7 +64,7 @@ func addMsgHandlerToSTF[T any, PT interface { func TestSTF(t *testing.T) { state := mock.DB() mockTx := mock.Tx{ - Sender: []byte("sender"), + Sender: []byte(senderAddr), Msg: &gogotypes.BoolValue{Value: true}, GasLimit: 100_000, } @@ -68,22 +72,48 @@ func TestSTF(t *testing.T) { sum := sha256.Sum256([]byte("test-hash")) s := &STF[mock.Tx]{ - doPreBlock: func(ctx context.Context, txs []mock.Tx) error { return nil }, + doPreBlock: func(ctx context.Context, txs []mock.Tx) error { + ctx.(*executionContext).events = append(ctx.(*executionContext).events, event.NewEvent("pre-block")) + return nil + }, doBeginBlock: func(ctx context.Context) error { kvSet(t, ctx, "begin-block") + ctx.(*executionContext).events = append(ctx.(*executionContext).events, event.NewEvent("begin-block")) return nil }, doEndBlock: func(ctx context.Context) error { kvSet(t, ctx, "end-block") + ctx.(*executionContext).events = append(ctx.(*executionContext).events, event.NewEvent("end-block")) return nil }, - doValidatorUpdate: func(ctx context.Context) ([]appmodulev2.ValidatorUpdate, error) { return nil, nil }, + doValidatorUpdate: func(ctx context.Context) ([]appmodulev2.ValidatorUpdate, error) { + ctx.(*executionContext).events = append(ctx.(*executionContext).events, event.NewEvent("validator-update")) + return nil, nil + }, doTxValidation: func(ctx context.Context, tx mock.Tx) error { kvSet(t, ctx, "validate") + ctx.(*executionContext).events = append( + ctx.(*executionContext).events, + event.NewEvent("validate-tx", event.NewAttribute(senderAddr, string(tx.Sender))), + event.NewEvent( + "validate-tx", + event.NewAttribute(senderAddr, string(tx.Sender)), + event.NewAttribute("index", "2"), + ), + ) return nil }, postTxExec: func(ctx context.Context, tx mock.Tx, success bool) error { kvSet(t, ctx, "post-tx-exec") + ctx.(*executionContext).events = append( + ctx.(*executionContext).events, + event.NewEvent("post-tx-exec", event.NewAttribute(senderAddr, string(tx.Sender))), + event.NewEvent( + "post-tx-exec", + event.NewAttribute(senderAddr, string(tx.Sender)), + event.NewAttribute("index", "2"), + ), + ) return nil }, branchFn: branch.DefaultNewWriterMap, @@ -93,6 +123,15 @@ func TestSTF(t *testing.T) { addMsgHandlerToSTF(t, s, func(ctx context.Context, msg *gogotypes.BoolValue) (*gogotypes.BoolValue, error) { kvSet(t, ctx, "exec") + ctx.(*executionContext).events = append( + ctx.(*executionContext).events, + event.NewEvent("handle-msg", event.NewAttribute("msg", msg.String())), + event.NewEvent( + "handle-msg", + event.NewAttribute("msg", msg.String()), + event.NewAttribute("index", "2"), + ), + ) return nil, nil }) @@ -135,13 +174,124 @@ func TestSTF(t *testing.T) { if txResult.GasWanted != mockTx.GasLimit { t.Errorf("Expected GasWanted to be %d, got %d", mockTx.GasLimit, txResult.GasWanted) } + + // Check PreBlockEvents + preBlockEvents := result.PreBlockEvents + if len(preBlockEvents) != 1 { + t.Fatalf("Expected 1 PreBlockEvent, got %d", len(preBlockEvents)) + } + if preBlockEvents[0].Type != "pre-block" { + t.Errorf("Expected PreBlockEvent Type 'pre-block', got %s", preBlockEvents[0].Type) + } + if preBlockEvents[0].BlockStage != appdata.PreBlockStage { + t.Errorf("Expected PreBlockStage %d, got %d", appdata.PreBlockStage, preBlockEvents[0].BlockStage) + } + if preBlockEvents[0].EventIndex != 1 { + t.Errorf("Expected PreBlockEventIndex 1, got %d", preBlockEvents[0].EventIndex) + } + // Check BeginBlockEvents + beginBlockEvents := result.BeginBlockEvents + if len(beginBlockEvents) != 1 { + t.Fatalf("Expected 1 BeginBlockEvent, got %d", len(beginBlockEvents)) + } + if beginBlockEvents[0].Type != "begin-block" { + t.Errorf("Expected BeginBlockEvent Type 'begin-block', got %s", beginBlockEvents[0].Type) + } + if beginBlockEvents[0].BlockStage != appdata.BeginBlockStage { + t.Errorf("Expected BeginBlockStage %d, got %d", appdata.BeginBlockStage, beginBlockEvents[0].BlockStage) + } + if beginBlockEvents[0].EventIndex != 1 { + t.Errorf("Expected BeginBlockEventIndex 1, got %d", beginBlockEvents[0].EventIndex) + } + // Check EndBlockEvents + endBlockEvents := result.EndBlockEvents + if len(endBlockEvents) != 2 { + t.Fatalf("Expected 2 EndBlockEvents, got %d", len(endBlockEvents)) + } + if endBlockEvents[0].Type != "end-block" { + t.Errorf("Expected EndBlockEvent Type 'end-block', got %s", endBlockEvents[0].Type) + } + if endBlockEvents[1].Type != "validator-update" { + t.Errorf("Expected EndBlockEvent Type 'validator-update', got %s", endBlockEvents[1].Type) + } + if endBlockEvents[1].BlockStage != appdata.EndBlockStage { + t.Errorf("Expected EndBlockStage %d, got %d", appdata.EndBlockStage, endBlockEvents[1].BlockStage) + } + if endBlockEvents[0].EventIndex != 1 { + t.Errorf("Expected EndBlockEventIndex 1, got %d", endBlockEvents[0].EventIndex) + } + if endBlockEvents[1].EventIndex != 2 { + t.Errorf("Expected EndBlockEventIndex 2, got %d", endBlockEvents[1].EventIndex) + } + // check TxEvents + events := txResult.Events + if len(events) != 6 { + t.Fatalf("Expected 6 TxEvents, got %d", len(events)) + } + for i, event := range events { + if event.BlockStage != appdata.TxProcessingStage { + t.Errorf("Expected BlockStage %d, got %d", appdata.TxProcessingStage, event.BlockStage) + } + if event.TxIndex != 1 { + t.Errorf("Expected TxIndex 1, got %d", event.TxIndex) + } + if event.EventIndex != int32(i%2+1) { + t.Errorf("Expected EventIndex %d, got %d", i%2+1, event.EventIndex) + } + + attrs, err := event.Attributes() + if err != nil { + t.Fatalf("Error getting event attributes: %v", err) + } + if len(attrs) < 1 || len(attrs) > 2 { + t.Errorf("Expected 1 or 2 attributes, got %d", len(attrs)) + } + + if len(attrs) == 2 { + if attrs[1].Key != "index" || attrs[1].Value != "2" { + t.Errorf("Expected attribute key 'index' and value '2', got key '%s' and value '%s'", attrs[1].Key, attrs[1].Value) + } + } + switch i { + case 0, 1: + if event.Type != "validate-tx" { + t.Errorf("Expected event type 'validate-tx', got %s", event.Type) + } + if event.MsgIndex != 0 { + t.Errorf("Expected MsgIndex 0, got %d", event.MsgIndex) + } + if attrs[0].Key != senderAddr || attrs[0].Value != senderAddr { + t.Errorf("Expected sender attribute key 'sender' and value 'sender', got key '%s' and value '%s'", attrs[0].Key, attrs[0].Value) + } + case 2, 3: + if event.Type != "handle-msg" { + t.Errorf("Expected event type 'handle-msg', got %s", event.Type) + } + if event.MsgIndex != 1 { + t.Errorf("Expected MsgIndex 1, got %d", event.MsgIndex) + } + if attrs[0].Key != "msg" || attrs[0].Value != "&BoolValue{Value:true,XXX_unrecognized:[],}" { + t.Errorf("Expected msg attribute with value '&BoolValue{Value:true,XXX_unrecognized:[],}', got '%s'", attrs[0].Value) + } + case 4, 5: + if event.Type != "post-tx-exec" { + t.Errorf("Expected event type 'post-tx-exec', got %s", event.Type) + } + if event.MsgIndex != -1 { + t.Errorf("Expected MsgIndex -1, got %d", event.MsgIndex) + } + if attrs[0].Key != senderAddr || attrs[0].Value != senderAddr { + t.Errorf("Expected sender attribute key 'sender' and value 'sender', got key '%s' and value '%s'", attrs[0].Key, attrs[0].Value) + } + } + } }) t.Run("exec tx out of gas", func(t *testing.T) { s := s.clone() mockTx := mock.Tx{ - Sender: []byte("sender"), + Sender: []byte(senderAddr), Msg: &gogotypes.BoolValue{Value: true}, // msg does not matter at all because our handler does nothing. GasLimit: 0, // NO GAS! }