From e1ce288fe92b5f5e6c69ee42026ae1df7a815780 Mon Sep 17 00:00:00 2001 From: chenmingsong <59043531+m-schen@users.noreply.github.com> Date: Sat, 19 Oct 2024 22:36:50 +0800 Subject: [PATCH] convert some pipelines to run locally, as they will cause hung due to receiver lost. (#19467) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 由于某些pipeline需要往其他pipeline发送数据(例如insert到唯一约束的列,需要发送一份数据到去重pipeline),无法发送到远程单独执行,这会导致找不到dispatch算子的数据接收者而卡死的问题。 对于这些pipeline在记录info级别的log后转换为本地执行。 Approved by: @ouyuanning --- pkg/sql/compile/remoterun.go | 1 - pkg/sql/compile/remoterunClient.go | 70 +++++++++++++++++++ pkg/sql/compile/remoterun_test.go | 105 +++++++++++++++++++++++++++++ pkg/sql/compile/scope.go | 3 + 4 files changed, 178 insertions(+), 1 deletion(-) diff --git a/pkg/sql/compile/remoterun.go b/pkg/sql/compile/remoterun.go index 481f2d9f8097..59882a84d687 100644 --- a/pkg/sql/compile/remoterun.go +++ b/pkg/sql/compile/remoterun.go @@ -511,7 +511,6 @@ func convertToPipelineInstruction(op vm.Operator, proc *process.Process, ctx *sc PipelineId: ctx0.id, } } - if len(t.RemoteRegs) > 0 { in.Dispatch.RemoteConnector = make([]*pipeline.WrapNode, len(t.RemoteRegs)) for i := range t.RemoteRegs { diff --git a/pkg/sql/compile/remoterunClient.go b/pkg/sql/compile/remoterunClient.go index 553a398c8fc5..03a3f9b6a072 100644 --- a/pkg/sql/compile/remoterunClient.go +++ b/pkg/sql/compile/remoterunClient.go @@ -92,6 +92,76 @@ func (s *Scope) remoteRun(c *Compile) (sender *messageSenderOnClient, err error) return sender, err } +// checkPipelineStandaloneExecutableAtRemote is responsible for checking the standalone excitability of the pipeline +// once it was sent to other remote node. +// +// it returns true if the pipeline has only the root operator capable of sending data to other outer pipeline. +func checkPipelineStandaloneExecutableAtRemote(s *Scope) bool { + var regs = make(map[*process.WaitRegister]struct{}) + var toScan []*Scope + // record which mergeReceivers this scope tree holds. + { + toScan = append(toScan, s) + for len(toScan) > 0 { + node := toScan[len(toScan)-1] + toScan = toScan[:len(toScan)-1] + + if len(node.PreScopes) > 0 { + toScan = append(toScan, node.PreScopes...) + } + + for i := range node.Proc.Reg.MergeReceivers { + regs[node.Proc.Reg.MergeReceivers[i]] = struct{}{} + } + } + } + + // check if there are target channels from other trees. + { + if len(s.PreScopes) > 0 { + toScan = append(toScan, s.PreScopes...) + } + + for len(toScan) > 0 { + node := toScan[len(toScan)-1] + toScan = toScan[:len(toScan)-1] + + if len(node.PreScopes) > 0 { + toScan = append(toScan, node.PreScopes...) + } + + if node.RootOp.OpType() == vm.Dispatch { + t := node.RootOp.(*dispatch.Dispatch) + for i := range t.LocalRegs { + if _, ok := regs[t.LocalRegs[i]]; !ok { + s.Proc.Infof( + s.Proc.Ctx, + "txn id : %s, the pipeline %p convert to execute locally because it holds a dispatch operator will send data to other local pipeline tree.", + s.Proc.GetTxnOperator().Txn().ID, s) + + return false + } + } + continue + } + if node.RootOp.OpType() == vm.Connector { + t := node.RootOp.(*connector.Connector) + if _, ok := regs[t.Reg]; !ok { + s.Proc.Infof( + s.Proc.Ctx, + "txn id : %s, the pipeline %p convert to execute locally because it holds a connector operator will send data to other local pipeline tree.", + s.Proc.GetTxnOperator().Txn().ID, s) + + return false + } + continue + } + } + } + + return true +} + func prepareRemoteRunSendingData(sqlStr string, s *Scope) (scopeData []byte, withoutOutput bool, processData []byte, err error) { // if simpleRun is true, it indicates that this pipeline will not produce any output. withoutOutput = true diff --git a/pkg/sql/compile/remoterun_test.go b/pkg/sql/compile/remoterun_test.go index 49262265bdd2..3118ea08154c 100644 --- a/pkg/sql/compile/remoterun_test.go +++ b/pkg/sql/compile/remoterun_test.go @@ -502,6 +502,12 @@ type fakeTxnOperator struct { client.TxnOperator } +func (f fakeTxnOperator) Txn() txn.TxnMeta { + return txn.TxnMeta{ + ID: []byte("test"), + } +} + func (f fakeTxnOperator) Snapshot() (txn.CNTxnSnapshot, error) { return txn.CNTxnSnapshot{}, nil } @@ -652,3 +658,102 @@ func Test_ReceiveMessageFromCnServer(t *testing.T) { require.NotNil(t, receiveMessageFromCnServer(s4, false, &sender)) } } + +func Test_checkPipelineStandaloneExecutableAtRemote(t *testing.T) { + proc := testutil.NewProcess() + proc.Base.TxnOperator = fakeTxnOperator{} + // a standalone pipeline tree should return true. + { + // s0, pre: s1, s2 + s0 := &Scope{ + Proc: proc.NewContextChildProc(2), + RootOp: dispatch.NewArgument(), + } + + s1 := &Scope{ + Proc: proc.NewContextChildProc(0), + } + op1 := connector.NewArgument() + op1.Reg = s0.Proc.Reg.MergeReceivers[0] + s1.RootOp = op1 + + s2 := &Scope{ + Proc: proc.NewContextChildProc(0), + } + op2 := dispatch.NewArgument() + op2.LocalRegs = []*process.WaitRegister{s0.Proc.Reg.MergeReceivers[1]} + s2.RootOp = op2 + + s0.PreScopes = append(s0.PreScopes, s1, s2) + + require.True(t, checkPipelineStandaloneExecutableAtRemote(s0)) + } + + // a pipeline holds an invalid dispatch should return false. + { + // s0, pre: s1 + s0 := &Scope{ + Proc: proc.NewContextChildProc(1), + RootOp: dispatch.NewArgument(), + } + + s1 := &Scope{ + Proc: proc.NewContextChildProc(0), + } + op1 := dispatch.NewArgument() + op1.LocalRegs = []*process.WaitRegister{{}} + s1.RootOp = op1 + + s0.PreScopes = append(s0.PreScopes, s1) + + require.False(t, checkPipelineStandaloneExecutableAtRemote(s0)) + } + + // a pipeline holds an invalid connector should return false. + { + // s0, pre: s1 + s0 := &Scope{ + Proc: proc.NewContextChildProc(1), + RootOp: dispatch.NewArgument(), + } + + s1 := &Scope{ + Proc: proc.NewContextChildProc(0), + } + op1 := connector.NewArgument() + op1.Reg = &process.WaitRegister{} + s1.RootOp = op1 + + s0.PreScopes = append(s0.PreScopes, s1) + + require.False(t, checkPipelineStandaloneExecutableAtRemote(s0)) + } + + // depth more than 2. + { + // s0, pre: s1, pre: s2. + s0 := &Scope{ + Proc: proc.NewContextChildProc(1), + RootOp: dispatch.NewArgument(), + } + + s1 := &Scope{ + Proc: proc.NewContextChildProc(1), + } + op1 := connector.NewArgument() + op1.Reg = s0.Proc.Reg.MergeReceivers[0] + s1.RootOp = op1 + + s2 := &Scope{ + Proc: proc.NewContextChildProc(0), + } + op2 := connector.NewArgument() + op2.Reg = &process.WaitRegister{} + s2.RootOp = op2 + + s0.PreScopes = append(s0.PreScopes, s1) + s1.PreScopes = append(s1.PreScopes, s2) + + require.False(t, checkPipelineStandaloneExecutableAtRemote(s0)) + } +} diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index bd642eb4e6cf..144f38e1d019 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -341,6 +341,9 @@ func (s *Scope) RemoteRun(c *Compile) error { if !s.canRemote(c, true) { return s.MergeRun(c) } + if !checkPipelineStandaloneExecutableAtRemote(s) { + return s.MergeRun(c) + } runtime.ServiceRuntime(s.Proc.GetService()).Logger(). Debug("remote run pipeline",