Skip to content

Commit

Permalink
Merge branch '2.0-dev' into dynamicmetadatacachecap2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Nov 7, 2024
2 parents e8c0760 + 38ead4f commit 4ef0467
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 146 deletions.
1 change: 1 addition & 0 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func TestBackupData(t *testing.T) {
}

func TestBackupData2(t *testing.T) {
t.Skip("TestBackupData2")
defer testutils.AfterTest(t)()
testutils.EnsureNoLeak(t)
ctx := context.Background()
Expand Down
16 changes: 8 additions & 8 deletions pkg/bootstrap/versions/v2_0_1/tenant_upgrade_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ var tenantUpgEntries = []versions.UpgradeEntry{
var upg_mo_user_add_password_last_changed = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column password_last_changed timestamp default utc_timestamp",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "password_last_changed")
if err != nil {
return false, err
}

if colInfo.ColType == "TIMESTAMP" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand All @@ -48,15 +48,15 @@ var upg_mo_user_add_password_last_changed = versions.UpgradeEntry{
var upg_mo_user_add_password_history = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column password_history text default '[]'",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "password_history")
if err != nil {
return false, err
}

if colInfo.ColType == "TEXT" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand All @@ -66,15 +66,15 @@ var upg_mo_user_add_password_history = versions.UpgradeEntry{
var upg_mo_user_add_login_attempts = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column login_attempts int unsigned default 0",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "login_attempts")
if err != nil {
return false, err
}

if colInfo.ColType == "INT UNSIGNED" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand All @@ -84,14 +84,14 @@ var upg_mo_user_add_login_attempts = versions.UpgradeEntry{
var upg_mo_user_add_lock_time = versions.UpgradeEntry{
Schema: catalog.MO_CATALOG,
TableName: catalog.MO_USER,
UpgType: versions.MODIFY_COLUMN,
UpgType: versions.ADD_COLUMN,
UpgSql: "alter table mo_catalog.mo_user add column lock_time timestamp default utc_timestamp",
CheckFunc: func(txn executor.TxnExecutor, accountId uint32) (bool, error) {
colInfo, err := versions.CheckTableColumn(txn, accountId, catalog.MO_CATALOG, catalog.MO_USER, "lock_time")
if err != nil {
return false, err
}
if colInfo.ColType == "TIMESTAMP" {
if colInfo.IsExits {
return true, nil
}
return false, nil
Expand Down
46 changes: 29 additions & 17 deletions pkg/cdc/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,13 @@ func Test_mysqlSinker_Sink(t *testing.T) {

ar := NewCdcActiveRoutine()

sinker := NewMysqlSinker(sink, dbTblInfo, watermarkUpdater, tableDef, ar)
go sinker.Run(ctx, ar)
defer func() { sinker.Close() }()
s := NewMysqlSinker(sink, dbTblInfo, watermarkUpdater, tableDef, ar)
go s.Run(ctx, ar)
defer func() {
// call dummy to guarantee sqls has been sent, then close
s.SendDummy()
s.Close()
}()

packerPool := fileservice.NewPool(
128,
Expand All @@ -513,14 +517,14 @@ func Test_mysqlSinker_Sink(t *testing.T) {
ckpBat.Vecs[1] = testutil.MakeInt32Vector([]int32{1, 2, 3}, nil)
ckpBat.SetRowCount(3)

sinker.Sink(ctx, &DecoderOutput{
s.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeSnapshot,
fromTs: t0,
toTs: t1,
checkpointBat: ckpBat,
})
assert.NoError(t, err)
sinker.Sink(ctx, &DecoderOutput{
s.Sink(ctx, &DecoderOutput{
noMoreData: true,
fromTs: t0,
toTs: t1,
Expand All @@ -542,7 +546,7 @@ func Test_mysqlSinker_Sink(t *testing.T) {
deleteBat.SetRowCount(1)
deleteAtomicBat.Append(packer, deleteBat, 1, 0)

sinker.Sink(ctx, &DecoderOutput{
s.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTail,
fromTs: t1,
toTs: t2,
Expand All @@ -551,7 +555,7 @@ func Test_mysqlSinker_Sink(t *testing.T) {
})
assert.NoError(t, err)

sinker.Sink(ctx, &DecoderOutput{
s.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTail,
fromTs: t1,
toTs: t2,
Expand All @@ -560,7 +564,7 @@ func Test_mysqlSinker_Sink(t *testing.T) {
})
assert.NoError(t, err)

sinker.Sink(ctx, &DecoderOutput{
s.Sink(ctx, &DecoderOutput{
outputTyp: OutputTypeTail,
fromTs: t1,
toTs: t2,
Expand All @@ -569,7 +573,7 @@ func Test_mysqlSinker_Sink(t *testing.T) {
})
assert.NoError(t, err)

sinker.Sink(ctx, &DecoderOutput{
s.Sink(ctx, &DecoderOutput{
noMoreData: true,
fromTs: t1,
toTs: t2,
Expand Down Expand Up @@ -617,7 +621,11 @@ func Test_mysqlSinker_Sink_NoMoreData(t *testing.T) {
s.preSqlBufLen = 128
s.sqlBufSendCh = make(chan []byte)
go s.Run(ctx, ar)
defer func() { s.Close() }()
defer func() {
// call dummy to guarantee sqls has been sent, then close
s.SendDummy()
s.Close()
}()

s.Sink(ctx, &DecoderOutput{
noMoreData: true,
Expand Down Expand Up @@ -877,7 +885,7 @@ func Test_mysqlSinker_SendBeginCommitRollback(t *testing.T) {
mock.ExpectRollback()

ar := NewCdcActiveRoutine()
sinker := &mysqlSinker{
s := &mysqlSinker{
mysql: &mysqlSink{
retryTimes: 3,
retryDuration: 3 * time.Second,
Expand All @@ -886,17 +894,21 @@ func Test_mysqlSinker_SendBeginCommitRollback(t *testing.T) {
ar: ar,
sqlBufSendCh: make(chan []byte),
}
go sinker.Run(context.Background(), ar)
defer func() { sinker.Close() }()
go s.Run(context.Background(), ar)
defer func() {
// call dummy to guarantee sqls has been sent, then close
s.SendDummy()
s.Close()
}()

sinker.SendBegin()
s.SendBegin()
assert.NoError(t, err)
sinker.SendCommit()
s.SendCommit()
assert.NoError(t, err)

sinker.SendBegin()
s.SendBegin()
assert.NoError(t, err)
sinker.SendRollback()
s.SendRollback()
assert.NoError(t, err)
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/frontend/authenticate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,7 +1185,11 @@ const (

deletePitrFromMoPitrFormat = `delete from mo_catalog.mo_pitr where create_account = %d;`

getPasswordOfUserFormat = `select user_id, authentication_string, default_role, password_last_changed, password_history, status, login_attempts, lock_time from mo_catalog.mo_user where user_name = "%s" order by user_id;`
getPasswordOfUserFormat = `select user_id, authentication_string, default_role from mo_catalog.mo_user where user_name = "%s" order by user_id;`

getLockInfoOfUserFormat = `select status, login_attempts, lock_time from mo_catalog.mo_user where user_name = "%s" order by user_id;`

getExpiredTimeOfUserFormat = `select password_last_changed from mo_catalog.mo_user where user_name = "%s" order by user_id;`

getPasswordHistotyOfUsrFormat = `select password_history from mo_catalog.mo_user where user_name = "%s";`

Expand Down Expand Up @@ -1647,6 +1651,14 @@ func getPasswordHistotyOfUserSql(user string) string {
return fmt.Sprintf(getPasswordHistotyOfUsrFormat, user)
}

func getLockInfoOfUserSql(user string) string {
return fmt.Sprintf(getLockInfoOfUserFormat, user)
}

func getExpiredTimeOfUserSql(user string) string {
return fmt.Sprintf(getExpiredTimeOfUserFormat, user)
}

func getSqlForUpdatePasswordHistoryOfUser(passwordHistory, user string) string {
return fmt.Sprintf(updatePasswordHistoryOfUserFormat, passwordHistory, user)
}
Expand Down
Loading

0 comments on commit 4ef0467

Please sign in to comment.