Skip to content

Feature/308 frostfs add forming phy storage group #9

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions audit/collect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package audit

import (
"context"
"fmt"

"github.com/TrueCloudLab/frostfs-sdk-go/checksum"
cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id"
"github.com/TrueCloudLab/frostfs-sdk-go/object"
oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/TrueCloudLab/frostfs-sdk-go/object/relations"
"github.com/TrueCloudLab/frostfs-sdk-go/storagegroup"
"github.com/TrueCloudLab/tzhash/tz"
)

type Collector interface {
Head(ctx context.Context, addr oid.Address) (*object.Object, error)
relations.Relations
}

// CollectMembers creates new storage group structure and fills it
// with information about members collected via HeadReceiver.
//
// Resulting storage group consists of physically stored objects only.
func CollectMembers(ctx context.Context, collector Collector, cnr cid.ID, members []oid.ID, tokens relations.Tokens, calcHomoHash bool) (*storagegroup.StorageGroup, error) {
var (
err error
sumPhySize uint64
phyMembers []oid.ID
phyHashes [][]byte
addr oid.Address
sg storagegroup.StorageGroup
)

addr.SetContainer(cnr)

for i := range members {
if phyMembers, err = relations.ListRelations(ctx, collector, cnr, members[i], tokens, false); err != nil {
return nil, err
}

for _, phyMember := range phyMembers {
addr.SetObject(phyMember)
leaf, err := collector.Head(ctx, addr)
if err != nil {
return nil, fmt.Errorf("head phy member '%s': %w", phyMember.EncodeToString(), err)
}

sumPhySize += leaf.PayloadSize()
cs, _ := leaf.PayloadHomomorphicHash()

if calcHomoHash {
phyHashes = append(phyHashes, cs.Value())
}
}
}

sg.SetMembers(phyMembers)
sg.SetValidationDataSize(sumPhySize)

if calcHomoHash {
sumHash, err := tz.Concat(phyHashes)
if err != nil {
return nil, err
}

var cs checksum.Checksum
tzHash := [64]byte{}
copy(tzHash[:], sumHash)
cs.SetTillichZemor(tzHash)

sg.SetValidationDataHash(cs)
}

return &sg, nil
}
70 changes: 39 additions & 31 deletions object/relations/relations.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ type Tokens struct {
}

type Relations interface {
// GetSplitInfo tries to get split info by root object id.
// If object isn't virtual it returns ErrNoSplitInfo.
// GetSplitInfo tries to get split info by some object id.
// This method must return split info on any object from split chain as well as on parent/linking object.
// If object doesn't have any split information returns ErrNoSplitInfo.
GetSplitInfo(ctx context.Context, cnrID cid.ID, rootID oid.ID, tokens Tokens) (*object.SplitInfo, error)

// ListChildrenByLinker returns list of children for link object.
Expand All @@ -31,9 +32,6 @@ type Relations interface {
// If no previous object it returns ErrNoLeftSibling.
GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error)

// FindSiblingBySplitID returns all objects that relates to the provided split id.
FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens Tokens) ([]oid.ID, error)

// FindSiblingByParentID returns all object that relates to the provided parent id.
FindSiblingByParentID(ctx context.Context, cnrID cid.ID, parentID oid.ID, tokens Tokens) ([]oid.ID, error)
}
Expand All @@ -46,9 +44,15 @@ var (
ErrNoSplitInfo = errors.New("no split info")
)

// ListAllRelations return all related phy objects for provided root object ID.
// Result doesn't include root object ID itself.
// ListAllRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself. If linking object is found its id will be the last one.
func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) {
return ListRelations(ctx, rels, cnrID, rootObjID, tokens, true)
}

// ListRelations return all related phy objects for provided root object ID in split-chain order.
// Result doesn't include root object ID itself.
func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens, includeLinking bool) ([]oid.ID, error) {
splitInfo, err := rels.GetSplitInfo(ctx, cnrID, rootObjID, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
Expand All @@ -59,22 +63,40 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj

// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if idLinking, ok := splitInfo.Link(); ok {
children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens)
if _, ok := splitInfo.Link(); !ok {
// the list is expected to contain last part and (probably) split info
list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens)
if err != nil {
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
return nil, fmt.Errorf("failed to find object children: %w", err)
}

// include linking object
return append(children, idLinking), nil
for _, id := range list {
split, err := rels.GetSplitInfo(ctx, cnrID, id, tokens)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
continue
}
return nil, fmt.Errorf("failed to get split info: %w", err)
}
if link, ok := split.Link(); ok {
splitInfo.SetLink(link)
}
if last, ok := split.LastPart(); ok {
splitInfo.SetLastPart(last)
}
}
}

if idSplit := splitInfo.SplitID(); idSplit != nil {
members, err := rels.FindSiblingBySplitID(ctx, cnrID, idSplit, tokens)
if idLinking, ok := splitInfo.Link(); ok {
children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens)
if err != nil {
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
return nil, fmt.Errorf("failed to get linking object's header: %w", err)
}
return members, nil

if includeLinking {
children = append(children, idLinking)
}
return children, nil
}

idMember, ok := splitInfo.LastPart()
Expand All @@ -85,9 +107,6 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}

// prmHead.SetRawFlag(false)
// split members are almost definitely singular, but don't get hung up on it

for {
idMember, err = rels.GetLeftSibling(ctx, cnrID, idMember, tokens)
if err != nil {
Expand All @@ -101,20 +120,9 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj
return nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
}

chain = append(chain, idMember)
chain = append([]oid.ID{idMember}, chain...)
chainSet[idMember] = struct{}{}
}

list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens)
if err != nil {
return nil, fmt.Errorf("failed to find object children: %w", err)
}

for i := range list {
if _, ok = chainSet[list[i]]; !ok {
chain = append(chain, list[i])
}
}

return chain, nil
}
50 changes: 16 additions & 34 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2538,15 +2538,29 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok
}
prm.MarkRaw()

_, err := p.HeadObject(ctx, prm)
res, err := p.HeadObject(ctx, prm)

var errSplit *object.SplitInfoError

switch {
case errors.As(err, &errSplit):
return errSplit.SplitInfo(), nil
case err == nil:
return nil, relations.ErrNoSplitInfo
if res.SplitID() == nil {
return nil, relations.ErrNoSplitInfo
}

splitInfo := object.NewSplitInfo()
splitInfo.SetSplitID(res.SplitID())
if res.HasParent() {
if len(res.Children()) > 0 {
splitInfo.SetLink(objID)
} else {
splitInfo.SetLastPart(objID)
}
}

return splitInfo, nil
default:
return nil, fmt.Errorf("failed to get raw object header: %w", err)
}
Expand Down Expand Up @@ -2602,38 +2616,6 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t
return idMember, nil
}

// FindSiblingBySplitID implements relations.Relations.
func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
query.AddSplitIDFilter(object.MatchStringEqual, splitID)

var prm PrmObjectSearch
prm.SetContainerID(cnrID)
prm.SetFilters(query)
if tokens.Bearer != nil {
prm.UseBearer(*tokens.Bearer)
}
if tokens.Session != nil {
prm.UseSession(*tokens.Session)
}

res, err := p.SearchObjects(ctx, prm)
if err != nil {
return nil, fmt.Errorf("failed to search objects by split ID: %w", err)
}

var members []oid.ID
err = res.Iterate(func(id oid.ID) bool {
members = append(members, id)
return false
})
if err != nil {
return nil, fmt.Errorf("failed to iterate found objects: %w", err)
}

return members, nil
}

// FindSiblingByParentID implements relations.Relations.
func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
var query object.SearchFilters
Expand Down