Skip to content

Commit

Permalink
*: fix a bug that update statement uses point get and update plan wit…
Browse files Browse the repository at this point in the history
…h different tblInfo (#54183) (#54255)

close #53634
  • Loading branch information
ti-chi-bot authored Jun 27, 2024
1 parent 50d888e commit 9f6dedb
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 3 deletions.
9 changes: 7 additions & 2 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
// step 3: add metadata lock and check each table's schema version
schemaNotMatch := false
for i := 0; i < len(stmt.dbName); i++ {
_, ok := is.TableByID(stmt.tbls[i].Meta().ID)
tbl, ok := is.TableByID(stmt.tbls[i].Meta().ID)
if !ok {
tblByName, err := is.TableByName(stmt.dbName[i], stmt.tbls[i].Meta().Name)
if err != nil {
Expand All @@ -116,7 +116,12 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
schemaNotMatch = true
continue
}
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision {
// The revision of tbl and newTbl may not be the same.
// Example:
// The version of stmt.tbls[i] is taken from the prepare statement and is revision v1.
// When stmt.tbls[i] is locked in MDL, the revision of newTbl is also v1.
// The revision of tbl is v2. The reason may have other statements trigger "tryLockMDLAndUpdateSchemaIfNecessary" before, leading to tbl revision update.
if stmt.tbls[i].Meta().Revision != newTbl.Meta().Revision || (tbl != nil && tbl.Meta().Revision != newTbl.Meta().Revision) {
schemaNotMatch = true
}
stmt.tbls[i] = newTbl
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/internal/testserverclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/server/internal/testserverclient",
visibility = ["//pkg/server:__subpackages__"],
deps = [
"//pkg/ddl/util/callback",
"//pkg/domain",
"//pkg/errno",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/server",
"//pkg/testkit",
Expand Down
181 changes: 181 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,12 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
tmysql "github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -2788,4 +2791,182 @@ func (cli *TestServerClient) RunTestTypeAndCharsetOfSendLongData(t *testing.T) {
})
}

func (cli *TestServerClient) getNewDB(t *testing.T, overrider configOverrider) *testkit.DBTestKit {
db, err := sql.Open("mysql", cli.GetDSN(overrider))
require.NoError(t, err)

return testkit.NewDBTestKit(t, db)
}

func MustExec(ctx context.Context, t *testing.T, conn *sql.Conn, sql string) {
_, err := conn.QueryContext(ctx, sql)
require.NoError(t, err)
}

func MustQuery(ctx context.Context, t *testing.T, cli *TestServerClient, conn *sql.Conn, sql string) {
rs, err := conn.QueryContext(ctx, sql)
require.NoError(t, err)
if rs != nil {
cli.Rows(t, rs)
rs.Close()
}
}

type sqlWithErr struct {
stmt *sql.Stmt
sql string
}

type expectQuery struct {
sql string
rows []string
}

func (cli *TestServerClient) RunTestIssue53634(t *testing.T, dom *domain.Domain) {
cli.RunTests(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
}, func(dbt *testkit.DBTestKit) {
ctx := context.Background()

conn, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
MustExec(ctx, t, conn, "create database test_db_state default charset utf8 default collate utf8_bin")
MustExec(ctx, t, conn, "use test_db_state")
MustExec(ctx, t, conn, `CREATE TABLE stock (
a int NOT NULL,
b char(30) NOT NULL,
c int,
d char(64),
PRIMARY KEY(a,b)
) ENGINE=InnoDB DEFAULT CHARSET=latin1 COLLATE=latin1_bin COMMENT='…comment';
`)
MustExec(ctx, t, conn, "insert into stock values(1, 'a', 11, 'x'), (2, 'b', 22, 'y')")
MustExec(ctx, t, conn, "alter table stock add column cct_1 int default 10")
MustExec(ctx, t, conn, "alter table stock modify cct_1 json")
MustExec(ctx, t, conn, "alter table stock add column adc_1 smallint")
defer MustExec(ctx, t, conn, "drop database test_db_state")

sqls := make([]sqlWithErr, 5)
sqls[0] = sqlWithErr{nil, "begin"}
sqls[1] = sqlWithErr{nil, "SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE"}
sqls[2] = sqlWithErr{nil, "UPDATE stock SET c = ? WHERE a= ? AND b = 'a'"}
sqls[3] = sqlWithErr{nil, "UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'"}
sqls[4] = sqlWithErr{nil, "commit"}
dropColumnSQL := "alter table stock drop column cct_1"
query := &expectQuery{sql: "select * from stock;", rows: []string{"1 a 101 x <nil>\n2 b 102 z <nil>"}}
runTestInSchemaState(t, conn, cli, dom, model.StateWriteReorganization, true, dropColumnSQL, sqls, query)
})
}

func runTestInSchemaState(
t *testing.T,
conn *sql.Conn,
cli *TestServerClient,
dom *domain.Domain,
state model.SchemaState,
isOnJobUpdated bool,
dropColumnSQL string,
sqlWithErrs []sqlWithErr,
expectQuery *expectQuery,
) {
ctx := context.Background()
MustExec(ctx, t, conn, "use test_db_state")

callback := &callback.TestDDLCallback{Do: dom}
prevState := model.StateNone
var checkErr error
dbt := cli.getNewDB(t, func(config *mysql.Config) {
config.MaxAllowedPacket = 1024
})
conn1, err := dbt.GetDB().Conn(ctx)
require.NoError(t, err)
defer func() {
err := dbt.GetDB().Close()
require.NoError(t, err)
}()
MustExec(ctx, t, conn1, "use test_db_state")

for i, sqlWithErr := range sqlWithErrs {
// Start the test txn.
// Step 1: begin(when i = 0).
if i == 0 || i == len(sqlWithErrs)-1 {
sqlWithErr := sqlWithErrs[i]
MustExec(ctx, t, conn1, sqlWithErr.sql)
} else {
// Step 2: prepare stmts.
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a'
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b'
stmt, err := conn1.PrepareContext(ctx, sqlWithErr.sql)
require.NoError(t, err)
sqlWithErr.stmt = stmt
sqlWithErrs[i] = sqlWithErr
}
}

// Step 3: begin.
sqlWithErr := sqlWithErrs[0]
MustExec(ctx, t, conn1, sqlWithErr.sql)

prevState = model.StateNone
state = model.StateWriteOnly
cbFunc1 := func(job *model.Job) {
if jobStateOrLastSubJobState(job) == prevState || checkErr != nil {
return
}
prevState = jobStateOrLastSubJobState(job)
if prevState != state {
return
}
// Step 4: exec stmts in write-only state (dropping a colum).
// SELECT a, c, d from stock where (a, b) IN ((?, ?),(?, ?)) FOR UPDATE, args:(1,"a"),(2,"b")
// UPDATE stock SET c = ? WHERE a= ? AND b = 'a', args:(100+1, 1)
// UPDATE stock SET c = ?, d = 'z' WHERE a= ? AND b = 'b', args:(100+2, 2)
// commit.
sqls := sqlWithErrs[1:]
for i, sqlWithErr := range sqls {
if i == 0 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 1, "a", 2, "b")
require.NoError(t, err)
} else if i == 1 || i == 2 {
_, err = sqlWithErr.stmt.ExecContext(ctx, 100+i, i)
require.NoError(t, err)
} else {
MustQuery(ctx, t, cli, conn1, sqlWithErr.sql)
}
}
}
if isOnJobUpdated {
callback.OnJobUpdatedExported.Store(&cbFunc1)
} else {
callback.OnJobRunBeforeExported = cbFunc1
}
d := dom.DDL()
originalCallback := d.GetHook()
d.SetHook(callback)
MustExec(ctx, t, conn, dropColumnSQL)
require.NoError(t, checkErr)

// Check the result.
// select * from stock
if expectQuery != nil {
rs, err := conn.QueryContext(ctx, expectQuery.sql)
require.NoError(t, err)
if expectQuery.rows == nil {
require.Nil(t, rs)
} else {
cli.CheckRows(t, rs, expectQuery.rows[0])
}
}
d.SetHook(originalCallback)
}

func jobStateOrLastSubJobState(job *model.Job) model.SchemaState {
if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil {
subs := job.MultiSchemaInfo.SubJobs
return subs[len(subs)-1].SchemaState
}
return job.SchemaState
}

//revive:enable:exported
5 changes: 5 additions & 0 deletions pkg/server/tests/commontest/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3089,6 +3089,11 @@ func TestTypeAndCharsetOfSendLongData(t *testing.T) {
ts.RunTestTypeAndCharsetOfSendLongData(t)
}

func TestIssue53634(t *testing.T) {
ts := servertestkit.CreateTidbTestSuiteWithDDLLease(t, "20s")
ts.RunTestIssue53634(t, ts.Domain)
}

func TestAuthSocket(t *testing.T) {
defer server2.ClearOSUserForAuthSocket()

Expand Down
2 changes: 2 additions & 0 deletions pkg/server/tests/servertestkit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ go_library(
"//pkg/util/cpuprofile",
"//pkg/util/topsql/collector/mock",
"//pkg/util/topsql/state",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
"@io_opencensus_go//stats/view",
"@org_uber_go_zap//:zap",
],
)
32 changes: 31 additions & 1 deletion pkg/server/tests/servertestkit/testkit.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import (
"database/sql"
"sync"
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
Expand All @@ -35,6 +37,7 @@ import (
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
)

// TidbTestSuite is a test suite for tidb
Expand All @@ -48,13 +51,37 @@ type TidbTestSuite struct {

// CreateTidbTestSuite creates a test suite for tidb
func CreateTidbTestSuite(t *testing.T) *TidbTestSuite {
cfg := newTestConfig()
return CreateTidbTestSuiteWithCfg(t, cfg)
}

// CreateTidbTestSuiteWithDDLLease creates a test suite with DDL lease for tidb.
func CreateTidbTestSuiteWithDDLLease(t *testing.T, ddlLease string) *TidbTestSuite {
cfg := newTestConfig()
cfg.Lease = ddlLease
return CreateTidbTestSuiteWithCfg(t, cfg)
}

func newTestConfig() *config.Config {
cfg := util.NewTestConfig()
cfg.Port = 0
cfg.Status.ReportStatus = true
cfg.Status.StatusPort = 0
cfg.Status.RecordDBLabel = true
cfg.Performance.TCPKeepAlive = true
return CreateTidbTestSuiteWithCfg(t, cfg)
return cfg
}

// parseDuration parses lease argument string.
func parseDuration(lease string) (time.Duration, error) {
dur, err := time.ParseDuration(lease)
if err != nil {
dur, err = time.ParseDuration(lease + "s")
}
if err != nil || dur < 0 {
return 0, errors.Newf("invalid lease duration", zap.String("lease", lease))
}
return dur, nil
}

// CreateTidbTestSuiteWithCfg creates a test suite for tidb with config
Expand All @@ -66,6 +93,9 @@ func CreateTidbTestSuiteWithCfg(t *testing.T, cfg *config.Config) *TidbTestSuite
ts.Store, err = mockstore.NewMockStore()
session.DisableStats4Test()
require.NoError(t, err)
ddlLeaseDuration, err := parseDuration(cfg.Lease)
require.NoError(t, err)
session.SetSchemaLease(ddlLeaseDuration)
ts.Domain, err = session.BootstrapSession(ts.Store)
require.NoError(t, err)
ts.Tidbdrv = srv.NewTiDBDriver(ts.Store)
Expand Down

0 comments on commit 9f6dedb

Please sign in to comment.