Skip to content

Store tree service sync height in the metabase #82

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog for FrostFS Node
- Reload config for pprof and metrics on SIGHUP in `neofs-node` (#1868)
- Multiple configs support (#44)
- Parameters `nns-name` and `nns-zone` for command `frostfs-cli container create` (#37)
- Tree service now saves the last synchronization height which persists across restarts (#82)

### Changed
- Change `frostfs_node_engine_container_size` to counting sizes of logical objects
Expand Down
38 changes: 38 additions & 0 deletions pkg/local_object_storage/engine/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,44 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return err == nil, err
}

// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
index, lst, err := e.getTreeShard(cid, treeID)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
return err
}

err = lst[index].TreeUpdateLastSyncHeight(cid, treeID, height)
if err != nil && !errors.Is(err, shard.ErrReadOnlyMode) && err != shard.ErrPiloramaDisabled {
e.reportShardError(lst[index], "can't update tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
}
return err
}

// TreeLastSyncHeight implements the pilorama.Forest interface.
func (e *StorageEngine) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
var err error
var height uint64
for _, sh := range e.sortShardsByWeight(cid) {
height, err = sh.TreeLastSyncHeight(cid, treeID)
if err != nil {
if err == shard.ErrPiloramaDisabled {
break
}
if !errors.Is(err, pilorama.ErrTreeNotFound) {
e.reportShardError(sh, "can't read tree synchronization height", err,
zap.Stringer("cid", cid),
zap.String("tree", treeID))
}
continue
}
return height, err
}
return height, err
}

func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) {
lst := e.sortShardsByWeight(cid)
for i, sh := range lst {
Expand Down
40 changes: 40 additions & 0 deletions pkg/local_object_storage/pilorama/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,46 @@ func (t *boltForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
return exists, err
}

var syncHeightKey = []byte{'h'}

// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
rawHeight := make([]byte, 8)
binary.LittleEndian.PutUint64(rawHeight, height)

buck := bucketName(cid, treeID)
return t.db.Batch(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck)
if treeRoot == nil {
return ErrTreeNotFound
}

b := treeRoot.Bucket(dataBucket)
return b.Put(syncHeightKey, rawHeight)
})
}

// TreeLastSyncHeight implements the pilorama.Forest interface.
func (t *boltForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
var height uint64

buck := bucketName(cid, treeID)
err := t.db.View(func(tx *bbolt.Tx) error {
treeRoot := tx.Bucket(buck)
if treeRoot == nil {
return ErrTreeNotFound
}

b := treeRoot.Bucket(dataBucket)
data := b.Get(syncHeightKey)
if len(data) == 8 {
height = binary.LittleEndian.Uint64(data)
}
return nil
})
return height, err
}

// TreeAddByPath implements the Forest interface.
func (t *boltForest) TreeAddByPath(d CIDDescriptor, treeID string, attr string, path []string, meta []KeyValue) ([]Move, error) {
if !d.checkValid() {
Expand Down
21 changes: 21 additions & 0 deletions pkg/local_object_storage/pilorama/forest.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,24 @@ func (f *memoryForest) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
_, ok := f.treeMap[fullID]
return ok, nil
}

// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (f *memoryForest) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
fullID := cid.EncodeToString() + "/" + treeID
t, ok := f.treeMap[fullID]
if !ok {
return ErrTreeNotFound
}
t.syncHeight = height
return nil
}

// TreeLastSyncHeight implements the pilorama.Forest interface.
func (f *memoryForest) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
fullID := cid.EncodeToString() + "/" + treeID
t, ok := f.treeMap[fullID]
if !ok {
return 0, ErrTreeNotFound
}
return t.syncHeight, nil
}
49 changes: 49 additions & 0 deletions pkg/local_object_storage/pilorama/forest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1030,3 +1030,52 @@ func testTreeGetTrees(t *testing.T, s Forest) {
require.ElementsMatch(t, treeIDs[cid], trees)
}
}

func TestTreeLastSyncHeight(t *testing.T) {
for i := range providers {
t.Run(providers[i].name, func(t *testing.T) {
testTreeLastSyncHeight(t, providers[i].construct(t))
})
}
}

func testTreeLastSyncHeight(t *testing.T, f Forest) {
cnr := cidtest.ID()
treeID := "someTree"

t.Run("ErrNotFound if no log operations are stored for a tree", func(t *testing.T) {
_, err := f.TreeLastSyncHeight(cnr, treeID)
require.ErrorIs(t, err, ErrTreeNotFound)

err = f.TreeUpdateLastSyncHeight(cnr, treeID, 1)
require.ErrorIs(t, err, ErrTreeNotFound)
})

_, err := f.TreeMove(CIDDescriptor{CID: cnr, Size: 1}, treeID, &Move{
Parent: RootID,
Child: 1,
})
require.NoError(t, err)

h, err := f.TreeLastSyncHeight(cnr, treeID)
require.NoError(t, err)
require.EqualValues(t, 0, h)

t.Run("separate storages for separate containers", func(t *testing.T) {
_, err := f.TreeLastSyncHeight(cidtest.ID(), treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
})

require.NoError(t, f.TreeUpdateLastSyncHeight(cnr, treeID, 10))

h, err = f.TreeLastSyncHeight(cnr, treeID)
require.NoError(t, err)
require.EqualValues(t, 10, h)

t.Run("removed correctly", func(t *testing.T) {
require.NoError(t, f.TreeDrop(cnr, treeID))

_, err := f.TreeLastSyncHeight(cnr, treeID)
require.ErrorIs(t, err, ErrTreeNotFound)
})
}
5 changes: 3 additions & 2 deletions pkg/local_object_storage/pilorama/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ func (s *state) findSpareID() Node {

// tree is a mapping from the child nodes to their parent and metadata.
type tree struct {
infoMap map[Node]nodeInfo
childMap map[Node][]Node
syncHeight uint64
infoMap map[Node]nodeInfo
childMap map[Node][]Node
}

func newTree() *tree {
Expand Down
4 changes: 4 additions & 0 deletions pkg/local_object_storage/pilorama/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ type Forest interface {
// TreeExists checks if a tree exists locally.
// If the tree is not found, false and a nil error should be returned.
TreeExists(cid cidSDK.ID, treeID string) (bool, error)
// TreeUpdateLastSyncHeight updates last log height synchronized with _all_ container nodes.
TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error
// TreeLastSyncHeight returns last log height synchronized with _all_ container nodes.
TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error)
}

type ForestStorage interface {
Expand Down
16 changes: 16 additions & 0 deletions pkg/local_object_storage/shard/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,19 @@ func (s *Shard) TreeExists(cid cidSDK.ID, treeID string) (bool, error) {
}
return s.pilorama.TreeExists(cid, treeID)
}

// TreeUpdateLastSyncHeight implements the pilorama.Forest interface.
func (s *Shard) TreeUpdateLastSyncHeight(cid cidSDK.ID, treeID string, height uint64) error {
if s.pilorama == nil {
return ErrPiloramaDisabled
}
return s.pilorama.TreeUpdateLastSyncHeight(cid, treeID, height)
}

// TreeLastSyncHeight implements the pilorama.Forest interface.
func (s *Shard) TreeLastSyncHeight(cid cidSDK.ID, treeID string) (uint64, error) {
if s.pilorama == nil {
return 0, ErrPiloramaDisabled
}
return s.pilorama.TreeLastSyncHeight(cid, treeID)
}
8 changes: 3 additions & 5 deletions pkg/services/tree/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ type Service struct {
syncChan chan struct{}
syncPool *ants.Pool

// cnrMap maps contrainer and tree ID to the minimum height which was fetched from _each_ client.
// This allows us to better handle split-brain scenario, because we always synchronize
// from the last seen height. The inner map is read-only and should not be modified in-place.
cnrMap map[cidSDK.ID]map[string]uint64
// cnrMap contains existing (used) container IDs.
cnrMap map[cidSDK.ID]struct{}
// cnrMapMtx protects cnrMap
cnrMapMtx sync.Mutex
}
Expand Down Expand Up @@ -63,7 +61,7 @@ func New(opts ...Option) *Service {
s.replicateLocalCh = make(chan applyOp)
s.replicationTasks = make(chan replicationTask, s.replicatorWorkerCount)
s.containerCache.init(s.containerCacheSize)
s.cnrMap = make(map[cidSDK.ID]map[string]uint64)
s.cnrMap = make(map[cidSDK.ID]struct{})
s.syncChan = make(chan struct{})
s.syncPool, _ = ants.NewPool(defaultSyncWorkerCount)

Expand Down
35 changes: 14 additions & 21 deletions pkg/services/tree/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,24 @@ func (s *Service) synchronizeAllTrees(ctx context.Context, cid cid.ID) error {
return fmt.Errorf("could not fetch tree ID list: %w", outErr)
}

s.cnrMapMtx.Lock()
oldStatus := s.cnrMap[cid]
s.cnrMapMtx.Unlock()

syncStatus := map[string]uint64{}
for i := range treesToSync {
syncStatus[treesToSync[i]] = 0
}
for tid := range oldStatus {
if _, ok := syncStatus[tid]; ok {
syncStatus[tid] = oldStatus[tid]
}
}

for _, tid := range treesToSync {
h := s.synchronizeTree(ctx, d, syncStatus[tid], tid, nodes)
if syncStatus[tid] < h {
syncStatus[tid] = h
h, err := s.forest.TreeLastSyncHeight(d.CID, tid)
if err != nil && !errors.Is(err, pilorama.ErrTreeNotFound) {
s.log.Warn("could not get last synchronized height for a tree",
zap.Stringer("cid", d.CID),
zap.String("tree", tid))
continue
}
newHeight := s.synchronizeTree(ctx, d, h, tid, nodes)
if h < newHeight {
if err := s.forest.TreeUpdateLastSyncHeight(d.CID, tid, newHeight); err != nil {
s.log.Warn("could not update last synchronized height for a tree",
zap.Stringer("cid", d.CID),
zap.String("tree", tid))
}
}
}

s.cnrMapMtx.Lock()
s.cnrMap[cid] = syncStatus
s.cnrMapMtx.Unlock()

return nil
}

Expand Down