Skip to content

Commit

Permalink
RSDB: metrics and performance improvements (#2184)
Browse files Browse the repository at this point in the history
- Reuses the existing eth/leveldb module to bring I/O-related metrics to the Roundstate DB.
- Increases default Level DB values related to the block size, cache capacity, write buffer size and compaction table size to reduce the amount of I/O operations and the number of non-level0 compactions of both State and Roundstate DBs.
  • Loading branch information
carterqw2 authored Sep 11, 2023
1 parent cd2e13a commit 0ed594e
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 34 deletions.
104 changes: 70 additions & 34 deletions consensus/istanbul/core/roundstate_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import (
"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/common/task"
"github.com/celo-org/celo-blockchain/consensus/istanbul"
"github.com/celo-org/celo-blockchain/ethdb/leveldb"
"github.com/celo-org/celo-blockchain/log"
"github.com/celo-org/celo-blockchain/metrics"
"github.com/celo-org/celo-blockchain/rlp"
"github.com/syndtr/goleveldb/leveldb"
lvlerrors "github.com/syndtr/goleveldb/leveldb/errors"
goleveldb "github.com/syndtr/goleveldb/leveldb"

"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)

Expand Down Expand Up @@ -63,10 +64,19 @@ type RoundStateDBOptions struct {
}

type roundStateDBImpl struct {
db *leveldb.DB
db *leveldb.Database

stopGarbageCollector task.StopFn
opts RoundStateDBOptions
logger log.Logger

rsRLPMeter metrics.Meter // Meter for measuring the size of rs RLP-encoded data
rsRLPEncTimer metrics.Timer // Timer measuring time required for rs RLP encoding
rsDbSaveTimer metrics.Timer // Timer measuring rs DB write latency
rcvdRLPMeter metrics.Meter // Meter for measuring the size of received consensus messages (rcvd) RLP-encoded data
rcvdRLPEncTimer metrics.Timer // Timer measuring time required for received consensus messages to be RLP encoded
rcvdDbSaveTimer metrics.Timer // Timer measuring DB write latency for received consensus messages

logger log.Logger // Contextual logger tracking the database path
}

var defaultRoundStateDBOptions = RoundStateDBOptions{
Expand All @@ -93,13 +103,14 @@ func coerceOptions(opts *RoundStateDBOptions) RoundStateDBOptions {
func newRoundStateDB(path string, opts *RoundStateDBOptions) (RoundStateDB, error) {
logger := log.New("func", "newRoundStateDB", "type", "roundStateDB", "rsdb_path", path)

namespace := "consensus/istanbul/roundstate/db/"
logger.Info("Open roundstate db")
var db *leveldb.DB
var db *leveldb.Database
var err error
if path == "" {
db, err = newMemoryDB()
} else {
db, err = newPersistentDB(path)
db, err = newPersistentDB(path, namespace)
}

if err != nil {
Expand All @@ -113,6 +124,13 @@ func newRoundStateDB(path string, opts *RoundStateDBOptions) (RoundStateDB, erro
logger: logger,
}

rsdb.rsRLPMeter = metrics.NewRegisteredMeter(namespace+"rs/rlp/encoding/size", nil)
rsdb.rsRLPEncTimer = metrics.NewRegisteredTimer(namespace+"rs/rlp/encoding/duration", nil)
rsdb.rsDbSaveTimer = metrics.NewRegisteredTimer(namespace+"rs/db/save/time", nil)
rsdb.rcvdRLPMeter = metrics.NewRegisteredMeter(namespace+"rcvd/rlp/encoding/size", nil)
rsdb.rcvdRLPEncTimer = metrics.NewRegisteredTimer(namespace+"rcvd/rlp/encoding/duration", nil)
rsdb.rcvdDbSaveTimer = metrics.NewRegisteredTimer(namespace+"rcvd/db/save/time", nil)

if rsdb.opts.withGarbageCollector {
rsdb.stopGarbageCollector = task.RunTaskRepeateadly(rsdb.garbageCollectEntries, task.NewDefaultTicker(rsdb.opts.garbageCollectorPeriod))
}
Expand All @@ -121,35 +139,36 @@ func newRoundStateDB(path string, opts *RoundStateDBOptions) (RoundStateDB, erro
}

// newMemoryDB creates a new in-memory node database without a persistent backend.
func newMemoryDB() (*leveldb.DB, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return db, nil
func newMemoryDB() (*leveldb.Database, error) {
return leveldb.NewInMemory()
}

// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentDB(path string) (*leveldb.DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted {
db, err = leveldb.RecoverFile(path, nil)
}
func newPersistentDB(path string, namespace string) (*leveldb.Database, error) {
db, err := leveldb.NewCustom(path, namespace, func(options *opt.Options) {
// increasing default values by a factor of 32 to decrease the number of
// compactions and overall disk operations, making a trade-off between
// high I/O usage and higher memory requirements in favor of the latter
options.BlockSize = 128 * opt.KiB
options.BlockCacheCapacity = 256 * opt.MiB
options.WriteBuffer = 128 * opt.MiB
})

if err != nil {
return nil, err
}

// The nodes contained in the cache correspond to a certain protocol version.
// Flush all nodes if the version doesn't match.
currentVer := make([]byte, binary.MaxVarintLen64)
currentVer = currentVer[:binary.PutVarint(currentVer, int64(dbVersion))]

blob, err := db.Get([]byte(dbVersionKey), nil)
blob, err := db.Get([]byte(dbVersionKey))
switch err {
case leveldb.ErrNotFound:
case goleveldb.ErrNotFound:
// Version not found (i.e. empty cache), insert it
if err := db.Put([]byte(dbVersionKey), currentVer, nil); err != nil {
if err := db.Put([]byte(dbVersionKey), currentVer); err != nil {
db.Close()
return nil, err
}
Expand All @@ -161,7 +180,7 @@ func newPersistentDB(path string) (*leveldb.DB, error) {
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return newPersistentDB(path)
return newPersistentDB(path, namespace)
}
}
return db, nil
Expand Down Expand Up @@ -260,17 +279,26 @@ func (rsdb *roundStateDBImpl) UpdateLastRcvd(rs RoundState) error {
if err != nil {
return err
}
// Encode and measure time
before := time.Now()
entryBytes, err := rlp.EncodeToBytes(&rRLP)
rsdb.rcvdRLPEncTimer.UpdateSince(before)

if err != nil {
logger.Error("Failed to save rcvd messages from roundState", "reason", "rlp encoding", "err", err)
return err
}
batch := new(leveldb.Batch)

rsdb.rcvdRLPMeter.Mark(int64(len(entryBytes)))

before = time.Now()
batch := rsdb.db.NewBatch()
batch.Put(rcvdViewKey, entryBytes)
err = rsdb.db.Write(batch, nil)
err = batch.Write()
if err != nil {
logger.Error("Failed to save rcvd messages from roundState", "reason", "levelDB write", "err", err, "func")
}
rsdb.rcvdDbSaveTimer.UpdateSince(before)

return err
}
Expand All @@ -283,17 +311,25 @@ func (rsdb *roundStateDBImpl) UpdateLastRoundState(rs RoundState) error {
logger := rsdb.logger.New("func", "UpdateLastRoundState")
viewKey := view2Key(rs.View())

before := time.Now()
entryBytes, err := rlp.EncodeToBytes(rs)
rsdb.rsRLPEncTimer.UpdateSince(before)

if err != nil {
logger.Error("Failed to save roundState", "reason", "rlp encoding", "err", err)
return err
}

batch := new(leveldb.Batch)
rsdb.rsRLPMeter.Mark(int64(len(entryBytes)))

before = time.Now()
batch := rsdb.db.NewBatch()
batch.Put([]byte(lastViewKey), viewKey)
batch.Put(viewKey, entryBytes)

err = rsdb.db.Write(batch, nil)
err = batch.Write()
rsdb.rsDbSaveTimer.UpdateSince(before)

if err != nil {
logger.Error("Failed to save roundState", "reason", "levelDB write", "err", err, "func")
}
Expand All @@ -302,7 +338,7 @@ func (rsdb *roundStateDBImpl) UpdateLastRoundState(rs RoundState) error {
}

func (rsdb *roundStateDBImpl) GetLastView() (*istanbul.View, error) {
rawEntry, err := rsdb.db.Get([]byte(lastViewKey), nil)
rawEntry, err := rsdb.db.Get([]byte(lastViewKey))
if err != nil {
return nil, err
}
Expand All @@ -313,7 +349,7 @@ func (rsdb *roundStateDBImpl) GetLastView() (*istanbul.View, error) {
func (rsdb *roundStateDBImpl) GetOldestValidView() (*istanbul.View, error) {
lastView, err := rsdb.GetLastView()
// If nothing stored all views are valid
if err == leveldb.ErrNotFound {
if err == goleveldb.ErrNotFound {
return &istanbul.View{Sequence: common.Big0, Round: common.Big0}, nil
} else if err != nil {
return nil, err
Expand All @@ -329,7 +365,7 @@ func (rsdb *roundStateDBImpl) GetOldestValidView() (*istanbul.View, error) {

func (rsdb *roundStateDBImpl) GetRoundStateFor(view *istanbul.View) (RoundState, error) {
viewKey := view2Key(view)
rawEntry, err := rsdb.db.Get(viewKey, nil)
rawEntry, err := rsdb.db.Get(viewKey)
if err != nil {
return nil, err
}
Expand All @@ -340,9 +376,9 @@ func (rsdb *roundStateDBImpl) GetRoundStateFor(view *istanbul.View) (RoundState,
}
// Check if rcvd is stored
rcvdViewKey := rcvdView2Key(view)
rawRcvd, err := rsdb.db.Get(rcvdViewKey, nil)
rawRcvd, err := rsdb.db.Get(rcvdViewKey)
// No rcvd, return the roundstate as found
if err == leveldb.ErrNotFound {
if err == goleveldb.ErrNotFound {
return &entry, nil
}
// Unknown error. Return the roundstate as found, but log the err
Expand Down Expand Up @@ -414,12 +450,12 @@ func (rsdb *roundStateDBImpl) deleteEntriesOlderThan(lastView *istanbul.View) (i
}

func (rsdb *roundStateDBImpl) deleteIteratorEntries(rang *util.Range) (int, error) {
iter := rsdb.db.NewIterator(rang, nil)
iter := rsdb.db.NewRangeIterator(rang)
defer iter.Release()
counter := 0
for iter.Next() {
rawKey := iter.Key()
err := rsdb.db.Delete(rawKey, nil)
err := rsdb.db.Delete(rawKey)
if err != nil {
return counter, err
}
Expand Down
25 changes: 25 additions & 0 deletions ethdb/leveldb/celo_leveldb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package leveldb

import (
"github.com/celo-org/celo-blockchain/ethdb"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)

// NewInMemory returns a wrapped LevelDB object with an in-memory storage.
func NewInMemory() (*Database, error) {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
if err != nil {
return nil, err
}
return &Database{
db: db,
}, nil
}

// NewRangeIterator creates an over a subset of database content starting at
// and ending at a particular key.
func (db *Database) NewRangeIterator(rang *util.Range) ethdb.Iterator {
return db.db.NewIterator(rang, nil)
}
1 change: 1 addition & 0 deletions ethdb/leveldb/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ func configureOptions(customizeFn func(*opt.Options)) *opt.Options {
options := &opt.Options{
Filter: filter.NewBloomFilter(10),
DisableSeeksCompaction: true,
CompactionTableSize: 10 * opt.MiB,
}
// Allow caller to make custom modifications to the options
if customizeFn != nil {
Expand Down

0 comments on commit 0ed594e

Please sign in to comment.