Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shard: drop more of context from local stores #2946

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package engine

import (
"context"
"errors"

meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
Expand Down Expand Up @@ -324,7 +323,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) {
return locked, outErr
}

func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Address) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, i am not sure it should never be used there. expired object handling can be interrupted, and an expired now object will still be expired in every next epoch, so if we are about to stop working, such operation is safe to stop immediately. however, none of the current "iteration" storage engine operations supports context so ok

func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) {
var prm InhumePrm
prm.MarkAsGarbage(addrs...)

Expand All @@ -334,17 +333,10 @@ func (e *StorageEngine) processExpiredObjects(_ context.Context, addrs []oid.Add
}
}

func (e *StorageEngine) processExpiredLocks(ctx context.Context, lockers []oid.Address) {
func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
sh.HandleExpiredLocks(lockers)

select {
case <-ctx.Done():
e.log.Info("interrupt processing the expired locks by context")
return true
default:
return false
}
return false
})
}

Expand Down
1 change: 0 additions & 1 deletion pkg/local_object_storage/shard/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (s *Shard) Init() error {
eventChan: make(chan Event),
mEventHandler: map[eventType]*eventHandlers{
eventNewEpoch: {
cancelFunc: func() {},
handlers: []eventHandler{
s.collectExpiredObjects,
s.collectExpiredTombstones,
Expand Down
40 changes: 14 additions & 26 deletions pkg/local_object_storage/shard/gc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -40,13 +39,11 @@ func EventNewEpoch(e uint64) Event {
}
}

type eventHandler func(context.Context, Event)
type eventHandler func(Event)

type eventHandlers struct {
prevGroup sync.WaitGroup

cancelFunc context.CancelFunc

handlers []eventHandler
}

Expand Down Expand Up @@ -114,19 +111,15 @@ func (gc *gc) listenEvents() {
continue
}

v.cancelFunc()
v.prevGroup.Wait()

var ctx context.Context
ctx, v.cancelFunc = context.WithCancel(context.Background())

v.prevGroup.Add(len(v.handlers))

for i := range v.handlers {
h := v.handlers[i]

err := gc.workerPool.Submit(func() {
h(ctx, event)
h(event)
v.prevGroup.Done()
})
if err != nil {
Expand Down Expand Up @@ -220,13 +213,13 @@ func (s *Shard) removeGarbage() {
}
}

func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {
func (s *Shard) collectExpiredObjects(e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

log.Debug("started expired objects handling")

expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
expired, err := s.getExpiredObjects(e.(newEpoch).epoch, func(typ object.Type) bool {
return typ != object.TypeLock
})
if err != nil || len(expired) == 0 {
Expand All @@ -238,12 +231,12 @@ func (s *Shard) collectExpiredObjects(ctx context.Context, e Event) {

log.Debug("collected expired objects", zap.Int("num", len(expired)))

s.expiredObjectsCallback(ctx, expired)
s.expiredObjectsCallback(expired)

log.Debug("finished expired objects handling")
}

func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) {
func (s *Shard) collectExpiredTombstones(e Event) {
epoch := e.(newEpoch).epoch
log := s.log.With(zap.Uint64("epoch", epoch))

Expand All @@ -258,8 +251,8 @@ func (s *Shard) collectExpiredTombstones(_ context.Context, e Event) {
log.Debug("finished expired tombstones handling", zap.Int("dropped marks", dropped))
}

func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
expired, err := s.getExpiredObjects(ctx, e.(newEpoch).epoch, func(typ object.Type) bool {
func (s *Shard) collectExpiredLocks(e Event) {
expired, err := s.getExpiredObjects(e.(newEpoch).epoch, func(typ object.Type) bool {
return typ == object.TypeLock
})
if err != nil || len(expired) == 0 {
Expand All @@ -269,10 +262,10 @@ func (s *Shard) collectExpiredLocks(ctx context.Context, e Event) {
return
}

s.expiredLocksCallback(ctx, expired)
s.expiredLocksCallback(expired)
}

func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
func (s *Shard) getExpiredObjects(epoch uint64, typeCond func(object.Type) bool) ([]oid.Address, error) {
s.m.RLock()
defer s.m.RUnlock()

Expand All @@ -283,20 +276,15 @@ func (s *Shard) getExpiredObjects(ctx context.Context, epoch uint64, typeCond fu
var expired []oid.Address

err := s.metaBase.IterateExpired(epoch, func(expiredObject *meta.ExpiredObject) error {
select {
case <-ctx.Done():
return meta.ErrInterruptIterator
default:
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
if typeCond(expiredObject.Type()) {
expired = append(expired, expiredObject.Address())
}
return nil
})
if err != nil {
return nil, err
}
return expired, ctx.Err()
return expired, nil
}

// HandleExpiredLocks unlocks all objects which were locked by lockers.
Expand Down
5 changes: 2 additions & 3 deletions pkg/local_object_storage/shard/gc_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard_test

import (
"context"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -64,7 +63,7 @@ func TestGC_ExpiredObjectWithExpiredLock(t *testing.T) {
shard.WithDeletedLockCallback(func(aa []oid.Address) {
sh.HandleDeletedLocks(aa)
}),
shard.WithExpiredLocksCallback(func(_ context.Context, aa []oid.Address) {
shard.WithExpiredLocksCallback(func(aa []oid.Address) {
sh.HandleExpiredLocks(aa)
}),
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
Expand Down Expand Up @@ -205,7 +204,7 @@ func TestExpiration(t *testing.T) {
meta.WithEpochState(epochState{Value: math.MaxUint64 / 2}),
),
shard.WithExpiredObjectsCallback(
func(_ context.Context, addresses []oid.Address) {
func(addresses []oid.Address) {
var p shard.InhumePrm
p.MarkAsGarbage(addresses...)
_, err := sh.Inhume(p)
Expand Down
5 changes: 2 additions & 3 deletions pkg/local_object_storage/shard/shard.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package shard

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -34,10 +33,10 @@ type Shard struct {
type Option func(*cfg)

// ExpiredTombstonesCallback is a callback handling list of expired tombstones.
type ExpiredTombstonesCallback func(context.Context, []meta.TombstonedObject)
type ExpiredTombstonesCallback func([]meta.TombstonedObject)

// ExpiredObjectsCallback is a callback handling list of expired objects.
type ExpiredObjectsCallback func(context.Context, []oid.Address)
type ExpiredObjectsCallback func([]oid.Address)

// DeletedLockCallback is a callback handling list of deleted LOCK objects.
type DeletedLockCallback func([]oid.Address)
Expand Down
Loading