-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server/v2/cometbft,stf): Listener integration in server/v2 #21917
Changes from all commits
6d8c45c
4197471
5a9091f
28e4f40
d6023a8
aa56d0c
ef73f4f
00c39c8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |||||||||||||||||||||||||||||||||||||
"cosmossdk.io/core/server" | ||||||||||||||||||||||||||||||||||||||
"cosmossdk.io/core/store" | ||||||||||||||||||||||||||||||||||||||
errorsmod "cosmossdk.io/errors/v2" | ||||||||||||||||||||||||||||||||||||||
"cosmossdk.io/schema/appdata" | ||||||||||||||||||||||||||||||||||||||
"cosmossdk.io/server/v2/streaming" | ||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
@@ -57,6 +58,55 @@ func (c *Consensus[T]) streamDeliverBlockChanges( | |||||||||||||||||||||||||||||||||||||
c.logger.Error("ListenStateChanges listening hook failed", "height", height, "err", err) | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
if c.listener == nil { | ||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
// stream the StartBlockData to the listener. | ||||||||||||||||||||||||||||||||||||||
if c.listener.StartBlock != nil { | ||||||||||||||||||||||||||||||||||||||
if err := c.listener.StartBlock(appdata.StartBlockData{ | ||||||||||||||||||||||||||||||||||||||
Height: uint64(height), | ||||||||||||||||||||||||||||||||||||||
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 | ||||||||||||||||||||||||||||||||||||||
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 | ||||||||||||||||||||||||||||||||||||||
}); err != nil { | ||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
// stream the TxData to the listener. | ||||||||||||||||||||||||||||||||||||||
if c.listener.OnTx != nil { | ||||||||||||||||||||||||||||||||||||||
for i, tx := range txs { | ||||||||||||||||||||||||||||||||||||||
if err := c.listener.OnTx(appdata.TxData{ | ||||||||||||||||||||||||||||||||||||||
TxIndex: int32(i), | ||||||||||||||||||||||||||||||||||||||
Bytes: func() ([]byte, error) { return tx, nil }, | ||||||||||||||||||||||||||||||||||||||
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 | ||||||||||||||||||||||||||||||||||||||
}); err != nil { | ||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
// stream the EventData to the listener. | ||||||||||||||||||||||||||||||||||||||
if c.listener.OnEvent != nil { | ||||||||||||||||||||||||||||||||||||||
if err := c.listener.OnEvent(appdata.EventData{Events: events}); err != nil { | ||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
// stream the KVPairData to the listener. | ||||||||||||||||||||||||||||||||||||||
if c.listener.OnKVPair != nil { | ||||||||||||||||||||||||||||||||||||||
if err := c.listener.OnKVPair(appdata.KVPairData{Updates: stateChanges}); err != nil { | ||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
// stream the CommitData to the listener. | ||||||||||||||||||||||||||||||||||||||
if c.listener.Commit != nil { | ||||||||||||||||||||||||||||||||||||||
if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil { | ||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||
} else if completionCallback != nil { | ||||||||||||||||||||||||||||||||||||||
if err := completionCallback(); err != nil { | ||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
Comment on lines
+101
to
+108
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Refactor to simplify control flow and enhance readability The Apply this diff to refactor the code: - if completionCallback, err := c.listener.Commit(appdata.CommitData{}); err != nil {
- return err
- } else if completionCallback != nil {
- if err := completionCallback(); err != nil {
- return err
- }
- }
+ completionCallback, err := c.listener.Commit(appdata.CommitData{})
+ if err != nil {
+ return err
+ }
+ if completionCallback != nil {
+ if err := completionCallback(); err != nil {
+ return err
+ }
+ } This refactoring removes the unnecessary 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like to add setters tbh, I feel like it pollutes the server instantiation in root.go
Additionally, Consensus is wired from the cometbft server and not manually.
Let's instead add it to the Consensus contructor if possible and have the CometBFT ServerOptions take a listener.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea of the separated setters is to make the listener a nil default since all nodes are not open to an off-chain indexer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, the option can be nil by default then. But a user won't have to call NewConsensus manually, so putting it in a setter isn't useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When init the consensus from the config, if the indexer is set as on, then call the setter API.
I think it is more flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
works too, could you wire that in cometbft server.go?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can I do it in the next PR after baseapp integration? it seems not trivial, and have no idea of the indexer config, not exist???
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a TODO in cometbft/server.go after NewConsensus then?