diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 54576af569..5e4eba5e7c 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -359,17 +359,10 @@ func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.A }) } -func (e *StorageEngine) processDeletedLocks(ctx context.Context, lockers []oid.Address) { +func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) { e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { sh.HandleDeletedLocks(lockers) - - select { - case <-ctx.Done(): - e.log.Info("interrupt processing the deleted locks by context") - return true - default: - return false - } + return false }) } diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 1892272404..798fb0bc63 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -58,7 +58,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) { meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epoch), ), - shard.WithDeletedLockCallback(func(_ context.Context, aa []oid.Address) { + shard.WithDeletedLockCallback(func(aa []oid.Address) { sh.HandleDeletedLocks(aa) }), shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) { diff --git a/pkg/local_object_storage/shard/inhume.go b/pkg/local_object_storage/shard/inhume.go index 47ab912690..1c272cc0e9 100644 --- a/pkg/local_object_storage/shard/inhume.go +++ b/pkg/local_object_storage/shard/inhume.go @@ -1,7 +1,6 @@ package shard import ( - "context" "errors" "fmt" @@ -122,7 +121,7 @@ func (s *Shard) Inhume(prm InhumePrm) (InhumeRes, error) { s.decObjectCounterBy(logical, res.AvailableInhumed()) if deletedLockObjs := res.DeletedLockObjects(); len(deletedLockObjs) != 0 { - s.deletedLockCallBack(context.Background(), deletedLockObjs) + s.deletedLockCallBack(deletedLockObjs) } return InhumeRes{}, nil diff --git a/pkg/local_object_storage/shard/lock_test.go b/pkg/local_object_storage/shard/lock_test.go index 9491a56e64..bc5826a89b 100644 --- a/pkg/local_object_storage/shard/lock_test.go +++ b/pkg/local_object_storage/shard/lock_test.go @@ -1,7 +1,6 @@ package shard_test import ( - "context" "path/filepath" "testing" "time" @@ -45,7 +44,7 @@ func TestShard_Lock(t *testing.T) { meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{}), ), - shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) { + shard.WithDeletedLockCallback(func(addresses []oid.Address) { sh.HandleDeletedLocks(addresses) }), } diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 9d5e75dccf..ae6d27ab2a 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -42,7 +42,7 @@ type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject) type ExpiredObjectsCallback func(context.Context, []oid.Address) // DeletedLockCallback is a callback handling list of deleted LOCK objects. -type DeletedLockCallback func(context.Context, []oid.Address) +type DeletedLockCallback func([]oid.Address) // MetricsWriter is an interface that must store shard's metrics. type MetricsWriter interface { diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 43a1e9b6c1..ba7a27329b 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -88,7 +88,7 @@ func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) st exec.setLogger(s.log) } - exec.execute() + exec.execute() //nolint:contextcheck // It is in fact passed via execCtx return exec.statusError } diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index f719a8fab7..cead4fe789 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -10,7 +10,7 @@ import ( "go.uber.org/zap" ) -func (exec *execCtx) executeOnContainer() { +func (exec *execCtx) executeOnContainer(ectx context.Context) { if exec.isLocal() { exec.log.Debug("return result directly") return @@ -18,7 +18,7 @@ func (exec *execCtx) executeOnContainer() { exec.log.Debug("trying to execute in container...") - ctx, cancel := context.WithCancel(exec.context()) + ctx, cancel := context.WithCancel(ectx) defer cancel() mProcessedNodes := make(map[string]struct{}) @@ -85,7 +85,7 @@ func (exec *execCtx) executeOnContainer() { return } - ids, err := c.searchObjects(exec, info) + ids, err := c.searchObjects(ctx, exec, info) if err != nil { lg.Debug("remote operation failed", zap.String("error", err.Error())) diff --git a/pkg/services/object/search/exec.go b/pkg/services/object/search/exec.go index 91f46dc878..2669458655 100644 --- a/pkg/services/object/search/exec.go +++ b/pkg/services/object/search/exec.go @@ -1,8 +1,6 @@ package searchsvc import ( - "context" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -17,8 +15,6 @@ type statusError struct { type execCtx struct { svc *Service - ctx context.Context - prm Prm statusError @@ -47,10 +43,6 @@ func (exec *execCtx) setLogger(l *zap.Logger) { ) } -func (exec execCtx) context() context.Context { - return exec.ctx -} - func (exec execCtx) isLocal() bool { return exec.prm.common.LocalOnly() } diff --git a/pkg/services/object/search/search.go b/pkg/services/object/search/search.go index 8990104401..51742e0fb8 100644 --- a/pkg/services/object/search/search.go +++ b/pkg/services/object/search/search.go @@ -25,7 +25,6 @@ func (s *Service) Search(ctx context.Context, prm Prm) error { exec := &execCtx{ svc: s, - ctx: ctx, prm: prm, } @@ -33,21 +32,21 @@ func (s *Service) Search(ctx context.Context, prm Prm) error { exec.setLogger(s.log) - exec.execute() + exec.execute(ctx) return exec.statusError.err } -func (exec *execCtx) execute() { +func (exec *execCtx) execute(ctx context.Context) { exec.log.Debug("serving request...") // perform local operation exec.executeLocal() - exec.analyzeStatus(true) + exec.analyzeStatus(ctx, true) } -func (exec *execCtx) analyzeStatus(execCnr bool) { +func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) { // analyze local result switch exec.status { default: @@ -59,8 +58,8 @@ func (exec *execCtx) analyzeStatus(execCnr bool) { } if execCnr { - exec.executeOnContainer() - exec.analyzeStatus(false) + exec.executeOnContainer(ctx) + exec.analyzeStatus(ctx, false) } } diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 33dd74100c..14c42a19c3 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -115,7 +115,7 @@ func (s *testStorage) search(exec *execCtx) ([]oid.ID, error) { return v.ids, v.err } -func (c *testStorage) searchObjects(exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) { +func (c *testStorage) searchObjects(_ context.Context, exec *execCtx, _ clientcore.NodeInfo) ([]oid.ID, error) { v, ok := c.items[exec.containerID().EncodeToString()] if !ok { return nil, nil diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 6f4c5eb528..646bb70712 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -1,6 +1,8 @@ package searchsvc import ( + "context" + "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" @@ -23,7 +25,7 @@ type Option func(*cfg) type searchClient interface { // searchObjects searches objects on the specified node. // MUST NOT modify execCtx as it can be accessed concurrently. - searchObjects(*execCtx, client.NodeInfo) ([]oid.ID, error) + searchObjects(context.Context, *execCtx, client.NodeInfo) ([]oid.ID, error) } // Containers provides information about NeoFS containers necessary for the diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index f98570894c..b83526a846 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -1,6 +1,7 @@ package searchsvc import ( + "context" "sync" "github.com/nspcc-dev/neofs-node/pkg/core/client" @@ -68,7 +69,7 @@ func (c *clientConstructorWrapper) get(info client.NodeInfo) (searchClient, erro }, nil } -func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { +func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info client.NodeInfo) ([]oid.ID, error) { if exec.prm.forwarder != nil { return exec.prm.forwarder(info, c.client) } @@ -89,7 +90,7 @@ func (c *clientWrapper) searchObjects(exec *execCtx, info client.NodeInfo) ([]oi var prm internalclient.SearchObjectsPrm - prm.SetContext(exec.context()) + prm.SetContext(ctx) prm.SetClient(c.client) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 8d5f19d7af..e2eddc1845 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -117,7 +117,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add } c := &processPlacementContext{ - Context: ctx, object: addrWithType, checkedNodes: newNodeCache(), } @@ -129,7 +128,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add default: } - p.processNodes(c, nn[i], policy.ReplicaNumberByIndex(i)) + p.processNodes(ctx, c, nn[i], policy.ReplicaNumberByIndex(i)) } // if context is done, needLocalCopy might not be able to calculate @@ -182,8 +181,6 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add } type processPlacementContext struct { - context.Context - // whether the local node is in the object container localNodeInContainer bool @@ -200,8 +197,8 @@ type processPlacementContext struct { checkedNodes *nodeCache } -func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) { - prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address) +func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) { + prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(plc.object.Address) p.cfg.RLock() headTimeout := p.headTimeout @@ -216,7 +213,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node // prevent spam with new replicas. // However, additional copies should not be removed in this case, // because we can remove the only copy this way. - ctx.checkedNodes.submitReplicaHolder(node) + plc.checkedNodes.submitReplicaHolder(node) shortage-- uncheckedCopies++ @@ -225,7 +222,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node ) } - if ctx.object.Type == object.TypeLock || ctx.object.Type == object.TypeLink { + if plc.object.Type == object.TypeLock || plc.object.Type == object.TypeLink { // all nodes of a container must store the `LOCK` and `LINK` objects // for correct object relations handling: // - `LINK` objects allows treating all children as root object; @@ -234,7 +231,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node shortage = uint32(len(nodes)) } - for i := 0; (!ctx.localNodeInContainer || shortage > 0) && i < len(nodes); i++ { + for i := 0; (!plc.localNodeInContainer || shortage > 0) && i < len(nodes); i++ { select { case <-ctx.Done(): return @@ -243,20 +240,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node isLocalNode := p.netmapKeys.IsLocalKey(nodes[i].PublicKey()) - if !ctx.localNodeInContainer { - ctx.localNodeInContainer = isLocalNode + if !plc.localNodeInContainer { + plc.localNodeInContainer = isLocalNode } if shortage == 0 { continue } else if isLocalNode { - ctx.needLocalCopy = true + plc.needLocalCopy = true shortage-- } else if nodes[i].IsMaintenance() { handleMaintenance(nodes[i]) } else { - if status := ctx.checkedNodes.processStatus(nodes[i]); status >= 0 { + if status := plc.checkedNodes.processStatus(nodes[i]); status >= 0 { if status == 0 { // node already contains replica, no need to replicate nodes = append(nodes[:i], nodes[i+1:]...) @@ -274,7 +271,7 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node cancel() if errors.Is(err, apistatus.ErrObjectNotFound) { - ctx.checkedNodes.submitReplicaCandidate(nodes[i]) + plc.checkedNodes.submitReplicaCandidate(nodes[i]) continue } @@ -282,12 +279,12 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node handleMaintenance(nodes[i]) } else if err != nil { p.log.Error("receive object header to check policy compliance", - zap.Stringer("object", ctx.object.Address), + zap.Stringer("object", plc.object.Address), zap.String("error", err.Error()), ) } else { shortage-- - ctx.checkedNodes.submitReplicaHolder(nodes[i]) + plc.checkedNodes.submitReplicaHolder(nodes[i]) } } @@ -297,20 +294,20 @@ func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.Node if shortage > 0 { p.log.Debug("shortage of object copies detected", - zap.Stringer("object", ctx.object.Address), + zap.Stringer("object", plc.object.Address), zap.Uint32("shortage", shortage), ) var task replicator.Task - task.SetObjectAddress(ctx.object.Address) + task.SetObjectAddress(plc.object.Address) task.SetNodes(nodes) task.SetCopiesNumber(shortage) - p.replicator.HandleTask(ctx, task, ctx.checkedNodes) + p.replicator.HandleTask(ctx, task, plc.checkedNodes) } else if uncheckedCopies > 0 { // If we have more copies than needed, but some of them are from the maintenance nodes, // save the local copy. - ctx.needLocalCopy = true + plc.needLocalCopy = true p.log.Debug("some of the copies are stored on nodes under maintenance, save local copy", zap.Int("count", uncheckedCopies)) } diff --git a/pkg/services/tree/replicator.go b/pkg/services/tree/replicator.go index 3a70413f09..2bfffaae72 100644 --- a/pkg/services/tree/replicator.go +++ b/pkg/services/tree/replicator.go @@ -52,11 +52,13 @@ func (s *Service) localReplicationWorker() { } } -func (s *Service) replicationWorker() { +func (s *Service) replicationWorker(ctx context.Context) { for { select { case <-s.closeCh: return + case <-ctx.Done(): + return case task := <-s.replicationTasks: var lastErr error var lastAddr string @@ -64,13 +66,13 @@ func (s *Service) replicationWorker() { task.n.IterateNetworkEndpoints(func(addr string) bool { lastAddr = addr - c, err := s.cache.get(context.Background(), addr) + c, err := s.cache.get(ctx, addr) if err != nil { lastErr = fmt.Errorf("can't create client: %w", err) return false } - ctx, cancel := context.WithTimeout(context.Background(), s.replicatorTimeout) + ctx, cancel := context.WithTimeout(ctx, s.replicatorTimeout) _, lastErr = c.Apply(ctx, task.req) cancel() @@ -94,7 +96,7 @@ func (s *Service) replicationWorker() { func (s *Service) replicateLoop(ctx context.Context) { for range s.replicatorWorkerCount { - go s.replicationWorker() + go s.replicationWorker(ctx) go s.localReplicationWorker() } defer func() {