diff --git a/store/batch.go b/store/batch.go index 752a7147f52b..36178a6a6327 100644 --- a/store/batch.go +++ b/store/batch.go @@ -13,7 +13,7 @@ type Batch interface { Write() error // Reset resets the batch. - Reset() + Reset() error } // RawBatch represents a group of writes. They may or may not be written atomically depending on the diff --git a/store/internal/encoding/changeset.go b/store/internal/encoding/changeset.go new file mode 100644 index 000000000000..8aefed75c327 --- /dev/null +++ b/store/internal/encoding/changeset.go @@ -0,0 +1,125 @@ +package encoding + +import ( + "bytes" + "fmt" + + corestore "cosmossdk.io/core/store" +) + +// encodedSize returns the size of the encoded Changeset. +func encodedSize(cs *corestore.Changeset) int { + size := EncodeUvarintSize(uint64(len(cs.Changes))) + for _, changes := range cs.Changes { + size += EncodeBytesSize(changes.Actor) + size += EncodeUvarintSize(uint64(len(changes.StateChanges))) + for _, pair := range changes.StateChanges { + size += EncodeBytesSize(pair.Key) + size += EncodeUvarintSize(1) // pair.Remove + if !pair.Remove { + size += EncodeBytesSize(pair.Value) + } + } + } + return size +} + +// MarshalChangeset returns the encoded byte representation of Changeset. +// NOTE: The Changeset is encoded as follows: +// - number of store keys (uvarint) +// - for each store key: +// -- store key (bytes) +// -- number of pairs (uvarint) +// -- for each pair: +// --- key (bytes) +// --- remove (1 byte) +// --- value (bytes) +func MarshalChangeset(cs *corestore.Changeset) ([]byte, error) { + var buf bytes.Buffer + buf.Grow(encodedSize(cs)) + + if err := EncodeUvarint(&buf, uint64(len(cs.Changes))); err != nil { + return nil, err + } + for _, changes := range cs.Changes { + if err := EncodeBytes(&buf, changes.Actor); err != nil { + return nil, err + } + if err := EncodeUvarint(&buf, uint64(len(changes.StateChanges))); err != nil { + return nil, err + } + for _, pair := range changes.StateChanges { + if err := EncodeBytes(&buf, pair.Key); err != nil { + return nil, err + } + if pair.Remove { + if err := EncodeUvarint(&buf, 1); err != nil { + return nil, err + } + } else { + if err := EncodeUvarint(&buf, 0); err != nil { + return nil, err + } + if err := EncodeBytes(&buf, pair.Value); err != nil { + return nil, err + } + } + } + } + + return buf.Bytes(), nil +} + +// UnmarshalChangeset decodes the Changeset from the given byte slice. +func UnmarshalChangeset(cs *corestore.Changeset, buf []byte) error { + storeCount, n, err := DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + changes := make([]corestore.StateChanges, storeCount) + for i := uint64(0); i < storeCount; i++ { + storeKey, n, err := DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairCount, n, err := DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + + pairs := make([]corestore.KVPair, pairCount) + for j := uint64(0); j < pairCount; j++ { + pairs[j].Key, n, err = DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + + remove, n, err := DecodeUvarint(buf) + if err != nil { + return err + } + buf = buf[n:] + if remove == 0 { + pairs[j].Remove = false + pairs[j].Value, n, err = DecodeBytes(buf) + if err != nil { + return err + } + buf = buf[n:] + } else if remove == 1 { + pairs[j].Remove = true + } else { + return fmt.Errorf("invalid remove flag: %d", remove) + } + } + changes[i] = corestore.StateChanges{Actor: storeKey, StateChanges: pairs} + } + cs.Changes = changes + + return nil +} diff --git a/store/internal/encoding/changeset_test.go b/store/internal/encoding/changeset_test.go new file mode 100644 index 000000000000..ae6cb4e2d4a4 --- /dev/null +++ b/store/internal/encoding/changeset_test.go @@ -0,0 +1,94 @@ +package encoding + +import ( + "testing" + + corestore "cosmossdk.io/core/store" + "github.com/stretchr/testify/require" +) + +func TestChangesetMarshal(t *testing.T) { + testcases := []struct { + name string + changeset *corestore.Changeset + encodedSize int + encodedBytes []byte + }{ + { + name: "empty", + changeset: corestore.NewChangeset(), + encodedSize: 1, + encodedBytes: []byte{0x0}, + }, + { + name: "one store", + changeset: &corestore.Changeset{Changes: []corestore.StateChanges{ + { + Actor: []byte("storekey"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key"), Value: []byte("value"), Remove: false}, + }, + }, + }}, + encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1 + 1 + 5, + encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x0, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65}, + }, + { + name: "one remove store", + changeset: &corestore.Changeset{Changes: []corestore.StateChanges{ + { + Actor: []byte("storekey"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key"), Remove: true}, + }, + }, + }}, + encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1, + encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x1}, + }, + { + name: "two stores", + changeset: &corestore.Changeset{Changes: []corestore.StateChanges{ + { + Actor: []byte("storekey1"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key1"), Value: []byte("value1"), Remove: false}, + }, + }, + { + Actor: []byte("storekey2"), + StateChanges: corestore.KVPairs{ + {Key: []byte("key2"), Value: []byte("value2"), Remove: false}, + {Key: []byte("key1"), Remove: true}, + }, + }, + }}, + encodedSize: 2 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 1 + 6 + 1 + 4 + 1, + // encodedBytes: it is not deterministic, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + // check the encoded size + require.Equal(t, encodedSize(tc.changeset), tc.encodedSize, "encoded size mismatch") + // check the encoded bytes + encodedBytes, err := MarshalChangeset(tc.changeset) + require.NoError(t, err, "marshal error") + if len(tc.encodedBytes) != 0 { + require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch") + } + // check the unmarshaled changeset + cs := corestore.NewChangeset() + require.NoError(t, UnmarshalChangeset(cs, encodedBytes), "unmarshal error") + require.Equal(t, len(tc.changeset.Changes), len(cs.Changes), "unmarshaled changeset store size mismatch") + for i, changes := range tc.changeset.Changes { + require.Equal(t, changes.Actor, cs.Changes[i].Actor, "unmarshaled changeset store key mismatch") + require.Equal(t, len(changes.StateChanges), len(cs.Changes[i].StateChanges), "unmarshaled changeset StateChanges size mismatch") + for j, pair := range changes.StateChanges { + require.Equal(t, pair, cs.Changes[i].StateChanges[j], "unmarshaled changeset pair mismatch") + } + } + }) + } +} diff --git a/store/migration/manager.go b/store/migration/manager.go index 112996042a60..285709351c59 100644 --- a/store/migration/manager.go +++ b/store/migration/manager.go @@ -1,11 +1,20 @@ package migration import ( + "encoding/binary" + "fmt" + "sync" + "time" + "golang.org/x/sync/errgroup" corestore "cosmossdk.io/core/store" "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/internal/encoding" "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" ) const ( @@ -13,25 +22,69 @@ const ( defaultChannelBufferSize = 1024 // defaultStorageBufferSize is the default buffer size for the storage snapshotter. defaultStorageBufferSize = 1024 + + migrateChangesetKeyFmt = "m/cs_%x" // m/cs_ ) +// VersionedChangeset is a pair of version and Changeset. +type VersionedChangeset struct { + Version uint64 + Changeset *corestore.Changeset +} + // Manager manages the migration of the whole state from store/v1 to store/v2. type Manager struct { logger log.Logger snapshotsManager *snapshots.Manager - storageSnapshotter snapshots.StorageSnapshotter - commitSnapshotter snapshots.CommitSnapshotter + stateStorage *storage.StorageStore + stateCommitment *commitment.CommitStore + + db store.RawDB + mtx sync.Mutex // mutex for migratedVersion + migratedVersion uint64 + + chChangeset <-chan *VersionedChangeset + chDone <-chan struct{} } // NewManager returns a new Manager. -func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager { +func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager { return &Manager{ - logger: logger, - snapshotsManager: sm, - storageSnapshotter: ss, - commitSnapshotter: cs, + logger: logger, + snapshotsManager: sm, + stateStorage: ss, + stateCommitment: sc, + db: db, + } +} + +// Start starts the whole migration process. +// It migrates the whole state at the given version to the new store/v2 (both SC and SS). +// It also catches up the Changesets which are committed while the migration is in progress. +// `chChangeset` is the channel to receive the committed Changesets from the RootStore. +// `chDone` is the channel to receive the done signal from the RootStore. +// NOTE: It should be called by the RootStore, running in the background. +func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone <-chan struct{}) error { + m.chChangeset = chChangeset + m.chDone = chDone + + go func() { + if err := m.writeChangeset(); err != nil { + m.logger.Error("failed to write changeset", "err", err) + } + }() + + if err := m.Migrate(version); err != nil { + return fmt.Errorf("failed to migrate state: %w", err) } + + return m.Sync() +} + +// GetStateCommitment returns the state commitment. +func (m *Manager) GetStateCommitment() *commitment.CommitStore { + return m.stateCommitment } // Migrate migrates the whole state at the given height to the new store/v2. @@ -49,13 +102,106 @@ func (m *Manager) Migrate(height uint64) error { eg := new(errgroup.Group) eg.Go(func() error { - return m.storageSnapshotter.Restore(height, chStorage) + return m.stateStorage.Restore(height, chStorage) }) eg.Go(func() error { defer close(chStorage) - _, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage) + _, err := m.stateCommitment.Restore(height, 0, ms, chStorage) return err }) - return eg.Wait() + if err := eg.Wait(); err != nil { + return err + } + + m.mtx.Lock() + m.migratedVersion = height + m.mtx.Unlock() + + return nil +} + +// writeChangeset writes the Changeset to the db. +func (m *Manager) writeChangeset() error { + for vc := range m.chChangeset { + cs := vc.Changeset + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, vc.Version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csBytes, err := encoding.MarshalChangeset(cs) + if err != nil { + return fmt.Errorf("failed to marshal changeset: %w", err) + } + + batch := m.db.NewBatch() + defer batch.Close() + + if err := batch.Set(csKey, csBytes); err != nil { + return fmt.Errorf("failed to write changeset to db.Batch: %w", err) + } + if err := batch.Write(); err != nil { + return fmt.Errorf("failed to write changeset to db: %w", err) + } + } + + return nil +} + +// GetMigratedVersion returns the migrated version. +// It is used to check the migrated version in the RootStore. +func (m *Manager) GetMigratedVersion() uint64 { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.migratedVersion +} + +// Sync catches up the Changesets which are committed while the migration is in progress. +// It should be called after the migration is done. +func (m *Manager) Sync() error { + version := m.GetMigratedVersion() + if version == 0 { + return fmt.Errorf("migration is not done yet") + } + version += 1 + + for { + select { + case <-m.chDone: + return nil + default: + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, version) + csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf)) + csBytes, err := m.db.Get(csKey) + if err != nil { + return fmt.Errorf("failed to get changeset from db: %w", err) + } + if csBytes == nil { + // wait for the next changeset + time.Sleep(100 * time.Millisecond) + continue + } + + cs := corestore.NewChangeset() + if err := encoding.UnmarshalChangeset(cs, csBytes); err != nil { + return fmt.Errorf("failed to unmarshal changeset: %w", err) + } + + if err := m.stateCommitment.WriteBatch(cs); err != nil { + return fmt.Errorf("failed to write changeset to commitment: %w", err) + } + if _, err := m.stateCommitment.Commit(version); err != nil { + return fmt.Errorf("failed to commit changeset to commitment: %w", err) + } + if err := m.stateStorage.ApplyChangeset(version, cs); err != nil { + return fmt.Errorf("failed to write changeset to storage: %w", err) + } + + m.mtx.Lock() + m.migratedVersion = version + m.mtx.Unlock() + + version += 1 + } + } } diff --git a/store/migration/manager_test.go b/store/migration/manager_test.go index 69cf3b923863..0985dd561550 100644 --- a/store/migration/manager_test.go +++ b/store/migration/manager_test.go @@ -50,7 +50,7 @@ func setupMigrationManager(t *testing.T) (*Manager, *commitment.CommitStore) { newCommitStore, err := commitment.NewCommitStore(multiTrees1, db1, nil, log.NewNopLogger()) // for store/v2 require.NoError(t, err) - return NewManager(snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore + return NewManager(db, snapshotsManager, newStorageStore, newCommitStore, log.NewNopLogger()), commitStore } func TestMigrateState(t *testing.T) { @@ -78,17 +78,17 @@ func TestMigrateState(t *testing.T) { for version := uint64(1); version < toVersion; version++ { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - val, err := m.commitSnapshotter.(*commitment.CommitStore).Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + val, err := m.stateCommitment.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) require.NoError(t, err) require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) } } } // check the latest state - val, err := m.commitSnapshotter.(*commitment.CommitStore).Get([]byte("store1"), toVersion-1, []byte("key-100-1")) + val, err := m.stateCommitment.Get([]byte("store1"), toVersion-1, []byte("key-100-1")) require.NoError(t, err) require.Nil(t, val) - val, err = m.commitSnapshotter.(*commitment.CommitStore).Get([]byte("store2"), toVersion-1, []byte("key-100-0")) + val, err = m.stateCommitment.Get([]byte("store2"), toVersion-1, []byte("key-100-0")) require.NoError(t, err) require.Nil(t, val) @@ -96,7 +96,7 @@ func TestMigrateState(t *testing.T) { for version := uint64(1); version < toVersion; version++ { for _, storeKey := range storeKeys { for i := 0; i < keyCount; i++ { - val, err := m.storageSnapshotter.(*storage.StorageStore).Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) + val, err := m.stateStorage.Get([]byte(storeKey), toVersion-1, []byte(fmt.Sprintf("key-%d-%d", version, i))) require.NoError(t, err) require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", version, i)), val) } diff --git a/store/root/migrate_test.go b/store/root/migrate_test.go new file mode 100644 index 000000000000..63919d667f96 --- /dev/null +++ b/store/root/migrate_test.go @@ -0,0 +1,162 @@ +package root + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + corestore "cosmossdk.io/core/store" + "cosmossdk.io/log" + "cosmossdk.io/store/v2" + "cosmossdk.io/store/v2/commitment" + "cosmossdk.io/store/v2/commitment/iavl" + dbm "cosmossdk.io/store/v2/db" + "cosmossdk.io/store/v2/migration" + "cosmossdk.io/store/v2/snapshots" + "cosmossdk.io/store/v2/storage" + "cosmossdk.io/store/v2/storage/sqlite" +) + +var ( + storeKeys = []string{"store1", "store2", "store3"} +) + +type MigrateStoreTestSuite struct { + suite.Suite + + rootStore store.RootStore +} + +func TestMigrateStoreTestSuite(t *testing.T) { + suite.Run(t, &MigrateStoreTestSuite{}) +} + +func (s *MigrateStoreTestSuite) SetupTest() { + testLog := log.NewTestLogger(s.T()) + nopLog := log.NewNopLogger() + + mdb := dbm.NewMemDB() + multiTrees := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + prefixDB := dbm.NewPrefixDB(mdb, []byte(storeKey)) + multiTrees[storeKey] = iavl.NewIavlTree(prefixDB, nopLog, iavl.DefaultConfig()) + } + orgSC, err := commitment.NewCommitStore(multiTrees, mdb, nil, testLog) + s.Require().NoError(err) + + // apply changeset against the original store + toVersion := uint64(200) + keyCount := 10 + for version := uint64(1); version <= toVersion; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + s.Require().NoError(orgSC.WriteBatch(cs)) + _, err = orgSC.Commit(version) + s.Require().NoError(err) + } + + // create a new storage and commitment stores + sqliteDB, err := sqlite.New(s.T().TempDir()) + s.Require().NoError(err) + ss := storage.NewStorageStore(sqliteDB, nil, testLog) + + multiTrees1 := make(map[string]commitment.Tree) + for _, storeKey := range storeKeys { + multiTrees1[storeKey] = iavl.NewIavlTree(dbm.NewMemDB(), nopLog, iavl.DefaultConfig()) + } + sc, err := commitment.NewCommitStore(multiTrees1, dbm.NewMemDB(), nil, testLog) + s.Require().NoError(err) + + snapshotsStore, err := snapshots.NewStore(s.T().TempDir()) + s.Require().NoError(err) + snapshotManager := snapshots.NewManager(snapshotsStore, snapshots.NewSnapshotOptions(1500, 2), orgSC, nil, nil, testLog) + migrationManager := migration.NewManager(dbm.NewMemDB(), snapshotManager, ss, sc, testLog) + + // assume no storage store, simulate the migration process + s.rootStore, err = New(testLog, ss, orgSC, migrationManager, nil) + s.Require().NoError(err) +} + +func (s *MigrateStoreTestSuite) TestMigrateState() { + err := s.rootStore.LoadLatestVersion() + s.Require().NoError(err) + originalLatestVersion, err := s.rootStore.GetLatestVersion() + s.Require().NoError(err) + + // start the migration process + s.rootStore.StartMigration() + + // continue to apply changeset against the original store + latestVersion := originalLatestVersion + 1 + keyCount := 10 + for ; latestVersion < 2*originalLatestVersion; latestVersion++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", latestVersion, i)), []byte(fmt.Sprintf("value-%d-%d", latestVersion, i)), false) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + + // check if the migration is completed + ver, err := s.rootStore.GetStateStorage().GetLatestVersion() + s.Require().NoError(err) + if ver == latestVersion { + break + } + + // add some delay to simulate the consensus process + time.Sleep(100 * time.Millisecond) + } + + // check if the migration is successful + version, err := s.rootStore.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(latestVersion, version) + + // query against the migrated store + for version := uint64(1); version <= latestVersion; version++ { + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + targetVersion := version + if version < originalLatestVersion { + targetVersion = originalLatestVersion + } + res, err := s.rootStore.Query([]byte(storeKey), targetVersion, []byte(fmt.Sprintf("key-%d-%d", version, i)), true) + s.Require().NoError(err) + s.Require().Equal([]byte(fmt.Sprintf("value-%d-%d", version, i)), res.Value) + } + } + } + + // prune the old versions + err = s.rootStore.Prune(latestVersion - 1) + s.Require().NoError(err) + + // apply changeset against the migrated store + for version := latestVersion + 1; version <= latestVersion+10; version++ { + cs := corestore.NewChangeset() + for _, storeKey := range storeKeys { + for i := 0; i < keyCount; i++ { + cs.Add([]byte(storeKey), []byte(fmt.Sprintf("key-%d-%d", version, i)), []byte(fmt.Sprintf("value-%d-%d", version, i)), false) + } + } + _, err := s.rootStore.WorkingHash(cs) + s.Require().NoError(err) + _, err = s.rootStore.Commit(cs) + s.Require().NoError(err) + } + + version, err = s.rootStore.GetLatestVersion() + s.Require().NoError(err) + s.Require().Equal(latestVersion+10, version) +} diff --git a/store/root/store.go b/store/root/store.go index bd751f76bca9..68944c209d24 100644 --- a/store/root/store.go +++ b/store/root/store.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "slices" + "sync" "time" "github.com/cockroachdb/errors" @@ -14,6 +15,7 @@ import ( "cosmossdk.io/log" "cosmossdk.io/store/v2" "cosmossdk.io/store/v2/metrics" + "cosmossdk.io/store/v2/migration" "cosmossdk.io/store/v2/proof" ) @@ -27,8 +29,8 @@ type Store struct { logger log.Logger initialVersion uint64 - // stateStore reflects the state storage backend - stateStore store.VersionedDatabase + // stateStorage reflects the state storage backend + stateStorage store.VersionedDatabase // stateCommitment reflects the state commitment (SC) backend stateCommitment store.Committer @@ -44,30 +46,43 @@ type Store struct { // telemetry reflects a telemetry agent responsible for emitting metrics (if any) telemetry metrics.StoreMetrics + + // Migration related fields + // migrationManager reflects the migration manager used to migrate state from v1 to v2 + migrationManager *migration.Manager + // chChangeset reflects the channel used to send the changeset to the migration manager + chChangeset chan *migration.VersionedChangeset + // chDone reflects the channel used to signal the migration manager that the migration + // is done + chDone chan struct{} + // isMigrating reflects whether the store is currently migrating + isMigrating bool } func New( logger log.Logger, ss store.VersionedDatabase, sc store.Committer, + mm *migration.Manager, m metrics.StoreMetrics, ) (store.RootStore, error) { return &Store{ - logger: logger.With("module", "root_store"), - initialVersion: 1, - stateStore: ss, - stateCommitment: sc, - telemetry: m, + logger: logger.With("module", "root_store"), + initialVersion: 1, + stateStorage: ss, + stateCommitment: sc, + migrationManager: mm, + telemetry: m, }, nil } // Close closes the store and resets all internal fields. Note, Close() is NOT // idempotent and should only be called once. func (s *Store) Close() (err error) { - err = errors.Join(err, s.stateStore.Close()) + err = errors.Join(err, s.stateStorage.Close()) err = errors.Join(err, s.stateCommitment.Close()) - s.stateStore = nil + s.stateStorage = nil s.stateCommitment = nil s.lastCommitInfo = nil s.commitHeader = nil @@ -107,7 +122,7 @@ func (s *Store) StateAt(v uint64) (corestore.ReaderMap, error) { } func (s *Store) GetStateStorage() store.VersionedDatabase { - return s.stateStore + return s.stateStorage } func (s *Store) GetStateCommitment() store.Committer { @@ -116,32 +131,17 @@ func (s *Store) GetStateCommitment() store.Committer { // LastCommitID returns a CommitID based off of the latest internal CommitInfo. // If an internal CommitInfo is not set, a new one will be returned with only the -// latest version set, which is based off of the SS view. +// latest version set, which is based off of the SC view. func (s *Store) LastCommitID() (proof.CommitID, error) { if s.lastCommitInfo != nil { return s.lastCommitInfo.CommitID(), nil } - // XXX/TODO: We cannot use SS to get the latest version when lastCommitInfo - // is nil if SS is flushed asynchronously. This is because the latest version - // in SS might not be the latest version in the SC stores. - // - // Ref: https://github.com/cosmos/cosmos-sdk/issues/17314 - latestVersion, err := s.stateStore.GetLatestVersion() - if err != nil { - return proof.CommitID{}, err - } - - // sanity check: ensure integrity of latest version against SC - scVersion, err := s.stateCommitment.GetLatestVersion() + latestVersion, err := s.stateCommitment.GetLatestVersion() if err != nil { return proof.CommitID{}, err } - if scVersion != latestVersion { - return proof.CommitID{}, fmt.Errorf("SC and SS version mismatch; got: %d, expected: %d", scVersion, latestVersion) - } - return proof.CommitID{Version: latestVersion}, nil } @@ -163,7 +163,7 @@ func (s *Store) Query(storeKey []byte, version uint64, key []byte, prove bool) ( defer s.telemetry.MeasureSince(now, "root_store", "query") } - val, err := s.stateStore.Get(storeKey, version, key) + val, err := s.stateStorage.Get(storeKey, version, key) if err != nil || val == nil { // fallback to querying SC backend if not found in SS backend // @@ -256,6 +256,21 @@ func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) { } if s.workingHash == nil { + // if migration is in progress, send the changeset to the migration manager + if s.isMigrating { + // if the migration manager has already migrated to the version, close the + // channels and replace the state commitment + if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version { + close(s.chDone) + close(s.chChangeset) + s.isMigrating = false + s.stateCommitment = s.migrationManager.GetStateCommitment() + s.logger.Info("migration completed", "version", s.lastCommitInfo.Version) + } else { + s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs} + } + } + if err := s.writeSC(cs); err != nil { return nil, err } @@ -291,7 +306,13 @@ func (s *Store) Commit(cs *corestore.Changeset) ([]byte, error) { // commit SS async eg.Go(func() error { - if err := s.stateStore.ApplyChangeset(version, cs); err != nil { + // if we're migrating, we don't want to commit to the state storage + // to avoid parallel writes + if s.isMigrating { + return nil + } + + if err := s.stateStorage.ApplyChangeset(version, cs); err != nil { return fmt.Errorf("failed to commit SS: %w", err) } @@ -327,7 +348,7 @@ func (s *Store) Prune(version uint64) error { defer s.telemetry.MeasureSince(now, "root_store", "prune") } - if err := s.stateStore.Prune(version); err != nil { + if err := s.stateStorage.Prune(version); err != nil { return fmt.Errorf("failed to prune SS store: %w", err) } @@ -338,6 +359,40 @@ func (s *Store) Prune(version uint64) error { return nil } +// StartMigration starts the migration process and initializes the channels. +// An error is returned if migration is already in progress. +// NOTE: This method should only be called once after loadVersion. +func (s *Store) StartMigration() error { + if s.isMigrating { + return fmt.Errorf("migration already in progress") + } + + // buffer at most 1 changeset, if the receiver is behind attempting to buffer + // more than 1 will block. + s.chChangeset = make(chan *migration.VersionedChangeset, 1) + // it is used to signal the migration manager that the migration is done + s.chDone = make(chan struct{}) + + s.isMigrating = true + + mtx := sync.Mutex{} + mtx.Lock() + go func() { + version := s.lastCommitInfo.Version + s.logger.Info("starting migration", "version", version) + mtx.Unlock() + if err := s.migrationManager.Start(version, s.chChangeset, s.chDone); err != nil { + s.logger.Error("failed to start migration", "err", err) + } + }() + + // wait for the migration manager to start + mtx.Lock() + defer mtx.Unlock() + + return nil +} + // writeSC accepts a Changeset and writes that as a batch to the underlying SC // tree, which allows us to retrieve the working hash of the SC tree. Finally, // we construct a *CommitInfo and set that as lastCommitInfo. Note, this should diff --git a/store/root/store_test.go b/store/root/store_test.go index 6773d496c7b0..d6c52367a265 100644 --- a/store/root/store_test.go +++ b/store/root/store_test.go @@ -52,7 +52,7 @@ func (s *RootStoreTestSuite) SetupTest() { sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree, testStoreKey2: tree2, testStoreKey3: tree3}, dbm.NewMemDB(), nil, noopLog) s.Require().NoError(err) - rs, err := New(noopLog, ss, sc, nil) + rs, err := New(noopLog, ss, sc, nil, nil) s.Require().NoError(err) s.rootStore = rs @@ -68,7 +68,7 @@ func (s *RootStoreTestSuite) TestGetStateCommitment() { } func (s *RootStoreTestSuite) TestGetStateStorage() { - s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStore) + s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStorage) } func (s *RootStoreTestSuite) TestSetInitialVersion() { diff --git a/store/snapshots/manager_test.go b/store/snapshots/manager_test.go index b06c28216320..8295bdf5f665 100644 --- a/store/snapshots/manager_test.go +++ b/store/snapshots/manager_test.go @@ -232,6 +232,17 @@ func TestManager_Restore(t *testing.T) { Metadata: types.Metadata{ChunkHashes: checksums(chunks)}, }) require.NoError(t, err) + + // Feeding the chunks should work + for i, chunk := range chunks { + done, err := manager.RestoreChunk(chunk) + require.NoError(t, err) + if i == len(chunks)-1 { + assert.True(t, done) + } else { + assert.False(t, done) + } + } } func TestManager_TakeError(t *testing.T) { diff --git a/store/storage/pebbledb/batch.go b/store/storage/pebbledb/batch.go index ae542b94bfc2..fed8c8a1b34c 100644 --- a/store/storage/pebbledb/batch.go +++ b/store/storage/pebbledb/batch.go @@ -41,8 +41,9 @@ func (b *Batch) Size() int { return b.batch.Len() } -func (b *Batch) Reset() { +func (b *Batch) Reset() error { b.batch.Reset() + return nil } func (b *Batch) set(storeKey []byte, tombstone uint64, key, value []byte) error { diff --git a/store/storage/rocksdb/batch.go b/store/storage/rocksdb/batch.go index 65954f1fa237..98d7d33f152e 100644 --- a/store/storage/rocksdb/batch.go +++ b/store/storage/rocksdb/batch.go @@ -44,8 +44,9 @@ func (b Batch) Size() int { return len(b.batch.Data()) } -func (b Batch) Reset() { +func (b Batch) Reset() error { b.batch.Clear() + return nil } func (b Batch) Set(storeKey []byte, key, value []byte) error { diff --git a/store/storage/sqlite/batch.go b/store/storage/sqlite/batch.go index fe04f1c265d2..ceb5e29ee625 100644 --- a/store/storage/sqlite/batch.go +++ b/store/storage/sqlite/batch.go @@ -23,19 +23,21 @@ type batchOp struct { } type Batch struct { + db *sql.DB tx *sql.Tx ops []batchOp size int version uint64 } -func NewBatch(storage *sql.DB, version uint64) (*Batch, error) { - tx, err := storage.Begin() +func NewBatch(db *sql.DB, version uint64) (*Batch, error) { + tx, err := db.Begin() if err != nil { return nil, fmt.Errorf("failed to create SQL transaction: %w", err) } return &Batch{ + db: db, tx: tx, ops: make([]batchOp, 0), version: version, @@ -46,10 +48,18 @@ func (b *Batch) Size() int { return b.size } -func (b *Batch) Reset() { +func (b *Batch) Reset() error { b.ops = nil b.ops = make([]batchOp, 0) b.size = 0 + + tx, err := b.db.Begin() + if err != nil { + return err + } + + b.tx = tx + return nil } func (b *Batch) Set(storeKey []byte, key, value []byte) error { diff --git a/store/storage/store.go b/store/storage/store.go index 8179836619f6..436682c7f500 100644 --- a/store/storage/store.go +++ b/store/storage/store.go @@ -130,11 +130,13 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.Stat if err := b.Set(kvPair.Actor, kv.Key, kv.Value); err != nil { return err } - if b.Size() > defaultBatchBufferSize { if err := b.Write(); err != nil { return err } + if err := b.Reset(); err != nil { + return err + } } } } @@ -145,7 +147,7 @@ func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.Stat } } - return ss.db.SetLatestVersion(version) + return nil } // Close closes the store. diff --git a/store/store.go b/store/store.go index e8437eee756e..bffede6ec3a4 100644 --- a/store/store.go +++ b/store/store.go @@ -72,6 +72,12 @@ type RootStore interface { // old versions of the RootStore by the CLI. Prune(version uint64) error + // StartMigration starts a migration process to migrate the RootStore/v1 to the + // SS and SC backends of store/v2. + // It runs in a separate goroutine and replaces the current RootStore with the + // migrated new backends once the migration is complete. + StartMigration() error + // SetMetrics sets the telemetry handler on the RootStore. SetMetrics(m metrics.Metrics)