Skip to content

Commit

Permalink
Fix test global checkpoint5 2.0 (#19742)
Browse files Browse the repository at this point in the history
Fix test global checkpoint5 2.0

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
LeftHandCold authored Nov 3, 2024
1 parent 3d3bf86 commit 48c7e16
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 128 deletions.
18 changes: 15 additions & 3 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,20 @@ func (r *runner) DebugUpdateOptions(opts ...Option) {
}

func (r *runner) onGlobalCheckpointEntries(items ...any) {
maxEnd := types.TS{}
for _, item := range items {
ctx := item.(*globalCheckpointContext)
doCheckpoint := false
maxCkp := r.MaxGlobalCheckpoint()
if maxCkp != nil {
maxEnd = maxCkp.end
}
if ctx.end.LE(&maxEnd) {
logutil.Warn(
"OnGlobalCheckpointEntries-Skip",
zap.String("checkpoint", ctx.end.ToString()))
continue
}
if ctx.force {
doCheckpoint = true
} else {
Expand Down Expand Up @@ -636,14 +647,15 @@ func (r *runner) doGlobalCheckpoint(
entry.truncateLSN = truncateLSN

logutil.Info(
"Checkpoint-Start",
"GCKP-Start",
zap.String("entry", entry.String()),
zap.String("ts", end.ToString()),
)

defer func() {
if err != nil {
logutil.Error(
"Checkpoint-Error",
"GCKP-Error",
zap.String("entry", entry.String()),
zap.String("phase", errPhase),
zap.Error(err),
Expand All @@ -653,7 +665,7 @@ func (r *runner) doGlobalCheckpoint(
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End",
"GCKP-End",
fields...,
)
}
Expand Down
27 changes: 19 additions & 8 deletions pkg/vm/engine/tae/db/checkpoint/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,40 @@ func (r *runner) CleanPenddingCheckpoint() {
}
}

func (r *runner) ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error {
if versionInterval == 0 {
versionInterval = r.options.globalVersionInterval
func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) error {
if interval == 0 {
interval = r.options.globalVersionInterval
}
if r.GetPenddingIncrementalCount() != 0 {
end = r.MaxIncrementalCheckpoint().GetEnd()
r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
force: true,
end: end,
interval: versionInterval,
interval: interval,
})
return nil
}
timeout := time.After(versionInterval)
retryTime := 0
timeout := time.After(interval)
var err error
defer func() {
if err != nil || retryTime > 0 {
logutil.Error("ForceGlobalCheckpoint-End",
zap.Error(err),
zap.Uint64("retryTime", uint64(retryTime)))
return
}
}()
for {
select {
case <-timeout:
return moerr.NewInternalError(r.ctx, "timeout")
default:
err := r.ForceIncrementalCheckpoint(end, false)
err = r.ForceIncrementalCheckpoint(end, false)
if err != nil {
if dbutils.IsRetrieableCheckpoint(err) {
interval := versionInterval.Milliseconds() / 400
retryTime++
interval := interval.Milliseconds() / 400
time.Sleep(time.Duration(interval))
break
}
Expand All @@ -117,7 +128,7 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, versionInterval time.Durati
r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
force: true,
end: end,
interval: versionInterval,
interval: interval,
})
return nil
}
Expand Down
79 changes: 0 additions & 79 deletions pkg/vm/engine/tae/db/gc/v3/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,85 +421,6 @@ func (c *checkpointCleaner) Replay() (err error) {
zap.Duration("duration", time.Since(start)),
)
}
if acctFile == "" {
//No account table information, it may be a new cluster or an upgraded cluster,
//and the table information needs to be initialized from the checkpoint
scanWaterMark := c.GetScanWaterMark()
isConsumedGCkp := false
var checkpointEntries []*checkpoint.CheckpointEntry
if checkpointEntries, err = checkpoint.ListSnapshotCheckpoint(
c.ctx, c.sid, c.fs.Service, scanWaterMark.GetEnd(), c.checkpointCli.GetCheckpointMetaFiles(),
); err != nil {
logutil.Error(
"GC-REPLAY-LIST-ERROR",
zap.String("task", c.TaskNameLocked()),
zap.Error(err),
)
return
}
if len(checkpointEntries) == 0 {
return
}
for _, entry := range checkpointEntries {
logutil.Info(
"GC-REPLAY-TRACE-LOAD",
zap.String("task", c.TaskNameLocked()),
zap.String("checkpoint", entry.String()),
zap.String("scanWaterMark", scanWaterMark.String()),
)
var ckpData *logtail.CheckpointData
if ckpData, err = c.collectCkpData(entry); err != nil {
logutil.Error(
"GC-REPLAY-COLLECT-ERROR",
zap.String("task", c.TaskNameLocked()),
zap.Error(err),
zap.String("checkpoint", entry.String()),
)
return
}
if entry.GetType() == checkpoint.ET_Global {
isConsumedGCkp = true
}
c.mutation.snapshotMeta.InitTableInfo(c.ctx, c.fs.Service, ckpData, entry.GetStart(), entry.GetEnd())
ckpData.Close()
}
if !isConsumedGCkp {
// The global checkpoint that Specified checkpoint depends on may have been GC,
// so we need to load a latest global checkpoint
entry := c.checkpointCli.MaxGlobalCheckpoint()
if entry == nil {
logutil.Warn(
"GC-REPLAY-NO-MAX-GLOBAL",
zap.String("task", c.TaskNameLocked()),
)
return
}
logutil.Info(
"GC-REPLAY-TRACE-MAX-GLOBAL",
zap.String("task", c.TaskNameLocked()),
zap.String("max-gloabl", entry.String()),
zap.String("max-consumed", scanWaterMark.String()),
)
var ckpData *logtail.CheckpointData
if ckpData, err = c.collectCkpData(entry); err != nil {
logutil.Error(
"GC-REPLAY-COLLECT-CHECKPOINT-ERROR",
zap.String("task", c.TaskNameLocked()),
zap.Error(err),
zap.String("checkpoint", entry.String()),
)
return
}
c.mutation.snapshotMeta.InitTableInfo(c.ctx, c.fs.Service, ckpData, entry.GetStart(), entry.GetEnd())
ckpData.Close()
}

logutil.Info(
"GC-REPLAY-TRACE-INIT-TABLE-INFO",
zap.String("task", c.TaskNameLocked()),
zap.String("details", c.mutation.snapshotMeta.TableInfoString()),
)
}
return
}

Expand Down
30 changes: 14 additions & 16 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ import (
)

const (
ModuleName = "TAEDB"
smallCheckpointBlockRows = 10
smallCheckpointSize = 1024
ModuleName = "TAEDB"
smallCheckpointBlockRows = 10
smallCheckpointSize = 1024
defaultGlobalCheckpointTimeout = 10 * time.Second
)

func TestAppend1(t *testing.T) {
Expand Down Expand Up @@ -7292,7 +7293,7 @@ func TestGlobalCheckpoint2(t *testing.T) {
txn, err := tae.StartTxn(nil)
assert.NoError(t, err)
tae.IncrementalCheckpoint(txn.GetStartTS(), false, true, true)
tae.GlobalCheckpoint(txn.GetStartTS(), 0, false)
tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, 0)
assert.NoError(t, txn.Commit(context.Background()))

tae.CreateRelAndAppend2(bat, false)
Expand All @@ -7303,7 +7304,7 @@ func TestGlobalCheckpoint2(t *testing.T) {
return tae.Runtime.Scheduler.GetPenddingLSNCnt() == 0
})
tae.IncrementalCheckpoint(currTs, false, true, true)
tae.GlobalCheckpoint(currTs, time.Duration(1), false)
tae.DB.ForceGlobalCheckpoint(ctx, currTs, defaultGlobalCheckpointTimeout, time.Duration(1))

p := &catalog.LoopProcessor{}
tableExisted := false
Expand Down Expand Up @@ -7413,8 +7414,8 @@ func TestGlobalCheckpoint4(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))

err = tae.GlobalCheckpoint(txn.GetCommitTS(), globalCkpInterval, false)
assert.NoError(t, err)
err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetCommitTS(), defaultGlobalCheckpointTimeout, globalCkpInterval)
assert.Nil(t, err)

tae.CreateRelAndAppend(bat, true)

Expand All @@ -7434,7 +7435,7 @@ func TestGlobalCheckpoint4(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))

err = tae.GlobalCheckpoint(txn.GetCommitTS(), globalCkpInterval, false)
err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetCommitTS(), defaultGlobalCheckpointTimeout, globalCkpInterval)
assert.NoError(t, err)

tae.CreateRelAndAppend(bat, false)
Expand All @@ -7451,10 +7452,9 @@ func TestGlobalCheckpoint5(t *testing.T) {
testutils.EnsureNoLeak(t)
ctx := context.Background()

opts := config.WithQuickScanAndCKPOpts(nil)
opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer tae.Close()
tae.BGCheckpointRunner.DisableCheckpoint()
tae.BGCheckpointRunner.CleanPenddingCheckpoint()
globalCkpIntervalTimeout := 10 * time.Second

Expand All @@ -7475,23 +7475,22 @@ func TestGlobalCheckpoint5(t *testing.T) {

txn, err = tae.StartTxn(nil)
assert.NoError(t, err)
err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpIntervalTimeout, false)
err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpIntervalTimeout)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))

tae.DoAppend(bats[1])

txn, err = tae.StartTxn(nil)
assert.NoError(t, err)
err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpIntervalTimeout, false)
err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpIntervalTimeout)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))

tae.CheckRowsByScan(40, true)

t.Log(tae.Catalog.SimplePPString(3))
tae.Restart(ctx)
tae.BGCheckpointRunner.DisableCheckpoint()
tae.BGCheckpointRunner.CleanPenddingCheckpoint()
t.Log(tae.Catalog.SimplePPString(3))

Expand All @@ -7502,8 +7501,7 @@ func TestGlobalCheckpoint5(t *testing.T) {
tae.CheckRowsByScan(60, true)
txn, err = tae.StartTxn(nil)
assert.NoError(t, err)
err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpIntervalTimeout, false)
assert.NoError(t, err)
err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpIntervalTimeout)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))
}
Expand Down Expand Up @@ -7541,7 +7539,7 @@ func TestGlobalCheckpoint6(t *testing.T) {
tae.DoAppend(bats[i+1])
txn, err = tae.StartTxn(nil)
assert.NoError(t, err)
err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpInterval, false)
err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpInterval)
assert.NoError(t, err)
assert.NoError(t, txn.Commit(context.Background()))

Expand Down
22 changes: 0 additions & 22 deletions pkg/vm/engine/tae/db/testutil/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,28 +269,6 @@ func (e *TestEngine) Truncate() {
assert.NoError(e.T, err)
assert.NoError(e.T, txn.Commit(context.Background()))
}
func (e *TestEngine) GlobalCheckpoint(
endTs types.TS,
versionInterval time.Duration,
enableAndCleanBGCheckpoint bool,
) error {
if enableAndCleanBGCheckpoint {
e.DB.BGCheckpointRunner.DisableCheckpoint()
defer e.DB.BGCheckpointRunner.EnableCheckpoint()
e.DB.BGCheckpointRunner.CleanPenddingCheckpoint()
}
if e.DB.BGCheckpointRunner.GetPenddingIncrementalCount() == 0 {
testutils.WaitExpect(4000, func() bool {
flushed := e.DB.BGCheckpointRunner.IsAllChangesFlushed(types.TS{}, endTs, false)
return flushed
})
flushed := e.DB.BGCheckpointRunner.IsAllChangesFlushed(types.TS{}, endTs, true)
assert.True(e.T, flushed)
}
err := e.DB.BGCheckpointRunner.ForceGlobalCheckpoint(endTs, versionInterval)
assert.NoError(e.T, err)
return nil
}

func (e *TestEngine) IncrementalCheckpoint(
end types.TS,
Expand Down

0 comments on commit 48c7e16

Please sign in to comment.