diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index f3a3743343dd..728046b0497b 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -1041,7 +1041,7 @@ func (c *Compile) compilePlanScope(step int32, curNodeIdx int32, ns []*plan.Node if n.Stats.HashmapStats != nil && n.Stats.HashmapStats.Shuffle { ss = c.compileSort(n, c.compileShuffleGroup(n, ss, ns)) return ss, nil - } else if c.IsSingleScope(ss) && ss[0].PartialResults == nil { + } else if c.IsSingleScope(ss) { ss = c.compileSort(n, c.compileProjection(n, c.compileRestrict(n, c.compileTPGroup(n, ss, ns)))) return ss, nil } else { @@ -1808,7 +1808,7 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) { stats.AddCompileTableScanConsumption(time.Since(compileStart)) }() - nodes, partialResults, partialResultTypes, err := c.generateNodes(n) + nodes, err := c.generateNodes(n) if err != nil { return nil, err } @@ -1824,12 +1824,13 @@ func (c *Compile) compileTableScan(n *plan.Node) ([]*Scope, error) { } c.anal.isFirst = false - if len(n.OrderBy) > 0 { - ss[0].NodeInfo.Mcpu = 1 + if len(n.AggList) > 0 { + partialResults, _, _ := checkAggOptimize(n) + if partialResults != nil { + ss[0].HasPartialResults = true + } } - ss[0].PartialResults = partialResults - ss[0].PartialResultTypes = partialResultTypes return ss, nil } @@ -3013,9 +3014,19 @@ func (c *Compile) compileSample(n *plan.Node, ss []*Scope) []*Scope { func (c *Compile) compileTPGroup(n *plan.Node, ss []*Scope, ns []*plan.Node) []*Scope { currentFirstFlag := c.anal.isFirst - op := constructGroup(c.proc.Ctx, n, ns[n.Children[0]], true, 0, c.proc) - op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) - ss[0].setRootOperator(op) + if ss[0].HasPartialResults { + op := constructGroup(c.proc.Ctx, n, ns[n.Children[0]], false, 0, c.proc) + op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + ss[0].setRootOperator(op) + arg := constructMergeGroup(true) + arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + ss[0].setRootOperator(arg) + } else { + op := constructGroup(c.proc.Ctx, n, ns[n.Children[0]], true, 0, c.proc) + op.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) + ss[0].setRootOperator(op) + } + ss[0].HasPartialResults = false c.anal.isFirst = false return ss } @@ -3043,12 +3054,6 @@ func (c *Compile) compileMergeGroup(n *plan.Node, ss []*Scope, ns []*plan.Node, currentFirstFlag = c.anal.isFirst arg := constructMergeGroup(true) - if ss[0].PartialResults != nil { - arg.PartialResults = ss[0].PartialResults - arg.PartialResultTypes = ss[0].PartialResultTypes - ss[0].PartialResults = nil - ss[0].PartialResultTypes = nil - } arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) rs.setRootOperator(arg) c.anal.isFirst = false @@ -3068,12 +3073,6 @@ func (c *Compile) compileMergeGroup(n *plan.Node, ss []*Scope, ns []*plan.Node, currentFirstFlag = c.anal.isFirst arg := constructMergeGroup(true) - if ss[0].PartialResults != nil { - arg.PartialResults = ss[0].PartialResults - arg.PartialResultTypes = ss[0].PartialResultTypes - ss[0].PartialResults = nil - ss[0].PartialResultTypes = nil - } arg.SetAnalyzeControl(c.anal.curNodeIdx, currentFirstFlag) rs.setRootOperator(arg) c.anal.isFirst = false @@ -3899,18 +3898,12 @@ func (c *Compile) generateCPUNumber(cpunum, blocks int) int { } func (c *Compile) determinExpandRanges(n *plan.Node) bool { - if len(n.AggList) > 0 { - partialResult, _, _ := checkAggOptimize(n) - if partialResult != nil { - return true - } - } return len(c.cnList) > 1 && !n.Stats.ForceOneCN && c.execType == plan2.ExecTypeAP_MULTICN && n.Stats.BlockNum > int32(plan2.BlockThresholdForOneCN) } func collectTombstones( c *Compile, - n *plan.Node, + node *plan.Node, rel engine.Relation, ) (engine.Tombstoner, error) { var err error @@ -3922,35 +3915,35 @@ func collectTombstones( //----------------------------------------------------------------------------------------------------- ctx := c.proc.GetTopContext() txnOp = c.proc.GetTxnOperator() - if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil { + if node.ScanSnapshot != nil && node.ScanSnapshot.TS != nil { zeroTS := timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0} snapTS := c.proc.GetTxnOperator().Txn().SnapshotTS - if !n.ScanSnapshot.TS.Equal(zeroTS) && n.ScanSnapshot.TS.Less(snapTS) { + if !node.ScanSnapshot.TS.Equal(zeroTS) && node.ScanSnapshot.TS.Less(snapTS) { if c.proc.GetCloneTxnOperator() != nil { txnOp = c.proc.GetCloneTxnOperator() } else { - txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS) + txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*node.ScanSnapshot.TS) c.proc.SetCloneTxnOperator(txnOp) } - if n.ScanSnapshot.Tenant != nil { - ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID) + if node.ScanSnapshot.Tenant != nil { + ctx = context.WithValue(ctx, defines.TenantIDKey{}, node.ScanSnapshot.Tenant.TenantID) } } } //----------------------------------------------------------------------------------------------------- - if util.TableIsClusterTable(n.TableDef.GetTableType()) { + if util.TableIsClusterTable(node.TableDef.GetTableType()) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - if n.ObjRef.PubInfo != nil { - ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId())) + if node.ObjRef.PubInfo != nil { + ctx = defines.AttachAccountId(ctx, uint32(node.ObjRef.PubInfo.GetTenantId())) } - if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) { + if util.TableIsLoggingTable(node.ObjRef.SchemaName, node.ObjRef.ObjName) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) + db, err = c.e.Database(ctx, node.ObjRef.SchemaName, txnOp) if err != nil { return nil, err } @@ -3959,9 +3952,9 @@ func collectTombstones( return nil, err } - if n.TableDef.Partition != nil { - if n.PartitionPrune != nil && n.PartitionPrune.IsPruned { - for _, partitionItem := range n.PartitionPrune.SelectedPartitions { + if node.TableDef.Partition != nil { + if node.PartitionPrune != nil && node.PartitionPrune.IsPruned { + for _, partitionItem := range node.PartitionPrune.SelectedPartitions { partTableName := partitionItem.PartitionTableName subrelation, err := db.Relation(ctx, partTableName, c.proc) if err != nil { @@ -3977,7 +3970,7 @@ func collectTombstones( } } } else { - partitionInfo := n.TableDef.Partition + partitionInfo := node.TableDef.Partition partitionNum := int(partitionInfo.PartitionNum) partitionTableNames := partitionInfo.PartitionTableNames for i := 0; i < partitionNum; i++ { @@ -4002,10 +3995,10 @@ func collectTombstones( } func (c *Compile) expandRanges( - n *plan.Node, + node *plan.Node, blockFilterList []*plan.Expr, crs *perfcounter.CounterSet) (engine.RelData, error) { - rel, db, ctx, err := c.handleDbRelContext(n) + rel, db, ctx, err := c.handleDbRelContext(node) if err != nil { return nil, err } @@ -4015,7 +4008,7 @@ func (c *Compile) expandRanges( if len(blockFilterList) > 0 { preAllocSize = 64 } else { - preAllocSize = int(n.Stats.BlockNum) + preAllocSize = int(node.Stats.BlockNum) } } @@ -4027,9 +4020,9 @@ func (c *Compile) expandRanges( } //tombstones, err := rel.CollectTombstones(ctx, c.TxnOffset) - if n.TableDef.Partition != nil { - if n.PartitionPrune != nil && n.PartitionPrune.IsPruned { - for i, partitionItem := range n.PartitionPrune.SelectedPartitions { + if node.TableDef.Partition != nil { + if node.PartitionPrune != nil && node.PartitionPrune.IsPruned { + for i, partitionItem := range node.PartitionPrune.SelectedPartitions { partTableName := partitionItem.PartitionTableName subrelation, err := db.Relation(newCtx, partTableName, c.proc) if err != nil { @@ -4048,7 +4041,7 @@ func (c *Compile) expandRanges( }) } } else { - partitionInfo := n.TableDef.Partition + partitionInfo := node.TableDef.Partition partitionNum := int(partitionInfo.PartitionNum) partitionTableNames := partitionInfo.PartitionTableNames for i := 0; i < partitionNum; i++ { @@ -4076,7 +4069,7 @@ func (c *Compile) expandRanges( } -func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) { +func (c *Compile) handleDbRelContext(node *plan.Node) (engine.Relation, engine.Database, context.Context, error) { var err error var db engine.Database var rel engine.Relation @@ -4085,34 +4078,34 @@ func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Data //------------------------------------------------------------------------------------------------------------------ ctx := c.proc.GetTopContext() txnOp = c.proc.GetTxnOperator() - if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil { - if !n.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) && - n.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) { + if node.ScanSnapshot != nil && node.ScanSnapshot.TS != nil { + if !node.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) && + node.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) { - txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS) + txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*node.ScanSnapshot.TS) c.proc.SetCloneTxnOperator(txnOp) - if n.ScanSnapshot.Tenant != nil { - ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID) + if node.ScanSnapshot.Tenant != nil { + ctx = context.WithValue(ctx, defines.TenantIDKey{}, node.ScanSnapshot.Tenant.TenantID) } } } //------------------------------------------------------------------------------------------------------------- - if util.TableIsClusterTable(n.TableDef.GetTableType()) { + if util.TableIsClusterTable(node.TableDef.GetTableType()) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - if n.ObjRef.PubInfo != nil { - ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId())) + if node.ObjRef.PubInfo != nil { + ctx = defines.AttachAccountId(ctx, uint32(node.ObjRef.PubInfo.GetTenantId())) } - if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) { + if util.TableIsLoggingTable(node.ObjRef.SchemaName, node.ObjRef.ObjName) { ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) + db, err = c.e.Database(ctx, node.ObjRef.SchemaName, txnOp) if err != nil { return nil, nil, nil, err } - rel, err = db.Relation(ctx, n.TableDef.Name, c.proc) + rel, err = db.Relation(ctx, node.TableDef.Name, c.proc) if err != nil { if txnOp.IsSnapOp() { return nil, nil, nil, err @@ -4124,7 +4117,7 @@ func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Data } // if temporary table, just scan at local cn. - rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc) + rel, e = db.Relation(ctx, engine.GetTempTableName(node.ObjRef.SchemaName, node.TableDef.Name), c.proc) if e != nil { return nil, nil, nil, err } @@ -4139,20 +4132,29 @@ func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Data return rel, db, ctx, nil } -func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) { +func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, error) { var relData engine.RelData - var partialResults []any - var partialResultTypes []types.T var nodes engine.Nodes rel, _, ctx, err := c.handleDbRelContext(n) if err != nil { - return nil, nil, nil, err + return nil, err + } + + forceSingle := false + if len(n.AggList) > 0 { + partialResults, _, _ := checkAggOptimize(n) + if partialResults != nil { + forceSingle = true + } + } + if len(n.OrderBy) > 0 { + forceSingle = true } if c.determinExpandRanges(n) { if c.isPrepare { - return nil, nil, nil, cantCompileForPrepareErr + return nil, cantCompileForPrepareErr } //@todo need remove expandRanges from Compile. @@ -4163,13 +4165,13 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e for _, e := range newFilterExpr { _, err := plan2.ReplaceFoldExpr(c.proc, e, &c.filterExprExes) if err != nil { - return nil, nil, nil, err + return nil, err } } for _, e := range newFilterExpr { err = plan2.EvalFoldExpr(c.proc, e, &c.filterExprExes) if err != nil { - return nil, nil, nil, err + return nil, err } } } @@ -4177,7 +4179,7 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e counterset := new(perfcounter.CounterSet) relData, err = c.expandRanges(n, newFilterExpr, counterset) if err != nil { - return nil, nil, nil, err + return nil, err } stats := statistic.StatsInfoFromContext(ctx) @@ -4191,65 +4193,18 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e }) } else { // add current CN + mcpu := c.generateCPUNumber(ncpu, int(n.Stats.BlockNum)) + if forceSingle { + mcpu = 1 + } nodes = append(nodes, engine.Node{ Addr: c.addr, - Mcpu: c.generateCPUNumber(ncpu, int(n.Stats.BlockNum)), + Mcpu: mcpu, }) nodes[0].NeedExpandRanges = true - return nodes, nil, nil, nil + return nodes, nil } - if len(n.AggList) > 0 && relData.DataCnt() > 1 { - var columnMap map[int]int - partialResults, partialResultTypes, columnMap = checkAggOptimize(n) - if partialResults != nil { - newRelData := relData.BuildEmptyRelData(1) - blk := relData.GetBlockInfo(0) - newRelData.AppendBlockInfo(&blk) - - tombstones, err := collectTombstones(c, n, rel) - if err != nil { - return nil, nil, nil, err - } - - fs, err := fileservice.Get[fileservice.FileService](c.proc.GetFileService(), defines.SharedFileServiceName) - if err != nil { - return nil, nil, nil, err - } - //For each blockinfo in relData, if blk has no tombstones, then compute the agg result, - //otherwise put it into newRelData. - var ( - hasTombstone bool - err2 error - ) - if err = engine.ForRangeBlockInfo(1, relData.DataCnt(), relData, func(blk *objectio.BlockInfo) (bool, error) { - if hasTombstone, err2 = tombstones.HasBlockTombstone( - ctx, &blk.BlockID, fs, - ); err2 != nil { - return false, err2 - } else if blk.IsAppendable() || hasTombstone { - newRelData.AppendBlockInfo(blk) - return true, nil - } - if c.evalAggOptimize(n, blk, partialResults, partialResultTypes, columnMap) != nil { - partialResults = nil - return false, nil - } - return true, nil - }); err != nil { - return nil, nil, nil, err - } - if partialResults != nil { - relData = newRelData - } - } - } - - // some log for finding a bug. - tblId := rel.GetTableID(ctx) - expectedLen := relData.DataCnt() - c.proc.Debugf(ctx, "cn generateNodes, tbl %d ranges is %d", tblId, expectedLen) - // if len(ranges) == 0 indicates that it's a temporary table. if relData.DataCnt() == 0 && n.TableDef.TableType != catalog.SystemOrdinaryRel { nodes = make(engine.Nodes, len(c.cnList)) @@ -4261,19 +4216,19 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e Data: engine.BuildEmptyRelData(), } } - return nodes, partialResults, partialResultTypes, nil + return nodes, nil } engineType := rel.GetEngineType() // for an ordered scan, put all payloads in current CN // or sometimes force on one CN // if not disttae engine, just put all payloads in current CN - if len(c.cnList) == 1 || len(n.OrderBy) > 0 || relData.DataCnt() < plan2.BlockThresholdForOneCN || n.Stats.ForceOneCN || engineType != engine.Disttae { - return putBlocksInCurrentCN(c, relData), partialResults, partialResultTypes, nil + if len(c.cnList) == 1 || relData.DataCnt() < plan2.BlockThresholdForOneCN || n.Stats.ForceOneCN || engineType != engine.Disttae || forceSingle { + return putBlocksInCurrentCN(c, relData, forceSingle), nil } // only support disttae engine for now nodes, err = shuffleBlocksToMultiCN(c, rel, relData, n) - return nodes, partialResults, partialResultTypes, err + return nodes, err } func checkAggOptimize(n *plan.Node) ([]any, []types.T, map[int]int) { @@ -4299,6 +4254,7 @@ func checkAggOptimize(n *plan.Node) ([]any, []types.T, map[int]int) { if !ok { if _, ok := args.Expr.(*plan.Expr_Lit); ok { agg.F.Func.ObjName = "starcount" + return partialResults, partialResultTypes, columnMap } return nil, nil, nil } else { @@ -4785,12 +4741,16 @@ func shuffleBlocksByRange(c *Compile, relData engine.RelData, n *plan.Node, node return nil } -func putBlocksInCurrentCN(c *Compile, relData engine.RelData) engine.Nodes { +func putBlocksInCurrentCN(c *Compile, relData engine.RelData, forceSingle bool) engine.Nodes { var nodes engine.Nodes // add current CN + mcpu := c.generateCPUNumber(ncpu, relData.DataCnt()) + if forceSingle { + mcpu = 1 + } nodes = append(nodes, engine.Node{ Addr: c.addr, - Mcpu: c.generateCPUNumber(ncpu, relData.DataCnt()), + Mcpu: mcpu, }) nodes[0].Data = relData return nodes diff --git a/pkg/sql/compile/compile_test.go b/pkg/sql/compile/compile_test.go index 9826044842be..b4c47e2daf85 100644 --- a/pkg/sql/compile/compile_test.go +++ b/pkg/sql/compile/compile_test.go @@ -308,7 +308,7 @@ func TestPutBlocksInCurrentCN(t *testing.T) { } reldata := &engine_util.BlockListRelData{} reldata.SetBlockList(s) - putBlocksInCurrentCN(testCompile, reldata) + putBlocksInCurrentCN(testCompile, reldata, true) } func TestShuffleBlocksToMultiCN(t *testing.T) { diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 762213838fe2..2ebc8524820e 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -21,6 +21,14 @@ import ( "sync" "time" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/group" + "github.com/matrixorigin/matrixone/pkg/sql/colexec/projection" + + "github.com/matrixorigin/matrixone/pkg/sql/colexec/mergegroup" + + "github.com/matrixorigin/matrixone/pkg/fileservice" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" @@ -652,8 +660,8 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { return err } - ctx1 := c.proc.GetTopContext() - stats := statistic.StatsInfoFromContext(ctx1) + ctx := c.proc.GetTopContext() + stats := statistic.StatsInfoFromContext(ctx) stats.AddScopePrepareS3Request(statistic.S3Request{ List: counterSet.FileService.S3.List.Load(), Head: counterSet.FileService.S3.Head.Load(), @@ -673,6 +681,11 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { return err } } + + err = s.aggOptimize(c) + if err != nil { + return err + } return nil } @@ -875,6 +888,88 @@ func removeStringBetween(s, start, end string) string { return s } +func (s *Scope) aggOptimize(c *Compile) error { + scanNode := s.DataSource.node + if scanNode != nil && len(scanNode.AggList) > 0 { + partialResults, partialResultTypes, columnMap := checkAggOptimize(scanNode) + if partialResults != nil && s.NodeInfo.Data.DataCnt() > 1 { + rel, _, ctx, err := c.handleDbRelContext(scanNode) + if err != nil { + return err + } + + newRelData := s.NodeInfo.Data.BuildEmptyRelData(1) + blk := s.NodeInfo.Data.GetBlockInfo(0) + newRelData.AppendBlockInfo(&blk) + + tombstones, err := collectTombstones(c, scanNode, rel) + if err != nil { + return err + } + + fs, err := fileservice.Get[fileservice.FileService](c.proc.GetFileService(), defines.SharedFileServiceName) + if err != nil { + return err + } + //For each blockinfo in relData, if blk has no tombstones, then compute the agg result, + //otherwise put it into newRelData. + var ( + hasTombstone bool + err2 error + ) + if err = engine.ForRangeBlockInfo(1, s.NodeInfo.Data.DataCnt(), s.NodeInfo.Data, func(blk *objectio.BlockInfo) (bool, error) { + if hasTombstone, err2 = tombstones.HasBlockTombstone( + ctx, &blk.BlockID, fs, + ); err2 != nil { + return false, err2 + } else if blk.IsAppendable() || hasTombstone { + newRelData.AppendBlockInfo(blk) + return true, nil + } + if c.evalAggOptimize(scanNode, blk, partialResults, partialResultTypes, columnMap) != nil { + partialResults = nil + return false, nil + } + return true, nil + }); err != nil { + return err + } + if partialResults != nil { + s.NodeInfo.Data = newRelData + //find the last mergegroup + mergeGroup := findMergeGroup(s.RootOp) + if mergeGroup != nil { + mergeGroup.PartialResults = partialResults + mergeGroup.PartialResultTypes = partialResultTypes + } else { + panic("can't find merge group operator for agg optimize!") + } + } + } + } + return nil +} + +// find scan->proj->group->mergegroup +func findMergeGroup(op vm.Operator) *mergegroup.MergeGroup { + if op == nil { + return nil + } + if mergeGroup, ok := op.(*mergegroup.MergeGroup); ok { + child := op.GetOperatorBase().GetChildren(0) + if _, ok = child.(*group.Group); ok { + child = child.GetOperatorBase().GetChildren(0) + if _, ok = child.(*projection.Projection); ok { + child = child.GetOperatorBase().GetChildren(0) + if _, ok = child.(*table_scan.TableScan); ok { + return mergeGroup + } + } + } + } + return findMergeGroup(op.GetOperatorBase().GetChildren(0)) +} + func (s *Scope) buildReaders(c *Compile) (readers []engine.Reader, err error) { // receive runtime filter and optimized the datasource. if err = s.handleRuntimeFilter(c); err != nil { diff --git a/pkg/sql/compile/types.go b/pkg/sql/compile/types.go index 07382db9adc2..dd34bdda754b 100644 --- a/pkg/sql/compile/types.go +++ b/pkg/sql/compile/types.go @@ -182,8 +182,7 @@ type Scope struct { RemoteReceivRegInfos []RemoteReceivRegInfo - PartialResults []any - PartialResultTypes []types.T + HasPartialResults bool } // ipAddrMatch return true if the node-addr of the scope matches to local address.