From e84c0eb86b20dc95be413b21b0da7377a9bbedc6 Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:40:14 -0400 Subject: [PATCH] feat(server/v2): init the indexer in server/v2 (#22218) Co-authored-by: Julien Robert --- runtime/v2/app.go | 10 ++++++++++ runtime/v2/go.mod | 2 +- schema/indexer/start.go | 8 ++++---- server/v2/cometbft/abci.go | 5 ----- server/v2/cometbft/config.go | 8 +++++++- server/v2/cometbft/go.mod | 2 +- server/v2/cometbft/go.sum | 4 ++-- server/v2/cometbft/server.go | 14 ++++++++++++++ server/v2/config.go | 7 ++----- server/v2/go.mod | 2 +- server/v2/go.sum | 4 ++-- server/v2/types.go | 2 ++ simapp/v2/go.mod | 2 +- simapp/v2/go.sum | 4 ++-- tools/confix/data/v2-app.toml | 10 +++++++++- 15 files changed, 58 insertions(+), 26 deletions(-) diff --git a/runtime/v2/app.go b/runtime/v2/app.go index d5999f3bd99f..e52ea26c6e9f 100644 --- a/runtime/v2/app.go +++ b/runtime/v2/app.go @@ -8,6 +8,7 @@ import ( "cosmossdk.io/core/registry" "cosmossdk.io/core/transaction" "cosmossdk.io/log" + "cosmossdk.io/schema/decoding" "cosmossdk.io/server/v2/appmanager" "cosmossdk.io/server/v2/stf" ) @@ -96,3 +97,12 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] { func (a *App[T]) GetQueryHandlers() map[string]appmodulev2.Handler { return a.QueryHandlers } + +// GetSchemaDecoderResolver returns the module schema resolver. +func (a *App[T]) GetSchemaDecoderResolver() decoding.DecoderResolver { + moduleSet := map[string]any{} + for moduleName, module := range a.moduleManager.Modules() { + moduleSet[moduleName] = module + } + return decoding.ModuleSetDecoderResolver(moduleSet) +} diff --git a/runtime/v2/go.mod b/runtime/v2/go.mod index c28b2b52a05b..5147e8ad1362 100644 --- a/runtime/v2/go.mod +++ b/runtime/v2/go.mod @@ -16,6 +16,7 @@ require ( cosmossdk.io/core v1.0.0-alpha.4 cosmossdk.io/depinject v1.0.0 cosmossdk.io/log v1.4.1 + cosmossdk.io/schema v0.3.0 cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000 cosmossdk.io/server/v2/stf v0.0.0-00010101000000-000000000000 cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000 @@ -30,7 +31,6 @@ require ( buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect - cosmossdk.io/schema v0.3.0 // indirect github.com/DataDog/zstd v1.5.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/schema/indexer/start.go b/schema/indexer/start.go index 909db71ed61a..f675c2916026 100644 --- a/schema/indexer/start.go +++ b/schema/indexer/start.go @@ -51,11 +51,11 @@ type IndexingOptions struct { // IndexingConfig is the configuration of the indexer manager and contains the configuration for each indexer target. type IndexingConfig struct { // Target is a map of named indexer targets to their configuration. - Target map[string]Config + Target map[string]Config `mapstructure:"target" toml:"target" json:"target" comment:"Target is a map of named indexer targets to their configuration."` // ChannelBufferSize is the buffer size of the channels used for buffering data sent to indexer go routines. // It defaults to 1024. - ChannelBufferSize *int `json:"channel_buffer_size,omitempty"` + ChannelBufferSize int `mapstructure:"channel_buffer_size" toml:"channel_buffer_size" json:"channel_buffer_size,omitempty" comment:"Buffer size of the channels used for buffering data sent to indexer go routines."` } // IndexingTarget returns the indexing target listener and associated data. @@ -142,8 +142,8 @@ func StartIndexing(opts IndexingOptions) (IndexingTarget, error) { } bufSize := 1024 - if cfg.ChannelBufferSize != nil { - bufSize = *cfg.ChannelBufferSize + if cfg.ChannelBufferSize != 0 { + bufSize = cfg.ChannelBufferSize } asyncOpts := appdata.AsyncListenerOptions{ Context: ctx, diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index cbcf387168a2..95eeec9bb5da 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -114,11 +114,6 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) { c.streaming = sm } -// SetListener sets the listener for the consensus module. -func (c *Consensus[T]) SetListener(l *appdata.Listener) { - c.listener = l -} - // RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager. // It allows additional snapshotter implementations to be used for creating and restoring snapshots. func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error { diff --git a/server/v2/cometbft/config.go b/server/v2/cometbft/config.go index 4525e7563c27..d8e591a9695c 100644 --- a/server/v2/cometbft/config.go +++ b/server/v2/cometbft/config.go @@ -3,6 +3,7 @@ package cometbft import ( cmtcfg "github.com/cometbft/cometbft/config" + "cosmossdk.io/schema/indexer" "cosmossdk.io/server/v2/cometbft/mempool" ) @@ -23,6 +24,10 @@ func DefaultAppTomlConfig() *AppTomlConfig { Trace: false, Standalone: false, Mempool: mempool.DefaultConfig(), + Indexer: indexer.IndexingConfig{ + Target: make(map[string]indexer.Config), + ChannelBufferSize: 1024, + }, } } @@ -37,7 +42,8 @@ type AppTomlConfig struct { Standalone bool `mapstructure:"standalone" toml:"standalone" comment:"standalone starts the application without the CometBFT node. The node should be started separately."` // Sub configs - Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` + Mempool mempool.Config `mapstructure:"mempool" toml:"mempool" comment:"mempool defines the configuration for the SDK built-in app-side mempool implementations."` + Indexer indexer.IndexingConfig `mapstructure:"indexer" toml:"indexer" comment:"indexer defines the configuration for the SDK built-in indexer implementation."` } // CfgOption is a function that allows to overwrite the default server configuration. diff --git a/server/v2/cometbft/go.mod b/server/v2/cometbft/go.mod index c95f22af817a..46a403768850 100644 --- a/server/v2/cometbft/go.mod +++ b/server/v2/cometbft/go.mod @@ -22,7 +22,7 @@ require ( cosmossdk.io/core v1.0.0-alpha.4 cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 cosmossdk.io/log v1.4.1 - cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 + cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000 cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1 diff --git a/server/v2/cometbft/go.sum b/server/v2/cometbft/go.sum index 4343a0df72ca..311e601f7d43 100644 --- a/server/v2/cometbft/go.sum +++ b/server/v2/cometbft/go.sum @@ -20,8 +20,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM= cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU= cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE= cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k= -cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU= -cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ= +cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc= +cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs= diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index e5076b897073..8bd2935149e7 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -20,6 +20,7 @@ import ( "cosmossdk.io/core/transaction" "cosmossdk.io/log" + "cosmossdk.io/schema/indexer" serverv2 "cosmossdk.io/server/v2" cometlog "cosmossdk.io/server/v2/cometbft/log" "cosmossdk.io/server/v2/cometbft/mempool" @@ -131,6 +132,19 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], cfg map[string]any, logg } consensus.snapshotManager = snapshots.NewManager(snapshotStore, s.serverOptions.SnapshotOptions(cfg), sc, ss, nil, s.logger) + // initialize the indexer + if indexerCfg := s.config.AppTomlConfig.Indexer; len(indexerCfg.Target) > 0 { + listener, err := indexer.StartIndexing(indexer.IndexingOptions{ + Config: indexerCfg, + Resolver: appI.GetSchemaDecoderResolver(), + Logger: s.logger.With(log.ModuleKey, "indexer"), + }) + if err != nil { + return fmt.Errorf("failed to start indexing: %w", err) + } + consensus.listener = &listener.Listener + } + s.Consensus = consensus return nil diff --git a/server/v2/config.go b/server/v2/config.go index 0670bc374a24..1ab396f0c148 100644 --- a/server/v2/config.go +++ b/server/v2/config.go @@ -44,11 +44,8 @@ func ReadConfig(configPath string) (*viper.Viper, error) { func UnmarshalSubConfig(cfg map[string]any, subName string, target any) error { var sub any if subName != "" { - for k, val := range cfg { - if k == subName { - sub = val - break - } + if val, ok := cfg[subName]; ok { + sub = val } } else { sub = cfg diff --git a/server/v2/go.mod b/server/v2/go.mod index 1fca8cbb4416..11389921eb76 100644 --- a/server/v2/go.mod +++ b/server/v2/go.mod @@ -16,6 +16,7 @@ require ( cosmossdk.io/core v1.0.0-alpha.4 cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 cosmossdk.io/log v1.4.1 + cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac cosmossdk.io/server/v2/appmanager v0.0.0-00010101000000-000000000000 cosmossdk.io/store/v2 v2.0.0-00010101000000-000000000000 github.com/cosmos/cosmos-proto v1.0.0-beta.5 @@ -42,7 +43,6 @@ require ( require ( cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect - cosmossdk.io/schema v0.3.0 // indirect github.com/DataDog/datadog-go v4.8.3+incompatible // indirect github.com/DataDog/zstd v1.5.5 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect diff --git a/server/v2/go.sum b/server/v2/go.sum index 26e67ab39a4c..4a7c52d6f39a 100644 --- a/server/v2/go.sum +++ b/server/v2/go.sum @@ -8,8 +8,8 @@ cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 h1:IQNdY2kB+k+1OM2DvqF cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5/go.mod h1:0CuYKkFHxc1vw2JC+t21THBCALJVROrWVR/3PQ1urpc= cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM= cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU= -cosmossdk.io/schema v0.3.0 h1:01lcaM4trhzZ1HQTfTV8z6Ma1GziOZ/YmdzBN3F720c= -cosmossdk.io/schema v0.3.0/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ= +cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc= +cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v4.8.3+incompatible h1:fNGaYSuObuQb5nzeTQqowRAd9bpDIRRV4/gUtIBjh8Q= diff --git a/server/v2/types.go b/server/v2/types.go index 8bd04d3221af..e628fb30c90f 100644 --- a/server/v2/types.go +++ b/server/v2/types.go @@ -7,6 +7,7 @@ import ( "cosmossdk.io/core/server" "cosmossdk.io/core/transaction" "cosmossdk.io/log" + "cosmossdk.io/schema/decoding" "cosmossdk.io/server/v2/appmanager" "cosmossdk.io/store/v2" ) @@ -19,4 +20,5 @@ type AppI[T transaction.Tx] interface { GetAppManager() *appmanager.AppManager[T] GetQueryHandlers() map[string]appmodulev2.Handler GetStore() store.RootStore + GetSchemaDecoderResolver() decoding.DecoderResolver } diff --git a/simapp/v2/go.mod b/simapp/v2/go.mod index 448a98a8928f..1ae8b374709f 100644 --- a/simapp/v2/go.mod +++ b/simapp/v2/go.mod @@ -61,7 +61,7 @@ require ( cosmossdk.io/core/testing v0.0.0-20240923163230-04da382a9f29 // indirect cosmossdk.io/errors v1.0.1 // indirect cosmossdk.io/errors/v2 v2.0.0-20240731132947-df72853b3ca5 // indirect - cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 // indirect + cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac // indirect cosmossdk.io/server/v2/appmanager v0.0.0-20240802110823-cffeedff643d // indirect cosmossdk.io/server/v2/stf v0.0.0-20240708142107-25e99c54bac1 // indirect cosmossdk.io/store v1.1.1 // indirect diff --git a/simapp/v2/go.sum b/simapp/v2/go.sum index f29f693d6539..c64c8320652f 100644 --- a/simapp/v2/go.sum +++ b/simapp/v2/go.sum @@ -206,8 +206,8 @@ cosmossdk.io/log v1.4.1 h1:wKdjfDRbDyZRuWa8M+9nuvpVYxrEOwbD/CA8hvhU8QM= cosmossdk.io/log v1.4.1/go.mod h1:k08v0Pyq+gCP6phvdI6RCGhLf/r425UT6Rk/m+o74rU= cosmossdk.io/math v1.3.0 h1:RC+jryuKeytIiictDslBP9i1fhkVm6ZDmZEoNP316zE= cosmossdk.io/math v1.3.0/go.mod h1:vnRTxewy+M7BtXBNFybkuhSH4WfedVAAnERHgVFhp3k= -cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9 h1:DmOoS/1PeY6Ih0hAVlJ69kLMUrLV+TCbfICrZtB1vdU= -cosmossdk.io/schema v0.3.1-0.20240930054013-7c6e0388a3f9/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ= +cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac h1:3joNZZWZ3k7fMsrBDL1ktuQ2xQwYLZOaDhkruadDFmc= +cosmossdk.io/schema v0.3.1-0.20241010135032-192601639cac/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= diff --git a/tools/confix/data/v2-app.toml b/tools/confix/data/v2-app.toml index c4f19348ce52..ef144a0b90dd 100644 --- a/tools/confix/data/v2-app.toml +++ b/tools/confix/data/v2-app.toml @@ -21,6 +21,14 @@ standalone = false # max-txs defines the maximum number of transactions that can be in the mempool. A value of 0 indicates an unbounded mempool, a negative value disables the app-side mempool. max-txs = -1 +# indexer defines the configuration for the SDK built-in indexer implementation. +[comet.indexer] +# Buffer size of the channels used for buffering data sent to indexer go routines. +channel_buffer_size = 1024 + +# Target is a map of named indexer targets to their configuration. +[comet.indexer.target] + [grpc] # Enable defines if the gRPC server should be enabled. enable = true @@ -71,7 +79,7 @@ skip-fast-storage-upgrade = true # Enable enables the application telemetry functionality. When enabled, an in-memory sink is also enabled by default. Operators may also enabled other sinks such as Prometheus. enable = true # Address defines the metrics server address to bind to. -address = 'localhost:1338' +address = 'localhost:1318' # Prefixed with keys to separate services. service-name = '' # Enable prefixing gauge values with hostname.