Skip to content

Commit

Permalink
feat(server/v2): init the indexer in server/v2 (#22218)
Browse files Browse the repository at this point in the history
Co-authored-by: Julien Robert <[email protected]>
  • Loading branch information
cool-develope and julienrbrt authored Oct 10, 2024
1 parent dd2369d commit e84c0eb
Show file tree
Hide file tree
Showing 15 changed files with 58 additions and 26 deletions.
10 changes: 10 additions & 0 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cosmossdk.io/core/registry"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/server/v2/stf"
)
Expand Down Expand Up @@ -96,3 +97,12 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler {
return a.QueryHandlers
}

// GetSchemaDecoderResolver returns the module schema resolver.
func (a *App[T]) GetSchemaDecoderResolver() decoding.DecoderResolver {
moduleSet := map[string]any{}
for moduleName, module := range a.moduleManager.Modules() {
moduleSet[moduleName] = module
}
return decoding.ModuleSetDecoderResolver(moduleSet)
}
2 changes: 1 addition & 1 deletion runtime/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/depinject v1.0.0
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.0
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/stf v0.0.0-00010101000000-000000000000
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
Expand All @@ -30,7 +31,6 @@ require (
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.0 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions schema/indexer/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ type IndexingOptions struct {
// IndexingConfig is the configuration of the indexer manager and contains the configuration for each indexer target.
type IndexingConfig struct {
// Target is a map of named indexer targets to their configuration.
Target map[string]Config
Target map[string]Config `mapstructure:"target" toml:"target" json:"target" comment:"Target is a map of named indexer targets to their configuration."`

// ChannelBufferSize is the buffer size of the channels used for buffering data sent to indexer go routines.
// It defaults to 1024.
ChannelBufferSize *int `json:"channel_buffer_size,omitempty"`
ChannelBufferSize int `mapstructure:"channel_buffer_size" toml:"channel_buffer_size" json:"channel_buffer_size,omitempty" comment:"Buffer size of the channels used for buffering data sent to indexer go routines."`
}

// IndexingTarget returns the indexing target listener and associated data.
Expand Down Expand Up @@ -142,8 +142,8 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) {
}

bufSize := 1024
if cfg.ChannelBufferSize != nil {
bufSize = *cfg.ChannelBufferSize
if cfg.ChannelBufferSize != 0 {
bufSize = cfg.ChannelBufferSize
}
asyncOpts := appdata.AsyncListenerOptions{
Context: ctx,
Expand Down
5 changes: 0 additions & 5 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

// SetListener sets the listener for the consensus module.
func (c *Consensus[T]) SetListener(l *appdata.Listener) {
c.listener = l
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down
8 changes: 7 additions & 1 deletion server/v2/cometbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cometbft
import (
cmtcfg "github.com/cometbft/cometbft/config"

"cosmossdk.io/schema/indexer"
"cosmossdk.io/server/v2/cometbft/mempool"
)

Expand All @@ -23,6 +24,10 @@ func DefaultAppTomlConfig() *AppTomlConfig {
Trace: false,
Standalone: false,
Mempool: mempool.DefaultConfig(),
Indexer: indexer.IndexingConfig{
Target: make(map[string]indexer.Config),
ChannelBufferSize: 1024,
},
}
}

Expand All @@ -37,7 +42,8 @@ type AppTomlConfig struct {
Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."`

// Sub configs
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."`
Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."`
}

// CfgOption is a function that allows to overwrite the default server configuration.
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1
Expand Down
4 changes: 2 additions & 2 deletions server/v2/cometbft/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
14 changes: 14 additions & 0 deletions server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/indexer"
serverv2 "cosmossdk.io/server/v2"
cometlog "cosmossdk.io/server/v2/cometbft/log"
"cosmossdk.io/server/v2/cometbft/mempool"
Expand Down Expand Up @@ -131,6 +132,19 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg
}
consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions(cfg), sc, ss, nil, s.logger)

// initialize the indexer
if indexerCfg := s.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 {
listener, err := indexer.StartIndexing(indexer.IndexingOptions{
Config: indexerCfg,
Resolver: appI.GetSchemaDecoderResolver(),
Logger: s.logger.With(log.ModuleKey, "indexer"),
})
if err != nil {
return fmt.Errorf("failed to start indexing: %w", err)
}
consensus.listener = &listener.Listener
}

s.Consensus = consensus

return nil
Expand Down
7 changes: 2 additions & 5 deletions server/v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,8 @@ func ReadConfig(configPath string) (*viper.Viper, error) {
func UnmarshalSubConfig(cfg map[string]any, subName string, target any) error {
var sub any
if subName != "" {
for k, val := range cfg {
if k == subName {
sub = val
break
}
if val, ok := cfg[subName]; ok {
sub = val
}
} else {
sub = cfg
Expand Down
2 changes: 1 addition & 1 deletion server/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
cosmossdk.io/core v1.0.0-alpha.4
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29
cosmossdk.io/log v1.4.1
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac
cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000
cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000
github.com/cosmos/cosmos-proto v1.0.0-beta.5
Expand All @@ -42,7 +43,6 @@ require (

require (
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.0 // indirect
github.com/DataDog/datadog-go v4.8.3+incompatible // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions server/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqF
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5/go.mod h1:0CuYKkFHxc1vw2JC+t21THBCALJVROrWVR/3PQ1urpc=
cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c=
cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q=
Expand Down
2 changes: 2 additions & 0 deletions server/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"cosmossdk.io/core/server"
"cosmossdk.io/core/transaction"
"cosmossdk.io/log"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/server/v2/appmanager"
"cosmossdk.io/store/v2"
)
Expand All @@ -19,4 +20,5 @@ type AppI[T transaction.Tx] interface {
GetAppManager() *appmanager.AppManager[T]
GetQueryHandlers() map[string]appmodulev2.Handler
GetStore() store.RootStore
GetSchemaDecoderResolver() decoding.DecoderResolver
}
2 changes: 1 addition & 1 deletion simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac // indirect
cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d // indirect
cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1 // indirect
cosmossdk.io/store v1.1.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions simapp/v2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM=
cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU=
cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE=
cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU=
cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc=
cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
Expand Down
10 changes: 9 additions & 1 deletion tools/confix/data/v2-app.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ standalone = false
# max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool.
max-txs = -1

# indexer defines the configuration for the SDK built-in indexer implementation.
[comet.indexer]
# Buffer size of the channels used for buffering data sent to indexer go routines.
channel_buffer_size = 1024

# Target is a map of named indexer targets to their configuration.
[comet.indexer.target]

[grpc]
# Enable defines if the gRPC server should be enabled.
enable = true
Expand Down Expand Up @@ -71,7 +79,7 @@ skip-fast-storage-upgrade = true
# Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus.
enable = true
# Address defines the metrics server address to bind to.
address = 'localhost:1338'
address = 'localhost:1318'
# Prefixed with keys to separate services.
service-name = ''
# Enable prefixing gauge values with hostname.
Expand Down

0 comments on commit e84c0eb

Please sign in to comment.