From 0b7c8878f8d9c557e6f7a41996d6c8d9012f44df Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Tue, 5 Nov 2024 14:42:08 +0800 Subject: [PATCH 01/10] fix --- pkg/sql/compile/compile.go | 84 +++++--------------------------------- pkg/sql/compile/scope.go | 68 +++++++++++++++++++++++++++++- 2 files changed, 77 insertions(+), 75 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index b96c60deb2b4..77b441b61f0f 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -1795,7 +1795,7 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) { stats.AddCompileTableScanConsumption(time.Since(compileStart)) }() - nodes, partialResults, partialResultTypes, err := c.generateNodes(n) + nodes, err := c.generateNodes(n) if err != nil { return nil, err } @@ -1815,8 +1815,6 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) { ss[0].NodeInfo.Mcpu = 1 } - ss[0].PartialResults = partialResults - ss[0].PartialResultTypes = partialResultTypes return ss, nil } @@ -3801,12 +3799,6 @@ func (c *Compile) generateCPUNumber(cpunum, blocks int) int { } func (c *Compile) determinExpandRanges(n *plan.Node) bool { - if len(n.AggList) > 0 { - partialResult, _, _ := checkAggOptimize(n) - if partialResult != nil { - return true - } - } return len(c.cnList) > 1 && !n.Stats.ForceOneCN && c.execType == plan2.ExecTypeAP_MULTICN && n.Stats.BlockNum > int32(plan2.BlockThresholdForOneCN) } @@ -4041,20 +4033,17 @@ func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Data return rel, db, ctx, nil } -func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) { +func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { var relData engine.RelData - var partialResults []any - var partialResultTypes []types.T var nodes engine.Nodes rel, _, ctx, err := c.handleDbRelContext(n) if err != nil { - return nil, nil, nil, err + return nil, err } - if c.determinExpandRanges(n) { if c.isPrepare { - return nil, nil, nil, cantCompileForPrepareErr + return nil, cantCompileForPrepareErr } //@todo need remove expandRanges from Compile. @@ -4065,13 +4054,13 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e for _, e := range newFilterExpr { _, err := plan2.ReplaceFoldExpr(c.proc, e, &c.filterExprExes) if err != nil { - return nil, nil, nil, err + return nil, err } } for _, e := range newFilterExpr { err = plan2.EvalFoldExpr(c.proc, e, &c.filterExprExes) if err != nil { - return nil, nil, nil, err + return nil, err } } } @@ -4079,7 +4068,7 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e counterset := new(perfcounter.CounterSet) relData, err = c.expandRanges(n, newFilterExpr, counterset) if err != nil { - return nil, nil, nil, err + return nil, err } stats := statistic.StatsInfoFromContext(ctx) @@ -4098,60 +4087,9 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e Mcpu: c.generateCPUNumber(ncpu, int(n.Stats.BlockNum)), }) nodes[0].NeedExpandRanges = true - return nodes, nil, nil, nil + return nodes, nil } - if len(n.AggList) > 0 && relData.DataCnt() > 1 { - var columnMap map[int]int - partialResults, partialResultTypes, columnMap = checkAggOptimize(n) - if partialResults != nil { - newRelData := relData.BuildEmptyRelData(1) - blk := relData.GetBlockInfo(0) - newRelData.AppendBlockInfo(&blk) - - tombstones, err := collectTombstones(c, n, rel) - if err != nil { - return nil, nil, nil, err - } - - fs, err := fileservice.Get[fileservice.FileService](c.proc.GetFileService(), defines.SharedFileServiceName) - if err != nil { - return nil, nil, nil, err - } - //For each blockinfo in relData, if blk has no tombstones, then compute the agg result, - //otherwise put it into newRelData. - var ( - hasTombstone bool - err2 error - ) - if err = engine.ForRangeBlockInfo(1, relData.DataCnt(), relData, func(blk *objectio.BlockInfo) (bool, error) { - if hasTombstone, err2 = tombstones.HasBlockTombstone( - ctx, &blk.BlockID, fs, - ); err2 != nil { - return false, err2 - } else if blk.IsAppendable() || hasTombstone { - newRelData.AppendBlockInfo(blk) - return true, nil - } - if c.evalAggOptimize(n, blk, partialResults, partialResultTypes, columnMap) != nil { - partialResults = nil - return false, nil - } - return true, nil - }); err != nil { - return nil, nil, nil, err - } - if partialResults != nil { - relData = newRelData - } - } - } - - // some log for finding a bug. - tblId := rel.GetTableID(ctx) - expectedLen := relData.DataCnt() - c.proc.Debugf(ctx, "cn generateNodes, tbl %d ranges is %d", tblId, expectedLen) - // if len(ranges) == 0 indicates that it's a temporary table. if relData.DataCnt() == 0 && n.TableDef.TableType != catalog.SystemOrdinaryRel { nodes = make(engine.Nodes, len(c.cnList)) @@ -4163,7 +4101,7 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e Data: engine.BuildEmptyRelData(), } } - return nodes, partialResults, partialResultTypes, nil + return nodes, nil } engineType := rel.GetEngineType() @@ -4171,11 +4109,11 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e // or sometimes force on one CN // if not disttae engine, just put all payloads in current CN if len(c.cnList) == 1 || len(n.OrderBy) > 0 || relData.DataCnt() < plan2.BlockThresholdForOneCN || n.Stats.ForceOneCN || engineType != engine.Disttae { - return putBlocksInCurrentCN(c, relData), partialResults, partialResultTypes, nil + return putBlocksInCurrentCN(c, relData), nil } // only support disttae engine for now nodes, err = shuffleBlocksToMultiCN(c, rel, relData, n) - return nodes, partialResults, partialResultTypes, err + return nodes, err } func checkAggOptimize(n *plan.Node) ([]any, []types.T, map[int]int) { diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 845f583c8f73..4b7c860c2673 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -21,6 +21,9 @@ import ( "sync" "time" + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -611,8 +614,8 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { return err } - ctx1 := c.proc.GetTopContext() - stats := statistic.StatsInfoFromContext(ctx1) + ctx := c.proc.GetTopContext() + stats := statistic.StatsInfoFromContext(ctx) stats.AddScopePrepareS3Request(statistic.S3Request{ List: counterSet.FileService.S3.List.Load(), Head: counterSet.FileService.S3.Head.Load(), @@ -632,6 +635,11 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { return err } } + + err = s.aggOptimize(c) + if err != nil { + return err + } return nil } @@ -835,6 +843,62 @@ func removeStringBetween(s, start, end string) string { return s } +func (s *Scope) aggOptimize(c *Compile) error { + scanNode := s.DataSource.node + rel, _, ctx, err := c.handleDbRelContext(scanNode) + if err != nil { + return err + } + if len(scanNode.AggList) > 0 { + + partialResults, partialResultTypes, columnMap := checkAggOptimize(scanNode) + if partialResults != nil { + newRelData := s.NodeInfo.Data.BuildEmptyRelData(1) + blk := s.NodeInfo.Data.GetBlockInfo(0) + newRelData.AppendBlockInfo(&blk) + + tombstones, err := collectTombstones(c, scanNode, rel) + if err != nil { + return err + } + + fs, err := fileservice.Get[fileservice.FileService](c.proc.GetFileService(), defines.SharedFileServiceName) + if err != nil { + return err + } + //For each blockinfo in relData, if blk has no tombstones, then compute the agg result, + //otherwise put it into newRelData. + var ( + hasTombstone bool + err2 error + ) + if err = engine.ForRangeBlockInfo(1, s.NodeInfo.Data.DataCnt(), s.NodeInfo.Data, func(blk *objectio.BlockInfo) (bool, error) { + if hasTombstone, err2 = tombstones.HasBlockTombstone( + ctx, &blk.BlockID, fs, + ); err2 != nil { + return false, err2 + } else if blk.IsAppendable() || hasTombstone { + newRelData.AppendBlockInfo(blk) + return true, nil + } + if c.evalAggOptimize(scanNode, blk, partialResults, partialResultTypes, columnMap) != nil { + partialResults = nil + return false, nil + } + return true, nil + }); err != nil { + return err + } + if partialResults != nil { + s.NodeInfo.Data = newRelData + s.PartialResults = partialResults + s.PartialResultTypes = partialResultTypes + } + } + } + return nil +} + func (s *Scope) buildReaders(c *Compile) (readers []engine.Reader, err error) { // receive runtime filter and optimized the datasource. if err = s.handleRuntimeFilter(c); err != nil { From 47693f50b33bcb9f56299393fd1d3179c9d88315 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Tue, 5 Nov 2024 15:47:08 +0800 Subject: [PATCH 02/10] fix --- pkg/sql/compile/scope.go | 12 ++++++++++-- pkg/vm/types.go | 13 +++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 4b7c860c2673..5d2e70ce7ad7 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -17,6 +17,7 @@ package compile import ( "context" "fmt" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" "strings" "sync" "time" @@ -891,8 +892,15 @@ func (s *Scope) aggOptimize(c *Compile) error { } if partialResults != nil { s.NodeInfo.Data = newRelData - s.PartialResults = partialResults - s.PartialResultTypes = partialResultTypes + //find the last mergegroup + child := s.RootOp.GetOperatorBase().GetChildren(0) + op := vm.GetLeafOpGrandParent(s.RootOp, child, child.GetOperatorBase().GetChildren(0)) + if mergeGroup, ok := op.(*mergegroup.MergeGroup); ok { + mergeGroup.PartialResults = partialResults + mergeGroup.PartialResultTypes = partialResultTypes + } else { + panic("can't find merge group operator for agg optimize!") + } } } } diff --git a/pkg/vm/types.go b/pkg/vm/types.go index a6120f54c7b2..5e6eda5d19a8 100644 --- a/pkg/vm/types.go +++ b/pkg/vm/types.go @@ -615,3 +615,16 @@ func GetLeafOpParent(parentOp Operator, op Operator) Operator { } return GetLeafOpParent(op, op.GetOperatorBase().GetChildren(0)) } + +// suppose that the op tree is like a list, only one leaf child +// return grand parent of leaf op +func GetLeafOpGrandParent(grandParentOp, parentOp, op Operator) Operator { + if op == nil || parentOp == nil { + return nil + } + if op.GetOperatorBase().NumChildren() == 0 { + return grandParentOp + } + child := op.GetOperatorBase().GetChildren(0) + return GetLeafOpGrandParent(parentOp, op, child) +} From 437eabccc5562a8316c9ee9a49e6659a3cb4226f Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Tue, 5 Nov 2024 16:40:23 +0800 Subject: [PATCH 03/10] fix --- pkg/sql/compile/compile.go | 61 ++++++++++++++++++++++++-------------- pkg/sql/compile/scope.go | 31 +++++++++++++++---- pkg/sql/compile/types.go | 3 +- pkg/vm/types.go | 13 -------- 4 files changed, 66 insertions(+), 42 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 77b441b61f0f..ab4d3cf4d5e8 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -1040,7 +1040,7 @@ func (c *Compile) compilePlanScope(step int32, curNodeIdx int32, ns []*plan.Node if n.Stats.HashmapStats != nil && n.Stats.HashmapStats.Shuffle { ss = c.compileSort(n, c.compileShuffleGroup(n, ss, ns)) return ss, nil - } else if c.IsSingleScope(ss) && ss[0].PartialResults == nil { + } else if c.IsSingleScope(ss) { ss = c.compileSort(n, c.compileProjection(n, c.compileRestrict(n, c.compileTPGroup(n, ss, ns)))) return ss, nil } else { @@ -1811,8 +1811,11 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) { } c.anal.isFirst = false - if len(n.OrderBy) > 0 { - ss[0].NodeInfo.Mcpu = 1 + if len(n.AggList) > 0 { + partialResults, _, _ := checkAggOptimize(n) + if partialResults != nil { + ss[0].HasPartialResults = true + } } return ss, nil @@ -2981,9 +2984,19 @@ func (c *Compile) compileSample(n *plan.Node, ss []*Scope) []*Scope { func (c *Compile) compileTPGroup(n *plan.Node, ss []*Scope, ns []*plan.Node) []*Scope { currentFirstFlag := c.anal.isFirst - op := constructGroup(c.proc.Ctx, n, ns[n.Children[0]], true, 0, c.proc) - op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) - ss[0].setRootOperator(op) + if ss[0].HasPartialResults { + op := constructGroup(c.proc.Ctx, n, ns[n.Children[0]], false, 0, c.proc) + op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + ss[0].setRootOperator(op) + arg := constructMergeGroup(true) + arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + ss[0].setRootOperator(arg) + } else { + op := constructGroup(c.proc.Ctx, n, ns[n.Children[0]], true, 0, c.proc) + op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + ss[0].setRootOperator(op) + } + ss[0].HasPartialResults = false c.anal.isFirst = false return ss } @@ -3011,12 +3024,6 @@ func (c *Compile) compileMergeGroup(n *plan.Node, ss []*Scope, ns []*plan.Node, currentFirstFlag = c.anal.isFirst arg := constructMergeGroup(true) - if ss[0].PartialResults != nil { - arg.PartialResults = ss[0].PartialResults - arg.PartialResultTypes = ss[0].PartialResultTypes - ss[0].PartialResults = nil - ss[0].PartialResultTypes = nil - } arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) rs.setRootOperator(arg) c.anal.isFirst = false @@ -3036,12 +3043,6 @@ func (c *Compile) compileMergeGroup(n *plan.Node, ss []*Scope, ns []*plan.Node, currentFirstFlag = c.anal.isFirst arg := constructMergeGroup(true) - if ss[0].PartialResults != nil { - arg.PartialResults = ss[0].PartialResults - arg.PartialResultTypes = ss[0].PartialResultTypes - ss[0].PartialResults = nil - ss[0].PartialResultTypes = nil - } arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) rs.setRootOperator(arg) c.anal.isFirst = false @@ -4041,6 +4042,18 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { if err != nil { return nil, err } + + forceSingle := false + if len(n.AggList) > 0 { + partialResults, _, _ := checkAggOptimize(n) + if partialResults != nil { + forceSingle = true + } + } + if len(n.OrderBy) > 0 { + forceSingle = true + } + if c.determinExpandRanges(n) { if c.isPrepare { return nil, cantCompileForPrepareErr @@ -4108,8 +4121,8 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { // for an ordered scan, put all payloads in current CN // or sometimes force on one CN // if not disttae engine, just put all payloads in current CN - if len(c.cnList) == 1 || len(n.OrderBy) > 0 || relData.DataCnt() < plan2.BlockThresholdForOneCN || n.Stats.ForceOneCN || engineType != engine.Disttae { - return putBlocksInCurrentCN(c, relData), nil + if len(c.cnList) == 1 || relData.DataCnt() < plan2.BlockThresholdForOneCN || n.Stats.ForceOneCN || engineType != engine.Disttae || forceSingle { + return putBlocksInCurrentCN(c, relData, forceSingle), nil } // only support disttae engine for now nodes, err = shuffleBlocksToMultiCN(c, rel, relData, n) @@ -4625,12 +4638,16 @@ func shuffleBlocksByRange(c *Compile, relData engine.RelData, n *plan.Node, node return nil } -func putBlocksInCurrentCN(c *Compile, relData engine.RelData) engine.Nodes { +func putBlocksInCurrentCN(c *Compile, relData engine.RelData, forceSingle bool) engine.Nodes { var nodes engine.Nodes // add current CN + mcpu := c.generateCPUNumber(ncpu, relData.DataCnt()) + if forceSingle { + mcpu = 1 + } nodes = append(nodes, engine.Node{ Addr: c.addr, - Mcpu: c.generateCPUNumber(ncpu, relData.DataCnt()), + Mcpu: mcpu, }) nodes[0].Data = relData return nodes diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 5d2e70ce7ad7..904dec48f918 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -17,11 +17,14 @@ package compile import ( "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/projection" "strings" "sync" "time" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" + "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -851,7 +854,6 @@ func (s *Scope) aggOptimize(c *Compile) error { return err } if len(scanNode.AggList) > 0 { - partialResults, partialResultTypes, columnMap := checkAggOptimize(scanNode) if partialResults != nil { newRelData := s.NodeInfo.Data.BuildEmptyRelData(1) @@ -893,9 +895,8 @@ func (s *Scope) aggOptimize(c *Compile) error { if partialResults != nil { s.NodeInfo.Data = newRelData //find the last mergegroup - child := s.RootOp.GetOperatorBase().GetChildren(0) - op := vm.GetLeafOpGrandParent(s.RootOp, child, child.GetOperatorBase().GetChildren(0)) - if mergeGroup, ok := op.(*mergegroup.MergeGroup); ok { + mergeGroup := findMergeGroup(s.RootOp) + if mergeGroup != nil { mergeGroup.PartialResults = partialResults mergeGroup.PartialResultTypes = partialResultTypes } else { @@ -907,6 +908,26 @@ func (s *Scope) aggOptimize(c *Compile) error { return nil } +// find scan->proj->group->mergegroup +func findMergeGroup(op vm.Operator) *mergegroup.MergeGroup { + if op == nil { + return nil + } + if mergeGroup, ok := op.(*mergegroup.MergeGroup); ok { + child := op.GetOperatorBase().GetChildren(0) + if _, ok = child.(*group.Group); ok { + child = child.GetOperatorBase().GetChildren(0) + if _, ok = child.(*projection.Projection); ok { + child = child.GetOperatorBase().GetChildren(0) + if _, ok = child.(*table_scan.TableScan); ok { + return mergeGroup + } + } + } + } + return findMergeGroup(op.GetOperatorBase().GetChildren(0)) +} + func (s *Scope) buildReaders(c *Compile) (readers []engine.Reader, err error) { // receive runtime filter and optimized the datasource. if err = s.handleRuntimeFilter(c); err != nil { diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index c34c8eb96fa0..74affd04e1fe 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -180,8 +180,7 @@ type Scope struct { RemoteReceivRegInfos []RemoteReceivRegInfo - PartialResults []any - PartialResultTypes []types.T + HasPartialResults bool } func canScopeOpRemote(rootOp vm.Operator) bool { diff --git a/pkg/vm/types.go b/pkg/vm/types.go index 5e6eda5d19a8..a6120f54c7b2 100644 --- a/pkg/vm/types.go +++ b/pkg/vm/types.go @@ -615,16 +615,3 @@ func GetLeafOpParent(parentOp Operator, op Operator) Operator { } return GetLeafOpParent(op, op.GetOperatorBase().GetChildren(0)) } - -// suppose that the op tree is like a list, only one leaf child -// return grand parent of leaf op -func GetLeafOpGrandParent(grandParentOp, parentOp, op Operator) Operator { - if op == nil || parentOp == nil { - return nil - } - if op.GetOperatorBase().NumChildren() == 0 { - return grandParentOp - } - child := op.GetOperatorBase().GetChildren(0) - return GetLeafOpGrandParent(parentOp, op, child) -} From 085a9366ce64009f0117d280d8953d12430ba1b2 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Tue, 5 Nov 2024 17:06:06 +0800 Subject: [PATCH 04/10] fix --- pkg/sql/compile/compile.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index ab4d3cf4d5e8..e4e587907f9a 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -431,7 +431,6 @@ func (c *Compile) FreeOperator() { } } -/* func (c *Compile) printPipeline() { if c.IsTpQuery() { fmt.Println("pipeline for tp query!", "sql: ", c.originSQL) @@ -440,7 +439,6 @@ func (c *Compile) printPipeline() { } fmt.Println(DebugShowScopes(c.scopes, OldLevel)) } -*/ // prePipelineInitializer is responsible for handling some tasks that need to be done before truly launching the pipeline. // @@ -467,7 +465,7 @@ func (c *Compile) prePipelineInitializer() (err error) { // run once func (c *Compile) runOnce() (err error) { - //c.printPipeline() + c.printPipeline() // defer cleanup at the end of runOnce() defer func() { @@ -4095,9 +4093,13 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { }) } else { // add current CN + mcpu := c.generateCPUNumber(ncpu, int(n.Stats.BlockNum)) + if forceSingle { + mcpu = 1 + } nodes = append(nodes, engine.Node{ Addr: c.addr, - Mcpu: c.generateCPUNumber(ncpu, int(n.Stats.BlockNum)), + Mcpu: mcpu, }) nodes[0].NeedExpandRanges = true return nodes, nil @@ -4110,7 +4112,7 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { nodes[i] = engine.Node{ Id: node.Id, Addr: node.Addr, - Mcpu: c.generateCPUNumber(node.Mcpu, int(n.Stats.BlockNum)), + Mcpu: 1, Data: engine.BuildEmptyRelData(), } } From 10b6b06df382709b2f323a265dc8e80f49a724ec Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Tue, 5 Nov 2024 17:50:53 +0800 Subject: [PATCH 05/10] fix --- pkg/sql/compile/scope.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 904dec48f918..f8a71f069ffd 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -17,12 +17,13 @@ package compile import ( "context" "fmt" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" - "github.com/matrixorigin/matrixone/pkg/sql/colexec/projection" "strings" "sync" "time" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/projection" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" "github.com/matrixorigin/matrixone/pkg/fileservice" @@ -849,13 +850,14 @@ func removeStringBetween(s, start, end string) string { func (s *Scope) aggOptimize(c *Compile) error { scanNode := s.DataSource.node - rel, _, ctx, err := c.handleDbRelContext(scanNode) - if err != nil { - return err - } - if len(scanNode.AggList) > 0 { + if scanNode != nil && len(scanNode.AggList) > 0 { partialResults, partialResultTypes, columnMap := checkAggOptimize(scanNode) if partialResults != nil { + rel, _, ctx, err := c.handleDbRelContext(scanNode) + if err != nil { + return err + } + newRelData := s.NodeInfo.Data.BuildEmptyRelData(1) blk := s.NodeInfo.Data.GetBlockInfo(0) newRelData.AppendBlockInfo(&blk) From ea4ed0ed90dfc22cc0d166d59747faa53027fc06 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Tue, 5 Nov 2024 17:51:36 +0800 Subject: [PATCH 06/10] fix --- pkg/sql/compile/compile_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/compile/compile_test.go b/pkg/sql/compile/compile_test.go index 9826044842be..b4c47e2daf85 100644 --- a/pkg/sql/compile/compile_test.go +++ b/pkg/sql/compile/compile_test.go @@ -308,7 +308,7 @@ func TestPutBlocksInCurrentCN(t *testing.T) { } reldata := &engine_util.BlockListRelData{} reldata.SetBlockList(s) - putBlocksInCurrentCN(testCompile, reldata) + putBlocksInCurrentCN(testCompile, reldata, true) } func TestShuffleBlocksToMultiCN(t *testing.T) { From 531a989e08b533278273e51a79e49ae35584c0e3 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Wed, 6 Nov 2024 08:50:14 +0800 Subject: [PATCH 07/10] fix --- pkg/sql/compile/compile.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index e4e587907f9a..d322903fa1f8 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -431,6 +431,7 @@ func (c *Compile) FreeOperator() { } } +/* func (c *Compile) printPipeline() { if c.IsTpQuery() { fmt.Println("pipeline for tp query!", "sql: ", c.originSQL) @@ -439,6 +440,7 @@ func (c *Compile) printPipeline() { } fmt.Println(DebugShowScopes(c.scopes, OldLevel)) } +*/ // prePipelineInitializer is responsible for handling some tasks that need to be done before truly launching the pipeline. // @@ -465,7 +467,7 @@ func (c *Compile) prePipelineInitializer() (err error) { // run once func (c *Compile) runOnce() (err error) { - c.printPipeline() + //c.printPipeline() // defer cleanup at the end of runOnce() defer func() { From 5da2d9528585b544eacf6afc0d12de1ee43486b7 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Wed, 6 Nov 2024 10:08:41 +0800 Subject: [PATCH 08/10] fix --- pkg/sql/compile/scope.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index f8a71f069ffd..21a5083615b9 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -852,7 +852,7 @@ func (s *Scope) aggOptimize(c *Compile) error { scanNode := s.DataSource.node if scanNode != nil && len(scanNode.AggList) > 0 { partialResults, partialResultTypes, columnMap := checkAggOptimize(scanNode) - if partialResults != nil { + if partialResults != nil && s.NodeInfo.Data.DataCnt() > 1 { rel, _, ctx, err := c.handleDbRelContext(scanNode) if err != nil { return err From e5ff561280af7dde5f568e2c0fbc5e93fa06b060 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Wed, 6 Nov 2024 13:10:55 +0800 Subject: [PATCH 09/10] fix --- pkg/sql/compile/compile.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index d322903fa1f8..1d651701ad9d 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -4156,6 +4156,7 @@ func checkAggOptimize(n *plan.Node) ([]any, []types.T, map[int]int) { if !ok { if _, ok := args.Expr.(*plan.Expr_Lit); ok { agg.F.Func.ObjName = "starcount" + return partialResults, partialResultTypes, columnMap } return nil, nil, nil } else { From 448c36d35cc03ec1f65880a31f0e2d9f151004b4 Mon Sep 17 00:00:00 2001 From: badboynt1 Date: Wed, 6 Nov 2024 14:13:40 +0800 Subject: [PATCH 10/10] fix --- pkg/sql/compile/compile.go | 74 +++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 1d651701ad9d..13d36571a21e 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3805,7 +3805,7 @@ func (c *Compile) determinExpandRanges(n *plan.Node) bool { func collectTombstones( c *Compile, - n *plan.Node, + node *plan.Node, rel engine.Relation, ) (engine.Tombstoner, error) { var err error @@ -3817,35 +3817,35 @@ func collectTombstones( //----------------------------------------------------------------------------------------------------- ctx := c.proc.GetTopContext() txnOp = c.proc.GetTxnOperator() - if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil { + if node.ScanSnapshot != nil && node.ScanSnapshot.TS != nil { zeroTS := timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0} snapTS := c.proc.GetTxnOperator().Txn().SnapshotTS - if !n.ScanSnapshot.TS.Equal(zeroTS) && n.ScanSnapshot.TS.Less(snapTS) { + if !node.ScanSnapshot.TS.Equal(zeroTS) && node.ScanSnapshot.TS.Less(snapTS) { if c.proc.GetCloneTxnOperator() != nil { txnOp = c.proc.GetCloneTxnOperator() } else { - txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS) + txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*node.ScanSnapshot.TS) c.proc.SetCloneTxnOperator(txnOp) } - if n.ScanSnapshot.Tenant != nil { - ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID) + if node.ScanSnapshot.Tenant != nil { + ctx = context.WithValue(ctx, defines.TenantIDKey{}, node.ScanSnapshot.Tenant.TenantID) } } } //----------------------------------------------------------------------------------------------------- - if util.TableIsClusterTable(n.TableDef.GetTableType()) { + if util.TableIsClusterTable(node.TableDef.GetTableType()) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - if n.ObjRef.PubInfo != nil { - ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId())) + if node.ObjRef.PubInfo != nil { + ctx = defines.AttachAccountId(ctx, uint32(node.ObjRef.PubInfo.GetTenantId())) } - if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) { + if util.TableIsLoggingTable(node.ObjRef.SchemaName, node.ObjRef.ObjName) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) + db, err = c.e.Database(ctx, node.ObjRef.SchemaName, txnOp) if err != nil { return nil, err } @@ -3854,9 +3854,9 @@ func collectTombstones( return nil, err } - if n.TableDef.Partition != nil { - if n.PartitionPrune != nil && n.PartitionPrune.IsPruned { - for _, partitionItem := range n.PartitionPrune.SelectedPartitions { + if node.TableDef.Partition != nil { + if node.PartitionPrune != nil && node.PartitionPrune.IsPruned { + for _, partitionItem := range node.PartitionPrune.SelectedPartitions { partTableName := partitionItem.PartitionTableName subrelation, err := db.Relation(ctx, partTableName, c.proc) if err != nil { @@ -3872,7 +3872,7 @@ func collectTombstones( } } } else { - partitionInfo := n.TableDef.Partition + partitionInfo := node.TableDef.Partition partitionNum := int(partitionInfo.PartitionNum) partitionTableNames := partitionInfo.PartitionTableNames for i := 0; i < partitionNum; i++ { @@ -3897,10 +3897,10 @@ func collectTombstones( } func (c *Compile) expandRanges( - n *plan.Node, + node *plan.Node, blockFilterList []*plan.Expr, crs *perfcounter.CounterSet) (engine.RelData, error) { - rel, db, ctx, err := c.handleDbRelContext(n) + rel, db, ctx, err := c.handleDbRelContext(node) if err != nil { return nil, err } @@ -3910,7 +3910,7 @@ func (c *Compile) expandRanges( if len(blockFilterList) > 0 { preAllocSize = 64 } else { - preAllocSize = int(n.Stats.BlockNum) + preAllocSize = int(node.Stats.BlockNum) } } @@ -3922,9 +3922,9 @@ func (c *Compile) expandRanges( } //tombstones, err := rel.CollectTombstones(ctx, c.TxnOffset) - if n.TableDef.Partition != nil { - if n.PartitionPrune != nil && n.PartitionPrune.IsPruned { - for i, partitionItem := range n.PartitionPrune.SelectedPartitions { + if node.TableDef.Partition != nil { + if node.PartitionPrune != nil && node.PartitionPrune.IsPruned { + for i, partitionItem := range node.PartitionPrune.SelectedPartitions { partTableName := partitionItem.PartitionTableName subrelation, err := db.Relation(newCtx, partTableName, c.proc) if err != nil { @@ -3943,7 +3943,7 @@ func (c *Compile) expandRanges( }) } } else { - partitionInfo := n.TableDef.Partition + partitionInfo := node.TableDef.Partition partitionNum := int(partitionInfo.PartitionNum) partitionTableNames := partitionInfo.PartitionTableNames for i := 0; i < partitionNum; i++ { @@ -3971,7 +3971,7 @@ func (c *Compile) expandRanges( } -func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) { +func (c *Compile) handleDbRelContext(node *plan.Node) (engine.Relation, engine.Database, context.Context, error) { var err error var db engine.Database var rel engine.Relation @@ -3980,34 +3980,34 @@ func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Data //------------------------------------------------------------------------------------------------------------------ ctx := c.proc.GetTopContext() txnOp = c.proc.GetTxnOperator() - if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil { - if !n.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) && - n.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) { + if node.ScanSnapshot != nil && node.ScanSnapshot.TS != nil { + if !node.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) && + node.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) { - txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS) + txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*node.ScanSnapshot.TS) c.proc.SetCloneTxnOperator(txnOp) - if n.ScanSnapshot.Tenant != nil { - ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID) + if node.ScanSnapshot.Tenant != nil { + ctx = context.WithValue(ctx, defines.TenantIDKey{}, node.ScanSnapshot.Tenant.TenantID) } } } //------------------------------------------------------------------------------------------------------------- - if util.TableIsClusterTable(n.TableDef.GetTableType()) { + if util.TableIsClusterTable(node.TableDef.GetTableType()) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - if n.ObjRef.PubInfo != nil { - ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId())) + if node.ObjRef.PubInfo != nil { + ctx = defines.AttachAccountId(ctx, uint32(node.ObjRef.PubInfo.GetTenantId())) } - if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) { + if util.TableIsLoggingTable(node.ObjRef.SchemaName, node.ObjRef.ObjName) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) + db, err = c.e.Database(ctx, node.ObjRef.SchemaName, txnOp) if err != nil { return nil, nil, nil, err } - rel, err = db.Relation(ctx, n.TableDef.Name, c.proc) + rel, err = db.Relation(ctx, node.TableDef.Name, c.proc) if err != nil { if txnOp.IsSnapOp() { return nil, nil, nil, err @@ -4019,7 +4019,7 @@ func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Data } // if temporary table, just scan at local cn. - rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc) + rel, e = db.Relation(ctx, engine.GetTempTableName(node.ObjRef.SchemaName, node.TableDef.Name), c.proc) if e != nil { return nil, nil, nil, err } @@ -4114,7 +4114,7 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { nodes[i] = engine.Node{ Id: node.Id, Addr: node.Addr, - Mcpu: 1, + Mcpu: c.generateCPUNumber(node.Mcpu, int(n.Stats.BlockNum)), Data: engine.BuildEmptyRelData(), } }