Skip to content

Commit

Permalink
abort all active txn on invalid service and make new txn can work to …
Browse files Browse the repository at this point in the history
…2.0 (#19749)

abort all active txn on invalid service and make new txn can work

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
zhangxu19830126 authored Nov 2, 2024
1 parent fd77e50 commit 3d3bf86
Show file tree
Hide file tree
Showing 31 changed files with 1,145 additions and 351 deletions.
21 changes: 14 additions & 7 deletions pkg/common/moerr/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ const (
ErrLockConflict uint16 = 20706
// ErrLockNeedUpgrade row level lock is too large that need upgrade to table level lock
ErrLockNeedUpgrade uint16 = 20707
// ErrCannotCommitOnInvalidCN cannot commit transaction on invalid CN
ErrCannotCommitOnInvalidCN uint16 = 20708

// Group 8: partition
ErrPartitionFunctionIsNotAllowed uint16 = 20801
Expand Down Expand Up @@ -466,13 +468,14 @@ var errorMsgRefer = map[uint16]moErrorMsgItem{
ErrCantDelGCChecker: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "can't delete gc checker"},

// Group 7: lock service
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
ErrLockTableBindChanged: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock table bind changed"},
ErrLockTableNotFound: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock table not found on remote lock service"},
ErrDeadlockCheckBusy: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock check is busy"},
ErrCannotCommitOrphan: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "cannot commit a orphan transaction"},
ErrLockConflict: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock options conflict, wait policy is fast fail"},
ErrLockNeedUpgrade: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "row level lock is too large that need upgrade to table level lock"},
ErrDeadLockDetected: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock detected"},
ErrLockTableBindChanged: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock table bind changed"},
ErrLockTableNotFound: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock table not found on remote lock service"},
ErrDeadlockCheckBusy: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "deadlock check is busy"},
ErrCannotCommitOrphan: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "cannot commit a orphan transaction"},
ErrLockConflict: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "lock options conflict, wait policy is fast fail"},
ErrLockNeedUpgrade: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "row level lock is too large that need upgrade to table level lock"},
ErrCannotCommitOnInvalidCN: {ER_UNKNOWN_ERROR, []string{MySQLDefaultSqlState}, "cannot commit a orphan transaction on invalid cn"},

// Group 8: partition
ErrPartitionFunctionIsNotAllowed: {ER_PARTITION_FUNCTION_IS_NOT_ALLOWED, []string{MySQLDefaultSqlState}, "This partition function is not allowed"},
Expand Down Expand Up @@ -1266,6 +1269,10 @@ func NewCannotCommitOrphan(ctx context.Context) *Error {
return newError(ctx, ErrCannotCommitOrphan)
}

func NewCannotCommitOnInvalidCN(ctx context.Context) *Error {
return newError(ctx, ErrCannotCommitOnInvalidCN)
}

func NewLockTableBindChanged(ctx context.Context) *Error {
return newError(ctx, ErrLockTableBindChanged)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/moerr/error_no_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,10 @@ func NewCannotCommitOrphanNoCtx() *Error {
return NewCannotCommitOrphan(Context())
}

func NewCannotCommitOnInvalidCNNoCtx() *Error {
return NewCannotCommitOnInvalidCN(Context())
}

func NewLockTableBindChangedNoCtx() *Error {
return newError(Context(), ErrLockTableBindChanged)
}
Expand Down
65 changes: 46 additions & 19 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/common/log"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/stopper"
"github.com/matrixorigin/matrixone/pkg/common/util"
pb "github.com/matrixorigin/matrixone/pkg/pb/lock"
"go.uber.org/zap"
)

type lockTableAllocator struct {
Expand Down Expand Up @@ -113,7 +112,7 @@ func (l *lockTableAllocator) Get(
sharding pb.Sharding) pb.LockTable {
binds := l.getServiceBinds(serviceID)
if binds == nil {
binds = l.registerService(serviceID, tableID)
binds = l.registerService(serviceID)
}
return l.registerBind(binds, group, tableID, originTableID, sharding)
}
Expand All @@ -140,6 +139,15 @@ func (l *lockTableAllocator) AddCannotCommit(values []pb.OrphanTxn) [][]byte {
return committing
}

func (l *lockTableAllocator) AddInvalidService(serviceID string) {
l.inactiveService.Store(serviceID, time.Now())
}

func (l *lockTableAllocator) HasInvalidService(serviceID string) bool {
_, ok := l.inactiveService.Load(serviceID)
return ok
}

func (l *lockTableAllocator) Valid(
serviceID string,
txnID []byte,
Expand Down Expand Up @@ -170,8 +178,9 @@ func (l *lockTableAllocator) Valid(

if _, ok := l.inactiveService.Load(serviceID); ok {
l.logger.Info("inactive service",
zap.String("serviceID", serviceID))
return nil, moerr.NewCannotCommitOrphanNoCtx()
zap.String("serviceID", serviceID),
)
return nil, moerr.NewCannotCommitOnInvalidCNNoCtx()
}

c := l.getCtl(serviceID)
Expand Down Expand Up @@ -374,7 +383,6 @@ func (l *lockTableAllocator) getTimeoutBinds(now time.Time) []*serviceBinds {

func (l *lockTableAllocator) registerService(
serviceID string,
tableID uint64,
) *serviceBinds {
l.mu.Lock()
defer l.mu.Unlock()
Expand Down Expand Up @@ -763,7 +771,8 @@ func (l *lockTableAllocator) initServer(cfg morpc.Config) {
func (l *lockTableAllocator) initHandler() {
l.server.RegisterMethodHandler(
pb.Method_GetBind,
l.handleGetBind)
l.handleGetBind,
)

l.server.RegisterMethodHandler(
pb.Method_KeepLockTableBind,
Expand All @@ -787,11 +796,18 @@ func (l *lockTableAllocator) initHandler() {

l.server.RegisterMethodHandler(
pb.Method_CannotCommit,
l.handleCannotCommit)
l.handleCannotCommit,
)

l.server.RegisterMethodHandler(
pb.Method_CheckOrphan,
l.handleCheckOrphan)
l.handleCheckOrphan,
)

l.server.RegisterMethodHandler(
pb.Method_ResumeInvalidCN,
l.handleResumeInvalidCN,
)
}

func (l *lockTableAllocator) handleGetBind(
Expand All @@ -801,7 +817,7 @@ func (l *lockTableAllocator) handleGetBind(
resp *pb.Response,
cs morpc.ClientSession) {
if !l.canGetBind(req.GetBind.ServiceID) {
writeResponse(ctx, l.logger, cancel, resp, moerr.NewNewTxnInCNRollingRestart(), cs)
writeResponse(l.logger, cancel, resp, moerr.NewNewTxnInCNRollingRestart(), cs)
return
}
resp.GetBind.LockTable = l.Get(
Expand All @@ -810,7 +826,7 @@ func (l *lockTableAllocator) handleGetBind(
req.GetBind.Table,
req.GetBind.OriginTable,
req.GetBind.Sharding)
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) handleKeepLockTableBind(
Expand All @@ -822,7 +838,7 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
resp.KeepLockTableBind.OK = l.KeepLockTableBind(req.KeepLockTableBind.ServiceID)
if !resp.KeepLockTableBind.OK {
// resp.KeepLockTableBind.Status = pb.Status_ServiceCanRestart
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
return
}
b := l.getServiceBinds(req.KeepLockTableBind.ServiceID)
Expand All @@ -832,7 +848,7 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
zap.String("serviceID", b.serviceID),
zap.String("status", req.KeepLockTableBind.Status.String()))
} else {
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
return
}
}
Expand All @@ -852,7 +868,7 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
resp.KeepLockTableBind.Status = req.KeepLockTableBind.Status
}
l.disableGroupTables(req.KeepLockTableBind.LockTables, b)
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) handleSetRestartService(
Expand All @@ -863,7 +879,7 @@ func (l *lockTableAllocator) handleSetRestartService(
cs morpc.ClientSession) {
l.setRestartService(req.SetRestartService.ServiceID)
resp.SetRestartService.OK = true
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) handleCanRestartService(
Expand All @@ -873,7 +889,7 @@ func (l *lockTableAllocator) handleCanRestartService(
resp *pb.Response,
cs morpc.ClientSession) {
resp.CanRestartService.OK = l.canRestartService(req.CanRestartService.ServiceID)
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) handleRemainTxnInService(
Expand All @@ -883,7 +899,7 @@ func (l *lockTableAllocator) handleRemainTxnInService(
resp *pb.Response,
cs morpc.ClientSession) {
resp.RemainTxnInService.RemainTxn = l.remainTxnInService(req.RemainTxnInService.ServiceID)
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) getLockTablesLocked(group uint32) map[uint64]pb.LockTable {
Expand All @@ -904,7 +920,7 @@ func (l *lockTableAllocator) handleCannotCommit(
cs morpc.ClientSession) {
committingTxn := l.AddCannotCommit(req.CannotCommit.OrphanTxnList)
resp.CannotCommit.CommittingTxn = committingTxn
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) handleCheckOrphan(
Expand All @@ -916,7 +932,18 @@ func (l *lockTableAllocator) handleCheckOrphan(
c := l.getCtl(req.CheckOrphan.ServiceID)
state, ok := c.getCtlState(util.UnsafeBytesToString(req.CheckOrphan.Txn))
resp.CheckOrphan.Orphan = ok && state == cannotCommitState
writeResponse(ctx, l.logger, cancel, resp, nil, cs)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) handleResumeInvalidCN(
ctx context.Context,
cancel context.CancelFunc,
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession,
) {
l.inactiveService.Delete(req.ResumeInvalidCN.ServiceID)
writeResponse(l.logger, cancel, resp, nil, cs)
}

func (l *lockTableAllocator) getCtl(serviceID string) *commitCtl {
Expand Down
8 changes: 4 additions & 4 deletions pkg/lockservice/lock_table_keeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestKeeper(t *testing.T) {
if n1.Add(1) == 10 {
close(c1)
}
writeResponse(ctx, getLogger(""), cancel, resp, nil, cs)
writeResponse(getLogger(""), cancel, resp, nil, cs)
})
s.RegisterMethodHandler(
pb.Method_KeepRemoteLock,
Expand All @@ -60,7 +60,7 @@ func TestKeeper(t *testing.T) {
if n2.Add(1) == 10 {
close(c2)
}
writeResponse(ctx, getLogger(""), cancel, resp, nil, cs)
writeResponse(getLogger(""), cancel, resp, nil, cs)
})
m := &lockTableHolders{service: "s1", holders: map[uint32]*lockTableHolder{}}
m.set(
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestKeepBindFailedWillRemoveAllLocalLockTable(t *testing.T) {
resp *pb.Response,
cs morpc.ClientSession) {
resp.KeepLockTableBind.OK = false
writeResponse(ctx, getLogger(""), cancel, resp, nil, cs)
writeResponse(getLogger(""), cancel, resp, nil, cs)
})

s.RegisterMethodHandler(
Expand All @@ -130,7 +130,7 @@ func TestKeepBindFailedWillRemoveAllLocalLockTable(t *testing.T) {
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
writeResponse(ctx, getLogger(""), cancel, resp, nil, cs)
writeResponse(getLogger(""), cancel, resp, nil, cs)
})

m := &lockTableHolders{service: "s1", holders: map[uint32]*lockTableHolder{}}
Expand Down
11 changes: 5 additions & 6 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ func (l *localLockTable) unlock(

logUnlockTableOnLocal(
l.logger,
l.bind.ServiceID,
txn,
l.bind,
)
Expand Down Expand Up @@ -586,7 +585,7 @@ func (l *localLockTable) addRangeLockLocked(
prevStartKey,
key, keyLock,
mc,
c.txn)
)
prevStartKey = nil
rangeStartEncountered = false
return bytes.Compare(key, end) < 0
Expand Down Expand Up @@ -632,7 +631,7 @@ func (l *localLockTable) addRangeLockLocked(
prevStartKey,
key, keyLock,
mc,
c.txn)
)
}
break
}
Expand Down Expand Up @@ -662,7 +661,7 @@ func (l *localLockTable) mergeRangeLocked(
seekKey []byte,
seekLock Lock,
mc *mergeContext,
txn *activeTxn) ([]byte, []byte) {
) ([]byte, []byte) {
// range lock encountered a row lock
if seekLock.isLockRow() {
// 5 + [1, 4] => [1, 4] + [5]
Expand Down Expand Up @@ -766,10 +765,10 @@ func (c *mergeContext) commit(
s.Delete(util.UnsafeStringToBytes(k))
}
txn.lockRemoved(
bind.ServiceID,
bind.Group,
bind.Table,
c.mergedLocks)
c.mergedLocks,
)

for _, q := range c.mergedWaiters {
// release ref in merged waiters. The ref is moved to c.to.
Expand Down
Loading

0 comments on commit 3d3bf86

Please sign in to comment.