Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add debug tool for remote run #19833

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/pb/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (m *Message) SetID(id uint64) {
m.Id = id
}

func (m *Message) SetDebugMsg(msg string) {
m.DebugMsg = msg
}

func (m *Message) SetSid(sid Status) {
m.Sid = sid
}
Expand Down
770 changes: 411 additions & 359 deletions pkg/pb/pipeline/pipeline.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions pkg/sql/compile/compile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (w *Ws) Readonly() bool {
return false
}

func (w *Ws) Snapshot() bool {
return false
}

func (w *Ws) IncrStatementID(ctx context.Context, commit bool) error {
return nil
}
Expand Down Expand Up @@ -225,6 +229,7 @@ func newTestTxnClientAndOp(ctrl *gomock.Controller) (client.TxnClient, client.Tx
txnOperator.EXPECT().NextSequence().Return(uint64(0)).AnyTimes()
txnOperator.EXPECT().EnterRunSql().Return().AnyTimes()
txnOperator.EXPECT().ExitRunSql().Return().AnyTimes()
txnOperator.EXPECT().Snapshot().Return(txn.CNTxnSnapshot{}, nil).AnyTimes()
txnOperator.EXPECT().Status().Return(txn.TxnStatus_Active).AnyTimes()
txnClient := mock_frontend.NewMockTxnClient(ctrl)
txnClient.EXPECT().New(gomock.Any(), gomock.Any()).Return(txnOperator, nil).AnyTimes()
Expand Down
15 changes: 13 additions & 2 deletions pkg/sql/compile/remoterunClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/sql/colexec/dispatch"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/sql/models"
"github.com/matrixorigin/matrixone/pkg/util/fault"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
Expand Down Expand Up @@ -82,7 +84,14 @@ func (s *Scope) remoteRun(c *Compile) (sender *messageSenderOnClient, err error)
return nil, err
}

if err = sender.sendPipeline(scopeEncodeData, processEncodeData, withoutOutput, maxMessageSizeToMoRpc); err != nil {
debugMsg := ""
_, sub_sql, exist := fault.TriggerFault("inject_send_pipeline")
if exist {
if strings.Contains(c.sql, sub_sql) {
debugMsg = fmt.Sprintf("inject_send_pipeline: client2server,compile = %p", c)
}
}
if err = sender.sendPipeline(scopeEncodeData, processEncodeData, withoutOutput, maxMessageSizeToMoRpc, debugMsg); err != nil {
return sender, err
}

Expand Down Expand Up @@ -368,10 +377,11 @@ func newMessageSenderOnClient(
}

func (sender *messageSenderOnClient) sendPipeline(
scopeData, procData []byte, noDataBack bool, eachMessageSizeLimitation int) error {
scopeData, procData []byte, noDataBack bool, eachMessageSizeLimitation int, debugMsg string) error {
sdLen := len(scopeData)
if sdLen <= eachMessageSizeLimitation {
message := cnclient.AcquireMessage()
message.SetDebugMsg(debugMsg)
message.SetID(sender.streamSender.ID())
message.SetMessageType(pipeline.Method_PipelineMessage)
message.SetData(scopeData)
Expand All @@ -386,6 +396,7 @@ func (sender *messageSenderOnClient) sendPipeline(
end := start + eachMessageSizeLimitation

message := cnclient.AcquireMessage()
message.SetDebugMsg(debugMsg)
message.SetID(sender.streamSender.ID())
message.SetMessageType(pipeline.Method_PipelineMessage)
if end >= sdLen {
Expand Down
61 changes: 61 additions & 0 deletions pkg/sql/compile/remoterunClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@ package compile
import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/cnservice/cnclient"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/morpc/mock_morpc"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/testutil/testengine"
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

var _ cnclient.PipelineClient = new(testPipelineClient)
Expand Down Expand Up @@ -83,3 +92,55 @@ func Test_newMessageSenderOnClient(t *testing.T) {

client.waitingTheStopResponse()
}

func TestRemoteRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx = defines.AttachAccountId(ctx, catalog.System_Account)

proc := testutil.NewProcess()
proc.Ctx = context.WithValue(proc.Ctx, defines.TenantIDKey{}, uint32(0))

tPCli := &testPipelineClient{
genStream: func(s string) morpc.Stream {
stream := mock_morpc.NewMockStream(ctrl)
stream.EXPECT().Receive().Return(nil, nil).AnyTimes()
stream.EXPECT().ID().Return(uint64(3)).AnyTimes()
stream.EXPECT().Send(gomock.Any(), gomock.Any()).Return(moerr.NewInternalErrorNoCtx("send error")).AnyTimes()
return stream
},
}

runtime.ServiceRuntime("").SetGlobalVariables(runtime.PipelineClient, tPCli)

fault.Enable()
fault.AddFaultPoint(ctx, "inject_send_pipeline", ":::", "echo", 0, "test_tbl")

txnCli, txnOp := newTestTxnClientAndOp(ctrl)
proc.Base.TxnClient = txnCli
proc.Base.TxnOperator = txnOp

sql := "insert into test_tbl values (1,1)"
e, _, _ := testengine.New(defines.AttachAccountId(context.Background(), catalog.System_Account))
c := NewCompile("test", "test", sql, "", "", e, proc, nil, false, nil, time.Now())
c.anal = &AnalyzeModule{qry: &plan.Query{}}

// if the root operator is connector.
s1 := &Scope{
Proc: proc,
RootOp: connector.NewArgument(),
ScopeAnalyzer: &ScopeAnalyzer{isStoped: true},
}
s1.RootOp.(*connector.Connector).Reg = &process.WaitRegister{
Ch2: make(chan process.PipelineSignal, 1),
}
// ch, err1 := sender.streamSender.Receive()
// require.Nil(t, err1)
// sender.receiveCh = ch

_, err := s1.remoteRun(c)
assert.Error(t, err)
}
4 changes: 4 additions & 0 deletions pkg/sql/compile/remoterunServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/udf"
"github.com/matrixorigin/matrixone/pkg/util/debug/goroutine"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/process"
Expand Down Expand Up @@ -91,6 +92,9 @@ func CnServerMessageHandler(
logutil.Errorf("cn server should receive *pipeline.Message, but get %v", message)
panic("cn server receive a message with unexpected type")
}
if msg.DebugMsg != "" {
logutil.Infof("%s, goRoutineId=%d", msg.GetDebugMsg(), goroutine.GetRoutineId())
}

// prepare the receiver structure, just for easy using the `send` method.
receiver := newMessageReceiverOnServer(ctx, serverAddress, msg,
Expand Down
11 changes: 6 additions & 5 deletions pkg/sql/compile/remoterun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ package compile

import (
"context"
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/pb/txn"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/connector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/value_scan"
"github.com/matrixorigin/matrixone/pkg/testutil"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"testing"
"time"

"github.com/matrixorigin/matrixone/pkg/sql/colexec"

Expand Down Expand Up @@ -561,7 +562,7 @@ func Test_MessageSenderSendPipeline(t *testing.T) {
sender.streamSender.(*fakeStreamSender).sentCnt = 0
sender.streamSender.(*fakeStreamSender).nextSendError = nil

err := sender.sendPipeline(make([]byte, 10), make([]byte, 10), true, 100)
err := sender.sendPipeline(make([]byte, 10), make([]byte, 10), true, 100, "")
require.Nil(t, err)

require.Equal(t, 1, sender.streamSender.(*fakeStreamSender).sentCnt)
Expand All @@ -572,7 +573,7 @@ func Test_MessageSenderSendPipeline(t *testing.T) {
sender.streamSender.(*fakeStreamSender).sentCnt = 0
sender.streamSender.(*fakeStreamSender).nextSendError = nil

err := sender.sendPipeline(make([]byte, 10), make([]byte, 10), true, 5)
err := sender.sendPipeline(make([]byte, 10), make([]byte, 10), true, 5, "")
require.Nil(t, err)

require.True(t, sender.streamSender.(*fakeStreamSender).sentCnt > 1)
Expand All @@ -583,7 +584,7 @@ func Test_MessageSenderSendPipeline(t *testing.T) {
sender.streamSender.(*fakeStreamSender).sentCnt = 0
sender.streamSender.(*fakeStreamSender).nextSendError = moerr.NewInternalErrorNoCtx("timeout")

err := sender.sendPipeline(make([]byte, 10), make([]byte, 10), true, 100)
err := sender.sendPipeline(make([]byte, 10), make([]byte, 10), true, 100, "")
require.NotNil(t, err)
}
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/util/debug/goroutine/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package goroutine

import (
"bytes"
"runtime"
"strconv"
)

// GetRoutineId gets the routine id
func GetRoutineId() uint64 {
data := make([]byte, 64)
data = data[:runtime.Stack(data, false)]
data = bytes.TrimPrefix(data, []byte("goroutine "))
data = data[:bytes.IndexByte(data, ' ')]
id, _ := strconv.ParseUint(string(data), 10, 64)
return id
}
21 changes: 21 additions & 0 deletions pkg/util/debug/goroutine/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package goroutine

import "testing"

func TestGetRoutineId(t *testing.T) {
_ = GetRoutineId()
}
1 change: 1 addition & 0 deletions proto/pipeline.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ message Message {
uint64 id = 7;
bytes uuid = 8;
bool needNotReply = 9;
string debugMsg = 10;
}

message Connector {
Expand Down
12 changes: 12 additions & 0 deletions test/distributed/cases/dml/insert/not_null_check.result
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,16 @@ constraint violation: Column 'a' cannot be null
drop table if exists t1;
create table t1 (a int primary key, b int, c int, unique key(b,c));
INSERT INTO t1 SELECT result,result,null FROM generate_series(1,1000000) g;
drop table t1;
create table t1 (a int primary key, b int);
select enable_fault_injection();
enable_fault_injection()
true
select add_fault_point('inject_send_pipeline', ':::', 'echo', 1, 't1');
add_fault_point(inject_send_pipeline, :::, echo, 1, t1)
true
INSERT INTO t1 SELECT result,result FROM generate_series(1,3000000) g;
select disable_fault_injection();
disable_fault_injection()
true
drop database if exists test;
6 changes: 6 additions & 0 deletions test/distributed/cases/dml/insert/not_null_check.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,10 @@ update t set a=null;
drop table if exists t1;
create table t1 (a int primary key, b int, c int, unique key(b,c));
INSERT INTO t1 SELECT result,result,null FROM generate_series(1,1000000) g;
drop table t1;
create table t1 (a int primary key, b int);
select enable_fault_injection();
select add_fault_point('inject_send_pipeline', ':::', 'echo', 1, 't1');
INSERT INTO t1 SELECT result,result FROM generate_series(1,3000000) g;
select disable_fault_injection();
drop database if exists test;
Loading