From 927bab0164afb8615dd00c00f02f9ca0f68d1a25 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Fri, 16 Feb 2024 11:51:19 -0500 Subject: [PATCH 01/10] add the catch up process in the migration --- store/changeset.go | 104 ++++++++++++++++++++ store/migration/manager.go | 163 ++++++++++++++++++++++++++++++-- store/migration/manager_test.go | 10 +- store/root/migrate_test.go | 154 ++++++++++++++++++++++++++++++ store/root/store.go | 86 ++++++++++++++--- store/root/store_test.go | 4 +- store/store.go | 6 ++ 7 files changed, 496 insertions(+), 31 deletions(-) create mode 100644 store/root/migrate_test.go diff --git a/store/changeset.go b/store/changeset.go index 7c5d88a864a3..b7d431086223 100644 --- a/store/changeset.go +++ b/store/changeset.go @@ -1,5 +1,11 @@ package store +import ( + "bytes" + + "cosmossdk.io/store/v2/internal/encoding" +) + // KVPair defines a key-value pair with additional metadata that is used to // track writes. Deletion can be denoted by a nil value or explicitly by the // Delete field. @@ -61,3 +67,101 @@ func (cs *Changeset) Merge(other *Changeset) { cs.Pairs[storeKey] = append(cs.Pairs[storeKey], pairs...) } } + +// encodedSize returns the size of the encoded Changeset. +func (cs *Changeset) encodedSize() int { + size := encoding.EncodeUvarintSize(uint64(len(cs.Pairs))) + for storeKey, pairs := range cs.Pairs { + size += encoding.EncodeBytesSize([]byte(storeKey)) + size += encoding.EncodeUvarintSize(uint64(len(pairs))) + for _, pair := range pairs { + size += encoding.EncodeBytesSize(pair.Key) + size += encoding.EncodeBytesSize(pair.Value) + } + } + return size +} + +// Marshal returns the encoded byte representation of Changeset. +// NOTE: The Changeset is encoded as follows: +// - number of store keys (uvarint) +// - for each store key: +// -- store key (bytes) +// -- number of pairs (uvarint) +// -- for each pair: +// --- key (bytes) +// --- value (bytes) +func (cs *Changeset) Marshal() ([]byte, error) { + var buf bytes.Buffer + buf.Grow(cs.encodedSize()) + + if err := encoding.EncodeUvarint(&buf, uint64(len(cs.Pairs))); err != nil { + return nil, err + } + for storeKey, pairs := range cs.Pairs { + if err := encoding.EncodeBytes(&buf, []byte(storeKey)); err != nil { + return nil, err + } + if err := encoding.EncodeUvarint(&buf, uint64(len(pairs))); err != nil { + return nil, err + } + for _, pair := range pairs { + if err := encoding.EncodeBytes(&buf, pair.Key); err != nil { + return nil, err + } + if err := encoding.EncodeBytes(&buf, pair.Value); err != nil { + return nil, err + } + } + } + + return buf.Bytes(), nil +} + +// Unmarshal decodes the Changeset from the given byte slice. +func (cs *Changeset) Unmarshal(buf []byte) error { + storeCount, n, err := encoding.DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + + cs.Pairs = make(map[string]KVPairs, storeCount) + for i := uint64(0); i < storeCount; i++ { + storeKey, n, err := encoding.DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairCount, n, err := encoding.DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairs := make(KVPairs, pairCount) + for j := uint64(0); j < pairCount; j++ { + key, n, err := encoding.DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + value, n, err := encoding.DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairs[j] = KVPair{ + Key: key, + Value: value, + StoreKey: string(storeKey), + } + } + cs.Pairs[string(storeKey)] = pairs + } + + return nil +} diff --git a/store/migration/manager.go b/store/migration/manager.go index 48537664a3f4..21bd21f085d8 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -1,11 +1,18 @@ package migration import ( + "encoding/binary" + "fmt" + "sync" + "time" + "golang.org/x/sync/errgroup" "cosmossdk.io/log" "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" ) const ( @@ -13,25 +20,68 @@ const ( defaultChannelBufferSize = 1024 // defaultStorageBufferSize is the default buffer size for the storage snapshotter. defaultStorageBufferSize = 1024 + + migrateChangesetKeyFmt = "m/cs_%x" // m/ ) +// VersionedChangeset is a pair of version and Changeset. +type VersionedChangeset struct { + Version uint64 + Changeset *store.Changeset +} + // Manager manages the migration of the whole state from store/v1 to store/v2. type Manager struct { logger log.Logger snapshotsManager *snapshots.Manager - storageSnapshotter snapshots.StorageSnapshotter - commitSnapshotter snapshots.CommitSnapshotter + stateStorage *storage.StorageStore + stateCommitment *commitment.CommitStore + + db store.RawDB + migratedVersion uint64 + mtx sync.Mutex + chChangeset <-chan *VersionedChangeset + chDone chan struct{} } // NewManager returns a new Manager. -func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager { +func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager { return &Manager{ - logger: logger, - snapshotsManager: sm, - storageSnapshotter: ss, - commitSnapshotter: cs, + logger: logger, + snapshotsManager: sm, + stateStorage: ss, + stateCommitment: sc, + db: db, + } +} + +// Start starts the whole migration process. +func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone chan struct{}) error { + m.chChangeset = chChangeset + m.chDone = chDone + + go func() { + if err := m.writeChangeset(); err != nil { + m.logger.Error("failed to write changeset", "err", err) + } + }() + + if err := m.Migrate(version); err != nil { + return fmt.Errorf("failed to migrate state: %w", err) } + + return m.Sync() +} + +// GetStateStorage returns the state storage. +func (m *Manager) GetStateStorage() *storage.StorageStore { + return m.stateStorage +} + +// GetStateCommitment returns the state commitment. +func (m *Manager) GetStateCommitment() *commitment.CommitStore { + return m.stateCommitment } // Migrate migrates the whole state at the given height to the new store/v2. @@ -49,13 +99,106 @@ func (m *Manager) Migrate(height uint64) error { eg := new(errgroup.Group) eg.Go(func() error { - return m.storageSnapshotter.Restore(height, chStorage) + return m.stateStorage.Restore(height, chStorage) }) eg.Go(func() error { defer close(chStorage) - _, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage) + _, err := m.stateCommitment.Restore(height, 0, ms, chStorage) return err }) - return eg.Wait() + if err := eg.Wait(); err != nil { + return err + } + + m.mtx.Lock() + m.migratedVersion = height + m.mtx.Unlock() + + return nil +} + +// writeChangeset writes the Changeset to the db. +func (m *Manager) writeChangeset() error { + for vc := range m.chChangeset { + cs := vc.Changeset + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, vc.Version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csBytes, err := cs.Marshal() + if err != nil { + return fmt.Errorf("failed to marshal changeset: %w", err) + } + + batch := m.db.NewBatch() + defer batch.Close() + + if err := batch.Set(csKey, csBytes); err != nil { + return fmt.Errorf("failed to write changeset to db.Batch: %w", err) + } + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write changeset to db: %w", err) + } + } + + return nil +} + +// GetMigratedVersion returns the migrated version. +// It is used to check the migrated version in the RootStore. +func (m *Manager) GetMigratedVersion() uint64 { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.migratedVersion +} + +// Sync catches up the Changesets which are committed while the migration is in progress. +// It should be called after the migration is done. +func (m *Manager) Sync() error { + version := m.GetMigratedVersion() + if version == 0 { + return fmt.Errorf("migration is not done yet") + } + version += 1 + + for { + select { + case <-m.chDone: + return nil + default: + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csBytes, err := m.db.Get(csKey) + if err != nil { + return fmt.Errorf("failed to get changeset from db: %w", err) + } + if csBytes == nil { + // wait for the next changeset + time.Sleep(100 * time.Millisecond) + continue + } + + cs := store.NewChangeset() + if err := cs.Unmarshal(csBytes); err != nil { + return fmt.Errorf("failed to unmarshal changeset: %w", err) + } + + if err := m.stateCommitment.WriteBatch(cs); err != nil { + return fmt.Errorf("failed to write changeset to commitment: %w", err) + } + if _, err := m.stateCommitment.Commit(version); err != nil { + return fmt.Errorf("failed to commit changeset to commitment: %w", err) + } + if err := m.stateStorage.ApplyChangeset(version, cs); err != nil { + return fmt.Errorf("failed to write changeset to storage: %w", err) + } + + m.mtx.Lock() + m.migratedVersion = version + m.mtx.Unlock() + + version += 1 + } + } } diff --git a/store/migration/manager_test.go b/store/migration/manager_test.go index b02ac5db94a9..3f08d320ae2a 100644 --- a/store/migration/manager_test.go +++ b/store/migration/manager_test.go @@ -50,7 +50,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2 require.NoError(t, err) - return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore + return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore } func TestMigrateState(t *testing.T) { @@ -78,17 +78,17 @@ func TestMigrateState(t *testing.T) { for version := uint64(1); version < toVersion; version++ { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - val, err := m.commitSnapshotter.(*commitment.CommitStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + val, err := m.stateCommitment.Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) require.NoError(t, err) require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) } } } // check the latest state - val, err := m.commitSnapshotter.(*commitment.CommitStore).Get("store1", toVersion-1, []byte("key-100-1")) + val, err := m.stateCommitment.Get("store1", toVersion-1, []byte("key-100-1")) require.NoError(t, err) require.Nil(t, val) - val, err = m.commitSnapshotter.(*commitment.CommitStore).Get("store2", toVersion-1, []byte("key-100-0")) + val, err = m.stateCommitment.Get("store2", toVersion-1, []byte("key-100-0")) require.NoError(t, err) require.Nil(t, val) @@ -96,7 +96,7 @@ func TestMigrateState(t *testing.T) { for version := uint64(1); version < toVersion; version++ { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - val, err := m.storageSnapshotter.(*storage.StorageStore).Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + val, err := m.stateStorage.Get(storeKey, toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) require.NoError(t, err) require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) } diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go new file mode 100644 index 000000000000..4809a80f28c5 --- /dev/null +++ b/store/root/migrate_test.go @@ -0,0 +1,154 @@ +package root + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" + "cosmossdk.io/store/v2/migration" + "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" + "cosmossdk.io/store/v2/storage/sqlite" +) + +var ( + storeKeys = []string{"store1", "store2", "store3"} +) + +type MigrateStoreTestSuite struct { + suite.Suite + + rootStore store.RootStore +} + +func TestMigrateStoreTestSuite(t *testing.T) { + suite.Run(t, &MigrateStoreTestSuite{}) +} + +func (s *MigrateStoreTestSuite) SetupTest() { + noopLog := log.NewNopLogger() + + mdb := dbm.NewMemDB() + multiTrees := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, noopLog, iavl.DefaultConfig()) + } + commitStore, err := commitment.NewCommitStore(multiTrees, mdb, nil, noopLog) + s.Require().NoError(err) + + // create a new storage and commitment stores + sqliteDB, err := sqlite.New(s.T().TempDir()) + s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB, nil, noopLog) + + multiTrees1 := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) + } + sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, noopLog) + s.Require().NoError(err) + + snapshotsStore, err := snapshots.NewStore(dbm.NewMemDB(), s.T().TempDir()) + s.Require().NoError(err) + snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, noopLog) + migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, noopLog) + + // assume no storage store, simulate the migration process + s.rootStore, err = New(noopLog, nil, commitStore, migrationManager, nil) + s.Require().NoError(err) +} + +func (s *MigrateStoreTestSuite) TestMigrateState() { + // apply changeset against the original store + toVersion := uint64(100) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + } + + // start the migration process + s.rootStore.StartMigration() + + // continue to apply changeset against the original store + latestVersion := uint64(0) + for version := toVersion + 1; ; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + + // check if the migration is complete + if s.rootStore.GetStateStorage() != nil { + latestVersion = version + break + } + + // add some delay to simulate the consensus process + time.Sleep(100 * time.Millisecond) + } + + // check if the migration is successful + version, err := s.rootStore.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(latestVersion, version) + + // query against the migrated store + for version := uint64(1); version <= latestVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + targetVersion := version + if version < toVersion { + targetVersion = toVersion + } + value, err := s.rootStore.Query(storeKey, targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + s.Require().NoError(err) + s.Require().NotNil(value) + } + } + } + + // prune the old versions + err = s.rootStore.Prune(latestVersion - 1) + s.Require().NoError(err) + + // apply changeset against the migrated store + for version := latestVersion + 1; version <= latestVersion+10; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + } + + version, err = s.rootStore.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(latestVersion+10, version) +} diff --git a/store/root/store.go b/store/root/store.go index 766d3d8e5f7d..36b04e105fd4 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -13,6 +13,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/metrics" + "cosmossdk.io/store/v2/migration" "cosmossdk.io/store/v2/proof" ) @@ -26,8 +27,8 @@ type Store struct { logger log.Logger initialVersion uint64 - // stateStore reflects the state storage backend - stateStore store.VersionedDatabase + // stateStorage reflects the state storage backend + stateStorage store.VersionedDatabase // stateCommitment reflects the state commitment (SC) backend stateCommitment store.Committer @@ -43,30 +44,42 @@ type Store struct { // telemetry reflects a telemetry agent responsible for emitting metrics (if any) telemetry metrics.StoreMetrics + + // Migration related fields + // migrationManager reflects the migration manager used to migrate state from v1 to v2 + migrationManager *migration.Manager + // chChangeset reflects the channel used to send the changeset to the migration manager + chChangeset chan *migration.VersionedChangeset + // chDone reflects the channel used to signal the migration manager that the migration is done + chDone chan struct{} + // isMigrating reflects whether the store is currently migrating + isMigrating bool } func New( logger log.Logger, ss store.VersionedDatabase, sc store.Committer, + mm *migration.Manager, m metrics.StoreMetrics, ) (store.RootStore, error) { return &Store{ - logger: logger.With("module", "root_store"), - initialVersion: 1, - stateStore: ss, - stateCommitment: sc, - telemetry: m, + logger: logger.With("module", "root_store"), + initialVersion: 1, + stateStorage: ss, + stateCommitment: sc, + migrationManager: mm, + telemetry: m, }, nil } // Close closes the store and resets all internal fields. Note, Close() is NOT // idempotent and should only be called once. func (s *Store) Close() (err error) { - err = errors.Join(err, s.stateStore.Close()) + err = errors.Join(err, s.stateStorage.Close()) err = errors.Join(err, s.stateCommitment.Close()) - s.stateStore = nil + s.stateStorage = nil s.stateCommitment = nil s.lastCommitInfo = nil s.commitHeader = nil @@ -106,7 +119,7 @@ func (s *Store) StateAt(v uint64) (store.ReadOnlyRootStore, error) { } func (s *Store) GetStateStorage() store.VersionedDatabase { - return s.stateStore + return s.stateStorage } func (s *Store) GetStateCommitment() store.Committer { @@ -126,7 +139,7 @@ func (s *Store) LastCommitID() (proof.CommitID, error) { // in SS might not be the latest version in the SC stores. // // Ref: https://github.com/cosmos/cosmos-sdk/issues/17314 - latestVersion, err := s.stateStore.GetLatestVersion() + latestVersion, err := s.stateStorage.GetLatestVersion() if err != nil { return proof.CommitID{}, err } @@ -162,7 +175,7 @@ func (s *Store) Query(storeKey string, version uint64, key []byte, prove bool) ( defer s.telemetry.MeasureSince(now, "root_store", "query") } - val, err := s.stateStore.Get(storeKey, version, key) + val, err := s.stateStorage.Get(storeKey, version, key) if err != nil || val == nil { // fallback to querying SC backend if not found in SS backend // @@ -255,6 +268,21 @@ func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) { } if s.workingHash == nil { + // if migration is in progress, send the changeset to the migration manager + if s.isMigrating { + // if the migration manager has already migrated to the version, close the + // channels and replace the state storage and commitment + if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { + close(s.chDone) + close(s.chChangeset) + s.isMigrating = false + s.stateStorage = s.migrationManager.GetStateStorage() + s.stateCommitment = s.migrationManager.GetStateCommitment() + } else { + s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} + } + } + if err := s.writeSC(cs); err != nil { return nil, err } @@ -290,7 +318,12 @@ func (s *Store) Commit(cs *store.Changeset) ([]byte, error) { // commit SS async eg.Go(func() error { - if err := s.stateStore.ApplyChangeset(version, cs); err != nil { + // if we're migrating, we don't want to commit to the state storage + if s.stateStorage == nil { + return nil + } + + if err := s.stateStorage.ApplyChangeset(version, cs); err != nil { return fmt.Errorf("failed to commit SS: %w", err) } @@ -326,7 +359,7 @@ func (s *Store) Prune(version uint64) error { defer s.telemetry.MeasureSince(now, "root_store", "prune") } - if err := s.stateStore.Prune(version); err != nil { + if err := s.stateStorage.Prune(version); err != nil { return fmt.Errorf("failed to prune SS store: %w", err) } @@ -337,6 +370,31 @@ func (s *Store) Prune(version uint64) error { return nil } +// StartMigration starts the migration process. It sets the migration manager +// and initializes the channels. An error is returned if migration is already in +// progress. +// NOTE: This method should only be called once after loadVersion. +func (s *Store) StartMigration() error { + if s.isMigrating { + return fmt.Errorf("migration already in progress") + } + + // buffer the changeset channel to avoid blocking + s.chChangeset = make(chan *migration.VersionedChangeset, 1) + s.chDone = make(chan struct{}) + + s.isMigrating = true + + go func() { + version := s.lastCommitInfo.Version + if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { + s.logger.Error("failed to start migration", "err", err) + } + }() + + return nil +} + // writeSC accepts a Changeset and writes that as a batch to the underlying SC // tree, which allows us to retrieve the working hash of the SC tree. Finally, // we construct a *CommitInfo and set that as lastCommitInfo. Note, this should diff --git a/store/root/store_test.go b/store/root/store_test.go index 89d319129f71..a717313c135b 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -45,7 +45,7 @@ func (s *RootStoreTestSuite) SetupTest() { sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog) s.Require().NoError(err) - rs, err := New(noopLog, ss, sc, nil) + rs, err := New(noopLog, ss, sc, nil, nil) s.Require().NoError(err) s.rootStore = rs @@ -61,7 +61,7 @@ func (s *RootStoreTestSuite) TestGetStateCommitment() { } func (s *RootStoreTestSuite) TestGetStateStorage() { - s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStore) + s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStorage) } func (s *RootStoreTestSuite) TestSetInitialVersion() { diff --git a/store/store.go b/store/store.go index a26afb4298a0..ec8d1f81ea62 100644 --- a/store/store.go +++ b/store/store.go @@ -72,6 +72,12 @@ type RootStore interface { // old versions of the RootStore by the CLI. Prune(version uint64) error + // StartMigration starts a migration process to migrate the RootStore/v1 to the + // SS and SC backends of store/v2. + // It runs in a separate goroutine and replaces the current RootStore with the + // migrated new backends once the migration is complete. + StartMigration() error + // SetMetrics sets the telemetry handler on the RootStore. SetMetrics(m metrics.Metrics) From 460cedbfd0ceea99c51e7e26bc47080212a66d99 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 20 Feb 2024 12:34:31 -0500 Subject: [PATCH 02/10] comments --- store/changeset.go | 1 - store/changeset_test.go | 47 ++++++++++++++++++++++++++++++++++++++ store/migration/manager.go | 16 +++++++++---- store/root/migrate_test.go | 4 ++-- 4 files changed, 60 insertions(+), 8 deletions(-) create mode 100644 store/changeset_test.go diff --git a/store/changeset.go b/store/changeset.go index b7d431086223..c51dda9eeda5 100644 --- a/store/changeset.go +++ b/store/changeset.go @@ -126,7 +126,6 @@ func (cs *Changeset) Unmarshal(buf []byte) error { } buf = buf[n:] - cs.Pairs = make(map[string]KVPairs, storeCount) for i := uint64(0); i < storeCount; i++ { storeKey, n, err := encoding.DecodeBytes(buf) if err != nil { diff --git a/store/changeset_test.go b/store/changeset_test.go new file mode 100644 index 000000000000..d0158fdd1be5 --- /dev/null +++ b/store/changeset_test.go @@ -0,0 +1,47 @@ +package store + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestChangesetMarshal(t *testing.T) { + testcases := []struct { + name string + changeset *Changeset + encodedSize int + encodedBytes []byte + }{ + { + name: "empty", + changeset: NewChangeset(), + encodedSize: 1, + encodedBytes: []byte{0x0}, + }, + { + name: "one store", + changeset: &Changeset{Pairs: map[string]KVPairs{"storekey": {{Key: []byte("key"), Value: []byte("value"), StoreKey: "storekey"}}}}, + encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1 + 5, + encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65}, + }, + { + name: "two stores", + changeset: &Changeset{Pairs: map[string]KVPairs{"storekey1": {{Key: []byte("key1"), Value: []byte("value1"), StoreKey: "storekey1"}}, "storekey2": {{Key: []byte("key2"), Value: []byte("value2"), StoreKey: "storekey2"}, {Key: []byte("key3"), Value: []byte("value3"), StoreKey: "storekey2"}}}}, + encodedSize: 1 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 4 + 1 + 6, + encodedBytes: []byte{0x2, 0x9, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x31, 0x1, 0x4, 0x6b, 0x65, 0x79, 0x31, 0x6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x31, 0x9, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x32, 0x2, 0x4, 0x6b, 0x65, 0x79, 0x32, 0x6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x4, 0x6b, 0x65, 0x79, 0x33, 0x6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x33}, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.changeset.encodedSize(), tc.encodedSize, "encoded size mismatch") + encodedBytes, err := tc.changeset.Marshal() + require.NoError(t, err, "marshal error") + require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch") + cs := NewChangeset() + require.NoError(t, cs.Unmarshal(tc.encodedBytes), "unmarshal error") + require.Equal(t, cs, tc.changeset, "unmarshaled changeset mismatch") + }) + } +} diff --git a/store/migration/manager.go b/store/migration/manager.go index 21bd21f085d8..fd0d2d932816 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -21,7 +21,7 @@ const ( // defaultStorageBufferSize is the default buffer size for the storage snapshotter. defaultStorageBufferSize = 1024 - migrateChangesetKeyFmt = "m/cs_%x" // m/ + migrateChangesetKeyFmt = "m/cs_%x" // m/cs_ ) // VersionedChangeset is a pair of version and Changeset. @@ -39,10 +39,11 @@ type Manager struct { stateCommitment *commitment.CommitStore db store.RawDB + mtx sync.Mutex // mutex for migratedVersion migratedVersion uint64 - mtx sync.Mutex - chChangeset <-chan *VersionedChangeset - chDone chan struct{} + + chChangeset <-chan *VersionedChangeset + chDone <-chan struct{} } // NewManager returns a new Manager. @@ -57,7 +58,12 @@ func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, } // Start starts the whole migration process. -func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone chan struct{}) error { +// It migrates the whole state at the given version to the new store/v2 (both SC and SS). +// It also catches up the Changesets which are committed while the migration is in progress. +// `chChangeset` is the channel to receive the committed Changesets from the RootStore. +// `chDone` is the channel to receive the done signal from the RootStore. +// NOTE: It should be called by the RootStore, running in the background. +func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone <-chan struct{}) error { m.chChangeset = chChangeset m.chDone = chDone diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go index 4809a80f28c5..5ab497136dd5 100644 --- a/store/root/migrate_test.go +++ b/store/root/migrate_test.go @@ -123,9 +123,9 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { if version < toVersion { targetVersion = toVersion } - value, err := s.rootStore.Query(storeKey, targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + res, err := s.rootStore.Query(storeKey, targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) s.Require().NoError(err) - s.Require().NotNil(value) + s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value) } } } From 93666bfc5a397c472a8e86363882c2f95824bae3 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 20 Feb 2024 12:45:17 -0500 Subject: [PATCH 03/10] test --- store/changeset_test.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/store/changeset_test.go b/store/changeset_test.go index d0158fdd1be5..a313df2f9af9 100644 --- a/store/changeset_test.go +++ b/store/changeset_test.go @@ -26,22 +26,33 @@ func TestChangesetMarshal(t *testing.T) { encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65}, }, { - name: "two stores", - changeset: &Changeset{Pairs: map[string]KVPairs{"storekey1": {{Key: []byte("key1"), Value: []byte("value1"), StoreKey: "storekey1"}}, "storekey2": {{Key: []byte("key2"), Value: []byte("value2"), StoreKey: "storekey2"}, {Key: []byte("key3"), Value: []byte("value3"), StoreKey: "storekey2"}}}}, - encodedSize: 1 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 4 + 1 + 6, - encodedBytes: []byte{0x2, 0x9, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x31, 0x1, 0x4, 0x6b, 0x65, 0x79, 0x31, 0x6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x31, 0x9, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x32, 0x2, 0x4, 0x6b, 0x65, 0x79, 0x32, 0x6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x4, 0x6b, 0x65, 0x79, 0x33, 0x6, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x33}, + name: "two stores", + changeset: &Changeset{Pairs: map[string]KVPairs{"storekey1": {{Key: []byte("key1"), Value: []byte("value1"), StoreKey: "storekey1"}}, "storekey2": {{Key: []byte("key2"), Value: []byte("value2"), StoreKey: "storekey2"}, {Key: []byte("key3"), Value: []byte("value3"), StoreKey: "storekey2"}}}}, + encodedSize: 1 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 4 + 1 + 6, + // encodedBytes: it is not deterministic, }, } for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { + // check the encoded size require.Equal(t, tc.changeset.encodedSize(), tc.encodedSize, "encoded size mismatch") + // check the encoded bytes encodedBytes, err := tc.changeset.Marshal() require.NoError(t, err, "marshal error") - require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch") + if len(tc.encodedBytes) != 0 { + require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch") + } + // check the unmarshaled changeset cs := NewChangeset() - require.NoError(t, cs.Unmarshal(tc.encodedBytes), "unmarshal error") - require.Equal(t, cs, tc.changeset, "unmarshaled changeset mismatch") + require.NoError(t, cs.Unmarshal(encodedBytes), "unmarshal error") + require.Equal(t, len(tc.changeset.Pairs), len(cs.Pairs), "unmarshaled changeset store size mismatch") + for storeKey, pairs := range tc.changeset.Pairs { + require.Equal(t, len(pairs), len(cs.Pairs[storeKey]), "unmarshaled changeset pairs size mismatch") + for i, pair := range pairs { + require.Equal(t, pair, cs.Pairs[storeKey][i], "unmarshaled changeset pair mismatch") + } + } }) } } From 8a6c48b82317e7386e5ac29ed2029df196e59045 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 20 Feb 2024 16:40:56 -0500 Subject: [PATCH 04/10] state storage load --- store/migration/manager.go | 5 ---- store/root/migrate_test.go | 55 +++++++++++++++++++++----------------- store/root/store.go | 37 +++++++++---------------- store/storage/store.go | 2 +- 4 files changed, 44 insertions(+), 55 deletions(-) diff --git a/store/migration/manager.go b/store/migration/manager.go index fd0d2d932816..5be1826570b3 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -80,11 +80,6 @@ func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, return m.Sync() } -// GetStateStorage returns the state storage. -func (m *Manager) GetStateStorage() *storage.StorageStore { - return m.stateStorage -} - // GetStateCommitment returns the state commitment. func (m *Manager) GetStateCommitment() *commitment.CommitStore { return m.stateCommitment diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go index 5ab497136dd5..fcf916f7b5a2 100644 --- a/store/root/migrate_test.go +++ b/store/root/migrate_test.go @@ -41,9 +41,24 @@ func (s *MigrateStoreTestSuite) SetupTest() { prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, noopLog, iavl.DefaultConfig()) } - commitStore, err := commitment.NewCommitStore(multiTrees, mdb, nil, noopLog) + orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, noopLog) s.Require().NoError(err) + // apply changeset against the original store + toVersion := uint64(100) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := store.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + } + } + s.Require().NoError(orgSC.WriteBatch(cs)) + _, err = orgSC.Commit(version) + s.Require().NoError(err) + } + // create a new storage and commitment stores sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) @@ -58,37 +73,27 @@ func (s *MigrateStoreTestSuite) SetupTest() { snapshotsStore, err := snapshots.NewStore(dbm.NewMemDB(), s.T().TempDir()) s.Require().NoError(err) - snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), commitStore, nil, nil, noopLog) + snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, noopLog) migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, noopLog) // assume no storage store, simulate the migration process - s.rootStore, err = New(noopLog, nil, commitStore, migrationManager, nil) + s.rootStore, err = New(noopLog, ss, orgSC, migrationManager, nil) s.Require().NoError(err) } func (s *MigrateStoreTestSuite) TestMigrateState() { - // apply changeset against the original store - toVersion := uint64(100) - keyCount := 10 - for version := uint64(1); version <= toVersion; version++ { - cs := store.NewChangeset() - for _, storeKey := range storeKeys { - for i := 0; i < keyCount; i++ { - cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) - } - } - _, err := s.rootStore.WorkingHash(cs) - s.Require().NoError(err) - _, err = s.rootStore.Commit(cs) - s.Require().NoError(err) - } + err := s.rootStore.LoadLatestVersion() + s.Require().NoError(err) + originalLatestVersion, err := s.rootStore.GetLatestVersion() + s.Require().NoError(err) // start the migration process s.rootStore.StartMigration() // continue to apply changeset against the original store latestVersion := uint64(0) - for version := toVersion + 1; ; version++ { + keyCount := 10 + for version := originalLatestVersion + 1; ; version++ { cs := store.NewChangeset() for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { @@ -100,9 +105,11 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { _, err = s.rootStore.Commit(cs) s.Require().NoError(err) - // check if the migration is complete - if s.rootStore.GetStateStorage() != nil { - latestVersion = version + // check if the migration is completed + ver, err := s.rootStore.GetStateStorage().GetLatestVersion() + s.Require().NoError(err) + if ver == version { + latestVersion = ver break } @@ -120,8 +127,8 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { targetVersion := version - if version < toVersion { - targetVersion = toVersion + if version < originalLatestVersion { + targetVersion = originalLatestVersion } res, err := s.rootStore.Query(storeKey, targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) s.Require().NoError(err) diff --git a/store/root/store.go b/store/root/store.go index 36b04e105fd4..758abe320b57 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -50,7 +50,8 @@ type Store struct { migrationManager *migration.Manager // chChangeset reflects the channel used to send the changeset to the migration manager chChangeset chan *migration.VersionedChangeset - // chDone reflects the channel used to signal the migration manager that the migration is done + // chDone reflects the channel used to signal the migration manager that the migration + // is done chDone chan struct{} // isMigrating reflects whether the store is currently migrating isMigrating bool @@ -128,32 +129,17 @@ func (s *Store) GetStateCommitment() store.Committer { // LastCommitID returns a CommitID based off of the latest internal CommitInfo. // If an internal CommitInfo is not set, a new one will be returned with only the -// latest version set, which is based off of the SS view. +// latest version set, which is based off of the SC view. func (s *Store) LastCommitID() (proof.CommitID, error) { if s.lastCommitInfo != nil { return s.lastCommitInfo.CommitID(), nil } - // XXX/TODO: We cannot use SS to get the latest version when lastCommitInfo - // is nil if SS is flushed asynchronously. This is because the latest version - // in SS might not be the latest version in the SC stores. - // - // Ref: https://github.com/cosmos/cosmos-sdk/issues/17314 - latestVersion, err := s.stateStorage.GetLatestVersion() - if err != nil { - return proof.CommitID{}, err - } - - // sanity check: ensure integrity of latest version against SC - scVersion, err := s.stateCommitment.GetLatestVersion() + latestVersion, err := s.stateCommitment.GetLatestVersion() if err != nil { return proof.CommitID{}, err } - if scVersion != latestVersion { - return proof.CommitID{}, fmt.Errorf("SC and SS version mismatch; got: %d, expected: %d", scVersion, latestVersion) - } - return proof.CommitID{Version: latestVersion}, nil } @@ -271,12 +257,11 @@ func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) { // if migration is in progress, send the changeset to the migration manager if s.isMigrating { // if the migration manager has already migrated to the version, close the - // channels and replace the state storage and commitment + // channels and replace the state commitment if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { close(s.chDone) close(s.chChangeset) s.isMigrating = false - s.stateStorage = s.migrationManager.GetStateStorage() s.stateCommitment = s.migrationManager.GetStateCommitment() } else { s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} @@ -319,7 +304,8 @@ func (s *Store) Commit(cs *store.Changeset) ([]byte, error) { // commit SS async eg.Go(func() error { // if we're migrating, we don't want to commit to the state storage - if s.stateStorage == nil { + // to avoid parallel writes + if s.isMigrating { return nil } @@ -370,17 +356,18 @@ func (s *Store) Prune(version uint64) error { return nil } -// StartMigration starts the migration process. It sets the migration manager -// and initializes the channels. An error is returned if migration is already in -// progress. +// StartMigration starts the migration process and initializes the channels. +// An error is returned if migration is already in progress. // NOTE: This method should only be called once after loadVersion. func (s *Store) StartMigration() error { if s.isMigrating { return fmt.Errorf("migration already in progress") } - // buffer the changeset channel to avoid blocking + // buffer at most 1 changeset, if the receiver is behind attempting to buffer + // more than 1 will block. s.chChangeset = make(chan *migration.VersionedChangeset, 1) + // it is used to signal the migration manager that the migration is done s.chDone = make(chan struct{}) s.isMigrating = true diff --git a/store/storage/store.go b/store/storage/store.go index 28ed550be6e7..40bb4148ec0f 100644 --- a/store/storage/store.go +++ b/store/storage/store.go @@ -82,7 +82,7 @@ func (ss *StorageStore) ApplyChangeset(version uint64, cs *store.Changeset) erro } } - return nil + return ss.db.SetLatestVersion(version) } // GetLatestVersion returns the latest version of the store. From c85bc5a0ab1c268521390883772e8327722c4ea4 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Tue, 20 Feb 2024 18:53:25 -0500 Subject: [PATCH 05/10] storage batch --- store/batch.go | 2 +- store/root/migrate_test.go | 20 ++++++++++---------- store/storage/pebbledb/batch.go | 3 ++- store/storage/rocksdb/batch.go | 3 ++- store/storage/sqlite/batch.go | 16 +++++++++++++--- store/storage/store.go | 7 +++++-- 6 files changed, 33 insertions(+), 18 deletions(-) diff --git a/store/batch.go b/store/batch.go index 752a7147f52b..36178a6a6327 100644 --- a/store/batch.go +++ b/store/batch.go @@ -13,7 +13,7 @@ type Batch interface { Write() error // Reset resets the batch. - Reset() + Reset() error } // RawBatch represents a group of writes. They may or may not be written atomically depending on the diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go index fcf916f7b5a2..8f94c8411cc0 100644 --- a/store/root/migrate_test.go +++ b/store/root/migrate_test.go @@ -33,19 +33,19 @@ func TestMigrateStoreTestSuite(t *testing.T) { } func (s *MigrateStoreTestSuite) SetupTest() { - noopLog := log.NewNopLogger() + testLog := log.NewTestLogger(s.T()) mdb := dbm.NewMemDB() multiTrees := make(map[string]commitment.Tree) for _, storeKey := range storeKeys { prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) - multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, noopLog, iavl.DefaultConfig()) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, testLog, iavl.DefaultConfig()) } - orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, noopLog) + orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, testLog) s.Require().NoError(err) // apply changeset against the original store - toVersion := uint64(100) + toVersion := uint64(200) keyCount := 10 for version := uint64(1); version <= toVersion; version++ { cs := store.NewChangeset() @@ -62,22 +62,22 @@ func (s *MigrateStoreTestSuite) SetupTest() { // create a new storage and commitment stores sqliteDB, err := sqlite.New(s.T().TempDir()) s.Require().NoError(err) - ss := storage.NewStorageStore(sqliteDB, nil, noopLog) + ss := storage.NewStorageStore(sqliteDB, nil, testLog) multiTrees1 := make(map[string]commitment.Tree) for _, storeKey := range storeKeys { - multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig()) + multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), testLog, iavl.DefaultConfig()) } - sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, noopLog) + sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog) s.Require().NoError(err) snapshotsStore, err := snapshots.NewStore(dbm.NewMemDB(), s.T().TempDir()) s.Require().NoError(err) - snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, noopLog) - migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, noopLog) + snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, testLog) + migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, testLog) // assume no storage store, simulate the migration process - s.rootStore, err = New(noopLog, ss, orgSC, migrationManager, nil) + s.rootStore, err = New(testLog, ss, orgSC, migrationManager, nil) s.Require().NoError(err) } diff --git a/store/storage/pebbledb/batch.go b/store/storage/pebbledb/batch.go index 5e9548d97c25..d026532e9094 100644 --- a/store/storage/pebbledb/batch.go +++ b/store/storage/pebbledb/batch.go @@ -41,8 +41,9 @@ func (b *Batch) Size() int { return b.batch.Len() } -func (b *Batch) Reset() { +func (b *Batch) Reset() error { b.batch.Reset() + return nil } func (b *Batch) set(storeKey string, tombstone uint64, key, value []byte) error { diff --git a/store/storage/rocksdb/batch.go b/store/storage/rocksdb/batch.go index e780b8059c78..db758d13d3c8 100644 --- a/store/storage/rocksdb/batch.go +++ b/store/storage/rocksdb/batch.go @@ -44,8 +44,9 @@ func (b Batch) Size() int { return len(b.batch.Data()) } -func (b Batch) Reset() { +func (b Batch) Reset() error { b.batch.Clear() + return nil } func (b Batch) Set(storeKey string, key, value []byte) error { diff --git a/store/storage/sqlite/batch.go b/store/storage/sqlite/batch.go index 82e8f3e5b306..1abdc220e531 100644 --- a/store/storage/sqlite/batch.go +++ b/store/storage/sqlite/batch.go @@ -23,19 +23,21 @@ type batchOp struct { } type Batch struct { + db *sql.DB tx *sql.Tx ops []batchOp size int version uint64 } -func NewBatch(storage *sql.DB, version uint64) (*Batch, error) { - tx, err := storage.Begin() +func NewBatch(db *sql.DB, version uint64) (*Batch, error) { + tx, err := db.Begin() if err != nil { return nil, fmt.Errorf("failed to create SQL transaction: %w", err) } return &Batch{ + db: db, tx: tx, ops: make([]batchOp, 0), version: version, @@ -46,10 +48,18 @@ func (b *Batch) Size() int { return b.size } -func (b *Batch) Reset() { +func (b *Batch) Reset() error { b.ops = nil b.ops = make([]batchOp, 0) b.size = 0 + + tx, err := b.db.Begin() + if err != nil { + return err + } + + b.tx = tx + return nil } func (b *Batch) Set(storeKey string, key, value []byte) error { diff --git a/store/storage/store.go b/store/storage/store.go index 40bb4148ec0f..fa7349cac50c 100644 --- a/store/storage/store.go +++ b/store/storage/store.go @@ -82,7 +82,7 @@ func (ss *StorageStore) ApplyChangeset(version uint64, cs *store.Changeset) erro } } - return ss.db.SetLatestVersion(version) + return nil } // GetLatestVersion returns the latest version of the store. @@ -134,6 +134,9 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *store.KVPair) if err := b.Write(); err != nil { return err } + if err := b.Reset(); err != nil { + return err + } } } @@ -143,7 +146,7 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *store.KVPair) } } - return ss.db.SetLatestVersion(version) + return nil } // Close closes the store. From 41a8b5db207901f871db5d148094cb5915a3e0ef Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 21 Feb 2024 18:08:24 -0500 Subject: [PATCH 06/10] logger --- store/root/migrate_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go index 8f94c8411cc0..3f1edf233722 100644 --- a/store/root/migrate_test.go +++ b/store/root/migrate_test.go @@ -34,12 +34,13 @@ func TestMigrateStoreTestSuite(t *testing.T) { func (s *MigrateStoreTestSuite) SetupTest() { testLog := log.NewTestLogger(s.T()) + nopLog := log.NewNopLogger() mdb := dbm.NewMemDB() multiTrees := make(map[string]commitment.Tree) for _, storeKey := range storeKeys { prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) - multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, testLog, iavl.DefaultConfig()) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) } orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, testLog) s.Require().NoError(err) @@ -66,7 +67,7 @@ func (s *MigrateStoreTestSuite) SetupTest() { multiTrees1 := make(map[string]commitment.Tree) for _, storeKey := range storeKeys { - multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), testLog, iavl.DefaultConfig()) + multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), nopLog, iavl.DefaultConfig()) } sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog) s.Require().NoError(err) From 2fbc4672f46c9f24576d0d35617c326c5da11a35 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Thu, 22 Feb 2024 09:04:06 -0500 Subject: [PATCH 07/10] add log --- store/root/migrate_test.go | 9 ++++----- store/root/store.go | 2 ++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go index 3f1edf233722..4797654da459 100644 --- a/store/root/migrate_test.go +++ b/store/root/migrate_test.go @@ -92,13 +92,13 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { s.rootStore.StartMigration() // continue to apply changeset against the original store - latestVersion := uint64(0) + latestVersion := originalLatestVersion + 1 keyCount := 10 - for version := originalLatestVersion + 1; ; version++ { + for ; latestVersion < 2*originalLatestVersion; latestVersion++ { cs := store.NewChangeset() for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", latestVersion, i)), []byte(fmt.Sprintf("value-%d-%d", latestVersion, i))) } } _, err := s.rootStore.WorkingHash(cs) @@ -109,8 +109,7 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { // check if the migration is completed ver, err := s.rootStore.GetStateStorage().GetLatestVersion() s.Require().NoError(err) - if ver == version { - latestVersion = ver + if ver == latestVersion { break } diff --git a/store/root/store.go b/store/root/store.go index 758abe320b57..e601b0ee5cbb 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -263,6 +263,7 @@ func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) { close(s.chChangeset) s.isMigrating = false s.stateCommitment = s.migrationManager.GetStateCommitment() + s.logger.Info("migration completed", "version", s.lastCommitInfo.Version) } else { s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} } @@ -374,6 +375,7 @@ func (s *Store) StartMigration() error { go func() { version := s.lastCommitInfo.Version + s.logger.Info("starting migration", "version", version) if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { s.logger.Error("failed to start migration", "err", err) } From f7ff2c119d52f375ff4ee1b992490789dd3c9302 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 20 Mar 2024 17:06:12 -0400 Subject: [PATCH 08/10] fix --- store/changeset.go | 166 ---------------------- store/changeset_test.go | 58 -------- store/db/db.go | 33 +++++ store/internal/encoding/changeset.go | 125 ++++++++++++++++ store/internal/encoding/changeset_test.go | 94 ++++++++++++ store/migration/manager.go | 9 +- store/root/migrate_test.go | 17 +-- store/storage/store.go | 4 +- 8 files changed, 269 insertions(+), 237 deletions(-) delete mode 100644 store/changeset.go delete mode 100644 store/changeset_test.go create mode 100644 store/db/db.go create mode 100644 store/internal/encoding/changeset.go create mode 100644 store/internal/encoding/changeset_test.go diff --git a/store/changeset.go b/store/changeset.go deleted file mode 100644 index c51dda9eeda5..000000000000 --- a/store/changeset.go +++ /dev/null @@ -1,166 +0,0 @@ -package store - -import ( - "bytes" - - "cosmossdk.io/store/v2/internal/encoding" -) - -// KVPair defines a key-value pair with additional metadata that is used to -// track writes. Deletion can be denoted by a nil value or explicitly by the -// Delete field. -type KVPair struct { - Key []byte - Value []byte - StoreKey string // Optional for snapshot restore -} - -type KVPairs []KVPair - -// Changeset defines a set of KVPair entries by maintaining a map from store key -// to a slice of KVPair objects. -type Changeset struct { - Pairs map[string]KVPairs -} - -func NewChangeset() *Changeset { - return &Changeset{ - Pairs: make(map[string]KVPairs), - } -} - -func NewChangesetWithPairs(pairs map[string]KVPairs) *Changeset { - return &Changeset{ - Pairs: pairs, - } -} - -// Size returns the number of key-value pairs in the batch. -func (cs *Changeset) Size() int { - cnt := 0 - for _, pairs := range cs.Pairs { - cnt += len(pairs) - } - - return cnt -} - -// Add adds a key-value pair to the ChangeSet. -func (cs *Changeset) Add(storeKey string, key, value []byte) { - cs.Pairs[storeKey] = append(cs.Pairs[storeKey], KVPair{ - Key: key, - Value: value, - StoreKey: storeKey, - }) -} - -// AddKVPair adds a KVPair to the ChangeSet. -func (cs *Changeset) AddKVPair(storeKey string, pair KVPair) { - cs.Pairs[storeKey] = append(cs.Pairs[storeKey], pair) -} - -// Merge merges the provided Changeset argument into the receiver. This may be -// useful when you have a Changeset that only pertains to a single store key, -// i.e. a map of size one, and you want to merge it into another. -func (cs *Changeset) Merge(other *Changeset) { - for storeKey, pairs := range other.Pairs { - cs.Pairs[storeKey] = append(cs.Pairs[storeKey], pairs...) - } -} - -// encodedSize returns the size of the encoded Changeset. -func (cs *Changeset) encodedSize() int { - size := encoding.EncodeUvarintSize(uint64(len(cs.Pairs))) - for storeKey, pairs := range cs.Pairs { - size += encoding.EncodeBytesSize([]byte(storeKey)) - size += encoding.EncodeUvarintSize(uint64(len(pairs))) - for _, pair := range pairs { - size += encoding.EncodeBytesSize(pair.Key) - size += encoding.EncodeBytesSize(pair.Value) - } - } - return size -} - -// Marshal returns the encoded byte representation of Changeset. -// NOTE: The Changeset is encoded as follows: -// - number of store keys (uvarint) -// - for each store key: -// -- store key (bytes) -// -- number of pairs (uvarint) -// -- for each pair: -// --- key (bytes) -// --- value (bytes) -func (cs *Changeset) Marshal() ([]byte, error) { - var buf bytes.Buffer - buf.Grow(cs.encodedSize()) - - if err := encoding.EncodeUvarint(&buf, uint64(len(cs.Pairs))); err != nil { - return nil, err - } - for storeKey, pairs := range cs.Pairs { - if err := encoding.EncodeBytes(&buf, []byte(storeKey)); err != nil { - return nil, err - } - if err := encoding.EncodeUvarint(&buf, uint64(len(pairs))); err != nil { - return nil, err - } - for _, pair := range pairs { - if err := encoding.EncodeBytes(&buf, pair.Key); err != nil { - return nil, err - } - if err := encoding.EncodeBytes(&buf, pair.Value); err != nil { - return nil, err - } - } - } - - return buf.Bytes(), nil -} - -// Unmarshal decodes the Changeset from the given byte slice. -func (cs *Changeset) Unmarshal(buf []byte) error { - storeCount, n, err := encoding.DecodeUvarint(buf) - if err != nil { - return err - } - buf = buf[n:] - - for i := uint64(0); i < storeCount; i++ { - storeKey, n, err := encoding.DecodeBytes(buf) - if err != nil { - return err - } - buf = buf[n:] - - pairCount, n, err := encoding.DecodeUvarint(buf) - if err != nil { - return err - } - buf = buf[n:] - - pairs := make(KVPairs, pairCount) - for j := uint64(0); j < pairCount; j++ { - key, n, err := encoding.DecodeBytes(buf) - if err != nil { - return err - } - buf = buf[n:] - - value, n, err := encoding.DecodeBytes(buf) - if err != nil { - return err - } - buf = buf[n:] - - pairs[j] = KVPair{ - Key: key, - Value: value, - StoreKey: string(storeKey), - } - } - cs.Pairs[string(storeKey)] = pairs - } - - return nil -} diff --git a/store/changeset_test.go b/store/changeset_test.go deleted file mode 100644 index a313df2f9af9..000000000000 --- a/store/changeset_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package store - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestChangesetMarshal(t *testing.T) { - testcases := []struct { - name string - changeset *Changeset - encodedSize int - encodedBytes []byte - }{ - { - name: "empty", - changeset: NewChangeset(), - encodedSize: 1, - encodedBytes: []byte{0x0}, - }, - { - name: "one store", - changeset: &Changeset{Pairs: map[string]KVPairs{"storekey": {{Key: []byte("key"), Value: []byte("value"), StoreKey: "storekey"}}}}, - encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1 + 5, - encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65}, - }, - { - name: "two stores", - changeset: &Changeset{Pairs: map[string]KVPairs{"storekey1": {{Key: []byte("key1"), Value: []byte("value1"), StoreKey: "storekey1"}}, "storekey2": {{Key: []byte("key2"), Value: []byte("value2"), StoreKey: "storekey2"}, {Key: []byte("key3"), Value: []byte("value3"), StoreKey: "storekey2"}}}}, - encodedSize: 1 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 4 + 1 + 6, - // encodedBytes: it is not deterministic, - }, - } - - for _, tc := range testcases { - t.Run(tc.name, func(t *testing.T) { - // check the encoded size - require.Equal(t, tc.changeset.encodedSize(), tc.encodedSize, "encoded size mismatch") - // check the encoded bytes - encodedBytes, err := tc.changeset.Marshal() - require.NoError(t, err, "marshal error") - if len(tc.encodedBytes) != 0 { - require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch") - } - // check the unmarshaled changeset - cs := NewChangeset() - require.NoError(t, cs.Unmarshal(encodedBytes), "unmarshal error") - require.Equal(t, len(tc.changeset.Pairs), len(cs.Pairs), "unmarshaled changeset store size mismatch") - for storeKey, pairs := range tc.changeset.Pairs { - require.Equal(t, len(pairs), len(cs.Pairs[storeKey]), "unmarshaled changeset pairs size mismatch") - for i, pair := range pairs { - require.Equal(t, pair, cs.Pairs[storeKey][i], "unmarshaled changeset pair mismatch") - } - } - }) - } -} diff --git a/store/db/db.go b/store/db/db.go new file mode 100644 index 000000000000..dbbe25068ff4 --- /dev/null +++ b/store/db/db.go @@ -0,0 +1,33 @@ +package db + +import ( + "fmt" + + "cosmossdk.io/store/v2" +) + +type RawDBType string + +const ( + DBTypeGoLevelDB RawDBType = "goleveldb" + DBTypeRocksDB = "rocksdb" + DBTypePebbleDB = "pebbledb" + DBTypePrefixDB = "prefixdb" + + DBFileSuffix string = ".db" +) + +func NewRawDB(dbType RawDBType, name, dataDir string, opts store.DBOptions) (store.RawDB, error) { + switch dbType { + case DBTypeGoLevelDB: + return NewGoLevelDB(name, dataDir, opts) + + case DBTypeRocksDB: + return NewRocksDB(name, dataDir) + + case DBTypePebbleDB: + return NewPebbleDB(name, dataDir) + } + + return nil, fmt.Errorf("unsupported db type: %s", dbType) +} diff --git a/store/internal/encoding/changeset.go b/store/internal/encoding/changeset.go new file mode 100644 index 000000000000..8aefed75c327 --- /dev/null +++ b/store/internal/encoding/changeset.go @@ -0,0 +1,125 @@ +package encoding + +import ( + "bytes" + "fmt" + + corestore "cosmossdk.io/core/store" +) + +// encodedSize returns the size of the encoded Changeset. +func encodedSize(cs *corestore.Changeset) int { + size := EncodeUvarintSize(uint64(len(cs.Changes))) + for _, changes := range cs.Changes { + size += EncodeBytesSize(changes.Actor) + size += EncodeUvarintSize(uint64(len(changes.StateChanges))) + for _, pair := range changes.StateChanges { + size += EncodeBytesSize(pair.Key) + size += EncodeUvarintSize(1) // pair.Remove + if !pair.Remove { + size += EncodeBytesSize(pair.Value) + } + } + } + return size +} + +// MarshalChangeset returns the encoded byte representation of Changeset. +// NOTE: The Changeset is encoded as follows: +// - number of store keys (uvarint) +// - for each store key: +// -- store key (bytes) +// -- number of pairs (uvarint) +// -- for each pair: +// --- key (bytes) +// --- remove (1 byte) +// --- value (bytes) +func MarshalChangeset(cs *corestore.Changeset) ([]byte, error) { + var buf bytes.Buffer + buf.Grow(encodedSize(cs)) + + if err := EncodeUvarint(&buf, uint64(len(cs.Changes))); err != nil { + return nil, err + } + for _, changes := range cs.Changes { + if err := EncodeBytes(&buf, changes.Actor); err != nil { + return nil, err + } + if err := EncodeUvarint(&buf, uint64(len(changes.StateChanges))); err != nil { + return nil, err + } + for _, pair := range changes.StateChanges { + if err := EncodeBytes(&buf, pair.Key); err != nil { + return nil, err + } + if pair.Remove { + if err := EncodeUvarint(&buf, 1); err != nil { + return nil, err + } + } else { + if err := EncodeUvarint(&buf, 0); err != nil { + return nil, err + } + if err := EncodeBytes(&buf, pair.Value); err != nil { + return nil, err + } + } + } + } + + return buf.Bytes(), nil +} + +// UnmarshalChangeset decodes the Changeset from the given byte slice. +func UnmarshalChangeset(cs *corestore.Changeset, buf []byte) error { + storeCount, n, err := DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + changes := make([]corestore.StateChanges, storeCount) + for i := uint64(0); i < storeCount; i++ { + storeKey, n, err := DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairCount, n, err := DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairs := make([]corestore.KVPair, pairCount) + for j := uint64(0); j < pairCount; j++ { + pairs[j].Key, n, err = DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + remove, n, err := DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + if remove == 0 { + pairs[j].Remove = false + pairs[j].Value, n, err = DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + } else if remove == 1 { + pairs[j].Remove = true + } else { + return fmt.Errorf("invalid remove flag: %d", remove) + } + } + changes[i] = corestore.StateChanges{Actor: storeKey, StateChanges: pairs} + } + cs.Changes = changes + + return nil +} diff --git a/store/internal/encoding/changeset_test.go b/store/internal/encoding/changeset_test.go new file mode 100644 index 000000000000..ae6cb4e2d4a4 --- /dev/null +++ b/store/internal/encoding/changeset_test.go @@ -0,0 +1,94 @@ +package encoding + +import ( + "testing" + + corestore "cosmossdk.io/core/store" + "github.com/stretchr/testify/require" +) + +func TestChangesetMarshal(t *testing.T) { + testcases := []struct { + name string + changeset *corestore.Changeset + encodedSize int + encodedBytes []byte + }{ + { + name: "empty", + changeset: corestore.NewChangeset(), + encodedSize: 1, + encodedBytes: []byte{0x0}, + }, + { + name: "one store", + changeset: &corestore.Changeset{Changes: []corestore.StateChanges{ + { + Actor: []byte("storekey"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key"), Value: []byte("value"), Remove: false}, + }, + }, + }}, + encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1 + 1 + 5, + encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x0, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65}, + }, + { + name: "one remove store", + changeset: &corestore.Changeset{Changes: []corestore.StateChanges{ + { + Actor: []byte("storekey"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key"), Remove: true}, + }, + }, + }}, + encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1, + encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x1}, + }, + { + name: "two stores", + changeset: &corestore.Changeset{Changes: []corestore.StateChanges{ + { + Actor: []byte("storekey1"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key1"), Value: []byte("value1"), Remove: false}, + }, + }, + { + Actor: []byte("storekey2"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key2"), Value: []byte("value2"), Remove: false}, + {Key: []byte("key1"), Remove: true}, + }, + }, + }}, + encodedSize: 2 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 1 + 6 + 1 + 4 + 1, + // encodedBytes: it is not deterministic, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // check the encoded size + require.Equal(t, encodedSize(tc.changeset), tc.encodedSize, "encoded size mismatch") + // check the encoded bytes + encodedBytes, err := MarshalChangeset(tc.changeset) + require.NoError(t, err, "marshal error") + if len(tc.encodedBytes) != 0 { + require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch") + } + // check the unmarshaled changeset + cs := corestore.NewChangeset() + require.NoError(t, UnmarshalChangeset(cs, encodedBytes), "unmarshal error") + require.Equal(t, len(tc.changeset.Changes), len(cs.Changes), "unmarshaled changeset store size mismatch") + for i, changes := range tc.changeset.Changes { + require.Equal(t, changes.Actor, cs.Changes[i].Actor, "unmarshaled changeset store key mismatch") + require.Equal(t, len(changes.StateChanges), len(cs.Changes[i].StateChanges), "unmarshaled changeset StateChanges size mismatch") + for j, pair := range changes.StateChanges { + require.Equal(t, pair, cs.Changes[i].StateChanges[j], "unmarshaled changeset pair mismatch") + } + } + }) + } +} diff --git a/store/migration/manager.go b/store/migration/manager.go index fdc84455a67c..285709351c59 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -12,6 +12,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/internal/encoding" "cosmossdk.io/store/v2/snapshots" "cosmossdk.io/store/v2/storage" ) @@ -28,7 +29,7 @@ const ( // VersionedChangeset is a pair of version and Changeset. type VersionedChangeset struct { Version uint64 - Changeset *store.Changeset + Changeset *corestore.Changeset } // Manager manages the migration of the whole state from store/v1 to store/v2. @@ -127,7 +128,7 @@ func (m *Manager) writeChangeset() error { buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, vc.Version) csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) - csBytes, err := cs.Marshal() + csBytes, err := encoding.MarshalChangeset(cs) if err != nil { return fmt.Errorf("failed to marshal changeset: %w", err) } @@ -181,8 +182,8 @@ func (m *Manager) Sync() error { continue } - cs := store.NewChangeset() - if err := cs.Unmarshal(csBytes); err != nil { + cs := corestore.NewChangeset() + if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil { return fmt.Errorf("failed to unmarshal changeset: %w", err) } diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go index 4797654da459..63919d667f96 100644 --- a/store/root/migrate_test.go +++ b/store/root/migrate_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/suite" + corestore "cosmossdk.io/core/store" "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/commitment" @@ -49,10 +50,10 @@ func (s *MigrateStoreTestSuite) SetupTest() { toVersion := uint64(200) keyCount := 10 for version := uint64(1); version <= toVersion; version++ { - cs := store.NewChangeset() + cs := corestore.NewChangeset() for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) } } s.Require().NoError(orgSC.WriteBatch(cs)) @@ -72,7 +73,7 @@ func (s *MigrateStoreTestSuite) SetupTest() { sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog) s.Require().NoError(err) - snapshotsStore, err := snapshots.NewStore(dbm.NewMemDB(), s.T().TempDir()) + snapshotsStore, err := snapshots.NewStore(s.T().TempDir()) s.Require().NoError(err) snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, testLog) migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, testLog) @@ -95,10 +96,10 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { latestVersion := originalLatestVersion + 1 keyCount := 10 for ; latestVersion < 2*originalLatestVersion; latestVersion++ { - cs := store.NewChangeset() + cs := corestore.NewChangeset() for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", latestVersion, i)), []byte(fmt.Sprintf("value-%d-%d", latestVersion, i))) + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", latestVersion, i)), []byte(fmt.Sprintf("value-%d-%d", latestVersion, i)), false) } } _, err := s.rootStore.WorkingHash(cs) @@ -130,7 +131,7 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { if version < originalLatestVersion { targetVersion = originalLatestVersion } - res, err := s.rootStore.Query(storeKey, targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + res, err := s.rootStore.Query([]byte(storeKey), targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) s.Require().NoError(err) s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value) } @@ -143,10 +144,10 @@ func (s *MigrateStoreTestSuite) TestMigrateState() { // apply changeset against the migrated store for version := latestVersion + 1; version <= latestVersion+10; version++ { - cs := store.NewChangeset() + cs := corestore.NewChangeset() for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - cs.Add(storeKey, []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i))) + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) } } _, err := s.rootStore.WorkingHash(cs) diff --git a/store/storage/store.go b/store/storage/store.go index 11e1a4f8deef..436682c7f500 100644 --- a/store/storage/store.go +++ b/store/storage/store.go @@ -130,11 +130,13 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.Stat if err := b.Set(kvPair.Actor, kv.Key, kv.Value); err != nil { return err } - if b.Size() > defaultBatchBufferSize { if err := b.Write(); err != nil { return err } + if err := b.Reset(); err != nil { + return err + } } } } From 654f5a19ce2f3fcd4679acfbb9d037e5df2fbdb1 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 20 Mar 2024 17:16:02 -0400 Subject: [PATCH 09/10] fix flaky test --- store/root/store.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/store/root/store.go b/store/root/store.go index ba28827123e4..68944c209d24 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "slices" + "sync" "time" "github.com/cockroachdb/errors" @@ -374,14 +375,21 @@ func (s *Store) StartMigration() error { s.isMigrating = true + mtx := sync.Mutex{} + mtx.Lock() go func() { version := s.lastCommitInfo.Version s.logger.Info("starting migration", "version", version) + mtx.Unlock() if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { s.logger.Error("failed to start migration", "err", err) } }() + // wait for the migration manager to start + mtx.Lock() + defer mtx.Unlock() + return nil } From 09c35e538800848e7d3066324c30ab8d1053a7ac Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 20 Mar 2024 17:50:23 -0400 Subject: [PATCH 10/10] flaky tests --- store/snapshots/manager_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index b06c28216320..8295bdf5f665 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -232,6 +232,17 @@ func TestManager_Restore(t *testing.T) { Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, }) require.NoError(t, err) + + // Feeding the chunks should work + for i, chunk := range chunks { + done, err := manager.RestoreChunk(chunk) + require.NoError(t, err) + if i == len(chunks)-1 { + assert.True(t, done) + } else { + assert.False(t, done) + } + } } func TestManager_TakeError(t *testing.T) {