Skip to content

Commit

Permalink
feat/Add metrics per container and per shard (#2437)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Jul 25, 2023
2 parents f751d71 + c97e432 commit 8b818db
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 45 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Changelog for NeoFS Node
- Embedded Neo contracts in `contracts` dir (#2391)
- `dump-names` command for adm
- `renew-domain` command for adm
- Stored payload metric per container (#2116)
- Stored payload metric per shard (#2023)

### Fixed
- `neo-go` RPC connection loss handling (#1337)
Expand Down
3 changes: 3 additions & 0 deletions pkg/local_object_storage/engine/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type MetricRegister interface {
AddToObjectCounter(shardID, objectType string, delta int)

SetReadonly(shardID string, readonly bool)

AddToContainerSize(cnrID string, size int64)
AddToPayloadCounter(shardID string, size int64)
}

func elapsed(addFunc func(d time.Duration)) func() {
Expand Down
8 changes: 8 additions & 0 deletions pkg/local_object_storage/engine/shards.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func (m *metricsWithID) SetReadonly(readonly bool) {
m.mw.SetReadonly(m.id, readonly)
}

func (m *metricsWithID) AddToContainerSize(cnr string, size int64) {
m.mw.AddToContainerSize(cnr, size)
}

func (m *metricsWithID) AddToPayloadSize(size int64) {
m.mw.AddToPayloadCounter(m.id, size)
}

// AddShard adds a new shard to the storage engine.
//
// Returns any error encountered that did not allow adding a shard.
Expand Down
35 changes: 25 additions & 10 deletions pkg/local_object_storage/metabase/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type DeletePrm struct {
type DeleteRes struct {
rawRemoved uint64
availableRemoved uint64
sizes []uint64
}

// AvailableObjectsRemoved returns the number of removed available
Expand All @@ -35,6 +36,16 @@ func (d DeleteRes) RawObjectsRemoved() uint64 {
return d.rawRemoved
}

// RemovedObjectSizes returns the sizes of removed objects.
// The order of the sizes is the same as in addresses'
// slice that was provided via [DeletePrm.SetAddresses],
// meaning that i-th size equals the number of freed up bytes
// after removing an object by i-th address. A zero size is
// allowed, it claims a missing object.
func (d DeleteRes) RemovedObjectSizes() []uint64 {
return d.sizes
}

// SetAddresses is a Delete option to set the addresses of the objects to delete.
//
// Option is required.
Expand Down Expand Up @@ -66,9 +77,11 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
var rawRemoved uint64
var availableRemoved uint64
var err error
var sizes = make([]uint64, len(prm.addrs))

err = db.boltDB.Update(func(tx *bbolt.Tx) error {
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs)
// We need to clear slice because tx can try to execute multiple times.
rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes)
return err
})
if err == nil {
Expand All @@ -81,6 +94,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
return DeleteRes{
rawRemoved: rawRemoved,
availableRemoved: availableRemoved,
sizes: sizes,
}, err
}

Expand All @@ -90,21 +104,22 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
// objects that were stored. The second return value is a logical objects
// removed number: objects that were available (without Tombstones, GCMarks
// non-expired, etc.)
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, error) {
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) {
refCounter := make(referenceCounter, len(addrs))
currEpoch := db.epochState.CurrentEpoch()

var rawDeleted uint64
var availableDeleted uint64

for i := range addrs {
removed, available, err := db.delete(tx, addrs[i], refCounter, currEpoch)
removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch)
if err != nil {
return 0, 0, err // maybe log and continue?
}

if removed {
rawDeleted++
sizes[i] = size
}

if available {
Expand Down Expand Up @@ -143,8 +158,8 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er
// The first return value indicates if an object has been removed. (removing a
// non-exist object is error-free). The second return value indicates if an
// object was available before the removal (for calculating the logical object
// counter).
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, error) {
// counter). The third return value is removed object payload size.
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) {
key := make([]byte, addressKeySize)
addrKey := addressKey(addr, key)
garbageBKT := tx.Bucket(garbageBucketName)
Expand All @@ -156,7 +171,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
if garbageBKT != nil {
err := garbageBKT.Delete(addrKey)
if err != nil {
return false, false, fmt.Errorf("could not remove from garbage bucket: %w", err)
return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err)
}
}

Expand All @@ -167,10 +182,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
var notFoundErr apistatus.ObjectNotFound

if errors.As(err, &notFoundErr) || errors.As(err, &siErr) {
return false, false, nil
return false, false, 0, nil
}

return false, false, err
return false, false, 0, err
}

// if object is an only link to a parent, then remove parent
Expand All @@ -196,10 +211,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
// remove object
err = db.deleteObject(tx, obj, false)
if err != nil {
return false, false, fmt.Errorf("could not remove object: %w", err)
return false, false, 0, fmt.Errorf("could not remove object: %w", err)
}

return true, removeAvailableObject, nil
return true, removeAvailableObject, obj.PayloadSize(), nil
}

func (db *DB) deleteObject(
Expand Down
2 changes: 1 addition & 1 deletion pkg/local_object_storage/shard/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *Shard) Init() error {
}
}

s.updateObjectCounter()
s.initMetrics()

s.gc = &gc{
gcCfg: &s.gcCfg,
Expand Down
11 changes: 11 additions & 0 deletions pkg/local_object_storage/shard/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) {
s.decObjectCounterBy(physical, res.RawObjectsRemoved())
s.decObjectCounterBy(logical, res.AvailableObjectsRemoved())

var totalRemovedPayload uint64
removedSizes := res.RemovedObjectSizes()

for i := range prm.addr {
removedPayload := removedSizes[i]
totalRemovedPayload += removedPayload
s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(removedPayload))
}

s.addToPayloadCounter(-int64(totalRemovedPayload))

for i := range prm.addr {
var delPrm common.DeletePrm
delPrm.Address = prm.addr[i]
Expand Down
101 changes: 68 additions & 33 deletions pkg/local_object_storage/shard/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,53 @@ import (
)

type metricsStore struct {
s map[string]uint64
objectCounters map[string]uint64
containerSize map[string]int64
payloadSize int64
readOnly bool
}

func (m metricsStore) SetShardID(_ string) {}

func (m metricsStore) SetObjectCounter(objectType string, v uint64) {
m.s[objectType] = v
m.objectCounters[objectType] = v
}

func (m metricsStore) AddToObjectCounter(objectType string, delta int) {
switch {
case delta > 0:
m.s[objectType] += uint64(delta)
m.objectCounters[objectType] += uint64(delta)
case delta < 0:
uDelta := uint64(-delta)

if m.s[objectType] >= uDelta {
m.s[objectType] -= uDelta
if m.objectCounters[objectType] >= uDelta {
m.objectCounters[objectType] -= uDelta
} else {
m.s[objectType] = 0
m.objectCounters[objectType] = 0
}
case delta == 0:
return
}
}

func (m metricsStore) IncObjectCounter(objectType string) {
m.s[objectType] += 1
m.objectCounters[objectType] += 1
}

func (m metricsStore) DecObjectCounter(objectType string) {
m.AddToObjectCounter(objectType, -1)
}

func (m metricsStore) SetReadonly(r bool) {
if r {
m.s[readonly] = 1
} else {
m.s[readonly] = 0
}
func (m *metricsStore) SetReadonly(r bool) {
m.readOnly = r
}

func (m metricsStore) AddToContainerSize(cnr string, size int64) {
m.containerSize[cnr] += size
}

func (m *metricsStore) AddToPayloadSize(size int64) {
m.payloadSize += size
}

const physical = "phy"
Expand All @@ -68,9 +75,9 @@ func TestCounters(t *testing.T) {
sh, mm := shardWithMetrics(t, dir)

sh.SetMode(mode.ReadOnly)
require.Equal(t, mm.s[readonly], uint64(1))
require.True(t, mm.readOnly)
sh.SetMode(mode.ReadWrite)
require.Equal(t, mm.s[readonly], uint64(0))
require.False(t, mm.readOnly)

const objNumber = 10
oo := make([]*object.Object, objNumber)
Expand All @@ -79,10 +86,22 @@ func TestCounters(t *testing.T) {
}

t.Run("defaults", func(t *testing.T) {
require.Zero(t, mm.s[physical])
require.Zero(t, mm.s[logical])
require.Zero(t, mm.objectCounters[physical])
require.Zero(t, mm.objectCounters[logical])
require.Empty(t, mm.containerSize)
require.Zero(t, mm.payloadSize)
})

var totalPayload int64

expectedSizes := make(map[string]int64)
for i := range oo {
cnr, _ := oo[i].ContainerID()
oSize := int64(oo[i].PayloadSize())
expectedSizes[cnr.EncodeToString()] += oSize
totalPayload += oSize
}

t.Run("put", func(t *testing.T) {
var prm shard.PutPrm

Expand All @@ -93,8 +112,10 @@ func TestCounters(t *testing.T) {
require.NoError(t, err)
}

require.Equal(t, uint64(objNumber), mm.s[physical])
require.Equal(t, uint64(objNumber), mm.s[logical])
require.Equal(t, uint64(objNumber), mm.objectCounters[physical])
require.Equal(t, uint64(objNumber), mm.objectCounters[logical])
require.Equal(t, expectedSizes, mm.containerSize)
require.Equal(t, totalPayload, mm.payloadSize)
})

t.Run("inhume_GC", func(t *testing.T) {
Expand All @@ -108,8 +129,10 @@ func TestCounters(t *testing.T) {
require.NoError(t, err)
}

require.Equal(t, uint64(objNumber), mm.s[physical])
require.Equal(t, uint64(objNumber-inhumedNumber), mm.s[logical])
require.Equal(t, uint64(objNumber), mm.objectCounters[physical])
require.Equal(t, uint64(objNumber-inhumedNumber), mm.objectCounters[logical])
require.Equal(t, expectedSizes, mm.containerSize)
require.Equal(t, totalPayload, mm.payloadSize)

oo = oo[inhumedNumber:]
})
Expand All @@ -118,35 +141,47 @@ func TestCounters(t *testing.T) {
var prm shard.InhumePrm
ts := objectcore.AddressOf(generateObject(t))

phy := mm.s[physical]
logic := mm.s[logical]
phy := mm.objectCounters[physical]
logic := mm.objectCounters[logical]

inhumedNumber := int(phy / 4)
prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...)

_, err := sh.Inhume(prm)
require.NoError(t, err)

require.Equal(t, phy, mm.s[physical])
require.Equal(t, logic-uint64(inhumedNumber), mm.s[logical])
require.Equal(t, phy, mm.objectCounters[physical])
require.Equal(t, logic-uint64(inhumedNumber), mm.objectCounters[logical])
require.Equal(t, expectedSizes, mm.containerSize)
require.Equal(t, totalPayload, mm.payloadSize)

oo = oo[inhumedNumber:]
})

t.Run("Delete", func(t *testing.T) {
var prm shard.DeletePrm

phy := mm.s[physical]
logic := mm.s[logical]
phy := mm.objectCounters[physical]
logic := mm.objectCounters[logical]

deletedNumber := int(phy / 4)
prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...)

_, err := sh.Delete(prm)
require.NoError(t, err)

require.Equal(t, phy-uint64(deletedNumber), mm.s[physical])
require.Equal(t, logic-uint64(deletedNumber), mm.s[logical])
require.Equal(t, phy-uint64(deletedNumber), mm.objectCounters[physical])
require.Equal(t, logic-uint64(deletedNumber), mm.objectCounters[logical])
var totalRemovedpayload uint64
for i := range oo[:deletedNumber] {
removedPayload := oo[i].PayloadSize()
totalRemovedpayload += removedPayload

cnr, _ := oo[i].ContainerID()
expectedSizes[cnr.EncodeToString()] -= int64(removedPayload)
}
require.Equal(t, expectedSizes, mm.containerSize)
require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize)
})
}

Expand All @@ -163,11 +198,11 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
}

mm := &metricsStore{
s: map[string]uint64{
"phy": 0,
"logic": 0,
"readonly": 1,
objectCounters: map[string]uint64{
"phy": 0,
"logic": 0,
},
containerSize: make(map[string]int64),
}

sh := shard.New(
Expand Down
1 change: 1 addition & 0 deletions pkg/local_object_storage/shard/put.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) {
}

s.incObjectCounter()
s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize()))
}

return PutRes{}, nil
Expand Down
Loading

0 comments on commit 8b818db

Please sign in to comment.