From 44eef4b885de2ca0da1539052f18c7c6d905b3e4 Mon Sep 17 00:00:00 2001 From: fagongzi Date: Thu, 20 Jul 2023 10:22:38 +0800 Subject: [PATCH] improve deadlock check (#10772) deadlock channel will blocked. Approved by: @w-zr --- pkg/lockservice/deadlock.go | 32 +++++++++++++++++++++----- pkg/lockservice/lock_table_local.go | 35 ++++++++++++++++++++++++----- 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/pkg/lockservice/deadlock.go b/pkg/lockservice/deadlock.go index 5e1a10ccb5ec..1adbf3dfadae 100644 --- a/pkg/lockservice/deadlock.go +++ b/pkg/lockservice/deadlock.go @@ -25,6 +25,10 @@ import ( pb "github.com/matrixorigin/matrixone/pkg/pb/lock" ) +var ( + maxWaitingCheckCount = 10240 +) + type detector struct { serviceID string c chan deadlockTxn @@ -33,8 +37,9 @@ type detector struct { ignoreTxns sync.Map // txnID -> any stopper *stopper.Stopper mu struct { - sync.RWMutex - closed bool + sync.Mutex + closed bool + activeCheckTxn map[string]struct{} } } @@ -48,12 +53,13 @@ func newDeadlockDetector( waitTxnAbortFunc func(pb.WaitTxn)) *detector { d := &detector{ serviceID: serviceID, - c: make(chan deadlockTxn, 1024), + c: make(chan deadlockTxn, maxWaitingCheckCount), waitTxnsFetchFunc: waitTxnsFetchFunc, waitTxnAbortFunc: waitTxnAbortFunc, stopper: stopper.NewStopper("deadlock-detector", stopper.WithLogger(getLogger().RawLogger())), } + d.mu.activeCheckTxn = make(map[string]struct{}, maxWaitingCheckCount) err := d.stopper.RunTask(d.doCheck) if err != nil { panic("impossible") @@ -77,15 +83,26 @@ func (d *detector) txnClosed(txnID []byte) { func (d *detector) check( holdTxnID []byte, txn pb.WaitTxn) error { - d.mu.RLock() - defer d.mu.RUnlock() + d.mu.Lock() + defer d.mu.Unlock() if d.mu.closed { return ErrDeadlockDetectorClosed } - d.c <- deadlockTxn{ + key := util.UnsafeBytesToString(txn.TxnID) + if _, ok := d.mu.activeCheckTxn[key]; ok { + return nil + } + d.mu.activeCheckTxn[key] = struct{}{} + + select { + case d.c <- deadlockTxn{ holdTxnID: holdTxnID, waitTxn: txn, + }: + default: + // too many txns waiting for deadlock check, just return error + return ErrDeadlockDetectorClosed } return nil } @@ -108,6 +125,9 @@ func (d *detector) doCheck(ctx context.Context) { d.ignoreTxns.Store(v, struct{}{}) d.waitTxnAbortFunc(txn.waitTxn) } + d.mu.Lock() + delete(d.mu.activeCheckTxn, util.UnsafeBytesToString(txn.waitTxn.TxnID)) + d.mu.Unlock() } } } diff --git a/pkg/lockservice/lock_table_local.go b/pkg/lockservice/lock_table_local.go index d70c74e18009..d9fa6685e52f 100644 --- a/pkg/lockservice/lock_table_local.go +++ b/pkg/lockservice/lock_table_local.go @@ -264,13 +264,20 @@ func (l *localLockTable) acquireRowLockLocked(c lockContext) lockContext { // txn2/op2 added into txn2/op1's same txn list // txn1 unlock, notify txn2/op1 // txn2/op3 get lock before txn2/op1 get notify - if len(c.w.sameTxnWaiters) > 0 { - c.w.notifySameTxn(l.bind.ServiceID, notifyValue{}) - } - str := c.w.String() - if v := c.w.close(l.bind.ServiceID, notifyValue{}); v != nil { - panic("BUG: waiters should be empty, " + str + "," + v.String() + ", " + fmt.Sprintf("table(%d) %+v", l.bind.Table, key)) + // TODO: add more test + for _, w := range c.w.waiters.all() { + if bytes.Equal(lock.txnID, w.txnID) { + w.close(l.bind.ServiceID, notifyValue{}) + continue + } + + lock.waiter.add(l.bind.ServiceID, false, w) + if err := l.detector.check(c.w.txnID, w.belongTo); err != nil { + panic("BUG: active dead lock check can not fail") + } } + c.w.waiters.reset() + c.w.close(l.bind.ServiceID, notifyValue{}) c.w = nil } continue @@ -347,6 +354,9 @@ func (l *localLockTable) handleLockConflictLocked( w *waiter, key []byte, conflictWith Lock) { + childWaiters := w.waiters.all() + w.waiters.reset() + // find conflict, and wait prev txn completed, and a new // waiter added, we need to active deadlock check. txn.setBlocked(w.txnID, w) @@ -358,6 +368,19 @@ func (l *localLockTable) handleLockConflictLocked( true)); err != nil { panic("BUG: active dead lock check can not fail") } + + // add child waiters into current locks waiting list + for _, v := range childWaiters { + if bytes.Equal(v.txnID, conflictWith.txnID) { + v.notify(l.bind.ServiceID, notifyValue{}) + continue + } + + conflictWith.waiter.add(l.bind.ServiceID, false, v) + if err := l.detector.check(conflictWith.txnID, v.belongTo); err != nil { + panic("BUG: active dead lock check can not fail") + } + } logLocalLockWaitOn(l.bind.ServiceID, txn, l.bind.Table, w, key, conflictWith) }