Skip to content

Commit

Permalink
improve deadlock check (#10772)
Browse files Browse the repository at this point in the history
deadlock channel will blocked.

Approved by: @w-zr
  • Loading branch information
zhangxu19830126 authored Jul 20, 2023
1 parent 216bf2a commit 44eef4b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 12 deletions.
32 changes: 26 additions & 6 deletions pkg/lockservice/deadlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
pb "github.com/matrixorigin/matrixone/pkg/pb/lock"
)

var (
maxWaitingCheckCount = 10240
)

type detector struct {
serviceID string
c chan deadlockTxn
Expand All @@ -33,8 +37,9 @@ type detector struct {
ignoreTxns sync.Map // txnID -> any
stopper *stopper.Stopper
mu struct {
sync.RWMutex
closed bool
sync.Mutex
closed bool
activeCheckTxn map[string]struct{}
}
}

Expand All @@ -48,12 +53,13 @@ func newDeadlockDetector(
waitTxnAbortFunc func(pb.WaitTxn)) *detector {
d := &detector{
serviceID: serviceID,
c: make(chan deadlockTxn, 1024),
c: make(chan deadlockTxn, maxWaitingCheckCount),
waitTxnsFetchFunc: waitTxnsFetchFunc,
waitTxnAbortFunc: waitTxnAbortFunc,
stopper: stopper.NewStopper("deadlock-detector",
stopper.WithLogger(getLogger().RawLogger())),
}
d.mu.activeCheckTxn = make(map[string]struct{}, maxWaitingCheckCount)
err := d.stopper.RunTask(d.doCheck)
if err != nil {
panic("impossible")
Expand All @@ -77,15 +83,26 @@ func (d *detector) txnClosed(txnID []byte) {
func (d *detector) check(
holdTxnID []byte,
txn pb.WaitTxn) error {
d.mu.RLock()
defer d.mu.RUnlock()
d.mu.Lock()
defer d.mu.Unlock()
if d.mu.closed {
return ErrDeadlockDetectorClosed
}

d.c <- deadlockTxn{
key := util.UnsafeBytesToString(txn.TxnID)
if _, ok := d.mu.activeCheckTxn[key]; ok {
return nil
}
d.mu.activeCheckTxn[key] = struct{}{}

select {
case d.c <- deadlockTxn{
holdTxnID: holdTxnID,
waitTxn: txn,
}:
default:
// too many txns waiting for deadlock check, just return error
return ErrDeadlockDetectorClosed
}
return nil
}
Expand All @@ -108,6 +125,9 @@ func (d *detector) doCheck(ctx context.Context) {
d.ignoreTxns.Store(v, struct{}{})
d.waitTxnAbortFunc(txn.waitTxn)
}
d.mu.Lock()
delete(d.mu.activeCheckTxn, util.UnsafeBytesToString(txn.waitTxn.TxnID))
d.mu.Unlock()
}
}
}
Expand Down
35 changes: 29 additions & 6 deletions pkg/lockservice/lock_table_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,20 @@ func (l *localLockTable) acquireRowLockLocked(c lockContext) lockContext {
// txn2/op2 added into txn2/op1's same txn list
// txn1 unlock, notify txn2/op1
// txn2/op3 get lock before txn2/op1 get notify
if len(c.w.sameTxnWaiters) > 0 {
c.w.notifySameTxn(l.bind.ServiceID, notifyValue{})
}
str := c.w.String()
if v := c.w.close(l.bind.ServiceID, notifyValue{}); v != nil {
panic("BUG: waiters should be empty, " + str + "," + v.String() + ", " + fmt.Sprintf("table(%d) %+v", l.bind.Table, key))
// TODO: add more test
for _, w := range c.w.waiters.all() {
if bytes.Equal(lock.txnID, w.txnID) {
w.close(l.bind.ServiceID, notifyValue{})
continue
}

lock.waiter.add(l.bind.ServiceID, false, w)
if err := l.detector.check(c.w.txnID, w.belongTo); err != nil {
panic("BUG: active dead lock check can not fail")
}
}
c.w.waiters.reset()
c.w.close(l.bind.ServiceID, notifyValue{})
c.w = nil
}
continue
Expand Down Expand Up @@ -347,6 +354,9 @@ func (l *localLockTable) handleLockConflictLocked(
w *waiter,
key []byte,
conflictWith Lock) {
childWaiters := w.waiters.all()
w.waiters.reset()

// find conflict, and wait prev txn completed, and a new
// waiter added, we need to active deadlock check.
txn.setBlocked(w.txnID, w)
Expand All @@ -358,6 +368,19 @@ func (l *localLockTable) handleLockConflictLocked(
true)); err != nil {
panic("BUG: active dead lock check can not fail")
}

// add child waiters into current locks waiting list
for _, v := range childWaiters {
if bytes.Equal(v.txnID, conflictWith.txnID) {
v.notify(l.bind.ServiceID, notifyValue{})
continue
}

conflictWith.waiter.add(l.bind.ServiceID, false, v)
if err := l.detector.check(conflictWith.txnID, v.belongTo); err != nil {
panic("BUG: active dead lock check can not fail")
}
}
logLocalLockWaitOn(l.bind.ServiceID, txn, l.bind.Table, w, key, conflictWith)
}

Expand Down

0 comments on commit 44eef4b

Please sign in to comment.