Skip to content

Commit

Permalink
Restart SN services after neo-go is down (#2434)
Browse files Browse the repository at this point in the history
  • Loading branch information
cthulhu-rider authored Jul 21, 2023
2 parents 16fe138 + 25522ce commit f751d71
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 8 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Changelog for NeoFS Node
- `renew-domain` command for adm

### Fixed
- `neo-go` RPC connection lost handling by IR (#1337)
- `neo-go` RPC connection loss handling (#1337)
- Concurrent morph cache misses (#1248)

### Removed
Expand Down
27 changes: 26 additions & 1 deletion cmd/neofs-node/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (c *ttlNetCache[K, V]) remove(key K) {
c.cache.Remove(key)
}

// reset removes every cached value.
func (c *ttlNetCache[K, V]) reset() {
c.cache.Purge()
}

// entity that provides LRU cache interface.
type lruNetCache struct {
cache *lru.Cache[uint64, *netmapSDK.NetMap]
Expand Down Expand Up @@ -168,6 +173,10 @@ func (c *lruNetCache) get(key uint64) (*netmapSDK.NetMap, error) {
return val, nil
}

func (c *lruNetCache) reset() {
c.cache.Purge()
}

// wrapper over TTL cache of values read from the network
// that implements container storage.
type ttlContainerStorage struct {
Expand Down Expand Up @@ -199,6 +208,10 @@ func (s *ttlContainerStorage) Get(cnr cid.ID) (*container.Container, error) {
return val, nil
}

func (s *ttlContainerStorage) reset() {
s.tc.reset()
}

type ttlEACLStorage struct {
tc ttlNetCache[cid.ID, *container.EACL]
}
Expand Down Expand Up @@ -229,13 +242,17 @@ func (s *ttlEACLStorage) InvalidateEACL(cnr cid.ID) {
s.tc.remove(cnr)
}

func (s *ttlEACLStorage) reset() {
s.tc.reset()
}

type lruNetmapSource struct {
netState netmap.State

cache *lruNetCache
}

func newCachedNetmapStorage(s netmap.State, v netmap.Source) netmap.Source {
func newCachedNetmapStorage(s netmap.State, v netmap.Source) *lruNetmapSource {
const netmapCacheSize = 10

return &lruNetmapSource{
Expand Down Expand Up @@ -267,6 +284,10 @@ func (s *lruNetmapSource) Epoch() (uint64, error) {
return s.netState.CurrentEpoch(), nil
}

func (s *lruNetmapSource) reset() {
s.cache.reset()
}

// wrapper over TTL cache of values read from the network
// that implements container lister.
type ttlContainerLister struct {
Expand Down Expand Up @@ -378,6 +399,10 @@ func (s *ttlContainerLister) update(owner user.ID, cnr cid.ID, add bool) {
item.mtx.Unlock()
}

func (s *ttlContainerLister) reset() {
s.inner.reset()
}

type cachedIRFetcher struct {
tc ttlNetCache[struct{}, [][]byte]
}
Expand Down
20 changes: 20 additions & 0 deletions cmd/neofs-node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ type shared struct {
putClientCache *cache.ClientCache
localAddr network.AddressGroup

containerCache *ttlContainerStorage
eaclCache *ttlEACLStorage
containerListCache *ttlContainerLister
netmapCache *lruNetmapSource

key *keys.PrivateKey
binPublicKey []byte
ownerIDFromKey user.ID // user ID calculated from key
Expand All @@ -371,6 +376,21 @@ type shared struct {
metricsCollector *metrics.NodeMetrics
}

func (s shared) resetCaches() {
if s.containerCache != nil {
s.containerCache.reset()
}
if s.eaclCache != nil {
s.eaclCache.reset()
}
if s.containerListCache != nil {
s.containerListCache.reset()
}
if s.netmapCache != nil {
s.netmapCache.reset()
}
}

// dynamicConfiguration stores parameters of the
// components that supports runtime reconfigurations.
type dynamicConfiguration struct {
Expand Down
4 changes: 4 additions & 0 deletions cmd/neofs-node/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func initContainerService(c *cfg) {
cachedEACLStorage := newCachedEACLStorage(eACLFetcher, c.cfgMorph.cacheTTL)
cachedContainerLister := newCachedContainerLister(wrap, c.cfgMorph.cacheTTL)

c.shared.containerCache = cachedContainerStorage
c.shared.eaclCache = cachedEACLStorage
c.shared.containerListCache = cachedContainerLister

subscribeToContainerCreation(c, func(e event.Event) {
ev := e.(containerEvent.PutSuccess)

Expand Down
34 changes: 34 additions & 0 deletions cmd/neofs-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,37 @@ func shutdown(c *cfg) {
func (c *cfg) onShutdown(f func()) {
c.closers = append(c.closers, f)
}

func (c *cfg) restartMorph() error {
c.log.Info("restarting internal services because of RPC connection loss...")

c.shared.resetCaches()

epoch, ni, err := getNetworkState(c)
if err != nil {
return fmt.Errorf("getting network state: %w", err)
}

updateLocalState(c, epoch, ni)

// drop expired sessions if any has appeared while node was sleeping
c.shared.privateTokenStore.RemoveOld(epoch)

// bootstrap node after every reconnection cause the longevity of
// a connection downstate is unpredictable and bootstrap TX is a
// way to make a heartbeat so nothing is wrong in making sure the
// node is online (if it should be)

if !c.needBootstrap() || c.cfgNetmap.reBoostrapTurnedOff.Load() {
return nil
}

err = c.bootstrap()
if err != nil {
c.log.Warn("failed to re-bootstrap", zap.Error(err))
}

c.log.Info("internal services have been restarted after RPC connection loss")

return nil
}
10 changes: 9 additions & 1 deletion cmd/neofs-node/morph.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func initMorphComponents(c *cfg) {
client.WithEndpoints(addresses),
client.WithReconnectionRetries(morphconfig.ReconnectionRetriesNumber(c.appCfg)),
client.WithReconnectionsDelay(morphconfig.ReconnectionRetriesDelay(c.appCfg)),
client.WithConnSwitchCallback(func() {
err = c.restartMorph()
if err != nil {
c.internalErr <- fmt.Errorf("restarting after morph connection was lost: %w", err)
}
}),
client.WithConnLostCallback(func() {
c.internalErr <- errors.New("morph connection has been lost")
}),
Expand Down Expand Up @@ -84,8 +90,10 @@ func initMorphComponents(c *cfg) {
if c.cfgMorph.cacheTTL < 0 {
netmapSource = wrap
} else {
c.shared.netmapCache = newCachedNetmapStorage(c.cfgNetmap.state, wrap)

// use RPC node as source of netmap (with caching)
netmapSource = newCachedNetmapStorage(c.cfgNetmap.state, wrap)
netmapSource = c.shared.netmapCache
}

c.netMapSource = netmapSource
Expand Down
25 changes: 20 additions & 5 deletions cmd/neofs-node/netmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,8 @@ func setNetmapNotificationParser(c *cfg, sTyp string, p event.NotificationParser
// initNetmapState inits current Network map state.
// Must be called after Morph components initialization.
func initNetmapState(c *cfg) {
epoch, err := c.cfgNetmap.wrapper.Epoch()
fatalOnErrDetails("could not initialize current epoch number", err)

ni, err := c.netmapLocalNodeState(epoch)
fatalOnErrDetails("could not init network state", err)
epoch, ni, err := getNetworkState(c)
fatalOnErrDetails("getting network state", err)

stateWord := "undefined"

Expand All @@ -269,6 +266,24 @@ func initNetmapState(c *cfg) {
zap.String("state", stateWord),
)

updateLocalState(c, epoch, ni)
}

func getNetworkState(c *cfg) (uint64, *netmapSDK.NodeInfo, error) {
epoch, err := c.cfgNetmap.wrapper.Epoch()
if err != nil {
return 0, nil, fmt.Errorf("could not get current epoch number: %w", err)
}

ni, err := c.netmapLocalNodeState(epoch)
if err != nil {
return 0, nil, fmt.Errorf("could not init network state: %w", err)
}

return epoch, ni, nil
}

func updateLocalState(c *cfg, epoch uint64, ni *netmapSDK.NodeInfo) {
c.cfgNetmap.state.setCurrentEpoch(epoch)
c.cfgNetmap.startEpoch = epoch
c.handleLocalNodeInfo(ni)
Expand Down

0 comments on commit f751d71

Please sign in to comment.