Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

delay expand ranges for system table #19788

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 56 additions & 116 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3801,10 +3801,6 @@ func (c *Compile) generateCPUNumber(cpunum, blocks int) int {
}

func (c *Compile) determinExpandRanges(n *plan.Node) bool {
// Each time the three tables are opened, a new txnTable is created, which can result in Compile and Run holding different partition states. To avoid this, delay opening the three tables until the Run phase
if n.ObjRef.SchemaName == catalog.MO_CATALOG && n.TableDef.Name != catalog.MO_TABLES && n.TableDef.Name != catalog.MO_COLUMNS && n.TableDef.Name != catalog.MO_DATABASE {
return true //avoid bugs
}
if len(n.AggList) > 0 {
partialResult, _, _ := checkAggOptimize(n)
if partialResult != nil {
Expand Down Expand Up @@ -3907,78 +3903,11 @@ func collectTombstones(
return tombstone, nil
}

func (c *Compile) handleRelationAndContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) {
var rel engine.Relation
var err error
var db engine.Database
var txnOp client.TxnOperator

//-----------------------------------------------------------------------------------------------------
ctx := c.proc.Ctx
txnOp = c.proc.GetTxnOperator()
if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil {
if !n.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) &&
n.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) {
if c.proc.GetCloneTxnOperator() != nil {
txnOp = c.proc.GetCloneTxnOperator()
} else {
txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS)
c.proc.SetCloneTxnOperator(txnOp)
}

if n.ScanSnapshot.Tenant != nil {
ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID)
}
}
}
//-----------------------------------------------------------------------------------------------------

if util.TableIsClusterTable(n.TableDef.GetTableType()) {
ctx = defines.AttachAccountId(ctx, catalog.System_Account)
}
if n.ObjRef.PubInfo != nil {
ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId()))
}
if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) {
ctx = defines.AttachAccountId(ctx, catalog.System_Account)
}

db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp)
if err != nil {
return nil, nil, nil, err
}

rel, err = db.Relation(ctx, n.TableDef.Name, c.proc)
if err != nil {
if txnOp.IsSnapOp() {
return nil, nil, nil, err
}
var e error // avoid contamination of error messages
db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp)
if e != nil {
return nil, nil, nil, err
}

// if temporary table, just scan at local cn.
rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc)
if e != nil {
return nil, nil, nil, err
}
c.cnList = engine.Nodes{
engine.Node{
Addr: c.addr,
Mcpu: 1,
},
}
}
return rel, db, ctx, nil
}

func (c *Compile) expandRanges(
n *plan.Node,
blockFilterList []*plan.Expr, crs *perfcounter.CounterSet) (engine.RelData, error) {

rel, db, ctx, err := c.handleRelationAndContext(n)
rel, db, ctx, err := c.handleDbRelContext(n)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4049,15 +3978,10 @@ func (c *Compile) expandRanges(

}

func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) {
func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) {
var err error
var db engine.Database
var rel engine.Relation
//var ranges engine.Ranges
var relData engine.RelData
var partialResults []any
var partialResultTypes []types.T
var nodes engine.Nodes
var txnOp client.TxnOperator

//------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -4086,70 +4010,86 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e
ctx = defines.AttachAccountId(ctx, catalog.System_Account)
}

if c.determinExpandRanges(n) {
if c.isPrepare {
return nil, nil, nil, cantCompileForPrepareErr
db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp)
if err != nil {
return nil, nil, nil, err
}
rel, err = db.Relation(ctx, n.TableDef.Name, c.proc)
if err != nil {
if txnOp.IsSnapOp() {
return nil, nil, nil, err
}
db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp)
if err != nil {
var e error // avoid contamination of error messages
db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp)
if e != nil {
return nil, nil, nil, err
}
rel, err = db.Relation(ctx, n.TableDef.Name, c.proc)
if err != nil {
if txnOp.IsSnapOp() {
return nil, nil, nil, err
}
var e error // avoid contamination of error messages
db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp)
if e != nil {
return nil, nil, nil, err
}

// if temporary table, just scan at local cn.
rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc)
if e != nil {
return nil, nil, nil, err
}
c.cnList = engine.Nodes{
engine.Node{
Addr: c.addr,
Mcpu: 1,
},
}
// if temporary table, just scan at local cn.
rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc)
if e != nil {
return nil, nil, nil, err
}
c.cnList = engine.Nodes{
engine.Node{
Addr: c.addr,
Mcpu: 1,
},
}
}

return rel, db, ctx, nil
}

func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) {
var relData engine.RelData
var partialResults []any
var partialResultTypes []types.T
var nodes engine.Nodes

rel, _, ctx, err := c.handleDbRelContext(n)
if err != nil {
return nil, nil, nil, err
}

if c.determinExpandRanges(n) {
if c.isPrepare {
return nil, nil, nil, cantCompileForPrepareErr
}

//@todo need remove expandRanges from Compile.
// all expandRanges should be called by Run
var filterExpr []*plan.Expr
var newFilterExpr []*plan.Expr
if len(n.BlockFilterList) > 0 {
filterExpr = plan2.DeepCopyExprList(n.BlockFilterList)
for _, e := range filterExpr {
newFilterExpr = plan2.DeepCopyExprList(n.BlockFilterList)
for _, e := range newFilterExpr {
_, err := plan2.ReplaceFoldExpr(c.proc, e, &c.filterExprExes)
if err != nil {
return nil, nil, nil, err
}
}
for _, e := range filterExpr {
for _, e := range newFilterExpr {
err = plan2.EvalFoldExpr(c.proc, e, &c.filterExprExes)
if err != nil {
return nil, nil, nil, err
}
}
}

crs := new(perfcounter.CounterSet)
relData, err = c.expandRanges(n, filterExpr, crs)
counterset := new(perfcounter.CounterSet)
relData, err = c.expandRanges(n, newFilterExpr, counterset)
if err != nil {
return nil, nil, nil, err
}

stats := statistic.StatsInfoFromContext(ctx)
stats.CompileExpandRangesS3Request(statistic.S3Request{
List: crs.FileService.S3.List.Load(),
Head: crs.FileService.S3.Head.Load(),
Put: crs.FileService.S3.Put.Load(),
Get: crs.FileService.S3.Get.Load(),
Delete: crs.FileService.S3.Delete.Load(),
DeleteMul: crs.FileService.S3.DeleteMulti.Load(),
List: counterset.FileService.S3.List.Load(),
Head: counterset.FileService.S3.Head.Load(),
Put: counterset.FileService.S3.Put.Load(),
Get: counterset.FileService.S3.Get.Load(),
Delete: counterset.FileService.S3.Delete.Load(),
DeleteMul: counterset.FileService.S3.DeleteMulti.Load(),
})
} else {
// add current CN
Expand Down
40 changes: 20 additions & 20 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {

func (s *Scope) handleRuntimeFilter(c *Compile) error {
var err error
var inExprList []*plan.Expr
var runtimeInExprList []*plan.Expr
exprs := make([]*plan.Expr, 0, len(s.DataSource.RuntimeFilterSpecs))
filters := make([]message.RuntimeFilterMessage, 0, len(exprs))

Expand All @@ -538,7 +538,7 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
return nil
case message.RuntimeFilter_IN:
inExpr := plan2.MakeInExpr(c.proc.Ctx, spec.Expr, msg.Card, msg.Data, spec.MatchPrefix)
inExprList = append(inExprList, inExpr)
runtimeInExprList = append(runtimeInExprList, inExpr)

// TODO: implement BETWEEN expression
}
Expand All @@ -549,15 +549,15 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
}

var appendNotPkFilter []*plan.Expr
for i := range inExprList {
fn := inExprList[i].GetF()
for i := range runtimeInExprList {
fn := runtimeInExprList[i].GetF()
col := fn.Args[0].GetCol()
if col == nil {
panic("only support col in runtime filter's left child!")
}
pkPos := s.DataSource.TableDef.Name2ColIndex[s.DataSource.TableDef.Pkey.PkeyColName]
if pkPos != col.ColPos {
appendNotPkFilter = append(appendNotPkFilter, plan2.DeepCopyExpr(inExprList[i]))
appendNotPkFilter = append(appendNotPkFilter, plan2.DeepCopyExpr(runtimeInExprList[i]))
}
}

Expand All @@ -579,8 +579,8 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
}

// reset datasource
if len(inExprList) > 0 {
newExprList := plan2.DeepCopyExprList(inExprList)
if len(runtimeInExprList) > 0 {
newExprList := plan2.DeepCopyExprList(runtimeInExprList)
if s.DataSource.FilterExpr != nil {
newExprList = append(newExprList, s.DataSource.FilterExpr)
}
Expand All @@ -600,33 +600,33 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
}
}

newList := plan2.DeepCopyExprList(inExprList)
newExprList := plan2.DeepCopyExprList(runtimeInExprList)
if len(s.DataSource.node.BlockFilterList) > 0 {
newList = append(newList, s.DataSource.BlockFilterList...)
newExprList = append(newExprList, s.DataSource.BlockFilterList...)
}

crs := new(perfcounter.CounterSet)
relData, err := c.expandRanges(s.DataSource.node, newList, crs)
counterSet := new(perfcounter.CounterSet)
relData, err := c.expandRanges(s.DataSource.node, newExprList, counterSet)
if err != nil {
return err
}

ctx := c.proc.GetTopContext()
stats := statistic.StatsInfoFromContext(ctx)
ctx1 := c.proc.GetTopContext()
stats := statistic.StatsInfoFromContext(ctx1)
stats.AddScopePrepareS3Request(statistic.S3Request{
List: crs.FileService.S3.List.Load(),
Head: crs.FileService.S3.Head.Load(),
Put: crs.FileService.S3.Put.Load(),
Get: crs.FileService.S3.Get.Load(),
Delete: crs.FileService.S3.Delete.Load(),
DeleteMul: crs.FileService.S3.DeleteMulti.Load(),
List: counterSet.FileService.S3.List.Load(),
Head: counterSet.FileService.S3.Head.Load(),
Put: counterSet.FileService.S3.Put.Load(),
Get: counterSet.FileService.S3.Get.Load(),
Delete: counterSet.FileService.S3.Delete.Load(),
DeleteMul: counterSet.FileService.S3.DeleteMulti.Load(),
})

//FIXME:: Do need to attache tombstones? No, because the scope runs on local CN
//relData.AttachTombstones()
s.NodeInfo.Data = relData

} else if len(inExprList) > 0 {
} else if len(runtimeInExprList) > 0 {
s.NodeInfo.Data, err = ApplyRuntimeFilters(c.proc.Ctx, s.Proc, s.DataSource.TableDef, s.NodeInfo.Data, exprs, filters)
if err != nil {
return err
Expand Down
Loading