Skip to content

Commit

Permalink
convert some pipelines to run locally, as they will cause hung due to…
Browse files Browse the repository at this point in the history
… receiver lost. (#19467)

由于某些pipeline需要往其他pipeline发送数据(例如insert到唯一约束的列,需要发送一份数据到去重pipeline),无法发送到远程单独执行,这会导致找不到dispatch算子的数据接收者而卡死的问题。

对于这些pipeline在记录info级别的log后转换为本地执行。

Approved by: @ouyuanning
  • Loading branch information
m-schen authored Oct 19, 2024
1 parent 09afaa9 commit e1ce288
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 1 deletion.
1 change: 0 additions & 1 deletion pkg/sql/compile/remoterun.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,6 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc
PipelineId: ctx0.id,
}
}

if len(t.RemoteRegs) > 0 {
in.Dispatch.RemoteConnector = make([]*pipeline.WrapNode, len(t.RemoteRegs))
for i := range t.RemoteRegs {
Expand Down
70 changes: 70 additions & 0 deletions pkg/sql/compile/remoterunClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,76 @@ func (s *Scope) remoteRun(c *Compile) (sender *messageSenderOnClient, err error)
return sender, err
}

// checkPipelineStandaloneExecutableAtRemote is responsible for checking the standalone excitability of the pipeline
// once it was sent to other remote node.
//
// it returns true if the pipeline has only the root operator capable of sending data to other outer pipeline.
func checkPipelineStandaloneExecutableAtRemote(s *Scope) bool {
var regs = make(map[*process.WaitRegister]struct{})
var toScan []*Scope
// record which mergeReceivers this scope tree holds.
{
toScan = append(toScan, s)
for len(toScan) > 0 {
node := toScan[len(toScan)-1]
toScan = toScan[:len(toScan)-1]

if len(node.PreScopes) > 0 {
toScan = append(toScan, node.PreScopes...)
}

for i := range node.Proc.Reg.MergeReceivers {
regs[node.Proc.Reg.MergeReceivers[i]] = struct{}{}
}
}
}

// check if there are target channels from other trees.
{
if len(s.PreScopes) > 0 {
toScan = append(toScan, s.PreScopes...)
}

for len(toScan) > 0 {
node := toScan[len(toScan)-1]
toScan = toScan[:len(toScan)-1]

if len(node.PreScopes) > 0 {
toScan = append(toScan, node.PreScopes...)
}

if node.RootOp.OpType() == vm.Dispatch {
t := node.RootOp.(*dispatch.Dispatch)
for i := range t.LocalRegs {
if _, ok := regs[t.LocalRegs[i]]; !ok {
s.Proc.Infof(
s.Proc.Ctx,
"txn id : %s, the pipeline %p convert to execute locally because it holds a dispatch operator will send data to other local pipeline tree.",
s.Proc.GetTxnOperator().Txn().ID, s)

return false
}
}
continue
}
if node.RootOp.OpType() == vm.Connector {
t := node.RootOp.(*connector.Connector)
if _, ok := regs[t.Reg]; !ok {
s.Proc.Infof(
s.Proc.Ctx,
"txn id : %s, the pipeline %p convert to execute locally because it holds a connector operator will send data to other local pipeline tree.",
s.Proc.GetTxnOperator().Txn().ID, s)

return false
}
continue
}
}
}

return true
}

func prepareRemoteRunSendingData(sqlStr string, s *Scope) (scopeData []byte, withoutOutput bool, processData []byte, err error) {
// if simpleRun is true, it indicates that this pipeline will not produce any output.
withoutOutput = true
Expand Down
105 changes: 105 additions & 0 deletions pkg/sql/compile/remoterun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ type fakeTxnOperator struct {
client.TxnOperator
}

func (f fakeTxnOperator) Txn() txn.TxnMeta {
return txn.TxnMeta{
ID: []byte("test"),
}
}

func (f fakeTxnOperator) Snapshot() (txn.CNTxnSnapshot, error) {
return txn.CNTxnSnapshot{}, nil
}
Expand Down Expand Up @@ -652,3 +658,102 @@ func Test_ReceiveMessageFromCnServer(t *testing.T) {
require.NotNil(t, receiveMessageFromCnServer(s4, false, &sender))
}
}

func Test_checkPipelineStandaloneExecutableAtRemote(t *testing.T) {
proc := testutil.NewProcess()
proc.Base.TxnOperator = fakeTxnOperator{}
// a standalone pipeline tree should return true.
{
// s0, pre: s1, s2
s0 := &Scope{
Proc: proc.NewContextChildProc(2),
RootOp: dispatch.NewArgument(),
}

s1 := &Scope{
Proc: proc.NewContextChildProc(0),
}
op1 := connector.NewArgument()
op1.Reg = s0.Proc.Reg.MergeReceivers[0]
s1.RootOp = op1

s2 := &Scope{
Proc: proc.NewContextChildProc(0),
}
op2 := dispatch.NewArgument()
op2.LocalRegs = []*process.WaitRegister{s0.Proc.Reg.MergeReceivers[1]}
s2.RootOp = op2

s0.PreScopes = append(s0.PreScopes, s1, s2)

require.True(t, checkPipelineStandaloneExecutableAtRemote(s0))
}

// a pipeline holds an invalid dispatch should return false.
{
// s0, pre: s1
s0 := &Scope{
Proc: proc.NewContextChildProc(1),
RootOp: dispatch.NewArgument(),
}

s1 := &Scope{
Proc: proc.NewContextChildProc(0),
}
op1 := dispatch.NewArgument()
op1.LocalRegs = []*process.WaitRegister{{}}
s1.RootOp = op1

s0.PreScopes = append(s0.PreScopes, s1)

require.False(t, checkPipelineStandaloneExecutableAtRemote(s0))
}

// a pipeline holds an invalid connector should return false.
{
// s0, pre: s1
s0 := &Scope{
Proc: proc.NewContextChildProc(1),
RootOp: dispatch.NewArgument(),
}

s1 := &Scope{
Proc: proc.NewContextChildProc(0),
}
op1 := connector.NewArgument()
op1.Reg = &process.WaitRegister{}
s1.RootOp = op1

s0.PreScopes = append(s0.PreScopes, s1)

require.False(t, checkPipelineStandaloneExecutableAtRemote(s0))
}

// depth more than 2.
{
// s0, pre: s1, pre: s2.
s0 := &Scope{
Proc: proc.NewContextChildProc(1),
RootOp: dispatch.NewArgument(),
}

s1 := &Scope{
Proc: proc.NewContextChildProc(1),
}
op1 := connector.NewArgument()
op1.Reg = s0.Proc.Reg.MergeReceivers[0]
s1.RootOp = op1

s2 := &Scope{
Proc: proc.NewContextChildProc(0),
}
op2 := connector.NewArgument()
op2.Reg = &process.WaitRegister{}
s2.RootOp = op2

s0.PreScopes = append(s0.PreScopes, s1)
s1.PreScopes = append(s1.PreScopes, s2)

require.False(t, checkPipelineStandaloneExecutableAtRemote(s0))
}
}
3 changes: 3 additions & 0 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ func (s *Scope) RemoteRun(c *Compile) error {
if !s.canRemote(c, true) {
return s.MergeRun(c)
}
if !checkPipelineStandaloneExecutableAtRemote(s) {
return s.MergeRun(c)
}

runtime.ServiceRuntime(s.Proc.GetService()).Logger().
Debug("remote run pipeline",
Expand Down

0 comments on commit e1ce288

Please sign in to comment.