Skip to content

Commit

Permalink
Merge branch '2.0-dev' into dynamicmetadatacachecap2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 6, 2024
2 parents 56e7ea9 + 9577256 commit d27087a
Show file tree
Hide file tree
Showing 31 changed files with 1,253 additions and 440 deletions.
119 changes: 119 additions & 0 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,125 @@ func TestBackupData(t *testing.T) {

}

func TestBackupData2(t *testing.T) {
defer testutils.AfterTest(t)()
testutils.EnsureNoLeak(t)
ctx := context.Background()

opts := config.WithQuickScanAndCKPOpts(nil)
db := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer db.Close()
defer opts.Fs.Close()

schema := catalog.MockSchemaAll(13, 3)
schema.Extra.BlockMaxRows = 10
schema.Extra.ObjectMaxBlocks = 10
db.BindSchema(schema)
{
txn, err := db.DB.StartTxn(nil)
require.NoError(t, err)
dbH, err := testutil.CreateDatabase2(ctx, txn, "db")
require.NoError(t, err)
_, err = testutil.CreateRelation2(ctx, txn, dbH, schema)
require.NoError(t, err)
require.NoError(t, txn.Commit(ctx))
}

totalRows := uint64(schema.Extra.BlockMaxRows * 30)
bat := catalog.MockBatch(schema, int(totalRows))
defer bat.Close()
bats := bat.Split(100)

var wg sync.WaitGroup
pool, _ := ants.NewPool(80)
defer pool.Release()

start := time.Now()
for _, data := range bats {
wg.Add(1)
err := pool.Submit(testutil.AppendClosure(t, data, schema.Name, db.DB, &wg))
assert.Nil(t, err)
}
wg.Wait()
opts = config.WithLongScanAndCKPOpts(nil)
testutils.WaitExpect(5000, func() bool {
return db.DiskCleaner.GetCleaner().GetScanWaterMark() != nil
})
db.Restart(ctx, opts)
t.Logf("Append %d rows takes: %s", totalRows, time.Since(start))
deletedRows := 0
{
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)

obj := testutil.GetOneObject(rel)
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
require.NoError(t, err)
deletedRows = 1
testutil.CompactBlocks(t, 0, db.DB, "db", schema, false)

assert.NoError(t, txn.Commit(context.Background()))
}
t.Log(db.Catalog.SimplePPString(common.PPL1))

dir := path.Join(db.Dir, "/local")
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Backend: "DISK",
DataDir: dir,
}
service, err := fileservice.NewFileService(ctx, c, nil)
assert.Nil(t, err)
defer service.Close()
for _, data := range bats {
txn, rel := db.GetRelation()
v := testutil.GetSingleSortKeyValue(data, schema, 2)
filter := handle.NewEQFilter(v)
err := rel.DeleteByFilter(context.Background(), filter)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))
}
backupTime := time.Now().UTC()
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
locations = append(locations, location)
compacted := db.BGCheckpointRunner.GetCompacted()
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
files := make(map[string]string, 0)
for _, candidate := range checkpoints {
if files[candidate.GetLocation().Name().String()] == "" {
var loc string
loc = candidate.GetLocation().String()
loc += ":"
loc += fmt.Sprintf("%d", candidate.GetVersion())
files[candidate.GetLocation().Name().String()] = loc
}
}
for _, location := range files {
locations = append(locations, location)
}
fileList := make([]*taeFile, 0)
err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList)
assert.Nil(t, err)
fileMap := make(map[string]struct{})
for _, file := range fileList {
_, ok := fileMap[file.path]
assert.True(t, !ok)
fileMap[file.path] = struct{}{}
}
db.Opts.Fs = service
db.Restart(ctx)
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows-100)-deletedRows, true)
assert.NoError(t, txn.Commit(context.Background()))

}

func Test_saveTaeFilesList(t *testing.T) {
type args struct {
ctx context.Context
Expand Down
38 changes: 37 additions & 1 deletion pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
"io"
"os"
"path"
Expand Down Expand Up @@ -495,7 +496,42 @@ func CopyGCDir(
if err != nil {
return nil, err
}
for i, metaFile := range metaFiles {

copyFiles := make([]*checkpoint.MetaFile, 0)

for _, metaFile := range metaFiles {
name := metaFile.GetName()
window := gc.NewGCWindow(common.DebugAllocator, srcFs)
err = window.ReadTable(ctx, gc.GCMetaDir+name, srcFs)
if err != nil {
return nil, err
}
defer window.Close()
objects := window.GetObjectStats()
filesList := make([]*taeFile, 0)
needCopy := true
for _, object := range objects {
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, object.ObjectName().String(), "")
if err != nil {
logutil.Warnf("[Backup] copy file %v failed", object.ObjectName().String())
needCopy = false
break
}
filesList = append(filesList, &taeFile{
path: object.ObjectName().String(),
size: files[metaFile.GetIndex()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
})
}
if needCopy {
copyFiles = append(copyFiles, metaFile)
taeFileList = append(taeFileList, filesList...)
}
}

for i, metaFile := range copyFiles {
name := metaFile.GetName()
if i == len(metaFiles)-1 {
end := metaFile.GetEnd()
Expand Down
57 changes: 33 additions & 24 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type tableReader struct {
sinker Sinker
wMarkUpdater *WatermarkUpdater
tick *time.Ticker
restartFunc func(*DbTableInfo) error
resetWatermarkFunc func(*DbTableInfo) error
initSnapshotSplitTxn bool

tableDef *plan.TableDef
Expand All @@ -58,7 +58,7 @@ func NewTableReader(
sinker Sinker,
wMarkUpdater *WatermarkUpdater,
tableDef *plan.TableDef,
restartFunc func(*DbTableInfo) error,
resetWatermarkFunc func(*DbTableInfo) error,
initSnapshotSplitTxn bool,
) Reader {
reader := &tableReader{
Expand All @@ -70,7 +70,7 @@ func NewTableReader(
sinker: sinker,
wMarkUpdater: wMarkUpdater,
tick: time.NewTicker(200 * time.Millisecond),
restartFunc: restartFunc,
resetWatermarkFunc: resetWatermarkFunc,
initSnapshotSplitTxn: initSnapshotSplitTxn,
tableDef: tableDef,
}
Expand Down Expand Up @@ -112,19 +112,22 @@ func (reader *tableReader) Run(
}

if err := reader.readTable(ctx, ar); err != nil {
logutil.Errorf("cdc tableReader(%v) failed, err: %v\n", reader.info, err)
logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err)

// if stale read, try to restart reader
if moerr.IsMoErrCode(err, moerr.ErrStaleRead) {
if err = reader.restartFunc(reader.info); err != nil {
logutil.Errorf("cdc tableReader(%v) restart failed, err: %v\n", reader.info, err)
// reset sinker
reader.sinker.Reset()
// reset watermark
if err = reader.resetWatermarkFunc(reader.info); err != nil {
logutil.Errorf("cdc tableReader(%v) restart failed, err: %v", reader.info, err)
return
}
logutil.Errorf("cdc tableReader(%v) restart successfully\n", reader.info)
logutil.Errorf("cdc tableReader(%v) restart successfully", reader.info)
continue
}

logutil.Errorf("cdc tableReader(%v) err is not stale read, quit\n", reader.info)
logutil.Errorf("cdc tableReader(%v) err is not stale read, quit", reader.info)
return
}
}
Expand Down Expand Up @@ -172,8 +175,6 @@ func (reader *tableReader) readTableWithTxn(
txnOp client.TxnOperator,
packer *types.Packer,
ar *ActiveRoutine) (err error) {
v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load()))

var rel engine.Relation
var changes engine.ChangesHandle
//step1 : get relation
Expand Down Expand Up @@ -249,9 +250,15 @@ func (reader *tableReader) readTableWithTxn(
defer func() {
if hasBegin {
if err == nil {
_ = reader.sinker.SendCommit(ctx)
} else {
_ = reader.sinker.SendRollback(ctx)
// error may can't be caught immediately, but must be caught when next call
reader.sinker.SendCommit()
// so send a dummy sql to guarantee previous commit is sent successfully
reader.sinker.SendDummy()
err = reader.sinker.Error()
}

if err != nil {
reader.sinker.SendRollback()
}
}

Expand All @@ -271,7 +278,12 @@ func (reader *tableReader) readTableWithTxn(
return
default:
}
// check sinker error of last round
if err = reader.sinker.Error(); err != nil {
return
}

v2.CdcMpoolInUseBytesGauge.Set(float64(reader.mp.Stats().NumCurrBytes.Load()))
start = time.Now()
insertData, deleteData, curHint, err = changes.Next(ctx, reader.mp)
v2.CdcReadDurationHistogram.Observe(time.Since(start).Seconds())
Expand All @@ -282,11 +294,12 @@ func (reader *tableReader) readTableWithTxn(
// both nil denote no more data (end of this tail)
if insertData == nil && deleteData == nil {
// heartbeat
err = reader.sinker.Sink(ctx, &DecoderOutput{
reader.sinker.Sink(ctx, &DecoderOutput{
noMoreData: true,
fromTs: fromTs,
toTs: toTs,
})
err = reader.sinker.Error()
return
}

Expand All @@ -296,22 +309,20 @@ func (reader *tableReader) readTableWithTxn(
case engine.ChangesHandle_Snapshot:
// output sql in a txn
if !hasBegin && !reader.initSnapshotSplitTxn {
if err = reader.sinker.SendBegin(ctx); err != nil {
return err
}
reader.sinker.SendBegin()
hasBegin = true
}

// transform into insert instantly
err = reader.sinker.Sink(ctx, &DecoderOutput{
reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeSnapshot,
checkpointBat: insertData,
fromTs: fromTs,
toTs: toTs,
})
addSnapshotEndMetrics()
insertData.Clean(reader.mp)
if err != nil {
if err = reader.sinker.Error(); err != nil {
return
}
case engine.ChangesHandle_Tail_wip:
Expand All @@ -334,13 +345,11 @@ func (reader *tableReader) readTableWithTxn(

// output sql in a txn
if !hasBegin {
if err = reader.sinker.SendBegin(ctx); err != nil {
return err
}
reader.sinker.SendBegin()
hasBegin = true
}

err = reader.sinker.Sink(ctx, &DecoderOutput{
reader.sinker.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTail,
insertAtmBatch: insertAtmBatch,
deleteAtmBatch: deleteAtmBatch,
Expand All @@ -351,7 +360,7 @@ func (reader *tableReader) readTableWithTxn(
addTailEndMetrics(deleteAtmBatch)
insertAtmBatch.Close()
deleteAtmBatch.Close()
if err != nil {
if err = reader.sinker.Error(); err != nil {
return
}

Expand Down
Loading

0 comments on commit d27087a

Please sign in to comment.