Skip to content

Commit

Permalink
optimize replay (#9431)
Browse files Browse the repository at this point in the history
1. remove logallocator
2. reuse memory when read logentry payload

Approved by: @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored May 14, 2023
1 parent 8efbec0 commit 09ebd31
Show file tree
Hide file tree
Showing 22 changed files with 117 additions and 91 deletions.
6 changes: 0 additions & 6 deletions pkg/vm/engine/tae/common/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
var DefaultAllocator *mpool.MPool
var MutMemAllocator *mpool.MPool
var CacheAllocator *mpool.MPool
var LogAllocator *mpool.MPool

// init with zero fixed pool, for test.
func init() {
Expand All @@ -52,11 +51,6 @@ func InitTAEMPool() {
if CacheAllocator, err = mpool.NewMPool("tae_cache", 0, mpool.NoFixed); err != nil {
panic(err)
}

mpool.DeleteMPool(LogAllocator)
if LogAllocator, err = mpool.NewMPool("tae_log", 1024*1024*5, 0); err != nil {
panic(err)
}
}
once.Do(onceBody)
}
21 changes: 18 additions & 3 deletions pkg/vm/engine/tae/db/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
package db

import (

//"fmt"
"bytes"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand All @@ -42,6 +41,21 @@ type Replayer struct {
ckpedTS types.TS
}

type replayAllocator struct {
replayBuf bytes.Buffer
}

func newReplayAllocator() *replayAllocator {
return &replayAllocator{
replayBuf: bytes.Buffer{},
}
}

func (a *replayAllocator) Alloc(n int) ([]byte, error) {
a.replayBuf.Grow(n)
return a.replayBuf.Bytes(), nil
}

func newReplayer(dataFactory *tables.DataFactory, db *DB, ckpedTS types.TS) *Replayer {
return &Replayer{
DataFactory: dataFactory,
Expand Down Expand Up @@ -76,7 +90,8 @@ func (replayer *Replayer) PreReplayWal() {
}

func (replayer *Replayer) Replay() {
if err := replayer.db.Wal.Replay(replayer.OnReplayEntry); err != nil {
allocator := newReplayAllocator()
if err := replayer.db.Wal.Replay(replayer.OnReplayEntry, allocator); err != nil {
panic(err)
}
if _, err := replayer.db.Wal.Checkpoint(replayer.staleIndexes); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ func restartStore(s *baseStore, t *testing.T) *baseStore {
panic(moerr.NewInternalErrorNoCtx("logic error %d<%d", e.Lsn, tempLsn))
}
tempLsn = e.Lsn
_, err = s.Read(e.Lsn)
_, err = s.Read(e.Lsn, nil)
assert.NoError(t, err)
// logutil.Infof("lsn is %d",e.Lsn)
})
}, nil)
assert.NoError(t, err)
assert.Equal(t, maxlsn, s.GetCurrSeqNum())
assert.Equal(t, maxlsn, s.synced)
Expand Down Expand Up @@ -98,7 +98,7 @@ func concurrentAppendReadCheckpoint(s *baseStore, t *testing.T) {
return func() {
e := entries[i]
assert.NoError(t, e.WaitDone())
e2, err := s.Read(e.Lsn)
e2, err := s.Read(e.Lsn, nil)
assert.NoError(t, err)
assert.Equal(t, e2.Entry.GetInfo().(*storeEntry.Info).GroupLSN, e.Info.GroupLSN)
e2.Entry.Free()
Expand Down
9 changes: 5 additions & 4 deletions pkg/vm/engine/tae/logstore/driver/batchstoredriver/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
)

var suffix = ".rot"
Expand Down Expand Up @@ -167,7 +168,7 @@ func (rf *rotateFile) getEntryFromUncommitted(id int) (e *vFile) {
}
return nil
}
func (rf *rotateFile) Replay(r *replayer) error {
func (rf *rotateFile) Replay(r *replayer, allocator logstoreEntry.Allocator) error {
entryIDs := rf.history.EntryIds()
for _, vf := range rf.uncommitted {
entryIDs = append(entryIDs, vf.Id())
Expand All @@ -182,7 +183,7 @@ func (rf *rotateFile) Replay(r *replayer) error {
entry = vf
}

err := entry.Replay(r)
err := entry.Replay(r, allocator)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -316,12 +317,12 @@ func (rf *rotateFile) Sync() error {
return lastFile.Sync()
}

func (rf *rotateFile) Load(ver int, groupId uint32, lsn uint64) (*entry.Entry, error) {
func (rf *rotateFile) Load(ver int, groupId uint32, lsn uint64, allocator logstoreEntry.Allocator) (*entry.Entry, error) {
vf, err := rf.GetEntryByVersion(ver)
if err != nil {
return nil, err
}
return vf.Load(lsn)
return vf.Load(lsn, allocator)
}

func (rf *rotateFile) GetEntryByVersion(version int) (VFile, error) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/vm/engine/tae/logstore/driver/batchstoredriver/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
)

// wal ckping
Expand Down Expand Up @@ -77,14 +78,14 @@ func (r *replayer) onReplayEntry(e *entry.Entry) error {
return nil
}

func (r *replayer) replayHandler(vfile *vFile) error {
func (r *replayer) replayHandler(vfile *vFile, allocator logstoreEntry.Allocator) error {
if vfile.version != r.version {
r.pos = 0
r.version = vfile.version
}
e := entry.NewEmptyEntry()
t0 := time.Now()
_, err := e.ReadAt(vfile.File, r.pos)
_, err := e.ReadAt(vfile.File, r.pos, allocator)
r.readDuration += time.Since(t0)
if err != nil {
return err
Expand Down
9 changes: 5 additions & 4 deletions pkg/vm/engine/tae/logstore/driver/batchstoredriver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
)

Expand Down Expand Up @@ -267,10 +268,10 @@ func (bs *baseStore) Append(e *entry.Entry) error {
return nil
}

func (bs *baseStore) Replay(h driver.ApplyHandle) error {
func (bs *baseStore) Replay(h driver.ApplyHandle, allocator logstoreEntry.Allocator) error {
r := newReplayer(h)
bs.addrs = r.addrs
err := bs.file.Replay(r)
err := bs.file.Replay(r, allocator)
if err != nil {
return err
}
Expand All @@ -283,7 +284,7 @@ func (bs *baseStore) Replay(h driver.ApplyHandle) error {
return nil
}

func (bs *baseStore) Read(lsn uint64) (*entry.Entry, error) {
func (bs *baseStore) Read(lsn uint64, allocator logstoreEntry.Allocator) (*entry.Entry, error) {
ver, err := bs.retryGetVersionByGLSN(lsn)
if err != nil {
return nil, err
Expand All @@ -292,7 +293,7 @@ func (bs *baseStore) Read(lsn uint64) (*entry.Entry, error) {
if err != nil {
return nil, err
}
e, err := vf.Load(lsn)
e, err := vf.Load(lsn, allocator)
return e, err
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/vm/engine/tae/logstore/driver/batchstoredriver/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
)

type RotateChecker interface {
Expand All @@ -36,11 +37,11 @@ type VFile interface {
Name() string
String() string

Replay(*replayer) error
Replay(*replayer, logstoreEntry.Allocator) error
OnReplayCommitted()

Load(lsn uint64) (*entry.Entry, error)
LoadByOffset(offset int) (*entry.Entry, error)
Load(lsn uint64, allocator logstoreEntry.Allocator) (*entry.Entry, error)
LoadByOffset(offset int, allocator logstoreEntry.Allocator) (*entry.Entry, error)
}

type FileAppender interface {
Expand Down Expand Up @@ -78,17 +79,17 @@ type File interface {
GetEntryByVersion(version int) (VFile, error)
Sync() error
GetAppender() FileAppender
Replay(*replayer) error
Replay(*replayer, logstoreEntry.Allocator) error
GetHistory() History
Load(ver int, groupId uint32, lsn uint64) (*entry.Entry, error)
Load(ver int, groupId uint32, lsn uint64, allocator logstoreEntry.Allocator) (*entry.Entry, error)
}

type Store interface {
io.Closer
Append(*entry.Entry) error
Truncate(lsn uint64) error
GetTruncated() (lsn uint64, err error)
Read(lsn uint64) (*entry.Entry, error)
Read(lsn uint64, allocator logstoreEntry.Allocator) (*entry.Entry, error)
Close() error
Replay(driver.ApplyHandle) error
GetSynced(uint32) uint64
Expand Down
17 changes: 9 additions & 8 deletions pkg/vm/engine/tae/logstore/driver/batchstoredriver/vfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
)

var Metasize = 2
Expand Down Expand Up @@ -245,9 +246,9 @@ func (vf *vFile) Destroy() error {
return err
}

func (vf *vFile) Replay(r *replayer) error {
func (vf *vFile) Replay(r *replayer, allocator logstoreEntry.Allocator) error {
for {
if err := r.replayHandler(vf); err != nil {
if err := r.replayHandler(vf, allocator); err != nil {
if errors.Is(err, io.EOF) {
break
}
Expand All @@ -264,7 +265,7 @@ func (vf *vFile) OnLogInfo(info any) {
panic(err)
}
}
func (vf *vFile) Load(lsn uint64) (*entry.Entry, error) {
func (vf *vFile) Load(lsn uint64, allocator logstoreEntry.Allocator) (*entry.Entry, error) {
offset, err := vf.GetOffsetByLSN(lsn)
if err == ErrVFileGroupNotExist || err == ErrVFileLsnNotExist {
for i := 0; i < 10; i++ {
Expand All @@ -289,14 +290,14 @@ func (vf *vFile) Load(lsn uint64) (*entry.Entry, error) {
if err != nil {
return nil, err
}
return vf.readEntryAt(offset)
return vf.readEntryAt(offset, allocator)
}
func (vf *vFile) LoadByOffset(offset int) (*entry.Entry, error) {
return vf.readEntryAt(offset)
func (vf *vFile) LoadByOffset(offset int, allocator logstoreEntry.Allocator) (*entry.Entry, error) {
return vf.readEntryAt(offset, allocator)
}
func (vf *vFile) readEntryAt(offset int) (*entry.Entry, error) {
func (vf *vFile) readEntryAt(offset int, allocator logstoreEntry.Allocator) (*entry.Entry, error) {
e := entry.NewEmptyEntry()
_, err := e.ReadAt(vf.File, offset)
_, err := e.ReadAt(vf.File, offset, allocator)
return e, err
}
func (vf *vFile) OnReplayCommitted() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/vm/engine/tae/logstore/driver/entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ func (e *Entry) SetInfo() {
e.Info = info.(*entry.Info)
}
}
func (e *Entry) ReadFrom(r io.Reader) (n int64, err error) {
func (e *Entry) ReadFromWithAllocator(r io.Reader, allocator entry.Allocator) (n int64, err error) {
if _, err = r.Read(types.EncodeUint64(&e.Lsn)); err != nil {
return
}
_, err = e.Entry.ReadFrom(r)
_, err = e.Entry.ReadFromWithAllocator(r, allocator)
if err != nil {
panic(err)
}
e.Info = e.Entry.GetInfo().(*entry.Info)
return
}

func (e *Entry) ReadAt(r *os.File, offset int) (int, error) {
func (e *Entry) ReadAt(r *os.File, offset int, allocator entry.Allocator) (int, error) {
lsnbuf := make([]byte, 8)
n, err := r.ReadAt(lsnbuf, int64(offset))
if err != nil {
Expand All @@ -80,7 +80,7 @@ func (e *Entry) ReadAt(r *os.File, offset int) (int, error) {
return n, err
}

n2, err := e.Entry.ReadAt(r, offset)
n2, err := e.Entry.ReadAt(r, offset, allocator)
return n2 + n, err
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm"
)

Expand Down Expand Up @@ -112,10 +113,10 @@ func (d *LogServiceDriver) Close() error {
return nil
}

func (d *LogServiceDriver) Replay(h driver.ApplyHandle) error {
func (d *LogServiceDriver) Replay(h driver.ApplyHandle, allocator logstoreEntry.Allocator) error {
d.PreReplay()
r := newReplayer(h, ReplayReadSize, d)
r.replay()
r.replay(allocator)
d.onReplay(r)
r.d.resetReadCache()
d.PostReplay()
Expand Down
9 changes: 5 additions & 4 deletions pkg/vm/engine/tae/logstore/driver/logservicedriver/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/driver/entry"
logstoreEntry "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/entry"
)

type MetaType uint8
Expand Down Expand Up @@ -281,13 +282,13 @@ func newEmptyRecordEntry(r logservice.LogRecord) *recordEntry {
mashalMu: sync.RWMutex{}}
}

func (r *recordEntry) replay(h driver.ApplyHandle) (addr *common.ClosedIntervals) {
func (r *recordEntry) replay(h driver.ApplyHandle, allocator logstoreEntry.Allocator) (addr *common.ClosedIntervals) {
bbuf := bytes.NewBuffer(r.baseEntry.payload)
lsns := make([]uint64, 0)
for lsn := range r.meta.addr {
lsns = append(lsns, lsn)
e := entry.NewEmptyEntry()
e.ReadFrom(bbuf)
e.ReadFromWithAllocator(bbuf, allocator)
h(e)
e.Entry.Free()
}
Expand Down Expand Up @@ -329,12 +330,12 @@ func (r *recordEntry) unmarshal() {
r.unmarshaled.Store(1)
}

func (r *recordEntry) readEntry(lsn uint64) *entry.Entry {
func (r *recordEntry) readEntry(lsn uint64, allocator logstoreEntry.Allocator) *entry.Entry {
r.unmarshal()
offset := r.meta.addr[lsn]
bbuf := bytes.NewBuffer(r.baseEntry.payload[offset:])
e := entry.NewEmptyEntry()
e.ReadFrom(bbuf)
e.ReadFromWithAllocator(bbuf, allocator)
e.Lsn = lsn
return e
}
Loading

0 comments on commit 09ebd31

Please sign in to comment.