Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server/v2): init the indexer in server/v2 #22218

Merged
merged 8 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"cosmossdk.io/core/registry"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/stf"
)
Expand Down Expand Up @@ -96,3 +97,12 @@
func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
return a.QueryHandlers
}

// GetSchemaDecoderResolver returns the module schema resolver.
func (a *App[T]) GetSchemaDecoderResolver() decoding.DecoderResolver {
moduleSet := map[string]any{}
for moduleName, module := range a.moduleManager.Modules() {
moduleSet[moduleName] = module
}
Comment on lines +104 to +106

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
return decoding.ModuleSetDecoderResolver(moduleSet)
}
2 changes: 1 addition & 1 deletion runtime/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/depinject v1.0.0
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.0
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/stf v0.0.0-00010101000000-000000000000
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
Expand All @@ -30,7 +31,6 @@ require (
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.0 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions schema/indexer/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type IndexingOptions struct {
// IndexingConfig is the configuration of the indexer manager and contains the configuration for each indexer target.
type IndexingConfig struct {
// Target is a map of named indexer targets to their configuration.
Target map[string]Config
Target map[string]Config `mapstructure:"target" toml:"target" json:"target" comment:"Target is a map of named indexer targets to their configuration."`

// ChannelBufferSize is the buffer size of the channels used for buffering data sent to indexer go routines.
// It defaults to 1024.
ChannelBufferSize *int `json:"channel_buffer_size,omitempty"`
ChannelBufferSize int `mapstructure:"channel_buffer_size" toml:"channel_buffer_size" json:"channel_buffer_size,omitempty" comment:"Buffer size of the channels used for buffering data sent to indexer go routines."`
}

// IndexingTarget returns the indexing target listener and associated data.
Expand Down Expand Up @@ -142,8 +142,8 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) {
}

bufSize := 1024
if cfg.ChannelBufferSize != nil {
bufSize = *cfg.ChannelBufferSize
if cfg.ChannelBufferSize != 0 {
bufSize = cfg.ChannelBufferSize
}
asyncOpts := appdata.AsyncListenerOptions{
Context: ctx,
Expand Down
5 changes: 0 additions & 5 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
8 changes: 7 additions & 1 deletion server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cometbft
import (
cmtcfg "github.com/cometbft/cometbft/config"

"cosmossdk.io/schema/indexer"
"cosmossdk.io/server/v2/cometbft/mempool"
)

Expand All @@ -23,6 +24,10 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Trace: false,
Standalone: false,
Mempool: mempool.DefaultConfig(),
Indexer: indexer.IndexingConfig{
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
}
}

Expand All @@ -37,7 +42,8 @@ type AppTomlConfig struct {
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1
Expand Down
4 changes: 2 additions & 2 deletions server/v2/cometbft/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
14 changes: 14 additions & 0 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/indexer"
serverv2 "cosmossdk.io/server/v2"
cometlog "cosmossdk.io/server/v2/cometbft/log"
"cosmossdk.io/server/v2/cometbft/mempool"
Expand Down Expand Up @@ -131,6 +132,19 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
}
consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions(cfg), sc, ss, nil, s.logger)

// initialize the indexer
if indexerCfg := s.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 {
listener, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexerCfg,
Resolver: appI.GetSchemaDecoderResolver(),
Logger: s.logger.With(log.ModuleKey, "indexer"),
})
if err != nil {
return fmt.Errorf("failed to start indexing: %w", err)
}
consensus.listener = &listener.Listener
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
}

s.Consensus = consensus

return nil
Expand Down
7 changes: 2 additions & 5 deletions server/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,8 @@ func ReadConfig(configPath string) (*viper.Viper, error) {
func UnmarshalSubConfig(cfg map[string]any, subName string, target any) error {
var sub any
if subName != "" {
for k, val := range cfg {
if k == subName {
sub = val
break
}
if val, ok := cfg[subName]; ok {
sub = val
julienrbrt marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
sub = cfg
Expand Down
2 changes: 1 addition & 1 deletion server/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
github.com/cosmos/cosmos-proto v1.0.0-beta.5
Expand All @@ -42,7 +43,6 @@ require (

require (
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.0 // indirect
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions server/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqF
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5/go.mod h1:0CuYKkFHxc1vw2JC+t21THBCALJVROrWVR/3PQ1urpc=
cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q=
Expand Down
2 changes: 2 additions & 0 deletions server/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/store/v2"
)
Expand All @@ -19,4 +20,5 @@ type AppI[T transaction.Tx] interface {
GetAppManager() *appmanager.AppManager[T]
GetQueryHandlers() map[string]appmodulev2.Handler
GetStore() store.RootStore
GetSchemaDecoderResolver() decoding.DecoderResolver
}
2 changes: 1 addition & 1 deletion simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac // indirect
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d // indirect
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1 // indirect
cosmossdk.io/store v1.1.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions simapp/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
Expand Down
10 changes: 9 additions & 1 deletion tools/confix/data/v2-app.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ standalone = false
# max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool.
max-txs = -1

# indexer defines the configuration for the SDK built-in indexer implementation.
[comet.indexer]
# Buffer size of the channels used for buffering data sent to indexer go routines.
channel_buffer_size = 1024

# Target is a map of named indexer targets to their configuration.
[comet.indexer.target]

[grpc]
# Enable defines if the gRPC server should be enabled.
enable = true
Expand Down Expand Up @@ -71,7 +79,7 @@ skip-fast-storage-upgrade = true
# Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus.
enable = true
# Address defines the metrics server address to bind to.
address = 'localhost:1338'
address = 'localhost:1318'
# Prefixed with keys to separate services.
service-name = ''
# Enable prefixing gauge values with hostname.
Expand Down
Loading