From 4b3ef510d6dd056b80c5f16b087526a0b8ef8a04 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 27 Jun 2023 20:44:26 +0300 Subject: [PATCH 1/7] morph: Do not call callbacks with the switch mutex taken It can lead to a deadlock if a user tries to make some operation with the `Client` in a callback. Signed-off-by: Pavel Karpy --- pkg/morph/client/multi.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index 1b5c21a56c..f5187580f4 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -15,21 +15,23 @@ type Endpoint struct { // SwitchRPC performs reconnection and returns true if it was successful. func (c *Client) SwitchRPC() bool { c.switchLock.Lock() - defer c.switchLock.Unlock() for attempt := 0; attempt < c.cfg.reconnectionRetries; attempt++ { if c.switchPRC() { + c.switchLock.Unlock() return true } select { case <-time.After(c.cfg.reconnectionDelay): case <-c.closeChan: + c.switchLock.Unlock() return false } } c.inactive = true + c.switchLock.Unlock() if c.cfg.inactiveModeCb != nil { c.cfg.inactiveModeCb() From bdd3d3cda6b801e6b3e31e223cab975f8c8a99b7 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Fri, 23 Jun 2023 00:14:12 +0300 Subject: [PATCH 2/7] morph: Provide `WithConnSwitchCallback` option It allows providing a flexible feedback from the `Client` when it has established a connection with a new (next from the list provided on the `Client`'s creation) `neo-go` node. Signed-off-by: Pavel Karpy --- pkg/morph/client/constructor.go | 11 +++++++++++ pkg/morph/client/multi.go | 5 +++++ 2 files changed, 16 insertions(+) diff --git a/pkg/morph/client/constructor.go b/pkg/morph/client/constructor.go index 3b5919adcb..03638a17dd 100644 --- a/pkg/morph/client/constructor.go +++ b/pkg/morph/client/constructor.go @@ -42,6 +42,7 @@ type cfg struct { singleCli *rpcclient.WSClient // neo-go client for single client mode inactiveModeCb Callback + rpcSwitchCb Callback reconnectionRetries int reconnectionDelay time.Duration @@ -297,3 +298,13 @@ func WithConnLostCallback(cb Callback) Option { c.inactiveModeCb = cb } } + +// WithConnSwitchCallback returns a client constructor option +// that specifies a callback that is called when the Client +// reconnected to a new RPC (from [WithEndpoints] list) +// successfully. +func WithConnSwitchCallback(cb Callback) Option { + return func(c *cfg) { + c.rpcSwitchCb = cb + } +} diff --git a/pkg/morph/client/multi.go b/pkg/morph/client/multi.go index f5187580f4..8b7b7aa155 100644 --- a/pkg/morph/client/multi.go +++ b/pkg/morph/client/multi.go @@ -19,6 +19,11 @@ func (c *Client) SwitchRPC() bool { for attempt := 0; attempt < c.cfg.reconnectionRetries; attempt++ { if c.switchPRC() { c.switchLock.Unlock() + + if c.cfg.rpcSwitchCb != nil { + c.cfg.rpcSwitchCb() + } + return true } From fa389b804f363a625b38792523eb4d4132c10e4b Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 26 Jun 2023 18:38:03 +0300 Subject: [PATCH 3/7] ir: Remove intermediate option variable It used to store flexible number of options but now is useless after some changes. Signed-off-by: Pavel Karpy --- pkg/innerring/innerring.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index c77ba34423..024afe41ab 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -535,13 +535,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } - // form morph container client's options - morphCnrOpts := make([]cntClient.Option, 0, 3) - morphCnrOpts = append(morphCnrOpts, - cntClient.AsAlphabet(), - ) - - cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, 0, morphCnrOpts...) + cnrClient, err := cntClient.NewFromMorph(server.morphClient, server.contracts.container, 0, cntClient.AsAlphabet()) if err != nil { return nil, err } From 7b994174d2ae3d3996deaad28a5b5e4592b01965 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 28 Jun 2023 17:27:09 +0300 Subject: [PATCH 4/7] ir: Reorder fields Do not keep `*Client` fields under `// global state` comment, it confuses. Signed-off-by: Pavel Karpy --- pkg/innerring/innerring.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 024afe41ab..b8c91d0dfd 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -71,17 +71,18 @@ type ( blockTimers []*timer.BlockTimer epochTimer *timer.BlockTimer - // global state morphClient *client.Client mainnetClient *client.Client + auditClient *auditClient.Client + balanceClient *balanceClient.Client + netmapClient *nmClient.Client + + // global state epochCounter atomic.Uint64 epochDuration atomic.Uint64 statusIndex *innerRingIndexer precision precision.Fixed8Converter - auditClient *auditClient.Client healthStatus atomic.Value - balanceClient *balanceClient.Client - netmapClient *nmClient.Client persistate *state.PersistentStorage // metrics From 1e014c24fd9398307a40062c1d57f53559ea03f9 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 29 Jun 2023 21:05:04 +0300 Subject: [PATCH 5/7] ir: Do not consider dynamic balance precision It is not likely to be changeable, but it breaks some connection loss handling and confuses (what does it mean to change a precision of the token?). Signed-off-by: Pavel Karpy --- pkg/innerring/innerring.go | 22 +++++++++++----------- pkg/util/precision/converter.go | 27 +++++++++++---------------- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index b8c91d0dfd..d3dc7a3e56 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -81,7 +81,7 @@ type ( epochCounter atomic.Uint64 epochDuration atomic.Uint64 statusIndex *innerRingIndexer - precision precision.Fixed8Converter + precision uint32 // not changeable healthStatus atomic.Value persistate *state.PersistentStorage @@ -551,6 +551,11 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } + server.precision, err = server.balanceClient.Decimals() + if err != nil { + return nil, fmt.Errorf("can't read balance contract precision: %w", err) + } + repClient, err := repClient.NewFromMorph(server.morphClient, server.contracts.reputation, 0, repClient.AsAlphabet()) if err != nil { return nil, err @@ -771,6 +776,8 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return nil, err } + precisionConverter := precision.NewConverter(server.precision) + // create balance processor balanceProcessor, err := balance.New(&balance.Params{ Log: log, @@ -778,7 +785,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan NeoFSClient: neofsCli, BalanceSC: server.contracts.balance, AlphabetState: server, - Converter: &server.precision, + Converter: precisionConverter, }) if err != nil { return nil, err @@ -801,7 +808,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan MorphClient: server.morphClient, EpochState: server, AlphabetState: server, - Converter: &server.precision, + Converter: precisionConverter, MintEmitCacheSize: cfg.GetInt("emit.mint.cache_size"), MintEmitThreshold: cfg.GetUint64("emit.mint.threshold"), MintEmitValue: fixedn.Fixed8(cfg.GetInt64("emit.mint.value")), @@ -1036,15 +1043,8 @@ func (s *Server) initConfigFromBlockchain() error { return fmt.Errorf("can't read epoch duration: %w", err) } - // get balance precision - balancePrecision, err := s.balanceClient.Decimals() - if err != nil { - return fmt.Errorf("can't read balance contract precision: %w", err) - } - s.epochCounter.Store(epoch) s.epochDuration.Store(epochDuration) - s.precision.SetBalancePrecision(balancePrecision) // get next epoch delta tick s.initialEpochTickDelta, err = s.nextEpochBlockDelta() @@ -1056,7 +1056,7 @@ func (s *Server) initConfigFromBlockchain() error { zap.Bool("active", s.IsActive()), zap.Bool("alphabet", s.IsAlphabet()), zap.Uint64("epoch", epoch), - zap.Uint32("precision", balancePrecision), + zap.Uint32("precision", s.precision), zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta), ) diff --git a/pkg/util/precision/converter.go b/pkg/util/precision/converter.go index bd2948f0a4..a7b80468ef 100644 --- a/pkg/util/precision/converter.go +++ b/pkg/util/precision/converter.go @@ -39,11 +39,18 @@ func convert(n, factor *big.Int, decreasePrecision bool) *big.Int { // NewConverter returns Fixed8Converter. func NewConverter(precision uint32) Fixed8Converter { - var c Fixed8Converter - - c.SetBalancePrecision(precision) + exp := int(precision) - fixed8Precision + if exp < 0 { + exp = -exp + } - return c + return Fixed8Converter{ + converter: converter{ + base: fixed8Precision, + target: precision, + factor: new(big.Int).SetInt64(int64(math.Pow10(exp))), + }, + } } func (c converter) toTarget(n *big.Int) *big.Int { @@ -64,18 +71,6 @@ func (c Fixed8Converter) ToBalancePrecision(n int64) int64 { return c.toTarget(new(big.Int).SetInt64(n)).Int64() } -// SetBalancePrecision prepares converter to work. -func (c *Fixed8Converter) SetBalancePrecision(precision uint32) { - exp := int(precision) - fixed8Precision - if exp < 0 { - exp = -exp - } - - c.base = fixed8Precision - c.target = precision - c.factor = new(big.Int).SetInt64(int64(math.Pow10(exp))) -} - // Convert is a wrapper of convert function. Use cached `converter` struct // if fromPrecision and toPrecision are constant. func Convert(fromPrecision, toPrecision uint32, n *big.Int) *big.Int { From 464c4724dd205122d0b54c3d9cb17520bf7cb5f7 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Mon, 26 Jun 2023 18:51:17 +0300 Subject: [PATCH 6/7] ir: Reload internals on connection switch Update cached side chain configurations, reset timers and audit tasks queue. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/innerring/indexer.go | 10 ++++++ pkg/innerring/innerring.go | 73 ++++++++++++++++++++++++++++++-------- 3 files changed, 70 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d631981caf..ef825fcb30 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changelog for NeoFS Node - `renew-domain` command for adm ### Fixed +- `neo-go` RPC connection lost handling by IR (#1337) ### Removed - Deprecated `morph.rpc_endpoint` SN and `morph.endpoint.client` IR config sections (#2400) diff --git a/pkg/innerring/indexer.go b/pkg/innerring/indexer.go index ac5fb93efb..78c50ca29e 100644 --- a/pkg/innerring/indexer.go +++ b/pkg/innerring/indexer.go @@ -82,6 +82,16 @@ func (s *innerRingIndexer) update() (ind indexes, err error) { return s.ind, nil } +func (s *innerRingIndexer) reset() { + s.Lock() + defer s.Unlock() + + // zero time, every real time is expected to + // be _much later_ after that time; `update` + // will be forced to make RPC calls + s.lastAccess = time.Time{} +} + func (s *innerRingIndexer) InnerRingIndex() (int32, error) { ind, err := s.update() if err != nil { diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index d3dc7a3e56..4713251d75 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -77,6 +77,8 @@ type ( balanceClient *balanceClient.Client netmapClient *nmClient.Client + auditTaskManager *audittask.Manager + // global state epochCounter atomic.Uint64 epochDuration atomic.Uint64 @@ -97,7 +99,7 @@ type ( pubKey []byte contracts *contracts predefinedValidators keys.PublicKeys - initialEpochTickDelta uint32 + initialEpochTickDelta atomic.Uint32 withoutMainNet bool // runtime processors @@ -208,7 +210,9 @@ func (s *Server) Start(ctx context.Context, intError chan<- error) (err error) { // tick initial epoch initialEpochTicker := timer.NewOneTickTimer( - timer.StaticBlockMeter(s.initialEpochTickDelta), + func() (uint32, error) { + return s.initialEpochTickDelta.Load(), nil + }, func() { s.netmapProcessor.HandleNewEpochTick(timerEvent.NewEpochTick{}) }) @@ -435,7 +439,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan server.key = acc.PrivateKey() morphChain.key = server.key - server.morphClient, err = createClient(ctx, morphChain, errChan) + server.morphClient, err = server.createClient(ctx, morphChain, errChan) if err != nil { return nil, err } @@ -471,7 +475,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan mainnetChain.from = fromMainChainBlock // create mainnet client - server.mainnetClient, err = createClient(ctx, mainnetChain, errChan) + server.mainnetClient, err = server.createClient(ctx, mainnetChain, errChan) if err != nil { return nil, err } @@ -604,7 +608,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan porPoolSize := cfg.GetInt("audit.por.pool_size") // create audit processor dependencies - auditTaskManager := audittask.New( + server.auditTaskManager = audittask.New( audittask.WithQueueCapacity(cfg.GetUint32("audit.task.queue_capacity")), audittask.WithWorkerPool(auditPool), audittask.WithLogger(log), @@ -618,7 +622,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan }), ) - server.workers = append(server.workers, auditTaskManager.Listen) + server.workers = append(server.workers, server.auditTaskManager.Listen) // create audit processor auditProcessor, err := audit.New(&audit.Params{ @@ -630,7 +634,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan SGSource: clientCache, Key: &server.key.PrivateKey, RPCSearchTimeout: cfg.GetDuration("audit.timeout.search"), - TaskManager: auditTaskManager, + TaskManager: server.auditTaskManager, Reporter: server, }) if err != nil { @@ -984,8 +988,9 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev return listener, err } -func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { - endpoints := p.cfg.GetStringSlice(p.name + ".endpoints") +func (s *Server) createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { + name := p.name + endpoints := p.cfg.GetStringSlice(name + ".endpoints") if len(endpoints) == 0 { return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) } @@ -999,6 +1004,18 @@ func createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*c client.WithEndpoints(endpoints), client.WithReconnectionRetries(p.cfg.GetInt(p.name+".reconnections_number")), client.WithReconnectionsDelay(p.cfg.GetDuration(p.name+".reconnections_delay")), + client.WithConnSwitchCallback(func() { + var err error + + if name == morphPrefix { + err = s.restartMorph() + } else { + err = s.restartMainChain() + } + if err != nil { + errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", name, err) + } + }), client.WithConnLostCallback(func() { errChan <- fmt.Errorf("%s chain connection has been lost", p.name) }), @@ -1043,21 +1060,22 @@ func (s *Server) initConfigFromBlockchain() error { return fmt.Errorf("can't read epoch duration: %w", err) } - s.epochCounter.Store(epoch) - s.epochDuration.Store(epochDuration) - // get next epoch delta tick - s.initialEpochTickDelta, err = s.nextEpochBlockDelta() + delta, err := s.nextEpochBlockDelta() if err != nil { return err } + s.epochCounter.Store(epoch) + s.epochDuration.Store(epochDuration) + s.initialEpochTickDelta.Store(delta) + s.log.Debug("read config from blockchain", zap.Bool("active", s.IsActive()), zap.Bool("alphabet", s.IsAlphabet()), zap.Uint64("epoch", epoch), zap.Uint32("precision", s.precision), - zap.Uint32("init_epoch_tick_delta", s.initialEpochTickDelta), + zap.Uint32("init_epoch_tick_delta", delta), ) return nil @@ -1111,3 +1129,30 @@ func (s *Server) newEpochTickHandlers() []newEpochHandler { return newEpochHandlers } + +func (s *Server) restartMorph() error { + s.log.Info("restarting internal services because of RPC connection loss...") + + s.auditTaskManager.Reset() + s.statusIndex.reset() + + err := s.initConfigFromBlockchain() + if err != nil { + return fmt.Errorf("side chain config reinitialization: %w", err) + } + + for _, t := range s.blockTimers { + err = t.Reset() + if err != nil { + return fmt.Errorf("could not reset block timers: %w", err) + } + } + + s.log.Info("internal services have been restarted after RPC connection loss...") + + return nil +} + +func (s *Server) restartMainChain() error { + return nil +} From 8bbbec02e91f9161def227b2e5b47fc512ab9433 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 27 Jun 2023 02:33:05 +0300 Subject: [PATCH 7/7] ir: Fix neo-go connection loss message It printed "mainnet" for every connection loss before. Pass parameters as a copy, not as a pointer. It has been that way since the beginning of time, no one knows why. Signed-off-by: Pavel Karpy --- pkg/innerring/innerring.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/innerring/innerring.go b/pkg/innerring/innerring.go index 4713251d75..5ff0ee3c20 100644 --- a/pkg/innerring/innerring.go +++ b/pkg/innerring/innerring.go @@ -344,7 +344,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan log.Warn("can't get last processed side chain block number", zap.String("error", err.Error())) } - morphChain := &chainParams{ + morphChain := chainParams{ log: log, cfg: cfg, name: morphPrefix, @@ -953,7 +953,7 @@ func New(ctx context.Context, log *logger.Logger, cfg *viper.Viper, errChan chan return server, nil } -func createListener(ctx context.Context, cli *client.Client, p *chainParams) (event.Listener, error) { +func createListener(ctx context.Context, cli *client.Client, p chainParams) (event.Listener, error) { // listenerPoolCap is a capacity of a // worker pool inside the listener. It // is used to prevent blocking in neo-go: @@ -988,9 +988,8 @@ func createListener(ctx context.Context, cli *client.Client, p *chainParams) (ev return listener, err } -func (s *Server) createClient(ctx context.Context, p *chainParams, errChan chan<- error) (*client.Client, error) { - name := p.name - endpoints := p.cfg.GetStringSlice(name + ".endpoints") +func (s *Server) createClient(ctx context.Context, p chainParams, errChan chan<- error) (*client.Client, error) { + endpoints := p.cfg.GetStringSlice(p.name + ".endpoints") if len(endpoints) == 0 { return nil, fmt.Errorf("%s chain client endpoints not provided", p.name) } @@ -1007,13 +1006,13 @@ func (s *Server) createClient(ctx context.Context, p *chainParams, errChan chan< client.WithConnSwitchCallback(func() { var err error - if name == morphPrefix { + if p.name == morphPrefix { err = s.restartMorph() } else { err = s.restartMainChain() } if err != nil { - errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", name, err) + errChan <- fmt.Errorf("internal services' restart after RPC reconnection to the %s: %w", p.name, err) } }), client.WithConnLostCallback(func() {