diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b060a9daa..0ee76e11fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ Changelog for NeoFS Node - Embedded Neo contracts in `contracts` dir (#2391) - `dump-names` command for adm - `renew-domain` command for adm +- Stored payload metric per container (#2116) +- Stored payload metric per shard (#2023) ### Fixed - `neo-go` RPC connection loss handling (#1337) diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index f2f16c631e..13dcdfe020 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -21,6 +21,9 @@ type MetricRegister interface { AddToObjectCounter(shardID, objectType string, delta int) SetReadonly(shardID string, readonly bool) + + AddToContainerSize(cnrID string, size int64) + AddToPayloadCounter(shardID string, size int64) } func elapsed(addFunc func(d time.Duration)) func() { diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 5db0af6fe6..773dc61d14 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -49,6 +49,14 @@ func (m *metricsWithID) SetReadonly(readonly bool) { m.mw.SetReadonly(m.id, readonly) } +func (m *metricsWithID) AddToContainerSize(cnr string, size int64) { + m.mw.AddToContainerSize(cnr, size) +} + +func (m *metricsWithID) AddToPayloadSize(size int64) { + m.mw.AddToPayloadCounter(m.id, size) +} + // AddShard adds a new shard to the storage engine. // // Returns any error encountered that did not allow adding a shard. diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 772abb15ad..ce6a4c03f1 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -22,6 +22,7 @@ type DeletePrm struct { type DeleteRes struct { rawRemoved uint64 availableRemoved uint64 + sizes []uint64 } // AvailableObjectsRemoved returns the number of removed available @@ -35,6 +36,16 @@ func (d DeleteRes) RawObjectsRemoved() uint64 { return d.rawRemoved } +// RemovedObjectSizes returns the sizes of removed objects. +// The order of the sizes is the same as in addresses' +// slice that was provided via [DeletePrm.SetAddresses], +// meaning that i-th size equals the number of freed up bytes +// after removing an object by i-th address. A zero size is +// allowed, it claims a missing object. +func (d DeleteRes) RemovedObjectSizes() []uint64 { + return d.sizes +} + // SetAddresses is a Delete option to set the addresses of the objects to delete. // // Option is required. @@ -66,9 +77,11 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { var rawRemoved uint64 var availableRemoved uint64 var err error + var sizes = make([]uint64, len(prm.addrs)) err = db.boltDB.Update(func(tx *bbolt.Tx) error { - rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs) + // We need to clear slice because tx can try to execute multiple times. + rawRemoved, availableRemoved, err = db.deleteGroup(tx, prm.addrs, sizes) return err }) if err == nil { @@ -81,6 +94,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { return DeleteRes{ rawRemoved: rawRemoved, availableRemoved: availableRemoved, + sizes: sizes, }, err } @@ -90,7 +104,7 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { // objects that were stored. The second return value is a logical objects // removed number: objects that were available (without Tombstones, GCMarks // non-expired, etc.) -func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, error) { +func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address, sizes []uint64) (uint64, uint64, error) { refCounter := make(referenceCounter, len(addrs)) currEpoch := db.epochState.CurrentEpoch() @@ -98,13 +112,14 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er var availableDeleted uint64 for i := range addrs { - removed, available, err := db.delete(tx, addrs[i], refCounter, currEpoch) + removed, available, size, err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { return 0, 0, err // maybe log and continue? } if removed { rawDeleted++ + sizes[i] = size } if available { @@ -143,8 +158,8 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) (uint64, uint64, er // The first return value indicates if an object has been removed. (removing a // non-exist object is error-free). The second return value indicates if an // object was available before the removal (for calculating the logical object -// counter). -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, error) { +// counter). The third return value is removed object payload size. +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) (bool, bool, uint64, error) { key := make([]byte, addressKeySize) addrKey := addressKey(addr, key) garbageBKT := tx.Bucket(garbageBucketName) @@ -156,7 +171,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter if garbageBKT != nil { err := garbageBKT.Delete(addrKey) if err != nil { - return false, false, fmt.Errorf("could not remove from garbage bucket: %w", err) + return false, false, 0, fmt.Errorf("could not remove from garbage bucket: %w", err) } } @@ -167,10 +182,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter var notFoundErr apistatus.ObjectNotFound if errors.As(err, ¬FoundErr) || errors.As(err, &siErr) { - return false, false, nil + return false, false, 0, nil } - return false, false, err + return false, false, 0, err } // if object is an only link to a parent, then remove parent @@ -196,10 +211,10 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter // remove object err = db.deleteObject(tx, obj, false) if err != nil { - return false, false, fmt.Errorf("could not remove object: %w", err) + return false, false, 0, fmt.Errorf("could not remove object: %w", err) } - return true, removeAvailableObject, nil + return true, removeAvailableObject, obj.PayloadSize(), nil } func (db *DB) deleteObject( diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 7cd113433d..c144b66b70 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -132,7 +132,7 @@ func (s *Shard) Init() error { } } - s.updateObjectCounter() + s.initMetrics() s.gc = &gc{ gcCfg: &s.gcCfg, diff --git a/pkg/local_object_storage/shard/delete.go b/pkg/local_object_storage/shard/delete.go index b6f2170f99..921bdb4dce 100644 --- a/pkg/local_object_storage/shard/delete.go +++ b/pkg/local_object_storage/shard/delete.go @@ -81,6 +81,17 @@ func (s *Shard) delete(prm DeletePrm) (DeleteRes, error) { s.decObjectCounterBy(physical, res.RawObjectsRemoved()) s.decObjectCounterBy(logical, res.AvailableObjectsRemoved()) + var totalRemovedPayload uint64 + removedSizes := res.RemovedObjectSizes() + + for i := range prm.addr { + removedPayload := removedSizes[i] + totalRemovedPayload += removedPayload + s.addToContainerSize(prm.addr[i].Container().EncodeToString(), -int64(removedPayload)) + } + + s.addToPayloadCounter(-int64(totalRemovedPayload)) + for i := range prm.addr { var delPrm common.DeletePrm delPrm.Address = prm.addr[i] diff --git a/pkg/local_object_storage/shard/metrics_test.go b/pkg/local_object_storage/shard/metrics_test.go index a46e4dbb8d..0b188c8c3a 100644 --- a/pkg/local_object_storage/shard/metrics_test.go +++ b/pkg/local_object_storage/shard/metrics_test.go @@ -17,26 +17,29 @@ import ( ) type metricsStore struct { - s map[string]uint64 + objectCounters map[string]uint64 + containerSize map[string]int64 + payloadSize int64 + readOnly bool } func (m metricsStore) SetShardID(_ string) {} func (m metricsStore) SetObjectCounter(objectType string, v uint64) { - m.s[objectType] = v + m.objectCounters[objectType] = v } func (m metricsStore) AddToObjectCounter(objectType string, delta int) { switch { case delta > 0: - m.s[objectType] += uint64(delta) + m.objectCounters[objectType] += uint64(delta) case delta < 0: uDelta := uint64(-delta) - if m.s[objectType] >= uDelta { - m.s[objectType] -= uDelta + if m.objectCounters[objectType] >= uDelta { + m.objectCounters[objectType] -= uDelta } else { - m.s[objectType] = 0 + m.objectCounters[objectType] = 0 } case delta == 0: return @@ -44,19 +47,23 @@ func (m metricsStore) AddToObjectCounter(objectType string, delta int) { } func (m metricsStore) IncObjectCounter(objectType string) { - m.s[objectType] += 1 + m.objectCounters[objectType] += 1 } func (m metricsStore) DecObjectCounter(objectType string) { m.AddToObjectCounter(objectType, -1) } -func (m metricsStore) SetReadonly(r bool) { - if r { - m.s[readonly] = 1 - } else { - m.s[readonly] = 0 - } +func (m *metricsStore) SetReadonly(r bool) { + m.readOnly = r +} + +func (m metricsStore) AddToContainerSize(cnr string, size int64) { + m.containerSize[cnr] += size +} + +func (m *metricsStore) AddToPayloadSize(size int64) { + m.payloadSize += size } const physical = "phy" @@ -68,9 +75,9 @@ func TestCounters(t *testing.T) { sh, mm := shardWithMetrics(t, dir) sh.SetMode(mode.ReadOnly) - require.Equal(t, mm.s[readonly], uint64(1)) + require.True(t, mm.readOnly) sh.SetMode(mode.ReadWrite) - require.Equal(t, mm.s[readonly], uint64(0)) + require.False(t, mm.readOnly) const objNumber = 10 oo := make([]*object.Object, objNumber) @@ -79,10 +86,22 @@ func TestCounters(t *testing.T) { } t.Run("defaults", func(t *testing.T) { - require.Zero(t, mm.s[physical]) - require.Zero(t, mm.s[logical]) + require.Zero(t, mm.objectCounters[physical]) + require.Zero(t, mm.objectCounters[logical]) + require.Empty(t, mm.containerSize) + require.Zero(t, mm.payloadSize) }) + var totalPayload int64 + + expectedSizes := make(map[string]int64) + for i := range oo { + cnr, _ := oo[i].ContainerID() + oSize := int64(oo[i].PayloadSize()) + expectedSizes[cnr.EncodeToString()] += oSize + totalPayload += oSize + } + t.Run("put", func(t *testing.T) { var prm shard.PutPrm @@ -93,8 +112,10 @@ func TestCounters(t *testing.T) { require.NoError(t, err) } - require.Equal(t, uint64(objNumber), mm.s[physical]) - require.Equal(t, uint64(objNumber), mm.s[logical]) + require.Equal(t, uint64(objNumber), mm.objectCounters[physical]) + require.Equal(t, uint64(objNumber), mm.objectCounters[logical]) + require.Equal(t, expectedSizes, mm.containerSize) + require.Equal(t, totalPayload, mm.payloadSize) }) t.Run("inhume_GC", func(t *testing.T) { @@ -108,8 +129,10 @@ func TestCounters(t *testing.T) { require.NoError(t, err) } - require.Equal(t, uint64(objNumber), mm.s[physical]) - require.Equal(t, uint64(objNumber-inhumedNumber), mm.s[logical]) + require.Equal(t, uint64(objNumber), mm.objectCounters[physical]) + require.Equal(t, uint64(objNumber-inhumedNumber), mm.objectCounters[logical]) + require.Equal(t, expectedSizes, mm.containerSize) + require.Equal(t, totalPayload, mm.payloadSize) oo = oo[inhumedNumber:] }) @@ -118,8 +141,8 @@ func TestCounters(t *testing.T) { var prm shard.InhumePrm ts := objectcore.AddressOf(generateObject(t)) - phy := mm.s[physical] - logic := mm.s[logical] + phy := mm.objectCounters[physical] + logic := mm.objectCounters[logical] inhumedNumber := int(phy / 4) prm.SetTarget(ts, addrFromObjs(oo[:inhumedNumber])...) @@ -127,8 +150,10 @@ func TestCounters(t *testing.T) { _, err := sh.Inhume(prm) require.NoError(t, err) - require.Equal(t, phy, mm.s[physical]) - require.Equal(t, logic-uint64(inhumedNumber), mm.s[logical]) + require.Equal(t, phy, mm.objectCounters[physical]) + require.Equal(t, logic-uint64(inhumedNumber), mm.objectCounters[logical]) + require.Equal(t, expectedSizes, mm.containerSize) + require.Equal(t, totalPayload, mm.payloadSize) oo = oo[inhumedNumber:] }) @@ -136,8 +161,8 @@ func TestCounters(t *testing.T) { t.Run("Delete", func(t *testing.T) { var prm shard.DeletePrm - phy := mm.s[physical] - logic := mm.s[logical] + phy := mm.objectCounters[physical] + logic := mm.objectCounters[logical] deletedNumber := int(phy / 4) prm.SetAddresses(addrFromObjs(oo[:deletedNumber])...) @@ -145,8 +170,18 @@ func TestCounters(t *testing.T) { _, err := sh.Delete(prm) require.NoError(t, err) - require.Equal(t, phy-uint64(deletedNumber), mm.s[physical]) - require.Equal(t, logic-uint64(deletedNumber), mm.s[logical]) + require.Equal(t, phy-uint64(deletedNumber), mm.objectCounters[physical]) + require.Equal(t, logic-uint64(deletedNumber), mm.objectCounters[logical]) + var totalRemovedpayload uint64 + for i := range oo[:deletedNumber] { + removedPayload := oo[i].PayloadSize() + totalRemovedpayload += removedPayload + + cnr, _ := oo[i].ContainerID() + expectedSizes[cnr.EncodeToString()] -= int64(removedPayload) + } + require.Equal(t, expectedSizes, mm.containerSize) + require.Equal(t, totalPayload-int64(totalRemovedpayload), mm.payloadSize) }) } @@ -163,11 +198,11 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) { } mm := &metricsStore{ - s: map[string]uint64{ - "phy": 0, - "logic": 0, - "readonly": 1, + objectCounters: map[string]uint64{ + "phy": 0, + "logic": 0, }, + containerSize: make(map[string]int64), } sh := shard.New( diff --git a/pkg/local_object_storage/shard/put.go b/pkg/local_object_storage/shard/put.go index 46df7a5936..dfbd4de97e 100644 --- a/pkg/local_object_storage/shard/put.go +++ b/pkg/local_object_storage/shard/put.go @@ -79,6 +79,7 @@ func (s *Shard) Put(prm PutPrm) (PutRes, error) { } s.incObjectCounter() + s.addToContainerSize(putPrm.Address.Container().EncodeToString(), int64(prm.obj.PayloadSize())) } return PutRes{}, nil diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index bb48a6b453..398b080324 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -53,6 +53,12 @@ type MetricsWriter interface { // type. // Negative parameter must decrease the counter. AddToObjectCounter(objectType string, delta int) + // AddToContainerSize must add a value to the container size. + // Value can be negative. + AddToContainerSize(cnr string, value int64) + // AddToPayloadSize must add a value to the payload size. + // Value can be negative. + AddToPayloadSize(value int64) // IncObjectCounter must increment shard's object counter taking into account // object type. IncObjectCounter(objectType string) @@ -323,7 +329,7 @@ const ( logical = "logic" ) -func (s *Shard) updateObjectCounter() { +func (s *Shard) initMetrics() { if s.cfg.metricsWriter != nil && !s.GetMode().NoMetabase() { cc, err := s.metaBase.ObjectCounters() if err != nil { @@ -336,6 +342,28 @@ func (s *Shard) updateObjectCounter() { s.cfg.metricsWriter.SetObjectCounter(physical, cc.Phy()) s.cfg.metricsWriter.SetObjectCounter(logical, cc.Logic()) + + cnrList, err := s.metaBase.Containers() + if err != nil { + s.log.Warn("meta: can't read container list", zap.Error(err)) + return + } + + var totalPayload uint64 + + for i := range cnrList { + size, err := s.metaBase.ContainerSize(cnrList[i]) + if err != nil { + s.log.Warn("meta: can't read container size", + zap.String("cid", cnrList[i].EncodeToString()), + zap.Error(err)) + continue + } + s.metricsWriter.AddToContainerSize(cnrList[i].EncodeToString(), int64(size)) + totalPayload += size + } + + s.metricsWriter.AddToPayloadSize(int64(totalPayload)) } } @@ -353,3 +381,15 @@ func (s *Shard) decObjectCounterBy(typ string, v uint64) { s.cfg.metricsWriter.AddToObjectCounter(typ, -int(v)) } } + +func (s *Shard) addToContainerSize(cnr string, size int64) { + if s.cfg.metricsWriter != nil { + s.cfg.metricsWriter.AddToContainerSize(cnr, size) + } +} + +func (s *Shard) addToPayloadCounter(size int64) { + if s.cfg.metricsWriter != nil { + s.cfg.metricsWriter.AddToPayloadSize(size) + } +} diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 3f91d780b8..533a2b98cb 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -19,6 +19,8 @@ type ( rangeDuration prometheus.Counter searchDuration prometheus.Counter listObjectsDuration prometheus.Counter + containerSize prometheus.GaugeVec + payloadSize prometheus.GaugeVec } ) @@ -102,6 +104,20 @@ func newEngineMetrics() engineMetrics { Name: "list_objects_duration", Help: "Accumulated duration of engine list objects operations", }) + + containerSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "container_size", + Help: "Accumulated size of all objects in a container", + }, []string{containerIDLabelKey}) + + payloadSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "payload_size", + Help: "Accumulated size of all objects in a shard", + }, []string{shardIDLabelKey}) ) return engineMetrics{ @@ -116,6 +132,8 @@ func newEngineMetrics() engineMetrics { rangeDuration: rangeDuration, searchDuration: searchDuration, listObjectsDuration: listObjectsDuration, + containerSize: *containerSize, + payloadSize: *payloadSize, } } @@ -131,6 +149,8 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.rangeDuration) prometheus.MustRegister(m.searchDuration) prometheus.MustRegister(m.listObjectsDuration) + prometheus.MustRegister(m.containerSize) + prometheus.MustRegister(m.payloadSize) } func (m engineMetrics) AddListContainersDuration(d time.Duration) { @@ -176,3 +196,15 @@ func (m engineMetrics) AddSearchDuration(d time.Duration) { func (m engineMetrics) AddListObjectsDuration(d time.Duration) { m.listObjectsDuration.Add(float64(d)) } + +func (m engineMetrics) AddToContainerSize(cnrID string, size int64) { + m.containerSize.With( + prometheus.Labels{ + containerIDLabelKey: cnrID, + }, + ).Add(float64(size)) +} + +func (m engineMetrics) AddToPayloadCounter(shardID string, size int64) { + m.payloadSize.With(prometheus.Labels{shardIDLabelKey: shardID}).Add(float64(size)) +} diff --git a/pkg/metrics/object.go b/pkg/metrics/object.go index a1a9ca863a..3ea7ac2e41 100644 --- a/pkg/metrics/object.go +++ b/pkg/metrics/object.go @@ -43,6 +43,7 @@ type ( const ( shardIDLabelKey = "shard" counterTypeLabelKey = "type" + containerIDLabelKey = "cid" ) func newMethodCallCounter(name string) methodCount {