Skip to content

Commit

Permalink
node/policy: Cache container policy application results
Browse files Browse the repository at this point in the history
Application result of container (C) storage policy to the network map
(N) does not change for fixed C and N. Previously, `Search` and
`Replicate` object server handlers always calculated the list of
container nodes from scratch. This resulted in excessive node resource
consumption when there was a dense flow of requests for a small number
of containers per epoch. The obvious solution is to cache the latest
results.

A similar attempt had already been made earlier with
9269ed3, but it turned out to be
incorrect and did not change anything.

This adds a caching component for up to 1000 recently requested lists of
container nodes over two epochs - current and previous. By increasing
the amount of memory retained, the component will mitigate load spikes
on a small number of containers. The volume limit of 1000 was chosen
heuristically as a first approximation, and for both epochs.

Tests on the development environment showed a pretty good improvement,
but results on real load tests are yet to be obtained. Based on this,
similar optimization for other layers and queries will be done later.

Refs #2692.

Signed-off-by: Leonard Lyubich <[email protected]>
  • Loading branch information
cthulhu-rider committed Jul 11, 2024
1 parent d68aa9f commit 13a115d
Show file tree
Hide file tree
Showing 3 changed files with 519 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog for NeoFS Node
### Changed
- neofs-cli allows several objects deletion at a time (#2774)
- `ObjectService.Put` server of in-container node places objects using new `ObjectService.Replicate` RPC (#2802)
- `ObjectService`'s `Put` and `Replicate` RPC handlers cache up to 1000 lists of container nodes (#2892)

### Removed

Expand Down
199 changes: 167 additions & 32 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,113 @@ package main

import (
"fmt"
"sync"

"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// storagePolicyRes structures persistent storage policy application result for
// particular container and network map incl. error.
type storagePolicyRes struct {
nodeSets [][]netmapsdk.NodeInfo
err error
}

// containerNodesAtEpoch is a thread-safe LRU cache mapping containers into
// storage policy application results for particular network map.
type containerNodesAtEpoch struct {
mtx sync.RWMutex
lru simplelru.LRUCache[cid.ID, storagePolicyRes]
}

// containerNodesCaches groups caches for all tracked epochs.
type containerNodesCaches struct{ cur, prev *containerNodesAtEpoch }

// max number of container storage policy applications results cached by for
// each epoch.
const cachedContainerNodesPerEpochNum = 1000

// containerNodes wraps NeoFS network state to apply container storage policies.
//
// Since policy application results are consistent for fixed container and
// network map, they could be cached. The containerNodes caches up to
// cachedContainerNodesPerEpochNum results for the latest and the previous
// epochs. The previous one is required to support data migration after the
// epoch tick. Older epochs are not cached.
type containerNodes struct {
containers container.Source
network netmap.Source

lastSeenCurrentEpochMtx sync.Mutex
lastSeenCurrentEpoch uint64
epochCaches containerNodesCaches
}

func newContainerNodesAtEpochLRUCache() (simplelru.LRUCache[cid.ID, storagePolicyRes], error) {
lru, err := simplelru.NewLRU[cid.ID, storagePolicyRes](cachedContainerNodesPerEpochNum, nil)
if err != nil {
return nil, fmt.Errorf("create LRU container node cache for one epoch: %w", err)

Check warning on line 54 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L54

Added line #L54 was not covered by tests
}
return lru, nil
}

func newContainerNodes(containers container.Source, network netmap.Source) (*containerNodes, error) {
lru, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return nil, err

Check warning on line 62 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L62

Added line #L62 was not covered by tests
}
return &containerNodes{
containers: containers,
network: network,
epochCaches: containerNodesCaches{
cur: &containerNodesAtEpoch{lru: lru},
},
}, nil
}

func (x *containerNodes) updateAndGetCachesForCurrentEpoch(curEpoch uint64) (containerNodesCaches, error) {
x.lastSeenCurrentEpochMtx.Lock()
defer x.lastSeenCurrentEpochMtx.Unlock()

if curEpoch == x.lastSeenCurrentEpoch {
return x.epochCaches, nil
}

lru, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return containerNodesCaches{}, err

Check warning on line 83 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L83

Added line #L83 was not covered by tests
}
if curEpoch > x.lastSeenCurrentEpoch {
if curEpoch == x.lastSeenCurrentEpoch+1 {
x.epochCaches.prev = x.epochCaches.cur
} else {
lruPrev, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return containerNodesCaches{}, err

Check warning on line 91 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L91

Added line #L91 was not covered by tests
}
x.epochCaches.prev = &containerNodesAtEpoch{lru: lruPrev}
}
x.epochCaches.cur = &containerNodesAtEpoch{lru: lru}
} else if curEpoch < x.lastSeenCurrentEpoch { // not really expected in practice
if curEpoch == x.lastSeenCurrentEpoch-1 {
x.epochCaches.cur = x.epochCaches.prev
} else {
lruCur, err := newContainerNodesAtEpochLRUCache()
if err != nil {
return containerNodesCaches{}, err

Check warning on line 102 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L102

Added line #L102 was not covered by tests
}
x.epochCaches.cur = &containerNodesAtEpoch{lru: lruCur}
}
x.epochCaches.prev = &containerNodesAtEpoch{lru: lru}
}
x.lastSeenCurrentEpoch = curEpoch
return x.epochCaches, nil
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
Expand All @@ -32,58 +119,106 @@ func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.
}

func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error {
epoch, err := x.network.Epoch()
curEpoch, err := x.network.Epoch()
if err != nil {
return fmt.Errorf("read current NeoFS epoch: %w", err)
}

cnr, err := x.containers.Get(cnrID)
caches, err := x.updateAndGetCachesForCurrentEpoch(curEpoch)
if err != nil {
return fmt.Errorf("read container by ID: %w", err)
return err

Check warning on line 129 in cmd/neofs-node/policy.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/policy.go#L129

Added line #L129 was not covered by tests
}

networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
}
// TODO(#2692): node sets remain unchanged for fixed container and network map,
// so recently calculated results worth caching
ns, err := networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
}
cnrCtx := containerPolicyContext{id: cnrID, containers: x.containers, network: x.network}

for i := range ns {
for j := range ns[i] {
if !f(ns[i][j]) {
return nil
resCur, err := cnrCtx.applyAtEpoch(curEpoch, caches.cur)
if err != nil {
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, err)
} else if resCur.err == nil { // error case handled below
for i := range resCur.nodeSets {
for j := range resCur.nodeSets[i] {
if !f(resCur.nodeSets[i][j]) {
return nil
}
}
}
}

if !withPrevEpoch || epoch == 0 {
if !withPrevEpoch || curEpoch == 0 {
if resCur.err != nil {
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err)
}
return nil
}

epoch--

networkMap, err = x.network.GetNetMapByEpoch(epoch)
resPrev, err := cnrCtx.applyAtEpoch(curEpoch-1, caches.prev)
if err != nil {
return fmt.Errorf("read network map at epoch #%d: %w", epoch, err)
if resCur.err != nil {
return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w",
curEpoch, resCur.err, curEpoch-1, err)
}
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, err)
} else if resPrev.err == nil { // error case handled below
for i := range resPrev.nodeSets {
for j := range resPrev.nodeSets[i] {
if !f(resPrev.nodeSets[i][j]) {
return nil
}
}
}
}

ns, err = networkMap.ContainerNodes(cnr.Value.PlacementPolicy(), cnrID)
if err != nil {
return fmt.Errorf("apply container's storage policy to the network map at epoch #%d: %w", epoch, err)
if resCur.err != nil {
if resPrev.err != nil {
return fmt.Errorf("select container nodes for both epochs: (current#%d) %w; (previous#%d) %w",
curEpoch, resCur.err, curEpoch-1, resPrev.err)
}
return fmt.Errorf("select container nodes for current epoch #%d: %w", curEpoch, resCur.err)
} else if resPrev.err != nil {
return fmt.Errorf("select container nodes for previous epoch #%d: %w", curEpoch-1, resPrev.err)
}
return nil
}

for i := range ns {
for j := range ns[i] {
if !f(ns[i][j]) {
return nil
}
// preserves context of storage policy processing for the particular container.
type containerPolicyContext struct {
// static
id cid.ID
containers container.Source
network netmap.Source
// dynamic
cnr *container.Container
}

// applyAtEpoch applies storage policy of container referenced by parameterized
// ID to the network map at the specified epoch. If cache is non-nil,
// applyAtEpoch read existing results from it and stores new results in it.
func (x *containerPolicyContext) applyAtEpoch(epoch uint64, cache *containerNodesAtEpoch) (storagePolicyRes, error) {
if cache != nil {
cache.mtx.Lock()
defer cache.mtx.Unlock()
if result, ok := cache.lru.Get(x.id); ok {
return result, nil
}
}

return nil
var result storagePolicyRes
var err error
if x.cnr == nil {
x.cnr, err = x.containers.Get(x.id)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read container by ID: %w", err)
}
}
networkMap, err := x.network.GetNetMapByEpoch(epoch)
if err != nil {
// non-persistent error => do not cache
return result, fmt.Errorf("read network map by epoch: %w", err)
}
result.nodeSets, result.err = networkMap.ContainerNodes(x.cnr.Value.PlacementPolicy(), x.id)
if cache != nil {
// lock already acquired above!
cache.lru.Add(x.id, result)
}
return result, nil
}
Loading

0 comments on commit 13a115d

Please sign in to comment.