Skip to content

Commit

Permalink
fix(store/v2): clean up resources after the migration is completed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Mar 21, 2024
1 parent 9933a44 commit 8b4081f
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 17 deletions.
2 changes: 1 addition & 1 deletion store/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,5 @@ func (t *IavlTree) Import(version uint64) (commitment.Importer, error) {

// Close closes the iavl tree.
func (t *IavlTree) Close() error {
return nil
return t.tree.Close()
}
11 changes: 11 additions & 0 deletions store/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,14 @@ func (m *Manager) Sync() error {
}
}
}

// Close closes the manager. It should be called after the migration is done.
// It will close the db and notify the snapshotsManager that the migration is done.
func (m *Manager) Close() error {
if err := m.db.Close(); err != nil {
return fmt.Errorf("failed to close db: %w", err)
}
m.snapshotsManager.EndMigration(m.stateCommitment)

return nil
}
37 changes: 22 additions & 15 deletions store/root/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,6 @@ func (s *Store) WorkingHash(cs *corestore.Changeset) ([]byte, error) {
}

if s.workingHash == nil {
// if migration is in progress, send the changeset to the migration manager
if s.isMigrating {
// if the migration manager has already migrated to the version, close the
// channels and replace the state commitment
if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version {
close(s.chDone)
close(s.chChangeset)
s.isMigrating = false
s.stateCommitment = s.migrationManager.GetStateCommitment()
s.logger.Info("migration completed", "version", s.lastCommitInfo.Version)
} else {
s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs}
}
}

if err := s.writeSC(cs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -397,7 +382,29 @@ func (s *Store) StartMigration() error {
// tree, which allows us to retrieve the working hash of the SC tree. Finally,
// we construct a *CommitInfo and set that as lastCommitInfo. Note, this should
// only be called once per block!
// If migration is in progress, the changeset is sent to the migration manager.
func (s *Store) writeSC(cs *corestore.Changeset) error {
if s.isMigrating {
// if the migration manager has already migrated to the version, close the
// channels and replace the state commitment
if s.migrationManager.GetMigratedVersion() == s.lastCommitInfo.Version {
close(s.chDone)
close(s.chChangeset)
s.isMigrating = false
// close the old state commitment and replace it with the new one
if err := s.stateCommitment.Close(); err != nil {
return fmt.Errorf("failed to close the old SC store: %w", err)
}
s.stateCommitment = s.migrationManager.GetStateCommitment()
if err := s.migrationManager.Close(); err != nil {
return fmt.Errorf("failed to close migration manager: %w", err)
}
s.logger.Info("migration completed", "version", s.lastCommitInfo.Version)
} else {
s.chChangeset <- &migration.VersionedChangeset{Version: s.lastCommitInfo.Version + 1, Changeset: cs}
}
}

if err := s.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write batch to SC store: %w", err)
}
Expand Down
9 changes: 8 additions & 1 deletion store/snapshots/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error
if err != nil {
return err
}
defer m.end()
// m.end() will be called by the migration manager with EndMigration().

go func() {
if err := m.commitSnapshotter.Snapshot(height, protoWriter); err != nil {
Expand All @@ -258,6 +258,13 @@ func (m *Manager) CreateMigration(height uint64, protoWriter WriteCloser) error
return nil
}

// EndMigration ends the migration operation.
// It will replace the current commitSnapshotter with the new one.
func (m *Manager) EndMigration(commitSnapshotter CommitSnapshotter) {
defer m.end()
m.commitSnapshotter = commitSnapshotter
}

// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
func (m *Manager) List() ([]*types.Snapshot, error) {
return m.store.List()
Expand Down

0 comments on commit 8b4081f

Please sign in to comment.