From 3635e947a04d61af076f5c44c56e45a0082c2cda Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Thu, 10 Oct 2024 15:51:55 +0800 Subject: [PATCH] executor: fix data inconsistency after `load data ... replace into ...` (#56521) close pingcap/tidb#56408 --- executor/insert_common.go | 5 ++++- executor/write_test.go | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index aca30f8e1eab0..9ef2b470ab9fd 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -1201,10 +1201,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D if handle == nil { continue } - _, err = e.removeRow(ctx, txn, handle, r, true) + skip, err = e.removeRow(ctx, txn, handle, r, true) if err != nil { return err } + if skip { + break + } } else { // If duplicate keys were found in BatchGet, mark row = nil. e.ctx.GetSessionVars().StmtCtx.AppendWarning(uk.dupErr) diff --git a/executor/write_test.go b/executor/write_test.go index b3d9e9f63100b..d1ad824202c4d 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -4291,3 +4291,24 @@ func TestHandleColumnWithOnUpdateCurrentTimestamp(t *testing.T) { tk.MustExec("update t force index(primary) set b = 10 where a = '2023-06-11 10:00:00'") tk.MustExec("admin check table t") } + +func TestLoadDataReplaceConsistency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("USE test; DROP TABLE IF EXISTS a;") + tk.MustExec("create table a(id int, name varchar(20), addr varchar(100), primary key (id) nonclustered);") + tk.MustExec("LOAD DATA LOCAL INFILE '/tmp/nonexistence.csv' replace into table a fields terminated by '|' escaped by '' lines terminated by '\n'") + ctx := tk.Session().(sessionctx.Context) + ld, ok := ctx.Value(executor.LoadDataVarKey).(*executor.LoadDataInfo) + require.True(t, ok) + defer ctx.SetValue(executor.LoadDataVarKey, nil) + require.NotNil(t, ld) + tests := []testCase{ + {nil, []byte("1|aa|beijing\n1|aa|beijing\n1|aa|beijing\n1|aa|beijing\n2|bb|shanghai\n2|bb|shanghai\n2|bb|shanghai\n3|cc|guangzhou\n"), + []string{"1 aa beijing", "2 bb shanghai", "3 cc guangzhou"}, nil, "Records: 8 Deleted: 0 Skipped: 5 Warnings: 0"}, + } + deleteSQL := "delete from a" + selectSQL := "select * from a;" + checkCases(tests, ld, t, tk, ctx, selectSQL, deleteSQL) + tk.MustExec("admin check table a") +}