Skip to content

Commit

Permalink
add print debug (#8762)
Browse files Browse the repository at this point in the history
1, print clients info every 30 minutes if the `printDebug` is enabled in group [cn.frontend] in cn.toml.
print debug is disabled default.
print debug interval  can be changed in cn.toml.
2, add some debug tag at code with potential problem.

Approved by: @iamlinjunhong
  • Loading branch information
daviszhen authored Mar 29, 2023
1 parent def9896 commit 557c8ca
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 30 deletions.
13 changes: 12 additions & 1 deletion pkg/config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ var (
// defaultPathBuilder, val in [DBTable, AccountDate]
defaultPathBuilder = "AccountDate"

// defaultSessionTimeout default: 10 minutes
// defaultSessionTimeout default: 24 hour
defaultSessionTimeout = 24 * time.Hour

// defaultLogsExtension default: tae. Support val in [csv, tae]
Expand All @@ -149,6 +149,9 @@ var (

//defaultOBBufferSize, 10 << 20 = 10485760
defaultOBBufferSize int64 = 10485760

// defaultPrintDebugInterval default: 30 minutes
defaultPrintDebugInterval = 30
)

// FrontendParameters of the frontend
Expand Down Expand Up @@ -287,6 +290,10 @@ type FrontendParameters struct {
AutoIncrCacheSize uint64 `toml:"autoIncrCacheSize"`

LowerCaseTableNames string `toml:"lowerCaseTableNames"`

PrintDebug bool `toml:"printDebug"`

PrintDebugInterval int `toml:"printDebugInterval"`
}

func (fp *FrontendParameters) SetDefaultValues() {
Expand Down Expand Up @@ -409,6 +416,10 @@ func (fp *FrontendParameters) SetDefaultValues() {
if fp.LowerCaseTableNames == "" {
fp.LowerCaseTableNames = "1"
}

if fp.PrintDebugInterval == 0 {
fp.PrintDebugInterval = defaultPrintDebugInterval
}
}

func (fp *FrontendParameters) SetMaxMessageSize(size uint64) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/frontend/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,9 @@ func (ip *internalProtocol) GetStats() string { return "internal unknown stats"
func (ip *internalProtocol) sendLocalInfileRequest(filename string) error {
return nil
}

func (ip *internalProtocol) incDebugCount(int) {}

func (ip *internalProtocol) resetDebugCount() []uint64 {
return nil
}
15 changes: 12 additions & 3 deletions pkg/frontend/mysql_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ func (mp *MysqlProtocolImpl) authenticateUser(ctx context.Context, authResponse
}
}
}

mp.incDebugCount(1)
return nil
}

Expand Down Expand Up @@ -1173,6 +1173,7 @@ func (mp *MysqlProtocolImpl) HandleHandshake(ctx context.Context, payload []byte
}

logDebugf(mp.getDebugStringUnsafe(), "authenticate user")
mp.incDebugCount(0)
if err = mp.authenticateUser(ctx, authResponse); err != nil {
logutil.Errorf("authenticate user failed.error:%v", err)
fail := moerr.MysqlErrorMsgRefer[moerr.ER_ACCESS_DENIED_ERROR]
Expand All @@ -1186,8 +1187,11 @@ func (mp *MysqlProtocolImpl) HandleHandshake(ctx context.Context, payload []byte
return false, err
}

logDebugf(mp.getDebugStringUnsafe(), "handle handshake end")
mp.incDebugCount(2)
logInfof(mp.getDebugStringUnsafe(), "handle handshake end")
err = mp.sendOKPacket(0, 0, 0, 0, "")
mp.incDebugCount(3)
logInfof(mp.getDebugStringUnsafe(), "handle handshake response ok")
if err != nil {
return false, err
}
Expand Down Expand Up @@ -2221,7 +2225,9 @@ func (mp *MysqlProtocolImpl) flushOutBuffer() error {
mp.flushCount++
mp.writeBytes += uint64(mp.bytesInOutBuffer)
// FIXME: use a suitable timeout value
mp.incDebugCount(8)
err := mp.tcpConn.Flush(0)
mp.incDebugCount(9)
if err != nil {
return err
}
Expand Down Expand Up @@ -2505,7 +2511,9 @@ func (mp *MysqlProtocolImpl) writePackets(payload []byte) error {
//send packet
var packet = append(header[:], payload[i:i+curLen]...)

mp.incDebugCount(4)
err := mp.tcpConn.Write(packet, goetty.WriteOptions{Flush: true})
mp.incDebugCount(5)
if err != nil {
return err
}
Expand All @@ -2517,9 +2525,10 @@ func (mp *MysqlProtocolImpl) writePackets(payload []byte) error {
header[1] = 0
header[2] = 0
header[3] = mp.GetSequenceId()

mp.incDebugCount(6)
//send header / zero-sized packet
err := mp.tcpConn.Write(header[:], goetty.WriteOptions{Flush: true})
mp.incDebugCount(7)
if err != nil {
return err
}
Expand Down
27 changes: 27 additions & 0 deletions pkg/frontend/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ type Protocol interface {
SendPrepareResponse(ctx context.Context, stmt *PrepareStmt) error

Quit()

incDebugCount(int)

resetDebugCount() []uint64
}

type ProtocolImpl struct {
Expand All @@ -208,6 +212,23 @@ type ProtocolImpl struct {
//The sequence-id is incremented with each packet and may wrap around.
//It starts at 0 and is reset to 0 when a new command begins in the Command Phase.
sequenceId atomic.Uint32

//for debug
debugCount [16]uint64
}

func (pi *ProtocolImpl) incDebugCount(i int) {
if i >= 0 && i < len(pi.debugCount) {
atomic.AddUint64(&pi.debugCount[i], 1)
}
}

func (pi *ProtocolImpl) resetDebugCount() []uint64 {
ret := make([]uint64, len(pi.debugCount))
for i := 0; i < len(pi.debugCount); i++ {
ret[i] = atomic.LoadUint64(&pi.debugCount[i])
}
return ret
}

func (pi *ProtocolImpl) setQuit(b bool) bool {
Expand Down Expand Up @@ -507,3 +528,9 @@ func (fp *FakeProtocol) Quit() {}
func (fp *FakeProtocol) sendLocalInfileRequest(filename string) error {
return nil
}

func (fp *FakeProtocol) incDebugCount(int) {}

func (fp *FakeProtocol) resetDebugCount() []uint64 {
return nil
}
89 changes: 65 additions & 24 deletions pkg/frontend/routine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package frontend

import (
"bytes"
"container/list"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

"github.com/matrixorigin/matrixone/pkg/util/metric"
Expand All @@ -34,42 +37,32 @@ import (
)

type RoutineManager struct {
mu sync.Mutex
mu sync.RWMutex
ctx context.Context
clients map[goetty.IOSession]*Routine
pu *config.ParameterUnit
skipCheckUser bool
skipCheckUser atomic.Bool
tlsConfig *tls.Config
autoIncrCaches defines.AutoIncrCaches
}

func (rm *RoutineManager) GetAutoIncrCache() defines.AutoIncrCaches {
rm.mu.Lock()
defer rm.mu.Unlock()
return rm.autoIncrCaches
}

func (rm *RoutineManager) SetSkipCheckUser(b bool) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.skipCheckUser = b
rm.skipCheckUser.Store(b)
}

func (rm *RoutineManager) GetSkipCheckUser() bool {
rm.mu.Lock()
defer rm.mu.Unlock()
return rm.skipCheckUser
return rm.skipCheckUser.Load()
}

func (rm *RoutineManager) getParameterUnit() *config.ParameterUnit {
rm.mu.Lock()
defer rm.mu.Unlock()
return rm.pu
}

func (rm *RoutineManager) getCtx() context.Context {
rm.mu.Lock()
defer rm.mu.Unlock()
return rm.ctx
}

Expand All @@ -80,14 +73,12 @@ func (rm *RoutineManager) setRoutine(rs goetty.IOSession, r *Routine) {
}

func (rm *RoutineManager) getRoutine(rs goetty.IOSession) *Routine {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.mu.RLock()
defer rm.mu.RUnlock()
return rm.clients[rs]
}

func (rm *RoutineManager) getTlsConfig() *tls.Config {
rm.mu.Lock()
defer rm.mu.Unlock()
return rm.tlsConfig
}

Expand Down Expand Up @@ -173,14 +164,14 @@ if killConnection is false, only the query will be canceled. the connection keep
*/
func (rm *RoutineManager) kill(ctx context.Context, killConnection bool, idThatKill, id uint64, statementId string) error {
var rt *Routine = nil
rm.mu.Lock()
rm.mu.RLock()
for _, value := range rm.clients {
if uint64(value.getConnectionID()) == id {
rt = value
break
}
}
rm.mu.Unlock()
rm.mu.RUnlock()

killMyself := idThatKill == id
if rt != nil {
Expand Down Expand Up @@ -314,7 +305,7 @@ func (rm *RoutineManager) Handler(rs goetty.IOSession, msg interface{}, received
return nil
}

req := routine.getProtocol().GetRequest(payload)
req := protocol.GetRequest(payload)
req.seq = seq

//handle request
Expand All @@ -330,12 +321,47 @@ func (rm *RoutineManager) Handler(rs goetty.IOSession, msg interface{}, received
// clientCount returns the count of the clients
func (rm *RoutineManager) clientCount() int {
var count int
rm.mu.Lock()
rm.mu.RLock()
count = len(rm.clients)
rm.mu.Unlock()
rm.mu.RUnlock()
return count
}

func (rm *RoutineManager) printDebug() {
type info struct {
id uint32
peer string
count []uint64
}
infos := list.New()
rm.mu.RLock()
for _, routine := range rm.clients {
proto := routine.getProtocol()
infos.PushBack(&info{
proto.ConnectionID(),
proto.Peer(),
proto.resetDebugCount(),
})
}
rm.mu.RUnlock()

bb := bytes.Buffer{}
bb.WriteString("Clients:")
bb.WriteString(fmt.Sprintf("(%d)\n", infos.Len()))
for e := infos.Front(); e != nil; e = e.Next() {
d := e.Value.(*info)
if d == nil {
continue
}
bb.WriteString(fmt.Sprintf("%d|%s|", d.id, d.peer))
for i, u := range d.count {
bb.WriteString(fmt.Sprintf("%d:0x%x ", i, u))
}
bb.WriteByte('\n')
}
logutil.Info(bb.String())
}

func NewRoutineManager(ctx context.Context, pu *config.ParameterUnit) (*RoutineManager, error) {
rm := &RoutineManager{
ctx: ctx,
Expand All @@ -345,14 +371,29 @@ func NewRoutineManager(ctx context.Context, pu *config.ParameterUnit) (*RoutineM

// Initialize auto incre cache.
rm.autoIncrCaches.AutoIncrCaches = make(map[string]defines.AutoIncrCache)
rm.autoIncrCaches.Mu = &rm.mu
rm.autoIncrCaches.Mu = &sync.Mutex{}

if pu.SV.EnableTls {
err := initTlsConfig(rm, pu.SV)
if err != nil {
return nil, err
}
}

//add debug routine
if pu.SV.PrintDebug {
go func() {
for {
select {
case <-rm.ctx.Done():
return
default:
}
rm.printDebug()
time.Sleep(time.Duration(pu.SV.PrintDebugInterval) * time.Minute)
}
}()
}
return rm, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/parsers/dialect/mysql/mysql_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var (
input string
output string
}{
input: "select rank() over(partition by a order by b desc) from t1",
output: "select rank() over (partition by a order by b desc) from t1",
input: "select 1",
output: "select 1",
}
)

Expand Down

0 comments on commit 557c8ca

Please sign in to comment.