From 48c7e1698023b4321434bed194887e796213d836 Mon Sep 17 00:00:00 2001 From: GreatRiver <14086886+LeftHandCold@users.noreply.github.com> Date: Sun, 3 Nov 2024 10:48:39 +0800 Subject: [PATCH] Fix test global checkpoint5 2.0 (#19742) Fix test global checkpoint5 2.0 Approved by: @XuPeng-SH, @sukki37 --- pkg/vm/engine/tae/db/checkpoint/runner.go | 18 ++++- pkg/vm/engine/tae/db/checkpoint/testutils.go | 27 +++++-- pkg/vm/engine/tae/db/gc/v3/checkpoint.go | 79 -------------------- pkg/vm/engine/tae/db/test/db_test.go | 30 ++++---- pkg/vm/engine/tae/db/testutil/engine.go | 22 ------ 5 files changed, 48 insertions(+), 128 deletions(-) diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index c9f9d8dea755..d9e588520f5b 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -339,9 +339,20 @@ func (r *runner) DebugUpdateOptions(opts ...Option) { } func (r *runner) onGlobalCheckpointEntries(items ...any) { + maxEnd := types.TS{} for _, item := range items { ctx := item.(*globalCheckpointContext) doCheckpoint := false + maxCkp := r.MaxGlobalCheckpoint() + if maxCkp != nil { + maxEnd = maxCkp.end + } + if ctx.end.LE(&maxEnd) { + logutil.Warn( + "OnGlobalCheckpointEntries-Skip", + zap.String("checkpoint", ctx.end.ToString())) + continue + } if ctx.force { doCheckpoint = true } else { @@ -636,14 +647,15 @@ func (r *runner) doGlobalCheckpoint( entry.truncateLSN = truncateLSN logutil.Info( - "Checkpoint-Start", + "GCKP-Start", zap.String("entry", entry.String()), + zap.String("ts", end.ToString()), ) defer func() { if err != nil { logutil.Error( - "Checkpoint-Error", + "GCKP-Error", zap.String("entry", entry.String()), zap.String("phase", errPhase), zap.Error(err), @@ -653,7 +665,7 @@ func (r *runner) doGlobalCheckpoint( fields = append(fields, zap.Duration("cost", time.Since(now))) fields = append(fields, zap.String("entry", entry.String())) logutil.Info( - "Checkpoint-End", + "GCKP-End", fields..., ) } diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 79d5a4891b57..0ec354399b45 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -86,29 +86,40 @@ func (r *runner) CleanPenddingCheckpoint() { } } -func (r *runner) ForceGlobalCheckpoint(end types.TS, versionInterval time.Duration) error { - if versionInterval == 0 { - versionInterval = r.options.globalVersionInterval +func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) error { + if interval == 0 { + interval = r.options.globalVersionInterval } if r.GetPenddingIncrementalCount() != 0 { end = r.MaxIncrementalCheckpoint().GetEnd() r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ force: true, end: end, - interval: versionInterval, + interval: interval, }) return nil } - timeout := time.After(versionInterval) + retryTime := 0 + timeout := time.After(interval) + var err error + defer func() { + if err != nil || retryTime > 0 { + logutil.Error("ForceGlobalCheckpoint-End", + zap.Error(err), + zap.Uint64("retryTime", uint64(retryTime))) + return + } + }() for { select { case <-timeout: return moerr.NewInternalError(r.ctx, "timeout") default: - err := r.ForceIncrementalCheckpoint(end, false) + err = r.ForceIncrementalCheckpoint(end, false) if err != nil { if dbutils.IsRetrieableCheckpoint(err) { - interval := versionInterval.Milliseconds() / 400 + retryTime++ + interval := interval.Milliseconds() / 400 time.Sleep(time.Duration(interval)) break } @@ -117,7 +128,7 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, versionInterval time.Durati r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ force: true, end: end, - interval: versionInterval, + interval: interval, }) return nil } diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index f6614d0b76fd..faa8f38ca022 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -421,85 +421,6 @@ func (c *checkpointCleaner) Replay() (err error) { zap.Duration("duration", time.Since(start)), ) } - if acctFile == "" { - //No account table information, it may be a new cluster or an upgraded cluster, - //and the table information needs to be initialized from the checkpoint - scanWaterMark := c.GetScanWaterMark() - isConsumedGCkp := false - var checkpointEntries []*checkpoint.CheckpointEntry - if checkpointEntries, err = checkpoint.ListSnapshotCheckpoint( - c.ctx, c.sid, c.fs.Service, scanWaterMark.GetEnd(), c.checkpointCli.GetCheckpointMetaFiles(), - ); err != nil { - logutil.Error( - "GC-REPLAY-LIST-ERROR", - zap.String("task", c.TaskNameLocked()), - zap.Error(err), - ) - return - } - if len(checkpointEntries) == 0 { - return - } - for _, entry := range checkpointEntries { - logutil.Info( - "GC-REPLAY-TRACE-LOAD", - zap.String("task", c.TaskNameLocked()), - zap.String("checkpoint", entry.String()), - zap.String("scanWaterMark", scanWaterMark.String()), - ) - var ckpData *logtail.CheckpointData - if ckpData, err = c.collectCkpData(entry); err != nil { - logutil.Error( - "GC-REPLAY-COLLECT-ERROR", - zap.String("task", c.TaskNameLocked()), - zap.Error(err), - zap.String("checkpoint", entry.String()), - ) - return - } - if entry.GetType() == checkpoint.ET_Global { - isConsumedGCkp = true - } - c.mutation.snapshotMeta.InitTableInfo(c.ctx, c.fs.Service, ckpData, entry.GetStart(), entry.GetEnd()) - ckpData.Close() - } - if !isConsumedGCkp { - // The global checkpoint that Specified checkpoint depends on may have been GC, - // so we need to load a latest global checkpoint - entry := c.checkpointCli.MaxGlobalCheckpoint() - if entry == nil { - logutil.Warn( - "GC-REPLAY-NO-MAX-GLOBAL", - zap.String("task", c.TaskNameLocked()), - ) - return - } - logutil.Info( - "GC-REPLAY-TRACE-MAX-GLOBAL", - zap.String("task", c.TaskNameLocked()), - zap.String("max-gloabl", entry.String()), - zap.String("max-consumed", scanWaterMark.String()), - ) - var ckpData *logtail.CheckpointData - if ckpData, err = c.collectCkpData(entry); err != nil { - logutil.Error( - "GC-REPLAY-COLLECT-CHECKPOINT-ERROR", - zap.String("task", c.TaskNameLocked()), - zap.Error(err), - zap.String("checkpoint", entry.String()), - ) - return - } - c.mutation.snapshotMeta.InitTableInfo(c.ctx, c.fs.Service, ckpData, entry.GetStart(), entry.GetEnd()) - ckpData.Close() - } - - logutil.Info( - "GC-REPLAY-TRACE-INIT-TABLE-INFO", - zap.String("task", c.TaskNameLocked()), - zap.String("details", c.mutation.snapshotMeta.TableInfoString()), - ) - } return } diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 5da5a090627f..490a8fe06818 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -74,9 +74,10 @@ import ( ) const ( - ModuleName = "TAEDB" - smallCheckpointBlockRows = 10 - smallCheckpointSize = 1024 + ModuleName = "TAEDB" + smallCheckpointBlockRows = 10 + smallCheckpointSize = 1024 + defaultGlobalCheckpointTimeout = 10 * time.Second ) func TestAppend1(t *testing.T) { @@ -7292,7 +7293,7 @@ func TestGlobalCheckpoint2(t *testing.T) { txn, err := tae.StartTxn(nil) assert.NoError(t, err) tae.IncrementalCheckpoint(txn.GetStartTS(), false, true, true) - tae.GlobalCheckpoint(txn.GetStartTS(), 0, false) + tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, 0) assert.NoError(t, txn.Commit(context.Background())) tae.CreateRelAndAppend2(bat, false) @@ -7303,7 +7304,7 @@ func TestGlobalCheckpoint2(t *testing.T) { return tae.Runtime.Scheduler.GetPenddingLSNCnt() == 0 }) tae.IncrementalCheckpoint(currTs, false, true, true) - tae.GlobalCheckpoint(currTs, time.Duration(1), false) + tae.DB.ForceGlobalCheckpoint(ctx, currTs, defaultGlobalCheckpointTimeout, time.Duration(1)) p := &catalog.LoopProcessor{} tableExisted := false @@ -7413,8 +7414,8 @@ func TestGlobalCheckpoint4(t *testing.T) { assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) - err = tae.GlobalCheckpoint(txn.GetCommitTS(), globalCkpInterval, false) - assert.NoError(t, err) + err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetCommitTS(), defaultGlobalCheckpointTimeout, globalCkpInterval) + assert.Nil(t, err) tae.CreateRelAndAppend(bat, true) @@ -7434,7 +7435,7 @@ func TestGlobalCheckpoint4(t *testing.T) { assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) - err = tae.GlobalCheckpoint(txn.GetCommitTS(), globalCkpInterval, false) + err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetCommitTS(), defaultGlobalCheckpointTimeout, globalCkpInterval) assert.NoError(t, err) tae.CreateRelAndAppend(bat, false) @@ -7451,10 +7452,9 @@ func TestGlobalCheckpoint5(t *testing.T) { testutils.EnsureNoLeak(t) ctx := context.Background() - opts := config.WithQuickScanAndCKPOpts(nil) + opts := config.WithLongScanAndCKPOpts(nil) tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - tae.BGCheckpointRunner.DisableCheckpoint() tae.BGCheckpointRunner.CleanPenddingCheckpoint() globalCkpIntervalTimeout := 10 * time.Second @@ -7475,7 +7475,7 @@ func TestGlobalCheckpoint5(t *testing.T) { txn, err = tae.StartTxn(nil) assert.NoError(t, err) - err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpIntervalTimeout, false) + err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpIntervalTimeout) assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) @@ -7483,7 +7483,7 @@ func TestGlobalCheckpoint5(t *testing.T) { txn, err = tae.StartTxn(nil) assert.NoError(t, err) - err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpIntervalTimeout, false) + err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpIntervalTimeout) assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) @@ -7491,7 +7491,6 @@ func TestGlobalCheckpoint5(t *testing.T) { t.Log(tae.Catalog.SimplePPString(3)) tae.Restart(ctx) - tae.BGCheckpointRunner.DisableCheckpoint() tae.BGCheckpointRunner.CleanPenddingCheckpoint() t.Log(tae.Catalog.SimplePPString(3)) @@ -7502,8 +7501,7 @@ func TestGlobalCheckpoint5(t *testing.T) { tae.CheckRowsByScan(60, true) txn, err = tae.StartTxn(nil) assert.NoError(t, err) - err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpIntervalTimeout, false) - assert.NoError(t, err) + err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpIntervalTimeout) assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) } @@ -7541,7 +7539,7 @@ func TestGlobalCheckpoint6(t *testing.T) { tae.DoAppend(bats[i+1]) txn, err = tae.StartTxn(nil) assert.NoError(t, err) - err = tae.GlobalCheckpoint(txn.GetStartTS(), globalCkpInterval, false) + err = tae.DB.ForceGlobalCheckpoint(ctx, txn.GetStartTS(), defaultGlobalCheckpointTimeout, globalCkpInterval) assert.NoError(t, err) assert.NoError(t, txn.Commit(context.Background())) diff --git a/pkg/vm/engine/tae/db/testutil/engine.go b/pkg/vm/engine/tae/db/testutil/engine.go index 23516ac619e6..a890b5ecd7de 100644 --- a/pkg/vm/engine/tae/db/testutil/engine.go +++ b/pkg/vm/engine/tae/db/testutil/engine.go @@ -269,28 +269,6 @@ func (e *TestEngine) Truncate() { assert.NoError(e.T, err) assert.NoError(e.T, txn.Commit(context.Background())) } -func (e *TestEngine) GlobalCheckpoint( - endTs types.TS, - versionInterval time.Duration, - enableAndCleanBGCheckpoint bool, -) error { - if enableAndCleanBGCheckpoint { - e.DB.BGCheckpointRunner.DisableCheckpoint() - defer e.DB.BGCheckpointRunner.EnableCheckpoint() - e.DB.BGCheckpointRunner.CleanPenddingCheckpoint() - } - if e.DB.BGCheckpointRunner.GetPenddingIncrementalCount() == 0 { - testutils.WaitExpect(4000, func() bool { - flushed := e.DB.BGCheckpointRunner.IsAllChangesFlushed(types.TS{}, endTs, false) - return flushed - }) - flushed := e.DB.BGCheckpointRunner.IsAllChangesFlushed(types.TS{}, endTs, true) - assert.True(e.T, flushed) - } - err := e.DB.BGCheckpointRunner.ForceGlobalCheckpoint(endTs, versionInterval) - assert.NoError(e.T, err) - return nil -} func (e *TestEngine) IncrementalCheckpoint( end types.TS,