Skip to content

Commit

Permalink
refactor: valuescan
Browse files Browse the repository at this point in the history
  • Loading branch information
huby2358 committed Nov 7, 2024
1 parent 0ec2b30 commit 19bbe15
Show file tree
Hide file tree
Showing 23 changed files with 934 additions and 935 deletions.
11 changes: 0 additions & 11 deletions pkg/frontend/computation_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,17 +371,6 @@ func initExecuteStmtParam(reqCtx context.Context, ses *Session, cwft *TxnComputa
}
}

// The default count is 1. Setting it to 2 ensures that memory will not be reclaimed.
// Convenient to reuse memory next time
if prepareStmt.InsertBat != nil {
cwft.proc.SetPrepareBatch(prepareStmt.InsertBat)
for i := 0; i < len(prepareStmt.exprList); i++ {
for j := range prepareStmt.exprList[i] {
prepareStmt.exprList[i][j].ResetForNextQuery()
}
}
cwft.proc.SetPrepareExprList(prepareStmt.exprList)
}
numParams := len(preparePlan.ParamTypes)
if prepareStmt.params != nil && prepareStmt.params.Length() > 0 { // use binary protocol
if prepareStmt.params.Length() != numParams {
Expand Down
2 changes: 0 additions & 2 deletions pkg/frontend/mysql_cmd_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,6 @@ func createPrepareStmt(
PrepareStmt: saveStmt,
getFromSendLongData: make(map[int]struct{}),
}
prepareStmt.InsertBat = ses.GetTxnCompileCtx().GetProcess().GetPrepareBatch()

dcPrepare, ok := preparePlan.GetDcl().Control.(*plan.DataControl_Prepare)
if ok {
Expand Down Expand Up @@ -2852,7 +2851,6 @@ func doComQuery(ses *Session, execCtx *ExecCtx, input *UserInput) (retErr error)
proc.ReplaceTopCtx(execCtx.reqCtx)

pu := getPu(ses.GetService())
proc.CopyValueScanBatch(ses.proc)
proc.Base.Id = ses.getNextProcessId()
proc.Base.Lim.Size = pu.SV.ProcessLimitationSize
proc.Base.Lim.BatchRows = pu.SV.ProcessLimitationBatchRows
Expand Down
37 changes: 1 addition & 36 deletions pkg/frontend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/query"
"github.com/matrixorigin/matrixone/pkg/pb/status"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/tree"
plan2 "github.com/matrixorigin/matrixone/pkg/sql/plan"
Expand Down Expand Up @@ -956,14 +955,7 @@ func (ses *Session) SetPrepareStmt(ctx context.Context, name string, prepareStmt
} else {
stmt.Close()
}
if prepareStmt != nil && prepareStmt.PreparePlan != nil {
isInsertValues, exprList := checkPlanIsInsertValues(ses.proc,
prepareStmt.PreparePlan.GetDcl().GetPrepare().GetPlan())
if isInsertValues {
prepareStmt.proc = ses.proc
prepareStmt.exprList = exprList
}
}

ses.prepareStmts[name] = prepareStmt

return nil
Expand Down Expand Up @@ -1686,33 +1678,6 @@ func (ses *Session) reset(prev *Session) error {
return nil
}

func checkPlanIsInsertValues(proc *process.Process,
p *plan.Plan) (bool, [][]colexec.ExpressionExecutor) {
qry := p.GetQuery()
if qry != nil {
for _, node := range qry.Nodes {
if node.NodeType == plan.Node_VALUE_SCAN && node.RowsetData != nil {
exprList := make([][]colexec.ExpressionExecutor, len(node.RowsetData.Cols))
for i, col := range node.RowsetData.Cols {
exprList[i] = make([]colexec.ExpressionExecutor, 0, len(col.Data))
for _, data := range col.Data {
if data.Pos >= 0 {
continue
}
expr, err := colexec.NewExpressionExecutor(proc, data.Expr)
if err != nil {
return false, nil
}
exprList[i] = append(exprList[i], expr)
}
}
return true, exprList
}
}
}
return false, nil
}

func commitAfterMigrate(ses *Session, err error) error {
//if ses == nil {
// logutil.Error("session is nil")
Expand Down
17 changes: 1 addition & 16 deletions pkg/frontend/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/pb/timestamp"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/compile"
"github.com/matrixorigin/matrixone/pkg/sql/models"
"github.com/matrixorigin/matrixone/pkg/sql/parsers/dialect/mysql"
Expand Down Expand Up @@ -264,12 +263,8 @@ type PrepareStmt struct {
ParamTypes []byte
ColDefData [][]byte
IsCloudNonuser bool
IsInsertValues bool
InsertBat *batch.Batch
proc *process.Process

exprList [][]colexec.ExpressionExecutor

params *vector.Vector
getFromSendLongData map[int]struct{}

Expand Down Expand Up @@ -377,17 +372,7 @@ func (prepareStmt *PrepareStmt) Close() {
if prepareStmt.params != nil {
prepareStmt.params.Free(prepareStmt.proc.Mp())
}
if prepareStmt.InsertBat != nil {
prepareStmt.InsertBat.Clean(prepareStmt.proc.Mp())
prepareStmt.InsertBat = nil
}
if prepareStmt.exprList != nil {
for _, exprs := range prepareStmt.exprList {
for _, expr := range exprs {
expr.Free()
}
}
}

if prepareStmt.compile != nil {
prepareStmt.compile.FreeOperator()
prepareStmt.compile.SetIsPrepare(false)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/colexec/evalExpression.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ func NewExpressionExecutor(proc *process.Process, planExpr *plan.Expr) (Expressi
return ce, nil

case *plan.Expr_P:
return NewParamExpressionExecutor(proc.Mp(), int(t.P.Pos), types.T_text.ToType()), nil
typ := types.New(types.T(planExpr.Typ.Id), planExpr.Typ.Width, planExpr.Typ.Scale)
return NewParamExpressionExecutor(proc.Mp(), int(t.P.Pos), typ), nil

case *plan.Expr_V:
typ := types.New(types.T(planExpr.Typ.Id), planExpr.Typ.Width, planExpr.Typ.Scale)
Expand Down
59 changes: 52 additions & 7 deletions pkg/sql/colexec/value_scan/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package value_scan
import (
"github.com/matrixorigin/matrixone/pkg/common/reuse"
"github.com/matrixorigin/matrixone/pkg/container/batch"
plan2 "github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
Expand All @@ -34,11 +34,12 @@ type ValueScan struct {
// this means all the batches were saved other place.
// there is no need clean them after operator done.
dataInProcess bool
ColCount int
NodeType plan2.Node_NodeType

Batchs []*batch.Batch
RowsetData *plan.RowsetData
ColCount int
Uuid []byte
Batchs []*batch.Batch
RowsetData *plan2.RowsetData
ExprExecLists [][]colexec.ExpressionExecutor
}

type container struct {
Expand All @@ -59,6 +60,10 @@ func init() {
)
}

func NewArgument() *ValueScan {
return reuse.Alloc[ValueScan](nil)
}

func NewValueScanFromProcess() *ValueScan {
vs := getFromReusePool()
vs.dataInProcess = true
Expand Down Expand Up @@ -92,13 +97,43 @@ func (valueScan *ValueScan) Release() {

func (valueScan *ValueScan) Reset(proc *process.Process, _ bool, _ error) {
valueScan.runningCtx.nowIdx = 0
valueScan.doBatchClean(proc)
// valueScan.doBatchClean(proc)
if valueScan.Batchs != nil {
valueScan.resetBatchs()
}
for i := 0; i < valueScan.ColCount; i++ {
exprExecList := valueScan.ExprExecLists[i]
for _, expr := range exprExecList {
expr.ResetForNextQuery()
}
}
valueScan.ResetProjection(proc)
}

func (valueScan *ValueScan) Free(proc *process.Process, _ bool, _ error) {
valueScan.FreeProjection(proc)
valueScan.doBatchClean(proc)
// valueScan.doBatchClean(proc)
if valueScan.Batchs != nil {
valueScan.cleanBatchs(proc)
}
for i := range valueScan.ExprExecLists {
exprExecList := valueScan.ExprExecLists[i]
for i, expr := range exprExecList {
if expr != nil {
expr.Free()
exprExecList[i] = nil
}
}
}
}

func (valueScan *ValueScan) cleanBatchs(proc *process.Process) {
for _, bat := range valueScan.Batchs {
if bat != nil {
bat.Clean(proc.Mp())
}
}
valueScan.Batchs = nil
}

func (valueScan *ValueScan) doBatchClean(proc *process.Process) {
Expand All @@ -118,6 +153,16 @@ func (valueScan *ValueScan) doBatchClean(proc *process.Process) {
valueScan.Batchs = nil
}

func (valueScan *ValueScan) resetBatchs() {
for _, bat := range valueScan.Batchs {
if bat != nil {
for _, vec := range bat.Vecs {
vec.CleanOnlyData()
}
}
}
}

// TypeName implement the `reuse.ReusableObject` interface.
func (valueScan ValueScan) TypeName() string {
return thisOperator
Expand Down
Loading

0 comments on commit 19bbe15

Please sign in to comment.