Skip to content

Commit

Permalink
Merge branch '2.0-dev' into perstmtperfcounter2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 7, 2024
2 parents 01b2bec + 38ead4f commit 093c8ef
Show file tree
Hide file tree
Showing 43 changed files with 1,483 additions and 571 deletions.
120 changes: 120 additions & 0 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,126 @@ func TestBackupData(t *testing.T) {

}

func TestBackupData2(t *testing.T) {
t.Skip("TestBackupData2")
defer testutils.AfterTest(t)()
testutils.EnsureNoLeak(t)
ctx := context.Background()

opts := config.WithQuickScanAndCKPOpts(nil)
db := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer db.Close()
defer opts.Fs.Close()

schema := catalog.MockSchemaAll(13, 3)
schema.Extra.BlockMaxRows = 10
schema.Extra.ObjectMaxBlocks = 10
db.BindSchema(schema)
{
txn, err := db.DB.StartTxn(nil)
require.NoError(t, err)
dbH, err := testutil.CreateDatabase2(ctx, txn, "db")
require.NoError(t, err)
_, err = testutil.CreateRelation2(ctx, txn, dbH, schema)
require.NoError(t, err)
require.NoError(t, txn.Commit(ctx))
}

totalRows := uint64(schema.Extra.BlockMaxRows * 30)
bat := catalog.MockBatch(schema, int(totalRows))
defer bat.Close()
bats := bat.Split(100)

var wg sync.WaitGroup
pool, _ := ants.NewPool(80)
defer pool.Release()

start := time.Now()
for _, data := range bats {
wg.Add(1)
err := pool.Submit(testutil.AppendClosure(t, data, schema.Name, db.DB, &wg))
assert.Nil(t, err)
}
wg.Wait()
opts = config.WithLongScanAndCKPOpts(nil)
testutils.WaitExpect(5000, func() bool {
return db.DiskCleaner.GetCleaner().GetScanWaterMark() != nil
})
db.Restart(ctx, opts)
t.Logf("Append %d rows takes: %s", totalRows, time.Since(start))
deletedRows := 0
{
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows), false)

obj := testutil.GetOneObject(rel)
id := obj.GetMeta().(*catalog.ObjectEntry).AsCommonID()
err := rel.RangeDelete(id, 0, 0, handle.DT_Normal)
require.NoError(t, err)
deletedRows = 1
testutil.CompactBlocks(t, 0, db.DB, "db", schema, false)

assert.NoError(t, txn.Commit(context.Background()))
}
t.Log(db.Catalog.SimplePPString(common.PPL1))

dir := path.Join(db.Dir, "/local")
c := fileservice.Config{
Name: defines.LocalFileServiceName,
Backend: "DISK",
DataDir: dir,
}
service, err := fileservice.NewFileService(ctx, c, nil)
assert.Nil(t, err)
defer service.Close()
for _, data := range bats {
txn, rel := db.GetRelation()
v := testutil.GetSingleSortKeyValue(data, schema, 2)
filter := handle.NewEQFilter(v)
err := rel.DeleteByFilter(context.Background(), filter)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))
}
backupTime := time.Now().UTC()
currTs := types.BuildTS(backupTime.UnixNano(), 0)
locations := make([]string, 0)
locations = append(locations, backupTime.Format(time.DateTime))
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
assert.Nil(t, err)
db.BGCheckpointRunner.DisableCheckpoint()
locations = append(locations, location)
compacted := db.BGCheckpointRunner.GetCompacted()
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
files := make(map[string]string, 0)
for _, candidate := range checkpoints {
if files[candidate.GetLocation().Name().String()] == "" {
var loc string
loc = candidate.GetLocation().String()
loc += ":"
loc += fmt.Sprintf("%d", candidate.GetVersion())
files[candidate.GetLocation().Name().String()] = loc
}
}
for _, location := range files {
locations = append(locations, location)
}
fileList := make([]*taeFile, 0)
err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList)
assert.Nil(t, err)
fileMap := make(map[string]struct{})
for _, file := range fileList {
_, ok := fileMap[file.path]
assert.True(t, !ok)
fileMap[file.path] = struct{}{}
}
db.Opts.Fs = service
db.Restart(ctx)
txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name)
testutil.CheckAllColRowsByScan(t, rel, int(totalRows-100)-deletedRows, true)
assert.NoError(t, txn.Commit(context.Background()))

}

func Test_saveTaeFilesList(t *testing.T) {
type args struct {
ctx context.Context
Expand Down
38 changes: 37 additions & 1 deletion pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
"io"
"os"
"path"
Expand Down Expand Up @@ -495,7 +496,42 @@ func CopyGCDir(
if err != nil {
return nil, err
}
for i, metaFile := range metaFiles {

copyFiles := make([]*checkpoint.MetaFile, 0)

for _, metaFile := range metaFiles {
name := metaFile.GetName()
window := gc.NewGCWindow(common.DebugAllocator, srcFs)
err = window.ReadTable(ctx, gc.GCMetaDir+name, srcFs)
if err != nil {
return nil, err
}
defer window.Close()
objects := window.GetObjectStats()
filesList := make([]*taeFile, 0)
needCopy := true
for _, object := range objects {
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, object.ObjectName().String(), "")
if err != nil {
logutil.Warnf("[Backup] copy file %v failed", object.ObjectName().String())
needCopy = false
break
}
filesList = append(filesList, &taeFile{
path: object.ObjectName().String(),
size: files[metaFile.GetIndex()].Size,
checksum: checksum,
needCopy: true,
ts: backup,
})
}
if needCopy {
copyFiles = append(copyFiles, metaFile)
taeFileList = append(taeFileList, filesList...)
}
}

for i, metaFile := range copyFiles {
name := metaFile.GetName()
if i == len(metaFiles)-1 {
end := metaFile.GetEnd()
Expand Down
16 changes: 8 additions & 8 deletions pkg/bootstrap/versions/v2_0_1/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ var tenantUpgEntries = []versions.UpgradeEntry{
var upg_mo_user_add_password_last_changed = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column password_last_changed timestamp default utc_timestamp",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "password_last_changed")
if err != nil {
return false, err
}

if colInfo.ColType == "TIMESTAMP" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand All @@ -48,15 +48,15 @@ var upg_mo_user_add_password_last_changed = versions.UpgradeEntry{
var upg_mo_user_add_password_history = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column password_history text default '[]'",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "password_history")
if err != nil {
return false, err
}

if colInfo.ColType == "TEXT" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand All @@ -66,15 +66,15 @@ var upg_mo_user_add_password_history = versions.UpgradeEntry{
var upg_mo_user_add_login_attempts = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column login_attempts int unsigned default 0",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "login_attempts")
if err != nil {
return false, err
}

if colInfo.ColType == "INT UNSIGNED" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand All @@ -84,14 +84,14 @@ var upg_mo_user_add_login_attempts = versions.UpgradeEntry{
var upg_mo_user_add_lock_time = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column lock_time timestamp default utc_timestamp",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "lock_time")
if err != nil {
return false, err
}
if colInfo.ColType == "TIMESTAMP" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand Down
Loading

0 comments on commit 093c8ef

Please sign in to comment.