Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context fixes #2930

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,10 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.A
})
}

func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.Address) {
func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleDeletedLocks(lockers)

select {
case <-ctx.Done():
e.log.Info("interrupt processing the deleted locks by context")
return true
default:
return false
}
return false
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epoch),
),
shard.WithDeletedLockCallback(func(_ context.Context, aa []oid.Address) {
shard.WithDeletedLockCallback(func(aa []oid.Address) {
sh.HandleDeletedLocks(aa)
}),
shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) {
Expand Down
3 changes: 1 addition & 2 deletions pkg/local_object_storage/shard/inhume.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"errors"
"fmt"

Expand Down Expand Up @@ -122,7 +121,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) {
s.decObjectCounterBy(logical, res.AvailableInhumed())

if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 {
s.deletedLockCallBack(context.Background(), deletedLockObjs)
s.deletedLockCallBack(deletedLockObjs)
}

return InhumeRes{}, nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/local_object_storage/shard/lock_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard_test

import (
"context"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -45,7 +44,7 @@ func TestShard_Lock(t *testing.T) {
meta.WithPath(filepath.Join(rootPath, "meta")),
meta.WithEpochState(epochState{}),
),
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
shard.WithDeletedLockCallback(func(addresses []oid.Address) {
sh.HandleDeletedLocks(addresses)
}),
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
type ExpiredObjectsCallback func(context.Context, []oid.Address)

// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func(context.Context, []oid.Address)
type DeletedLockCallback func([]oid.Address)

// MetricsWriter is an interface that must store shard's metrics.
type MetricsWriter interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st
exec.setLogger(s.log)
}

exec.execute()
exec.execute() //nolint:contextcheck // It is in fact passed via execCtx

return exec.statusError
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/services/object/search/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
"go.uber.org/zap"
)

func (exec *execCtx) executeOnContainer() {
func (exec *execCtx) executeOnContainer(ectx context.Context) {
if exec.isLocal() {
exec.log.Debug("return result directly")
return
}

exec.log.Debug("trying to execute in container...")

ctx, cancel := context.WithCancel(exec.context())
ctx, cancel := context.WithCancel(ectx)
defer cancel()

mProcessedNodes := make(map[string]struct{})
Expand Down Expand Up @@ -85,7 +85,7 @@ func (exec *execCtx) executeOnContainer() {
return
}

ids, err := c.searchObjects(exec, info)
ids, err := c.searchObjects(ctx, exec, info)
if err != nil {
lg.Debug("remote operation failed",
zap.String("error", err.Error()))
Expand Down
8 changes: 0 additions & 8 deletions pkg/services/object/search/exec.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package searchsvc

import (
"context"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
Expand All @@ -17,8 +15,6 @@ type statusError struct {
type execCtx struct {
svc *Service

ctx context.Context

prm Prm

statusError
Expand Down Expand Up @@ -47,10 +43,6 @@ func (exec *execCtx) setLogger(l *zap.Logger) {
)
}

func (exec execCtx) context() context.Context {
return exec.ctx
}

func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/services/object/search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,28 @@ func (s *Service) Search(ctx context.Context, prm Prm) error {

exec := &execCtx{
svc: s,
ctx: ctx,
prm: prm,
}

exec.prepare()

exec.setLogger(s.log)

exec.execute()
exec.execute(ctx)

return exec.statusError.err
}

func (exec *execCtx) execute() {
func (exec *execCtx) execute(ctx context.Context) {
exec.log.Debug("serving request...")

// perform local operation
exec.executeLocal()

exec.analyzeStatus(true)
exec.analyzeStatus(ctx, true)
}

func (exec *execCtx) analyzeStatus(execCnr bool) {
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result
switch exec.status {
default:
Expand All @@ -59,8 +58,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) {
}

if execCnr {
exec.executeOnContainer()
exec.analyzeStatus(false)
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/services/object/search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) {
return v.ids, v.err
}

func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
func (c *testStorage) searchObjects(_ context.Context, exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) {
v, ok := c.items[exec.containerID().EncodeToString()]
if !ok {
return nil, nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/services/object/search/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package searchsvc

import (
"context"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
Expand All @@ -23,7 +25,7 @@ type Option func(*cfg)
type searchClient interface {
// searchObjects searches objects on the specified node.
// MUST NOT modify execCtx as it can be accessed concurrently.
searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error)
searchObjects(context.Context, *execCtx, client.NodeInfo) ([]oid.ID, error)
}

// Containers provides information about NeoFS containers necessary for the
Expand Down
5 changes: 3 additions & 2 deletions pkg/services/object/search/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package searchsvc

import (
"context"
"sync"

"github.com/nspcc-dev/neofs-node/pkg/core/client"
Expand Down Expand Up @@ -68,7 +69,7 @@
}, nil
}

func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oid.ID, error) {
func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) {

Check warning on line 72 in pkg/services/object/search/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/search/util.go#L72

Added line #L72 was not covered by tests
if exec.prm.forwarder != nil {
return exec.prm.forwarder(info, c.client)
}
Expand All @@ -89,7 +90,7 @@

var prm internalclient.SearchObjectsPrm

prm.SetContext(exec.context())
prm.SetContext(ctx)

Check warning on line 93 in pkg/services/object/search/util.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/search/util.go#L93

Added line #L93 was not covered by tests
prm.SetClient(c.client)
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down
37 changes: 17 additions & 20 deletions pkg/services/policer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
}

c := &processPlacementContext{
Context: ctx,
object: addrWithType,
checkedNodes: newNodeCache(),
}
Expand All @@ -129,7 +128,7 @@
default:
}

p.processNodes(c, nn[i], policy.ReplicaNumberByIndex(i))
p.processNodes(ctx, c, nn[i], policy.ReplicaNumberByIndex(i))

Check warning on line 131 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L131

Added line #L131 was not covered by tests
}

// if context is done, needLocalCopy might not be able to calculate
Expand Down Expand Up @@ -182,8 +181,6 @@
}

type processPlacementContext struct {
context.Context

// whether the local node is in the object container
localNodeInContainer bool

Expand All @@ -200,8 +197,8 @@
checkedNodes *nodeCache
}

func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address)
func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(plc.object.Address)

Check warning on line 201 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L200-L201

Added lines #L200 - L201 were not covered by tests

p.cfg.RLock()
headTimeout := p.headTimeout
Expand All @@ -216,7 +213,7 @@
// prevent spam with new replicas.
// However, additional copies should not be removed in this case,
// because we can remove the only copy this way.
ctx.checkedNodes.submitReplicaHolder(node)
plc.checkedNodes.submitReplicaHolder(node)

Check warning on line 216 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L216

Added line #L216 was not covered by tests
shortage--
uncheckedCopies++

Expand All @@ -225,7 +222,7 @@
)
}

if ctx.object.Type == object.TypeLock || ctx.object.Type == object.TypeLink {
if plc.object.Type == object.TypeLock || plc.object.Type == object.TypeLink {

Check warning on line 225 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L225

Added line #L225 was not covered by tests
// all nodes of a container must store the `LOCK` and `LINK` objects
// for correct object relations handling:
// - `LINK` objects allows treating all children as root object;
Expand All @@ -234,7 +231,7 @@
shortage = uint32(len(nodes))
}

for i := 0; (!ctx.localNodeInContainer || shortage > 0) && i < len(nodes); i++ {
for i := 0; (!plc.localNodeInContainer || shortage > 0) && i < len(nodes); i++ {

Check warning on line 234 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L234

Added line #L234 was not covered by tests
select {
case <-ctx.Done():
return
Expand All @@ -243,20 +240,20 @@

isLocalNode := p.netmapKeys.IsLocalKey(nodes[i].PublicKey())

if !ctx.localNodeInContainer {
ctx.localNodeInContainer = isLocalNode
if !plc.localNodeInContainer {
plc.localNodeInContainer = isLocalNode

Check warning on line 244 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L243-L244

Added lines #L243 - L244 were not covered by tests
}

if shortage == 0 {
continue
} else if isLocalNode {
ctx.needLocalCopy = true
plc.needLocalCopy = true

Check warning on line 250 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L250

Added line #L250 was not covered by tests

shortage--
} else if nodes[i].IsMaintenance() {
handleMaintenance(nodes[i])
} else {
if status := ctx.checkedNodes.processStatus(nodes[i]); status >= 0 {
if status := plc.checkedNodes.processStatus(nodes[i]); status >= 0 {

Check warning on line 256 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L256

Added line #L256 was not covered by tests
if status == 0 {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
Expand All @@ -274,20 +271,20 @@
cancel()

if errors.Is(err, apistatus.ErrObjectNotFound) {
ctx.checkedNodes.submitReplicaCandidate(nodes[i])
plc.checkedNodes.submitReplicaCandidate(nodes[i])

Check warning on line 274 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L274

Added line #L274 was not covered by tests
continue
}

if errors.Is(err, apistatus.ErrNodeUnderMaintenance) {
handleMaintenance(nodes[i])
} else if err != nil {
p.log.Error("receive object header to check policy compliance",
zap.Stringer("object", ctx.object.Address),
zap.Stringer("object", plc.object.Address),

Check warning on line 282 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L282

Added line #L282 was not covered by tests
zap.String("error", err.Error()),
)
} else {
shortage--
ctx.checkedNodes.submitReplicaHolder(nodes[i])
plc.checkedNodes.submitReplicaHolder(nodes[i])

Check warning on line 287 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L287

Added line #L287 was not covered by tests
}
}

Expand All @@ -297,20 +294,20 @@

if shortage > 0 {
p.log.Debug("shortage of object copies detected",
zap.Stringer("object", ctx.object.Address),
zap.Stringer("object", plc.object.Address),

Check warning on line 297 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L297

Added line #L297 was not covered by tests
zap.Uint32("shortage", shortage),
)

var task replicator.Task
task.SetObjectAddress(ctx.object.Address)
task.SetObjectAddress(plc.object.Address)

Check warning on line 302 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L302

Added line #L302 was not covered by tests
task.SetNodes(nodes)
task.SetCopiesNumber(shortage)

p.replicator.HandleTask(ctx, task, ctx.checkedNodes)
p.replicator.HandleTask(ctx, task, plc.checkedNodes)

Check warning on line 306 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L306

Added line #L306 was not covered by tests
} else if uncheckedCopies > 0 {
// If we have more copies than needed, but some of them are from the maintenance nodes,
// save the local copy.
ctx.needLocalCopy = true
plc.needLocalCopy = true

Check warning on line 310 in pkg/services/policer/check.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/policer/check.go#L310

Added line #L310 was not covered by tests
p.log.Debug("some of the copies are stored on nodes under maintenance, save local copy",
zap.Int("count", uncheckedCopies))
}
Expand Down
Loading
Loading