Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#54259
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
ti-chi-bot committed Nov 1, 2024
1 parent 7972ff1 commit 54c8ee3
Show file tree
Hide file tree
Showing 5 changed files with 440 additions and 116 deletions.
114 changes: 114 additions & 0 deletions ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -142,6 +143,119 @@ func newDefaultCallBack(do DomainReloader) Callback {

// ****************************** End of Default DDL Callback Instance *********************************************

// ****************************** Start of Test DDL Callback Instance ***********************************************

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
tc.onJobUpdated(job)
return
}

tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

// Clone copies the callback and take its reference
func (tc *TestDDLCallback) Clone() *TestDDLCallback {
return &*tc
}

// ****************************** End of Test DDL Callback Instance ***********************************************

// ****************************** Start of CTC DDL Callback Instance ***********************************************

// ctcCallback is the customized callback that TiDB will use.
Expand Down
113 changes: 0 additions & 113 deletions ddl/callback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ package ddl

import (
"context"
"sync/atomic"
"testing"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

type TestInterceptor struct {
Expand All @@ -41,115 +37,6 @@ func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema
return ti.BaseInterceptor.OnGetInfoSchema(ctx, is)
}

// TestDDLCallback is used to customize user callback themselves.
type TestDDLCallback struct {
*BaseCallback
// We recommended to pass the domain parameter to the test ddl callback, it will ensure
// domain to reload schema before your ddl stepping into the next state change.
Do DomainReloader

onJobRunBefore func(*model.Job)
OnJobRunBeforeExported func(*model.Job)
onJobUpdated func(*model.Job)
OnJobUpdatedExported atomic.Pointer[func(*model.Job)]
onWatched func(ctx context.Context)
OnGetJobBeforeExported func(string)
OnGetJobAfterExported func(string, *model.Job)
OnJobSchemaStateChanged func(int64)
}

// OnChanged mock the same behavior with the main DDL hook.
func (tc *TestDDLCallback) OnChanged(err error) error {
if err != nil {
return err
}
logutil.BgLogger().Info("performing DDL change, must reload")
if tc.Do != nil {
err = tc.Do.Reload()
if err != nil {
logutil.BgLogger().Error("performing DDL change failed", zap.Error(err))
}
}
return nil
}

// OnSchemaStateChanged mock the same behavior with the main ddl hook.
func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) {
if tc.Do != nil {
if err := tc.Do.Reload(); err != nil {
logutil.BgLogger().Warn("reload failed on schema state changed", zap.Error(err))
}
}

if tc.OnJobSchemaStateChanged != nil {
tc.OnJobSchemaStateChanged(schemaVer)
return
}
}

// OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first.
func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) {
logutil.BgLogger().Info("on job run before", zap.String("job", job.String()))
if tc.OnJobRunBeforeExported != nil {
tc.OnJobRunBeforeExported(job)
return
}
if tc.onJobRunBefore != nil {
tc.onJobRunBefore(job)
return
}

tc.BaseCallback.OnJobRunBefore(job)
}

// OnJobUpdated is used to run the user customized logic of `OnJobUpdated` first.
func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) {
logutil.BgLogger().Info("on job updated", zap.String("job", job.String()))
if onJobUpdatedExportedFunc := tc.OnJobUpdatedExported.Load(); onJobUpdatedExportedFunc != nil {
(*onJobUpdatedExportedFunc)(job)
return
}
if tc.onJobUpdated != nil {
tc.onJobUpdated(job)
return
}

tc.BaseCallback.OnJobUpdated(job)
}

// OnWatched is used to run the user customized logic of `OnWatched` first.
func (tc *TestDDLCallback) OnWatched(ctx context.Context) {
if tc.onWatched != nil {
tc.onWatched(ctx)
return
}

tc.BaseCallback.OnWatched(ctx)
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (tc *TestDDLCallback) OnGetJobBefore(jobType string) {
if tc.OnGetJobBeforeExported != nil {
tc.OnGetJobBeforeExported(jobType)
return
}
tc.BaseCallback.OnGetJobBefore(jobType)
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (tc *TestDDLCallback) OnGetJobAfter(jobType string, job *model.Job) {
if tc.OnGetJobAfterExported != nil {
tc.OnGetJobAfterExported(jobType, job)
return
}
tc.BaseCallback.OnGetJobAfter(jobType, job)
}

// Clone copies the callback and take its reference
func (tc *TestDDLCallback) Clone() *TestDDLCallback {
return &*tc
}

func TestCallback(t *testing.T) {
cb := &BaseCallback{}
require.Nil(t, cb.OnChanged(nil))
Expand Down
9 changes: 7 additions & 2 deletions planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
Expand All @@ -94,7 +94,12 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isGeneral
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
Expand Down
Loading

0 comments on commit 54c8ee3

Please sign in to comment.