Skip to content

Commit

Permalink
Merge branch 'main' into 1105-update-txnop-reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 5, 2024
2 parents a4d83da + 0236124 commit 17f5b21
Show file tree
Hide file tree
Showing 25 changed files with 701 additions and 637 deletions.
119 changes: 119 additions & 0 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,125 @@ func TestBackupData(t *testing.T) {

}

func TestBackupData2(t *testing.T) {
defer testutils.AfterTest(t)()
testutils.EnsureNoLeak(t)
ctx := context.Background()

opts := config.WithQuickScanAndCKPOpts(nil)
db := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer db.Close()
defer opts.Fs.Close()

schema := catalog.MockSchemaAll(13, 3)
schema.Extra.BlockMaxRows = 10
schema.Extra.ObjectMaxBlocks = 10
db.BindSchema(schema)
{
txn, err := db.DB.StartTxn(nil)
require.NoError(t, err)
dbH, err := testutil.CreateDatabase2(ctx, txn, "db")
require.NoError(t, err)
_, err = testutil.CreateRelation2(ctx, txn, dbH, schema)
require.NoError(t, err)
require.NoError(t, txn.Commit(ctx))
}

totalRows := uint64(schema.Extra.BlockMaxRows * 30)
bat := catalog.MockBatch(schema, int(totalRows))
defer bat.Close()
bats := bat.Split(100)

var wg sync.WaitGroup
pool, _ := ants.NewPool(80)
defer pool.Release()

start := time.Now()
for _, data := range bats {
wg.Add(1)
err := pool.Submit(testutil.AppendClosure(t, data, schema.Name, db.DB, &wg))
assert.Nil(t, err)
}
wg.Wait()
opts = config.WithLongScanAndCKPOpts(nil)
testutils.WaitExpect(5000, func() bool {
return db.DiskCleaner.GetCleaner().GetScanWaterMark() != nil
})
db.Restart(ctx, opts)
t.Logf("Append %d rows takes: %s", totalRows, time.Since(start))
deletedRows := 0
{
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)

obj := testutil.GetOneObject(rel)
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
require.NoError(t, err)
deletedRows = 1
testutil.CompactBlocks(t, 0, db.DB, "db", schema, false)

assert.NoError(t, txn.Commit(context.Background()))
}
t.Log(db.Catalog.SimplePPString(common.PPL1))

dir := path.Join(db.Dir, "/local")
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Backend: "DISK",
DataDir: dir,
}
service, err := fileservice.NewFileService(ctx, c, nil)
assert.Nil(t, err)
defer service.Close()
for _, data := range bats {
txn, rel := db.GetRelation()
v := testutil.GetSingleSortKeyValue(data, schema, 2)
filter := handle.NewEQFilter(v)
err := rel.DeleteByFilter(context.Background(), filter)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))
}
backupTime := time.Now().UTC()
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
locations = append(locations, location)
compacted := db.BGCheckpointRunner.GetCompacted()
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
files := make(map[string]string, 0)
for _, candidate := range checkpoints {
if files[candidate.GetLocation().Name().String()] == "" {
var loc string
loc = candidate.GetLocation().String()
loc += ":"
loc += fmt.Sprintf("%d", candidate.GetVersion())
files[candidate.GetLocation().Name().String()] = loc
}
}
for _, location := range files {
locations = append(locations, location)
}
fileList := make([]*taeFile, 0)
err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList)
assert.Nil(t, err)
fileMap := make(map[string]struct{})
for _, file := range fileList {
_, ok := fileMap[file.path]
assert.True(t, !ok)
fileMap[file.path] = struct{}{}
}
db.Opts.Fs = service
db.Restart(ctx)
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows-100)-deletedRows, true)
assert.NoError(t, txn.Commit(context.Background()))

}

func Test_saveTaeFilesList(t *testing.T) {
type args struct {
ctx context.Context
Expand Down
38 changes: 37 additions & 1 deletion pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
"io"
"os"
"path"
Expand Down Expand Up @@ -495,7 +496,42 @@ func CopyGCDir(
if err != nil {
return nil, err
}
for i, metaFile := range metaFiles {

copyFiles := make([]*checkpoint.MetaFile, 0)

for _, metaFile := range metaFiles {
name := metaFile.GetName()
window := gc.NewGCWindow(common.DebugAllocator, srcFs)
err = window.ReadTable(ctx, gc.GCMetaDir+name, srcFs)
if err != nil {
return nil, err
}
defer window.Close()
objects := window.GetObjectStats()
filesList := make([]*taeFile, 0)
needCopy := true
for _, object := range objects {
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, object.ObjectName().String(), "")
if err != nil {
logutil.Warnf("[Backup] copy file %v failed", object.ObjectName().String())
needCopy = false
break
}
filesList = append(filesList, &taeFile{
path: object.ObjectName().String(),
size: files[metaFile.GetIndex()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
})
}
if needCopy {
copyFiles = append(copyFiles, metaFile)
taeFileList = append(taeFileList, filesList...)
}
}

for i, metaFile := range copyFiles {
name := metaFile.GetName()
if i == len(metaFiles)-1 {
end := metaFile.GetEnd()
Expand Down
20 changes: 16 additions & 4 deletions pkg/sql/plan/base_binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1639,20 +1639,32 @@ func BindFuncExprImplByPlanExpr(ctx context.Context, name string, args []*Expr)
//expand the in list to col=a or col=b or ......
if name == "in" {
for _, expr := range orExprList {
tmpExpr, _ := BindFuncExprImplByPlanExpr(ctx, "=", []*Expr{DeepCopyExpr(args[0]), expr})
tmpExpr, err := BindFuncExprImplByPlanExpr(ctx, "=", []*Expr{DeepCopyExpr(args[0]), expr})
if err != nil {
return nil, err
}
if newExpr == nil {
newExpr = tmpExpr
} else {
newExpr, _ = BindFuncExprImplByPlanExpr(ctx, "or", []*Expr{newExpr, tmpExpr})
newExpr, err = BindFuncExprImplByPlanExpr(ctx, "or", []*Expr{newExpr, tmpExpr})
if err != nil {
return nil, err
}
}
}
} else {
for _, expr := range orExprList {
tmpExpr, _ := BindFuncExprImplByPlanExpr(ctx, "!=", []*Expr{DeepCopyExpr(args[0]), expr})
tmpExpr, err := BindFuncExprImplByPlanExpr(ctx, "!=", []*Expr{DeepCopyExpr(args[0]), expr})
if err != nil {
return nil, err
}
if newExpr == nil {
newExpr = tmpExpr
} else {
newExpr, _ = BindFuncExprImplByPlanExpr(ctx, "and", []*Expr{newExpr, tmpExpr})
newExpr, err = BindFuncExprImplByPlanExpr(ctx, "and", []*Expr{newExpr, tmpExpr})
if err != nil {
return nil, err
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/plan/function/func_cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3401,7 +3401,7 @@ func decimal64ToTimestamp(
return err
}
} else {
ts := types.Timestamp(int64(v))
ts := types.UnixToTimestamp(int64(v))
if err := to.Append(ts, false); err != nil {
return err
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/txn/storage/tae/storage_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ func (s *taeStorage) Debug(ctx context.Context,
resp, _ := handleRead(ctx, txnMeta, data, s.taeHandler.HandleStorageUsage)
return resp.Read()
case uint32(api.OpCode_OpSnapshotRead):
resp, _ := handleRead(ctx, txnMeta, data, s.taeHandler.HandleSnapshotRead)
resp, err := handleRead(ctx, txnMeta, data, s.taeHandler.HandleSnapshotRead)
if err != nil {
return types.Encode(&cmd_util.SnapshotReadResp{
Succeed: false,
})
}
return resp.Read()
case uint32(api.OpCode_OpInterceptCommit):
resp, err := handleRead(ctx, txnMeta, data, s.taeHandler.HandleInterceptCommit)
Expand Down
36 changes: 36 additions & 0 deletions pkg/vm/engine/tae/db/checkpoint/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,42 @@ func (r *runner) GetAllCheckpoints() []*CheckpointEntry {
return ckps
}

func (r *runner) GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry {
ckps := make([]*CheckpointEntry, 0)
var ts types.TS
if compact != nil {
ts = compact.GetEnd()
ckps = append(ckps, compact)
}
r.storage.Lock()
g := r.getLastFinishedGlobalCheckpointLocked()
tree := r.storage.incrementals.Copy()
r.storage.Unlock()
if g != nil {
if ts.IsEmpty() {
ts = g.GetEnd()
}
ckps = append(ckps, g)
}
pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental)
iter := tree.Iter()
defer iter.Release()
if ok := iter.Seek(pivot); ok {
for {
e := iter.Item()
if !e.IsFinished() {
break
}
ckps = append(ckps, e)
if !iter.Next() {
break
}
}
}
return ckps

}

func (r *runner) GCByTS(ctx context.Context, ts types.TS) error {
prev := r.gcTS.Load()
if prev == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/tae/db/checkpoint/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Runner interface {
// for test, delete in next phase
DebugUpdateOptions(opts ...Option)
GetAllCheckpoints() []*CheckpointEntry
GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry
}

type DirtyCtx struct {
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/tae/db/gc/v3/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (c *checkpointCleaner) Replay() (err error) {
for _, dir := range gcMetaDirs {
start := time.Now()
window := NewGCWindow(c.mp, c.fs.Service)
err = window.ReadTable(c.ctx, GCMetaDir+dir.Name, c.fs)
err = window.ReadTable(c.ctx, GCMetaDir+dir.Name, c.fs.Service)
if err != nil {
logger = logutil.Error
}
Expand Down Expand Up @@ -625,7 +625,7 @@ func (c *checkpointCleaner) deleteStaleCKPMetaFileLocked() (err error) {
}
gcWindow := NewGCWindow(c.mp, c.fs.Service)
defer gcWindow.Close()
if err = gcWindow.ReadTable(c.ctx, GCMetaDir+metaFile.Name(), c.fs); err != nil {
if err = gcWindow.ReadTable(c.ctx, GCMetaDir+metaFile.Name(), c.fs.Service); err != nil {
logutil.Error(
"GC-WINDOW-READ-ERROR",
zap.Error(err),
Expand Down
8 changes: 6 additions & 2 deletions pkg/vm/engine/tae/db/gc/v3/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ type GCWindow struct {
}
}

func (w *GCWindow) GetObjectStats() []objectio.ObjectStats {
return w.files
}

func (w *GCWindow) Clone() GCWindow {
w2 := *w
w2.files = make([]objectio.ObjectStats, len(w.files))
Expand Down Expand Up @@ -455,7 +459,7 @@ func (w *GCWindow) replayData(
}

// ReadTable reads an s3 file and replays a GCWindow in memory
func (w *GCWindow) ReadTable(ctx context.Context, name string, fs *objectio.ObjectFS) error {
func (w *GCWindow) ReadTable(ctx context.Context, name string, fs fileservice.FileService) error {
var release1 func()
var buffer *batch.Batch
defer func() {
Expand All @@ -466,7 +470,7 @@ func (w *GCWindow) ReadTable(ctx context.Context, name string, fs *objectio.Obje
start, end, _ := blockio.DecodeGCMetadataFileName(name)
w.tsRange.start = start
w.tsRange.end = end
reader, err := blockio.NewFileReaderNoCache(fs.Service, name)
reader, err := blockio.NewFileReaderNoCache(fs, name)
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/vm/engine/tae/db/testutil/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ func (e *TestEngine) BindSchema(schema *catalog.Schema) { e.schema = schema }

func (e *TestEngine) BindTenantID(tenantID uint32) { e.tenantID = tenantID }

func (e *TestEngine) Restart(ctx context.Context) {
func (e *TestEngine) Restart(ctx context.Context, opts ...*options.Options) {
_ = e.DB.Close()
var err error
if len(opts) > 0 {
e.Opts = opts[0]
}
e.DB, err = db.Open(ctx, e.Dir, e.Opts)
// only ut executes this checker
e.DB.DiskCleaner.GetCleaner().AddChecker(
Expand All @@ -103,6 +106,7 @@ func (e *TestEngine) Restart(ctx context.Context) {
}, cmd_util.CheckerKeyMinTS)
assert.NoError(e.T, err)
}

func (e *TestEngine) RestartDisableGC(ctx context.Context) {
_ = e.DB.Close()
var err error
Expand Down
Loading

0 comments on commit 17f5b21

Please sign in to comment.