diff --git a/audit/collect.go b/audit/collect.go new file mode 100644 index 00000000..6b4ddfee --- /dev/null +++ b/audit/collect.go @@ -0,0 +1,76 @@ +package audit + +import ( + "context" + "fmt" + + "github.com/TrueCloudLab/frostfs-sdk-go/checksum" + cid "github.com/TrueCloudLab/frostfs-sdk-go/container/id" + "github.com/TrueCloudLab/frostfs-sdk-go/object" + oid "github.com/TrueCloudLab/frostfs-sdk-go/object/id" + "github.com/TrueCloudLab/frostfs-sdk-go/object/relations" + "github.com/TrueCloudLab/frostfs-sdk-go/storagegroup" + "github.com/TrueCloudLab/tzhash/tz" +) + +type Collector interface { + Head(ctx context.Context, addr oid.Address) (*object.Object, error) + relations.Relations +} + +// CollectMembers creates new storage group structure and fills it +// with information about members collected via HeadReceiver. +// +// Resulting storage group consists of physically stored objects only. +func CollectMembers(ctx context.Context, collector Collector, cnr cid.ID, members []oid.ID, tokens relations.Tokens, calcHomoHash bool) (*storagegroup.StorageGroup, error) { + var ( + err error + sumPhySize uint64 + phyMembers []oid.ID + phyHashes [][]byte + addr oid.Address + sg storagegroup.StorageGroup + ) + + addr.SetContainer(cnr) + + for i := range members { + if phyMembers, err = relations.ListRelations(ctx, collector, cnr, members[i], tokens, false); err != nil { + return nil, err + } + + for _, phyMember := range phyMembers { + addr.SetObject(phyMember) + leaf, err := collector.Head(ctx, addr) + if err != nil { + return nil, fmt.Errorf("head phy member '%s': %w", phyMember.EncodeToString(), err) + } + + sumPhySize += leaf.PayloadSize() + cs, _ := leaf.PayloadHomomorphicHash() + + if calcHomoHash { + phyHashes = append(phyHashes, cs.Value()) + } + } + } + + sg.SetMembers(phyMembers) + sg.SetValidationDataSize(sumPhySize) + + if calcHomoHash { + sumHash, err := tz.Concat(phyHashes) + if err != nil { + return nil, err + } + + var cs checksum.Checksum + tzHash := [64]byte{} + copy(tzHash[:], sumHash) + cs.SetTillichZemor(tzHash) + + sg.SetValidationDataHash(cs) + } + + return &sg, nil +} diff --git a/object/relations/relations.go b/object/relations/relations.go index c1de85c3..3e024eb3 100644 --- a/object/relations/relations.go +++ b/object/relations/relations.go @@ -19,8 +19,9 @@ type Tokens struct { } type Relations interface { - // GetSplitInfo tries to get split info by root object id. - // If object isn't virtual it returns ErrNoSplitInfo. + // GetSplitInfo tries to get split info by some object id. + // This method must return split info on any object from split chain as well as on parent/linking object. + // If object doesn't have any split information returns ErrNoSplitInfo. GetSplitInfo(ctx context.Context, cnrID cid.ID, rootID oid.ID, tokens Tokens) (*object.SplitInfo, error) // ListChildrenByLinker returns list of children for link object. @@ -31,9 +32,6 @@ type Relations interface { // If no previous object it returns ErrNoLeftSibling. GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens Tokens) (oid.ID, error) - // FindSiblingBySplitID returns all objects that relates to the provided split id. - FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens Tokens) ([]oid.ID, error) - // FindSiblingByParentID returns all object that relates to the provided parent id. FindSiblingByParentID(ctx context.Context, cnrID cid.ID, parentID oid.ID, tokens Tokens) ([]oid.ID, error) } @@ -46,9 +44,15 @@ var ( ErrNoSplitInfo = errors.New("no split info") ) -// ListAllRelations return all related phy objects for provided root object ID. -// Result doesn't include root object ID itself. +// ListAllRelations return all related phy objects for provided root object ID in split-chain order. +// Result doesn't include root object ID itself. If linking object is found its id will be the last one. func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens) ([]oid.ID, error) { + return ListRelations(ctx, rels, cnrID, rootObjID, tokens, true) +} + +// ListRelations return all related phy objects for provided root object ID in split-chain order. +// Result doesn't include root object ID itself. +func ListRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObjID oid.ID, tokens Tokens, includeLinking bool) ([]oid.ID, error) { splitInfo, err := rels.GetSplitInfo(ctx, cnrID, rootObjID, tokens) if err != nil { if errors.Is(err, ErrNoSplitInfo) { @@ -59,22 +63,40 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj // collect split chain by the descending ease of operations (ease is evaluated heuristically). // If any approach fails, we don't try the next since we assume that it will fail too. - if idLinking, ok := splitInfo.Link(); ok { - children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens) + if _, ok := splitInfo.Link(); !ok { + // the list is expected to contain last part and (probably) split info + list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens) if err != nil { - return nil, fmt.Errorf("failed to get linking object's header: %w", err) + return nil, fmt.Errorf("failed to find object children: %w", err) } - // include linking object - return append(children, idLinking), nil + for _, id := range list { + split, err := rels.GetSplitInfo(ctx, cnrID, id, tokens) + if err != nil { + if errors.Is(err, ErrNoSplitInfo) { + continue + } + return nil, fmt.Errorf("failed to get split info: %w", err) + } + if link, ok := split.Link(); ok { + splitInfo.SetLink(link) + } + if last, ok := split.LastPart(); ok { + splitInfo.SetLastPart(last) + } + } } - if idSplit := splitInfo.SplitID(); idSplit != nil { - members, err := rels.FindSiblingBySplitID(ctx, cnrID, idSplit, tokens) + if idLinking, ok := splitInfo.Link(); ok { + children, err := rels.ListChildrenByLinker(ctx, cnrID, idLinking, tokens) if err != nil { - return nil, fmt.Errorf("failed to search objects by split ID: %w", err) + return nil, fmt.Errorf("failed to get linking object's header: %w", err) } - return members, nil + + if includeLinking { + children = append(children, idLinking) + } + return children, nil } idMember, ok := splitInfo.LastPart() @@ -85,9 +107,6 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj chain := []oid.ID{idMember} chainSet := map[oid.ID]struct{}{idMember: {}} - // prmHead.SetRawFlag(false) - // split members are almost definitely singular, but don't get hung up on it - for { idMember, err = rels.GetLeftSibling(ctx, cnrID, idMember, tokens) if err != nil { @@ -101,20 +120,9 @@ func ListAllRelations(ctx context.Context, rels Relations, cnrID cid.ID, rootObj return nil, fmt.Errorf("duplicated member in the split chain %s", idMember) } - chain = append(chain, idMember) + chain = append([]oid.ID{idMember}, chain...) chainSet[idMember] = struct{}{} } - list, err := rels.FindSiblingByParentID(ctx, cnrID, rootObjID, tokens) - if err != nil { - return nil, fmt.Errorf("failed to find object children: %w", err) - } - - for i := range list { - if _, ok = chainSet[list[i]]; !ok { - chain = append(chain, list[i]) - } - } - return chain, nil } diff --git a/pool/pool.go b/pool/pool.go index 2bb45990..3c9b44bd 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -2538,7 +2538,7 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok } prm.MarkRaw() - _, err := p.HeadObject(ctx, prm) + res, err := p.HeadObject(ctx, prm) var errSplit *object.SplitInfoError @@ -2546,7 +2546,21 @@ func (p *Pool) GetSplitInfo(ctx context.Context, cnrID cid.ID, objID oid.ID, tok case errors.As(err, &errSplit): return errSplit.SplitInfo(), nil case err == nil: - return nil, relations.ErrNoSplitInfo + if res.SplitID() == nil { + return nil, relations.ErrNoSplitInfo + } + + splitInfo := object.NewSplitInfo() + splitInfo.SetSplitID(res.SplitID()) + if res.HasParent() { + if len(res.Children()) > 0 { + splitInfo.SetLink(objID) + } else { + splitInfo.SetLastPart(objID) + } + } + + return splitInfo, nil default: return nil, fmt.Errorf("failed to get raw object header: %w", err) } @@ -2602,38 +2616,6 @@ func (p *Pool) GetLeftSibling(ctx context.Context, cnrID cid.ID, objID oid.ID, t return idMember, nil } -// FindSiblingBySplitID implements relations.Relations. -func (p *Pool) FindSiblingBySplitID(ctx context.Context, cnrID cid.ID, splitID *object.SplitID, tokens relations.Tokens) ([]oid.ID, error) { - var query object.SearchFilters - query.AddSplitIDFilter(object.MatchStringEqual, splitID) - - var prm PrmObjectSearch - prm.SetContainerID(cnrID) - prm.SetFilters(query) - if tokens.Bearer != nil { - prm.UseBearer(*tokens.Bearer) - } - if tokens.Session != nil { - prm.UseSession(*tokens.Session) - } - - res, err := p.SearchObjects(ctx, prm) - if err != nil { - return nil, fmt.Errorf("failed to search objects by split ID: %w", err) - } - - var members []oid.ID - err = res.Iterate(func(id oid.ID) bool { - members = append(members, id) - return false - }) - if err != nil { - return nil, fmt.Errorf("failed to iterate found objects: %w", err) - } - - return members, nil -} - // FindSiblingByParentID implements relations.Relations. func (p *Pool) FindSiblingByParentID(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) { var query object.SearchFilters