Skip to content

Commit

Permalink
refactor: make batcher configurable (#815)
Browse files Browse the repository at this point in the history
Co-authored-by: Marko Baricevic <[email protected]>
Co-authored-by: Cool Developer <[email protected]>
Co-authored-by: Julien Robert <[email protected]>
(cherry picked from commit 0420895)

# Conflicts:
#	CHANGELOG.md
  • Loading branch information
tac0turtle authored and mergify[bot] committed Aug 11, 2023
1 parent ec9839f commit 67ae83e
Show file tree
Hide file tree
Showing 15 changed files with 118 additions and 83 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Improvements

- [#695](https://github.com/cosmos/iavl/pull/695) Add API `SaveChangeSet` to save the changeset as a new version.
- [#703](https://github.com/cosmos/iavl/pull/703) New APIs `NewCompressExporter`/`NewCompressImporter` to support more compact snapshot format.
- [#729](https://github.com/cosmos/iavl/pull/729) Speedup Genesis writes for IAVL, by writing in small batches.
- [#726](https://github.com/cosmos/iavl/pull/726) Make `KVPair` and `ChangeSet` serializable with protobuf.
Expand All @@ -19,12 +20,17 @@

- [#735](https://github.com/cosmos/iavl/pull/735) Pass logger to `NodeDB`, `MutableTree` and `ImmutableTree`
- [#646](https://github.com/cosmos/iavl/pull/646) Remove the `orphans` from the storage
<<<<<<< HEAD
- [#777](https://github.com/cosmos/iavl/pull/777) Don't return errors from ImmutableTree.Hash, NewImmutableTree, NewImmutableTreeWIthOpts

### API Changes

=======
- [#777](https://github.com/cosmos/iavl/pull/777) Don't return errors from ImmutableTree.Hash, NewImmutableTree
- [#815](https://github.com/cosmos/iavl/pull/815) `NewMutableTreeWithOpts` was removed in favour of accepting options via a variadic in `NewMutableTree`
- [#815](https://github.com/cosmos/iavl/pull/815) `NewImmutableTreeWithOpts` is removed in favour of accepting options via a variadic in `NewImmutableTree`
>>>>>>> 0420895 (refactor: make batcher configurable (#815))
- [#646](https://github.com/cosmos/iavl/pull/646) Remove the `DeleteVersion`, `DeleteVersions`, `DeleteVersionsRange` and introduce a new endpoint of `DeleteVersionsTo` instead
- [#695](https://github.com/cosmos/iavl/pull/695) Add API `SaveChangeSet` to save the changeset as a new version.

## 0.20.0 (March 14, 2023)

Expand Down
20 changes: 10 additions & 10 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
type BatchWithFlusher struct {
db dbm.DB // This is only used to create new batch
batch dbm.Batch // Batched writing buffer.

flushThreshold int // The threshold to flush the batch to disk.
}

var _ dbm.Batch = &BatchWithFlusher{}

// Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115
var flushThreshold = 100000

// NewBatchWithFlusher returns new BatchWithFlusher wrapping the passed in batch
func NewBatchWithFlusher(db dbm.DB) *BatchWithFlusher {
func NewBatchWithFlusher(db dbm.DB, flushThreshold int) *BatchWithFlusher {
return &BatchWithFlusher{
db: db,
batch: db.NewBatchWithSize(flushThreshold),
db: db,
batch: db.NewBatchWithSize(flushThreshold),
flushThreshold: flushThreshold,
}
}

Expand Down Expand Up @@ -50,7 +50,7 @@ func (b *BatchWithFlusher) Set(key []byte, value []byte) error {
if err != nil {
return err
}
if batchSizeAfter > flushThreshold {
if batchSizeAfter > b.flushThreshold {
err = b.batch.Write()
if err != nil {
return err
Expand All @@ -59,7 +59,7 @@ func (b *BatchWithFlusher) Set(key []byte, value []byte) error {
if err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(flushThreshold)
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
}
err = b.batch.Set(key, value)
if err != nil {
Expand All @@ -77,7 +77,7 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
if err != nil {
return err
}
if batchSizeAfter > flushThreshold {
if batchSizeAfter > b.flushThreshold {
err = b.batch.Write()
if err != nil {
return err
Expand All @@ -86,7 +86,7 @@ func (b *BatchWithFlusher) Delete(key []byte) error {
if err != nil {
return err
}
b.batch = b.db.NewBatchWithSize(flushThreshold)
b.batch = b.db.NewBatchWithSize(b.flushThreshold)
}
err = b.batch.Delete(key)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func testBatchWithFlusher(t *testing.T, backend dbm.BackendType) {
require.NoError(t, err)
defer cleanupDBDir(dir, name)

batchWithFlusher := NewBatchWithFlusher(db)
batchWithFlusher := NewBatchWithFlusher(db, DefaultOptions().FlushThreshold)

// we'll try to to commit 10MBs (1000 * 10KBs each entries) of data into the db
for keyNonce := uint16(0); keyNonce < 1000; keyNonce++ {
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func randBytes(length int) []byte {
}

func prepareTree(b *testing.B, db db.DB, size, keyLen, dataLen int) (*iavl.MutableTree, [][]byte) {
t := iavl.NewMutableTreeWithOpts(db, size, nil, false, log.NewNopLogger())
t := iavl.NewMutableTree(db, size, false, log.NewNopLogger())
keys := make([][]byte, size)

for i := 0; i < size; i++ {
Expand Down
12 changes: 6 additions & 6 deletions immutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ type ImmutableTree struct {
}

// NewImmutableTree creates both in-memory and persistent instances
func NewImmutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool, lg log.Logger) *ImmutableTree {
func NewImmutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool, lg log.Logger, options ...Option) *ImmutableTree {
opts := DefaultOptions()
for _, opt := range options {
opt(&opts)
}

if db == nil {
// In-memory Tree.
return &ImmutableTree{}
}

return NewImmutableTreeWithOpts(db, cacheSize, nil, skipFastStorageUpgrade, lg)
}

// NewImmutableTreeWithOpts creates an ImmutableTree with the given options.
func NewImmutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastStorageUpgrade bool, lg log.Logger) *ImmutableTree {
return &ImmutableTree{
logger: lg,
// NodeDB-backed Tree.
Expand Down
4 changes: 2 additions & 2 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ func TestNodeIterator_Success(t *testing.T) {
}

func TestNodeIterator_WithEmptyRoot(t *testing.T) {
itr, err := NewNodeIterator(nil, newNodeDB(dbm.NewMemDB(), 0, nil, log.NewNopLogger()))
itr, err := NewNodeIterator(nil, newNodeDB(dbm.NewMemDB(), 0, DefaultOptions(), log.NewNopLogger()))
require.NoError(t, err)
require.False(t, itr.Valid())

itr, err = NewNodeIterator([]byte{}, newNodeDB(dbm.NewMemDB(), 0, nil, log.NewNopLogger()))
itr, err = NewNodeIterator([]byte{}, newNodeDB(dbm.NewMemDB(), 0, DefaultOptions(), log.NewNopLogger()))
require.NoError(t, err)
require.False(t, itr.Valid())
}
28 changes: 16 additions & 12 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@ import (
ibytes "github.com/cosmos/iavl/internal/bytes"
)

// commitGap after upgrade/delete commitGap FastNodes when commit the batch
var commitGap uint64 = 5000000
var (
// commitGap after upgrade/delete commitGap FastNodes when commit the batch
commitGap uint64 = 5000000

// ErrVersionDoesNotExist is returned if a requested version does not exist.
var ErrVersionDoesNotExist = errors.New("version does not exist")
// ErrVersionDoesNotExist is returned if a requested version does not exist.
ErrVersionDoesNotExist = errors.New("version does not exist")

// ErrKeyDoesNotExist is returned if a key does not exist.
var ErrKeyDoesNotExist = errors.New("key does not exist")
// ErrKeyDoesNotExist is returned if a key does not exist.
ErrKeyDoesNotExist = errors.New("key does not exist")
)

type Option func(*Options)

// MutableTree is a persistent tree which keeps track of versions. It is not safe for concurrent
// use, and should be guarded by a Mutex or RWLock as appropriate. An immutable tree at a given
Expand All @@ -44,13 +48,13 @@ type MutableTree struct {
mtx sync.Mutex
}

// NewMutableTree returns a new tree with the specified cache size and datastore.
func NewMutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool, lg log.Logger) *MutableTree {
return NewMutableTreeWithOpts(db, cacheSize, nil, skipFastStorageUpgrade, lg)
}
// NewMutableTree returns a new tree with the specified optional options.
func NewMutableTree(db dbm.DB, cacheSize int, skipFastStorageUpgrade bool, lg log.Logger, options ...Option) *MutableTree {
opts := DefaultOptions()
for _, opt := range options {
opt(&opts)
}

// NewMutableTreeWithOpts returns a new tree with the specified options.
func NewMutableTreeWithOpts(db dbm.DB, cacheSize int, opts *Options, skipFastStorageUpgrade bool, lg log.Logger) *MutableTree {
ndb := newNodeDB(db, cacheSize, opts, lg)
head := &ImmutableTree{ndb: ndb, skipFastStorageUpgrade: skipFastStorageUpgrade}

Expand Down
14 changes: 7 additions & 7 deletions mutable_tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestIteratorConcurrency(t *testing.T) {
}
itr, _ := tree.Iterator(nil, nil, true)
for ; itr.Valid(); itr.Next() {
// do nothing
}
}
wg.Wait()
Expand All @@ -107,6 +108,7 @@ func TestNewIteratorConcurrency(t *testing.T) {
}(i, j)
}
for ; it.Valid(); it.Next() {
// do nothing
}
wg.Wait()
}
Expand Down Expand Up @@ -257,7 +259,7 @@ func TestMutableTree_LoadVersion_Empty(t *testing.T) {

func TestMutableTree_InitialVersion(t *testing.T) {
memDB := db.NewMemDB()
tree := NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9}, false, log.NewNopLogger())
tree := NewMutableTree(memDB, 0, false, log.NewNopLogger(), InitialVersionOption(9))

_, err := tree.Set([]byte("a"), []byte{0x01})
require.NoError(t, err)
Expand All @@ -272,18 +274,18 @@ func TestMutableTree_InitialVersion(t *testing.T) {
assert.EqualValues(t, 10, version)

// Reloading the tree with the same initial version is fine
tree = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 9}, false, log.NewNopLogger())
tree = NewMutableTree(memDB, 0, false, log.NewNopLogger(), InitialVersionOption(9))
version, err = tree.Load()
require.NoError(t, err)
assert.EqualValues(t, 10, version)

// Reloading the tree with an initial version beyond the lowest should error
tree = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 10}, false, log.NewNopLogger())
tree = NewMutableTree(memDB, 0, false, log.NewNopLogger(), InitialVersionOption(10))
_, err = tree.Load()
require.Error(t, err)

// Reloading the tree with a lower initial version is fine, and new versions can be produced
tree = NewMutableTreeWithOpts(memDB, 0, &Options{InitialVersion: 3}, false, log.NewNopLogger())
tree = NewMutableTree(memDB, 0, false, log.NewNopLogger(), InitialVersionOption(3))
version, err = tree.Load()
require.NoError(t, err)
assert.EqualValues(t, 10, version)
Expand Down Expand Up @@ -1418,9 +1420,7 @@ func TestMutableTree_InitialVersion_FirstVersion(t *testing.T) {
db := db.NewMemDB()

initialVersion := int64(1000)
tree := NewMutableTreeWithOpts(db, 0, &Options{
InitialVersion: uint64(initialVersion),
}, true, log.NewNopLogger())
tree := NewMutableTree(db, 0, true, log.NewNopLogger(), InitialVersionOption(uint64(initialVersion)))

_, err := tree.Set([]byte("hello"), []byte("world"))
require.NoError(t, err)
Expand Down
15 changes: 5 additions & 10 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,7 @@ type nodeDB struct {
fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version.
}

func newNodeDB(db dbm.DB, cacheSize int, opts *Options, lg log.Logger) *nodeDB {
if opts == nil {
o := DefaultOptions()
opts = &o
}

func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg log.Logger) *nodeDB {
storeVersion, err := db.Get(metadataKeyFormat.Key([]byte(storageVersionKey)))

if err != nil || storeVersion == nil {
Expand All @@ -100,8 +95,8 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options, lg log.Logger) *nodeDB {
return &nodeDB{
logger: lg,
db: db,
batch: NewBatchWithFlusher(db),
opts: *opts,
batch: NewBatchWithFlusher(db, opts.FlushThreshold),
opts: opts,
firstVersion: 0,
latestVersion: 0, // initially invalid
legacyLatestVersion: 0,
Expand Down Expand Up @@ -382,7 +377,7 @@ func (ndb *nodeDB) writeBatch() error {
return err
}

ndb.batch = NewBatchWithFlusher(ndb.db)
ndb.batch = NewBatchWithFlusher(ndb.db, ndb.opts.FlushThreshold)

return nil
}
Expand Down Expand Up @@ -877,7 +872,7 @@ func (ndb *nodeDB) Commit() error {
}

ndb.batch.Close()
ndb.batch = NewBatchWithFlusher(ndb.db)
ndb.batch = NewBatchWithFlusher(ndb.db, ndb.opts.FlushThreshold)

return nil
}
Expand Down
Loading

0 comments on commit 67ae83e

Please sign in to comment.