Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): add the catch up process in the migration #19454

Merged
merged 20 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion store/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Batch interface {
Write() error

// Reset resets the batch.
Reset()
Reset() error
}

// RawBatch represents a group of writes. They may or may not be written atomically depending on the
Expand Down
103 changes: 103 additions & 0 deletions store/changeset.go
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package store

import (
"bytes"

"cosmossdk.io/store/v2/internal/encoding"
)

// KVPair defines a key-value pair with additional metadata that is used to
// track writes. Deletion can be denoted by a nil value or explicitly by the
// Delete field.
Expand Down Expand Up @@ -61,3 +67,100 @@ func (cs *Changeset) Merge(other *Changeset) {
cs.Pairs[storeKey] = append(cs.Pairs[storeKey], pairs...)
}
}

// encodedSize returns the size of the encoded Changeset.
func (cs *Changeset) encodedSize() int {
size := encoding.EncodeUvarintSize(uint64(len(cs.Pairs)))
for storeKey, pairs := range cs.Pairs {
size += encoding.EncodeBytesSize([]byte(storeKey))
size += encoding.EncodeUvarintSize(uint64(len(pairs)))
for _, pair := range pairs {
size += encoding.EncodeBytesSize(pair.Key)
size += encoding.EncodeBytesSize(pair.Value)
}
}
Fixed Show fixed Hide fixed
return size
}

// Marshal returns the encoded byte representation of Changeset.
// NOTE: The Changeset is encoded as follows:
// - number of store keys (uvarint)
// - for each store key:
// -- store key (bytes)
// -- number of pairs (uvarint)
// -- for each pair:
// --- key (bytes)
// --- value (bytes)
func (cs *Changeset) Marshal() ([]byte, error) {
var buf bytes.Buffer
buf.Grow(cs.encodedSize())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this isn't a hot code path it's probably OK, but it'd be better to grow in chunks rather than iterate cs.Pairs twice, once to calculate the size and once to marshal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's what IAVL is doing, I have no benchmarking result but it seems like it is introduced to improve the performance.


if err := encoding.EncodeUvarint(&buf, uint64(len(cs.Pairs))); err != nil {
return nil, err
}
for storeKey, pairs := range cs.Pairs {
if err := encoding.EncodeBytes(&buf, []byte(storeKey)); err != nil {
return nil, err
}
if err := encoding.EncodeUvarint(&buf, uint64(len(pairs))); err != nil {
return nil, err
}
for _, pair := range pairs {
if err := encoding.EncodeBytes(&buf, pair.Key); err != nil {
return nil, err
}
if err := encoding.EncodeBytes(&buf, pair.Value); err != nil {
return nil, err
}
}
}
Fixed Show fixed Hide fixed

return buf.Bytes(), nil
}

// Unmarshal decodes the Changeset from the given byte slice.
func (cs *Changeset) Unmarshal(buf []byte) error {
storeCount, n, err := encoding.DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]

for i := uint64(0); i < storeCount; i++ {
storeKey, n, err := encoding.DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]

pairCount, n, err := encoding.DecodeUvarint(buf)
if err != nil {
return err
}
buf = buf[n:]

pairs := make(KVPairs, pairCount)
for j := uint64(0); j < pairCount; j++ {
key, n, err := encoding.DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]

value, n, err := encoding.DecodeBytes(buf)
if err != nil {
return err
}
buf = buf[n:]

pairs[j] = KVPair{
Key: key,
Value: value,
StoreKey: string(storeKey),
}
}
cs.Pairs[string(storeKey)] = pairs
}

return nil
}
58 changes: 58 additions & 0 deletions store/changeset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package store

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestChangesetMarshal(t *testing.T) {
testcases := []struct {
name string
changeset *Changeset
encodedSize int
encodedBytes []byte
}{
{
name: "empty",
changeset: NewChangeset(),
encodedSize: 1,
encodedBytes: []byte{0x0},
},
{
name: "one store",
changeset: &Changeset{Pairs: map[string]KVPairs{"storekey": {{Key: []byte("key"), Value: []byte("value"), StoreKey: "storekey"}}}},
encodedSize: 1 + 1 + 8 + 1 + 1 + 3 + 1 + 5,
encodedBytes: []byte{0x1, 0x8, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x6b, 0x65, 0x79, 0x1, 0x3, 0x6b, 0x65, 0x79, 0x5, 0x76, 0x61, 0x6c, 0x75, 0x65},
},
{
name: "two stores",
changeset: &Changeset{Pairs: map[string]KVPairs{"storekey1": {{Key: []byte("key1"), Value: []byte("value1"), StoreKey: "storekey1"}}, "storekey2": {{Key: []byte("key2"), Value: []byte("value2"), StoreKey: "storekey2"}, {Key: []byte("key3"), Value: []byte("value3"), StoreKey: "storekey2"}}}},
encodedSize: 1 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 9 + 1 + 1 + 4 + 1 + 6 + 1 + 4 + 1 + 6,
// encodedBytes: it is not deterministic,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
// check the encoded size
require.Equal(t, tc.changeset.encodedSize(), tc.encodedSize, "encoded size mismatch")
// check the encoded bytes
encodedBytes, err := tc.changeset.Marshal()
require.NoError(t, err, "marshal error")
if len(tc.encodedBytes) != 0 {
require.Equal(t, encodedBytes, tc.encodedBytes, "encoded bytes mismatch")
}
// check the unmarshaled changeset
cs := NewChangeset()
require.NoError(t, cs.Unmarshal(encodedBytes), "unmarshal error")
require.Equal(t, len(tc.changeset.Pairs), len(cs.Pairs), "unmarshaled changeset store size mismatch")
for storeKey, pairs := range tc.changeset.Pairs {
require.Equal(t, len(pairs), len(cs.Pairs[storeKey]), "unmarshaled changeset pairs size mismatch")
for i, pair := range pairs {
require.Equal(t, pair, cs.Pairs[storeKey][i], "unmarshaled changeset pair mismatch")
}
}
})
}
}
164 changes: 154 additions & 10 deletions store/migration/manager.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,88 @@
package migration

import (
"encoding/binary"
"fmt"
"sync"
"time"

"golang.org/x/sync/errgroup"

"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
"cosmossdk.io/store/v2/snapshots"
"cosmossdk.io/store/v2/storage"
)

const (
// defaultChannelBufferSize is the default buffer size for the migration stream.
defaultChannelBufferSize = 1024
// defaultStorageBufferSize is the default buffer size for the storage snapshotter.
defaultStorageBufferSize = 1024

migrateChangesetKeyFmt = "m/cs_%x" // m/cs_<version>
)

// VersionedChangeset is a pair of version and Changeset.
type VersionedChangeset struct {
Version uint64
Changeset *store.Changeset
}

// Manager manages the migration of the whole state from store/v1 to store/v2.
type Manager struct {
logger log.Logger
snapshotsManager *snapshots.Manager

storageSnapshotter snapshots.StorageSnapshotter
commitSnapshotter snapshots.CommitSnapshotter
stateStorage *storage.StorageStore
stateCommitment *commitment.CommitStore

db store.RawDB
mtx sync.Mutex // mutex for migratedVersion
migratedVersion uint64

chChangeset <-chan *VersionedChangeset
chDone <-chan struct{}
}

// NewManager returns a new Manager.
func NewManager(sm *snapshots.Manager, ss snapshots.StorageSnapshotter, cs snapshots.CommitSnapshotter, logger log.Logger) *Manager {
func NewManager(db store.RawDB, sm *snapshots.Manager, ss *storage.StorageStore, sc *commitment.CommitStore, logger log.Logger) *Manager {
return &Manager{
logger: logger,
snapshotsManager: sm,
storageSnapshotter: ss,
commitSnapshotter: cs,
logger: logger,
snapshotsManager: sm,
stateStorage: ss,
stateCommitment: sc,
db: db,
}
}

// Start starts the whole migration process.
// It migrates the whole state at the given version to the new store/v2 (both SC and SS).
// It also catches up the Changesets which are committed while the migration is in progress.
// `chChangeset` is the channel to receive the committed Changesets from the RootStore.
// `chDone` is the channel to receive the done signal from the RootStore.
// NOTE: It should be called by the RootStore, running in the background.
func (m *Manager) Start(version uint64, chChangeset <-chan *VersionedChangeset, chDone <-chan struct{}) error {
m.chChangeset = chChangeset
m.chDone = chDone

go func() {
if err := m.writeChangeset(); err != nil {
m.logger.Error("failed to write changeset", "err", err)
}
}()
Dismissed Show dismissed Hide dismissed

if err := m.Migrate(version); err != nil {
return fmt.Errorf("failed to migrate state: %w", err)
}

return m.Sync()
}
cool-develope marked this conversation as resolved.
Show resolved Hide resolved

// GetStateCommitment returns the state commitment.
func (m *Manager) GetStateCommitment() *commitment.CommitStore {
return m.stateCommitment
}

// Migrate migrates the whole state at the given height to the new store/v2.
Expand All @@ -49,13 +100,106 @@ func (m *Manager) Migrate(height uint64) error {

eg := new(errgroup.Group)
eg.Go(func() error {
return m.storageSnapshotter.Restore(height, chStorage)
return m.stateStorage.Restore(height, chStorage)
})
eg.Go(func() error {
defer close(chStorage)
_, err := m.commitSnapshotter.Restore(height, 0, ms, chStorage)
_, err := m.stateCommitment.Restore(height, 0, ms, chStorage)
return err
})

return eg.Wait()
if err := eg.Wait(); err != nil {
return err
}

m.mtx.Lock()
m.migratedVersion = height
m.mtx.Unlock()

return nil
}

// writeChangeset writes the Changeset to the db.
func (m *Manager) writeChangeset() error {
for vc := range m.chChangeset {
cs := vc.Changeset
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, vc.Version)
csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf))
csBytes, err := cs.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal changeset: %w", err)
}

batch := m.db.NewBatch()
defer batch.Close()
Dismissed Show dismissed Hide dismissed

if err := batch.Set(csKey, csBytes); err != nil {
return fmt.Errorf("failed to write changeset to db.Batch: %w", err)
}
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write changeset to db: %w", err)
}
}

return nil
}
cool-develope marked this conversation as resolved.
Show resolved Hide resolved

// GetMigratedVersion returns the migrated version.
// It is used to check the migrated version in the RootStore.
func (m *Manager) GetMigratedVersion() uint64 {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.migratedVersion
}

// Sync catches up the Changesets which are committed while the migration is in progress.
// It should be called after the migration is done.
func (m *Manager) Sync() error {
version := m.GetMigratedVersion()
if version == 0 {
return fmt.Errorf("migration is not done yet")
}
version += 1

for {
select {
case <-m.chDone:
return nil
default:
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, version)
csKey := []byte(fmt.Sprintf(migrateChangesetKeyFmt, buf))
csBytes, err := m.db.Get(csKey)
if err != nil {
return fmt.Errorf("failed to get changeset from db: %w", err)
}
if csBytes == nil {
// wait for the next changeset
time.Sleep(100 * time.Millisecond)
continue
}

cs := store.NewChangeset()
if err := cs.Unmarshal(csBytes); err != nil {
return fmt.Errorf("failed to unmarshal changeset: %w", err)
}

if err := m.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write changeset to commitment: %w", err)
}
if _, err := m.stateCommitment.Commit(version); err != nil {
return fmt.Errorf("failed to commit changeset to commitment: %w", err)
}
if err := m.stateStorage.ApplyChangeset(version, cs); err != nil {
return fmt.Errorf("failed to write changeset to storage: %w", err)
}

m.mtx.Lock()
m.migratedVersion = version
m.mtx.Unlock()

version += 1
}
}
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading