Skip to content

Commit

Permalink
delay expand ranges for system table (#19788)
Browse files Browse the repository at this point in the history
delay expand ranges for system table

Approved by: @ouyuanning, @aunjgr
  • Loading branch information
badboynt1 authored Nov 5, 2024
1 parent 3d0127a commit ec22fe6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 136 deletions.
172 changes: 56 additions & 116 deletions pkg/sql/compile/compile.go
Original file line number Diff line number Diff line change
Expand Up @@ -3801,10 +3801,6 @@ func (c *Compile) generateCPUNumber(cpunum, blocks int) int {
}

func (c *Compile) determinExpandRanges(n *plan.Node) bool {
// Each time the three tables are opened, a new txnTable is created, which can result in Compile and Run holding different partition states. To avoid this, delay opening the three tables until the Run phase
if n.ObjRef.SchemaName == catalog.MO_CATALOG && n.TableDef.Name != catalog.MO_TABLES && n.TableDef.Name != catalog.MO_COLUMNS && n.TableDef.Name != catalog.MO_DATABASE {
return true //avoid bugs
}
if len(n.AggList) > 0 {
partialResult, _, _ := checkAggOptimize(n)
if partialResult != nil {
Expand Down Expand Up @@ -3907,78 +3903,11 @@ func collectTombstones(
return tombstone, nil
}

func (c *Compile) handleRelationAndContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) {
var rel engine.Relation
var err error
var db engine.Database
var txnOp client.TxnOperator

//-----------------------------------------------------------------------------------------------------
ctx := c.proc.Ctx
txnOp = c.proc.GetTxnOperator()
if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil {
if !n.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) &&
n.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) {
if c.proc.GetCloneTxnOperator() != nil {
txnOp = c.proc.GetCloneTxnOperator()
} else {
txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS)
c.proc.SetCloneTxnOperator(txnOp)
}

if n.ScanSnapshot.Tenant != nil {
ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID)
}
}
}
//-----------------------------------------------------------------------------------------------------

if util.TableIsClusterTable(n.TableDef.GetTableType()) {
ctx = defines.AttachAccountId(ctx, catalog.System_Account)
}
if n.ObjRef.PubInfo != nil {
ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId()))
}
if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) {
ctx = defines.AttachAccountId(ctx, catalog.System_Account)
}

db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp)
if err != nil {
return nil, nil, nil, err
}

rel, err = db.Relation(ctx, n.TableDef.Name, c.proc)
if err != nil {
if txnOp.IsSnapOp() {
return nil, nil, nil, err
}
var e error // avoid contamination of error messages
db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp)
if e != nil {
return nil, nil, nil, err
}

// if temporary table, just scan at local cn.
rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc)
if e != nil {
return nil, nil, nil, err
}
c.cnList = engine.Nodes{
engine.Node{
Addr: c.addr,
Mcpu: 1,
},
}
}
return rel, db, ctx, nil
}

func (c *Compile) expandRanges(
n *plan.Node,
blockFilterList []*plan.Expr, crs *perfcounter.CounterSet) (engine.RelData, error) {

rel, db, ctx, err := c.handleRelationAndContext(n)
rel, db, ctx, err := c.handleDbRelContext(n)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -4049,15 +3978,10 @@ func (c *Compile) expandRanges(

}

func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) {
func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) {
var err error
var db engine.Database
var rel engine.Relation
//var ranges engine.Ranges
var relData engine.RelData
var partialResults []any
var partialResultTypes []types.T
var nodes engine.Nodes
var txnOp client.TxnOperator

//------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -4086,70 +4010,86 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e
ctx = defines.AttachAccountId(ctx, catalog.System_Account)
}

if c.determinExpandRanges(n) {
if c.isPrepare {
return nil, nil, nil, cantCompileForPrepareErr
db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp)
if err != nil {
return nil, nil, nil, err
}
rel, err = db.Relation(ctx, n.TableDef.Name, c.proc)
if err != nil {
if txnOp.IsSnapOp() {
return nil, nil, nil, err
}
db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp)
if err != nil {
var e error // avoid contamination of error messages
db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp)
if e != nil {
return nil, nil, nil, err
}
rel, err = db.Relation(ctx, n.TableDef.Name, c.proc)
if err != nil {
if txnOp.IsSnapOp() {
return nil, nil, nil, err
}
var e error // avoid contamination of error messages
db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp)
if e != nil {
return nil, nil, nil, err
}

// if temporary table, just scan at local cn.
rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc)
if e != nil {
return nil, nil, nil, err
}
c.cnList = engine.Nodes{
engine.Node{
Addr: c.addr,
Mcpu: 1,
},
}
// if temporary table, just scan at local cn.
rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc)
if e != nil {
return nil, nil, nil, err
}
c.cnList = engine.Nodes{
engine.Node{
Addr: c.addr,
Mcpu: 1,
},
}
}

return rel, db, ctx, nil
}

func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) {
var relData engine.RelData
var partialResults []any
var partialResultTypes []types.T
var nodes engine.Nodes

rel, _, ctx, err := c.handleDbRelContext(n)
if err != nil {
return nil, nil, nil, err
}

if c.determinExpandRanges(n) {
if c.isPrepare {
return nil, nil, nil, cantCompileForPrepareErr
}

//@todo need remove expandRanges from Compile.
// all expandRanges should be called by Run
var filterExpr []*plan.Expr
var newFilterExpr []*plan.Expr
if len(n.BlockFilterList) > 0 {
filterExpr = plan2.DeepCopyExprList(n.BlockFilterList)
for _, e := range filterExpr {
newFilterExpr = plan2.DeepCopyExprList(n.BlockFilterList)
for _, e := range newFilterExpr {
_, err := plan2.ReplaceFoldExpr(c.proc, e, &c.filterExprExes)
if err != nil {
return nil, nil, nil, err
}
}
for _, e := range filterExpr {
for _, e := range newFilterExpr {
err = plan2.EvalFoldExpr(c.proc, e, &c.filterExprExes)
if err != nil {
return nil, nil, nil, err
}
}
}

crs := new(perfcounter.CounterSet)
relData, err = c.expandRanges(n, filterExpr, crs)
counterset := new(perfcounter.CounterSet)
relData, err = c.expandRanges(n, newFilterExpr, counterset)
if err != nil {
return nil, nil, nil, err
}

stats := statistic.StatsInfoFromContext(ctx)
stats.CompileExpandRangesS3Request(statistic.S3Request{
List: crs.FileService.S3.List.Load(),
Head: crs.FileService.S3.Head.Load(),
Put: crs.FileService.S3.Put.Load(),
Get: crs.FileService.S3.Get.Load(),
Delete: crs.FileService.S3.Delete.Load(),
DeleteMul: crs.FileService.S3.DeleteMulti.Load(),
List: counterset.FileService.S3.List.Load(),
Head: counterset.FileService.S3.Head.Load(),
Put: counterset.FileService.S3.Put.Load(),
Get: counterset.FileService.S3.Get.Load(),
Delete: counterset.FileService.S3.Delete.Load(),
DeleteMul: counterset.FileService.S3.DeleteMulti.Load(),
})
} else {
// add current CN
Expand Down
40 changes: 20 additions & 20 deletions pkg/sql/compile/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) {

func (s *Scope) handleRuntimeFilter(c *Compile) error {
var err error
var inExprList []*plan.Expr
var runtimeInExprList []*plan.Expr
exprs := make([]*plan.Expr, 0, len(s.DataSource.RuntimeFilterSpecs))
filters := make([]message.RuntimeFilterMessage, 0, len(exprs))

Expand All @@ -538,7 +538,7 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
return nil
case message.RuntimeFilter_IN:
inExpr := plan2.MakeInExpr(c.proc.Ctx, spec.Expr, msg.Card, msg.Data, spec.MatchPrefix)
inExprList = append(inExprList, inExpr)
runtimeInExprList = append(runtimeInExprList, inExpr)

// TODO: implement BETWEEN expression
}
Expand All @@ -549,15 +549,15 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
}

var appendNotPkFilter []*plan.Expr
for i := range inExprList {
fn := inExprList[i].GetF()
for i := range runtimeInExprList {
fn := runtimeInExprList[i].GetF()
col := fn.Args[0].GetCol()
if col == nil {
panic("only support col in runtime filter's left child!")
}
pkPos := s.DataSource.TableDef.Name2ColIndex[s.DataSource.TableDef.Pkey.PkeyColName]
if pkPos != col.ColPos {
appendNotPkFilter = append(appendNotPkFilter, plan2.DeepCopyExpr(inExprList[i]))
appendNotPkFilter = append(appendNotPkFilter, plan2.DeepCopyExpr(runtimeInExprList[i]))
}
}

Expand All @@ -579,8 +579,8 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
}

// reset datasource
if len(inExprList) > 0 {
newExprList := plan2.DeepCopyExprList(inExprList)
if len(runtimeInExprList) > 0 {
newExprList := plan2.DeepCopyExprList(runtimeInExprList)
if s.DataSource.FilterExpr != nil {
newExprList = append(newExprList, s.DataSource.FilterExpr)
}
Expand All @@ -600,33 +600,33 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error {
}
}

newList := plan2.DeepCopyExprList(inExprList)
newExprList := plan2.DeepCopyExprList(runtimeInExprList)
if len(s.DataSource.node.BlockFilterList) > 0 {
newList = append(newList, s.DataSource.BlockFilterList...)
newExprList = append(newExprList, s.DataSource.BlockFilterList...)
}

crs := new(perfcounter.CounterSet)
relData, err := c.expandRanges(s.DataSource.node, newList, crs)
counterSet := new(perfcounter.CounterSet)
relData, err := c.expandRanges(s.DataSource.node, newExprList, counterSet)
if err != nil {
return err
}

ctx := c.proc.GetTopContext()
stats := statistic.StatsInfoFromContext(ctx)
ctx1 := c.proc.GetTopContext()
stats := statistic.StatsInfoFromContext(ctx1)
stats.AddScopePrepareS3Request(statistic.S3Request{
List: crs.FileService.S3.List.Load(),
Head: crs.FileService.S3.Head.Load(),
Put: crs.FileService.S3.Put.Load(),
Get: crs.FileService.S3.Get.Load(),
Delete: crs.FileService.S3.Delete.Load(),
DeleteMul: crs.FileService.S3.DeleteMulti.Load(),
List: counterSet.FileService.S3.List.Load(),
Head: counterSet.FileService.S3.Head.Load(),
Put: counterSet.FileService.S3.Put.Load(),
Get: counterSet.FileService.S3.Get.Load(),
Delete: counterSet.FileService.S3.Delete.Load(),
DeleteMul: counterSet.FileService.S3.DeleteMulti.Load(),
})

//FIXME:: Do need to attache tombstones? No, because the scope runs on local CN
//relData.AttachTombstones()
s.NodeInfo.Data = relData

} else if len(inExprList) > 0 {
} else if len(runtimeInExprList) > 0 {
s.NodeInfo.Data, err = ApplyRuntimeFilters(c.proc.Ctx, s.Proc, s.DataSource.TableDef, s.NodeInfo.Data, exprs, filters)
if err != nil {
return err
Expand Down

0 comments on commit ec22fe6

Please sign in to comment.