Skip to content

Commit

Permalink
print debug message when waiting message timeout (#18468)
Browse files Browse the repository at this point in the history
print debug message when waiting message timeout
return err instead of panic

Approved by: @fengttt, @aunjgr, @m-schen
  • Loading branch information
badboynt1 authored Sep 3, 2024
1 parent c03cbb0 commit 7e3e503
Show file tree
Hide file tree
Showing 18 changed files with 270 additions and 72 deletions.
13 changes: 10 additions & 3 deletions pkg/sql/colexec/anti/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func (antiJoin *AntiJoin) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
antiJoin.build(anal, proc)
err = antiJoin.build(anal, proc)
if err != nil {
return result, err
}
ctr.state = Probe

case Probe:
Expand Down Expand Up @@ -137,15 +140,19 @@ func (antiJoin *AntiJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (antiJoin *AntiJoin) build(anal process.Analyze, proc *process.Process) {
func (antiJoin *AntiJoin) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &antiJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(antiJoin.JoinMapTag, antiJoin.IsShuffle, antiJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(antiJoin.JoinMapTag, antiJoin.IsShuffle, antiJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
ctr.batchRowCount = ctr.mp.GetRowCount()
return nil
}

func (ctr *container) emptyProbe(ap *AntiJoin, inbat *batch.Batch, proc *process.Process, result *vm.CallResult) error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/join/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func (innerJoin *InnerJoin) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
innerJoin.build(anal, proc)
err = innerJoin.build(anal, proc)
if err != nil {
return result, err
}

if ctr.mp == nil && !innerJoin.IsShuffle {
// for inner ,right and semi join, if hashmap is empty, we can finish this pipeline
Expand Down Expand Up @@ -138,15 +141,19 @@ func (innerJoin *InnerJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (innerJoin *InnerJoin) build(anal process.Analyze, proc *process.Process) {
func (innerJoin *InnerJoin) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &innerJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(innerJoin.JoinMapTag, innerJoin.IsShuffle, innerJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(innerJoin.JoinMapTag, innerJoin.IsShuffle, innerJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
ctr.batchRowCount = ctr.mp.GetRowCount()
return nil
}

func (ctr *container) probe(ap *InnerJoin, proc *process.Process, result *vm.CallResult) error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/left/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func (leftJoin *LeftJoin) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
leftJoin.build(anal, proc)
err = leftJoin.build(anal, proc)
if err != nil {
return result, err
}
ctr.state = Probe

case Probe:
Expand Down Expand Up @@ -150,15 +153,19 @@ func (leftJoin *LeftJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (leftJoin *LeftJoin) build(anal process.Analyze, proc *process.Process) {
func (leftJoin *LeftJoin) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &leftJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(leftJoin.JoinMapTag, leftJoin.IsShuffle, leftJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(leftJoin.JoinMapTag, leftJoin.IsShuffle, leftJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
ctr.batchRowCount = ctr.mp.GetRowCount()
return nil
}

func (ctr *container) emptyProbe(ap *LeftJoin, proc *process.Process, result *vm.CallResult) error {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colexec/loopjoin/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,11 @@ func (loopJoin *LoopJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (loopJoin *LoopJoin) build(proc *process.Process, anal process.Analyze) error {
func (loopJoin *LoopJoin) build(proc *process.Process, anal process.Analyze) (err error) {
start := time.Now()
defer anal.WaitStop(start)
loopJoin.ctr.mp = message.ReceiveJoinMap(loopJoin.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
return nil
loopJoin.ctr.mp, err = message.ReceiveJoinMap(loopJoin.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
return err
}

func (ctr *container) emptyProbe(ap *LoopJoin, proc *process.Process, result *vm.CallResult) error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/mark/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,15 @@ func (markJoin *MarkJoin) build(ap *MarkJoin, proc *process.Process, anal proces
ctr := &markJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
mp := message.ReceiveJoinMap(markJoin.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
mp, err := message.ReceiveJoinMap(markJoin.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if mp == nil {
return nil
}
batches := mp.GetBatches()
ctr.mp = mp
var err error
//maybe optimize this in the future
for i := range batches {
ctr.bat, err = ctr.bat.AppendWithCopy(proc.Ctx, proc.Mp(), batches[i])
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/product/product.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,15 @@ func (product *Product) build(proc *process.Process, anal process.Analyze) error
ctr := &product.ctr
start := time.Now()
defer anal.WaitStop(start)
mp := message.ReceiveJoinMap(product.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
mp, err := message.ReceiveJoinMap(product.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if mp == nil {
return nil
}
batches := mp.GetBatches()
var err error

//maybe optimize this in the future
for i := range batches {
ctr.bat, err = ctr.bat.AppendWithCopy(proc.Ctx, proc.Mp(), batches[i])
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/colexec/productl2/product_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,14 @@ func (productl2 *Productl2) build(proc *process.Process, anal process.Analyze) e
ctr := &productl2.ctr
start := time.Now()
defer anal.WaitStop(start)
mp := message.ReceiveJoinMap(productl2.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
mp, err := message.ReceiveJoinMap(productl2.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if mp == nil {
return nil
}
batches := mp.GetBatches()
var err error
//maybe optimize this in the future
for i := range batches {
ctr.bat, err = ctr.bat.AppendWithCopy(proc.Ctx, proc.Mp(), batches[i])
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/right/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,10 @@ func (rightJoin *RightJoin) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
rightJoin.build(analyze, proc)
err = rightJoin.build(analyze, proc)
if err != nil {
return result, err
}
if ctr.mp == nil && !rightJoin.IsShuffle {
// for inner ,right and semi join, if hashmap is empty, we can finish this pipeline
// shuffle join can't stop early for this moment
Expand Down Expand Up @@ -138,11 +141,14 @@ func (rightJoin *RightJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (rightJoin *RightJoin) build(anal process.Analyze, proc *process.Process) {
func (rightJoin *RightJoin) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &rightJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(rightJoin.JoinMapTag, rightJoin.IsShuffle, rightJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(rightJoin.JoinMapTag, rightJoin.IsShuffle, rightJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
Expand All @@ -152,6 +158,7 @@ func (rightJoin *RightJoin) build(anal process.Analyze, proc *process.Process) {
ctr.matched = &bitmap.Bitmap{}
ctr.matched.InitWithSize(ctr.batchRowCount)
}
return nil
}

func (ctr *container) sendLast(ap *RightJoin, proc *process.Process, analyze process.Analyze, _ bool, isLast bool, result *vm.CallResult) (bool, error) {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/rightanti/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ func (rightAnti *RightAnti) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
rightAnti.build(analyze, proc)
err = rightAnti.build(analyze, proc)
if err != nil {
return result, err
}
// for inner ,right and semi join, if hashmap is empty, we can finish this pipeline
// shuffle join can't stop early for this moment
if ctr.mp == nil && !rightAnti.IsShuffle {
Expand Down Expand Up @@ -139,11 +142,14 @@ func (rightAnti *RightAnti) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (rightAnti *RightAnti) build(anal process.Analyze, proc *process.Process) {
func (rightAnti *RightAnti) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &rightAnti.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(rightAnti.JoinMapTag, rightAnti.IsShuffle, rightAnti.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(rightAnti.JoinMapTag, rightAnti.IsShuffle, rightAnti.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
Expand All @@ -153,6 +159,7 @@ func (rightAnti *RightAnti) build(anal process.Analyze, proc *process.Process) {
ctr.matched = &bitmap.Bitmap{}
ctr.matched.InitWithSize(ctr.batchRowCount)
}
return nil
}

func (ctr *container) sendLast(ap *RightAnti, proc *process.Process, analyze process.Analyze, _ bool, isLast bool) (bool, error) {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/rightsemi/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ func (rightSemi *RightSemi) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
rightSemi.build(analyze, proc)
err = rightSemi.build(analyze, proc)
if err != nil {
return result, err
}
if ctr.mp == nil && !rightSemi.IsShuffle {
// for inner ,right and semi join, if hashmap is empty, we can finish this pipeline
// shuffle join can't stop early for this moment
Expand Down Expand Up @@ -137,11 +140,14 @@ func (rightSemi *RightSemi) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (rightSemi *RightSemi) build(anal process.Analyze, proc *process.Process) {
func (rightSemi *RightSemi) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &rightSemi.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(rightSemi.JoinMapTag, rightSemi.IsShuffle, rightSemi.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(rightSemi.JoinMapTag, rightSemi.IsShuffle, rightSemi.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
Expand All @@ -151,6 +157,7 @@ func (rightSemi *RightSemi) build(anal process.Analyze, proc *process.Process) {
ctr.matched = &bitmap.Bitmap{}
ctr.matched.InitWithSize(ctr.batchRowCount)
}
return nil
}

func (ctr *container) sendLast(ap *RightSemi, proc *process.Process, analyze process.Analyze, _ bool, isLast bool) (bool, error) {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/semi/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func (semiJoin *SemiJoin) Call(proc *process.Process) (vm.CallResult, error) {
for {
switch ctr.state {
case Build:
semiJoin.build(anal, proc)
err = semiJoin.build(anal, proc)
if err != nil {
return result, err
}
if ctr.mp == nil && !semiJoin.IsShuffle {
// for inner ,right and semi join, if hashmap is empty, we can finish this pipeline
// shuffle join can't stop early for this moment
Expand Down Expand Up @@ -159,14 +162,18 @@ func (semiJoin *SemiJoin) Call(proc *process.Process) (vm.CallResult, error) {
}
}

func (semiJoin *SemiJoin) build(anal process.Analyze, proc *process.Process) {
func (semiJoin *SemiJoin) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &semiJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(semiJoin.JoinMapTag, semiJoin.IsShuffle, semiJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(semiJoin.JoinMapTag, semiJoin.IsShuffle, semiJoin.ShuffleIdx, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
return nil
}

func (ctr *container) probe(bat *batch.Batch, ap *SemiJoin, proc *process.Process, result *vm.CallResult) error {
Expand Down
13 changes: 10 additions & 3 deletions pkg/sql/colexec/single/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func (singleJoin *SingleJoin) Call(proc *process.Process) (vm.CallResult, error)
for {
switch ctr.state {
case Build:
singleJoin.build(anal, proc)
err = singleJoin.build(anal, proc)
if err != nil {
return result, err
}
ctr.state = Probe

case Probe:
Expand Down Expand Up @@ -147,14 +150,18 @@ func (singleJoin *SingleJoin) Call(proc *process.Process) (vm.CallResult, error)
}
}
}
func (singleJoin *SingleJoin) build(anal process.Analyze, proc *process.Process) {
func (singleJoin *SingleJoin) build(anal process.Analyze, proc *process.Process) (err error) {
ctr := &singleJoin.ctr
start := time.Now()
defer anal.WaitStop(start)
ctr.mp = message.ReceiveJoinMap(singleJoin.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
ctr.mp, err = message.ReceiveJoinMap(singleJoin.JoinMapTag, false, 0, proc.GetMessageBoard(), proc.Ctx)
if err != nil {
return err
}
if ctr.mp != nil {
ctr.maxAllocSize = max(ctr.maxAllocSize, ctr.mp.Size())
}
return nil
}

func (ctr *container) emptyProbe(bat *batch.Batch, ap *SingleJoin, result *vm.CallResult) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/compile/compile2.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *Compile) Run(_ uint64) (queryResult *util2.RunResult, err error) {
for {
// build query context and pipeline contexts for the current run.
runC.InitPipelineContextToExecuteQuery()

runC.MessageBoard.BeforeRunonce()
if err = runC.runOnce(); err == nil {
break
}
Expand Down
Loading

0 comments on commit 7e3e503

Please sign in to comment.