From ec22fe677026aa22d737e19d412f31888ac9e3df Mon Sep 17 00:00:00 2001 From: nitao Date: Tue, 5 Nov 2024 14:16:46 +0800 Subject: [PATCH] delay expand ranges for system table (#19788) delay expand ranges for system table Approved by: @ouyuanning, @aunjgr --- pkg/sql/compile/compile.go | 172 ++++++++++++------------------------- pkg/sql/compile/scope.go | 40 ++++----- 2 files changed, 76 insertions(+), 136 deletions(-) diff --git a/pkg/sql/compile/compile.go b/pkg/sql/compile/compile.go index 57e6c6a19966..b96c60deb2b4 100644 --- a/pkg/sql/compile/compile.go +++ b/pkg/sql/compile/compile.go @@ -3801,10 +3801,6 @@ func (c *Compile) generateCPUNumber(cpunum, blocks int) int { } func (c *Compile) determinExpandRanges(n *plan.Node) bool { - // Each time the three tables are opened, a new txnTable is created, which can result in Compile and Run holding different partition states. To avoid this, delay opening the three tables until the Run phase - if n.ObjRef.SchemaName == catalog.MO_CATALOG && n.TableDef.Name != catalog.MO_TABLES && n.TableDef.Name != catalog.MO_COLUMNS && n.TableDef.Name != catalog.MO_DATABASE { - return true //avoid bugs - } if len(n.AggList) > 0 { partialResult, _, _ := checkAggOptimize(n) if partialResult != nil { @@ -3907,78 +3903,11 @@ func collectTombstones( return tombstone, nil } -func (c *Compile) handleRelationAndContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) { - var rel engine.Relation - var err error - var db engine.Database - var txnOp client.TxnOperator - - //----------------------------------------------------------------------------------------------------- - ctx := c.proc.Ctx - txnOp = c.proc.GetTxnOperator() - if n.ScanSnapshot != nil && n.ScanSnapshot.TS != nil { - if !n.ScanSnapshot.TS.Equal(timestamp.Timestamp{LogicalTime: 0, PhysicalTime: 0}) && - n.ScanSnapshot.TS.Less(c.proc.GetTxnOperator().Txn().SnapshotTS) { - if c.proc.GetCloneTxnOperator() != nil { - txnOp = c.proc.GetCloneTxnOperator() - } else { - txnOp = c.proc.GetTxnOperator().CloneSnapshotOp(*n.ScanSnapshot.TS) - c.proc.SetCloneTxnOperator(txnOp) - } - - if n.ScanSnapshot.Tenant != nil { - ctx = context.WithValue(ctx, defines.TenantIDKey{}, n.ScanSnapshot.Tenant.TenantID) - } - } - } - //----------------------------------------------------------------------------------------------------- - - if util.TableIsClusterTable(n.TableDef.GetTableType()) { - ctx = defines.AttachAccountId(ctx, catalog.System_Account) - } - if n.ObjRef.PubInfo != nil { - ctx = defines.AttachAccountId(ctx, uint32(n.ObjRef.PubInfo.GetTenantId())) - } - if util.TableIsLoggingTable(n.ObjRef.SchemaName, n.ObjRef.ObjName) { - ctx = defines.AttachAccountId(ctx, catalog.System_Account) - } - - db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) - if err != nil { - return nil, nil, nil, err - } - - rel, err = db.Relation(ctx, n.TableDef.Name, c.proc) - if err != nil { - if txnOp.IsSnapOp() { - return nil, nil, nil, err - } - var e error // avoid contamination of error messages - db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp) - if e != nil { - return nil, nil, nil, err - } - - // if temporary table, just scan at local cn. - rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc) - if e != nil { - return nil, nil, nil, err - } - c.cnList = engine.Nodes{ - engine.Node{ - Addr: c.addr, - Mcpu: 1, - }, - } - } - return rel, db, ctx, nil -} - func (c *Compile) expandRanges( n *plan.Node, blockFilterList []*plan.Expr, crs *perfcounter.CounterSet) (engine.RelData, error) { - rel, db, ctx, err := c.handleRelationAndContext(n) + rel, db, ctx, err := c.handleDbRelContext(n) if err != nil { return nil, err } @@ -4049,15 +3978,10 @@ func (c *Compile) expandRanges( } -func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) { +func (c *Compile) handleDbRelContext(n *plan.Node) (engine.Relation, engine.Database, context.Context, error) { var err error var db engine.Database var rel engine.Relation - //var ranges engine.Ranges - var relData engine.RelData - var partialResults []any - var partialResultTypes []types.T - var nodes engine.Nodes var txnOp client.TxnOperator //------------------------------------------------------------------------------------------------------------------ @@ -4086,49 +4010,65 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e ctx = defines.AttachAccountId(ctx, catalog.System_Account) } - if c.determinExpandRanges(n) { - if c.isPrepare { - return nil, nil, nil, cantCompileForPrepareErr + db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) + if err != nil { + return nil, nil, nil, err + } + rel, err = db.Relation(ctx, n.TableDef.Name, c.proc) + if err != nil { + if txnOp.IsSnapOp() { + return nil, nil, nil, err } - db, err = c.e.Database(ctx, n.ObjRef.SchemaName, txnOp) - if err != nil { + var e error // avoid contamination of error messages + db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp) + if e != nil { return nil, nil, nil, err } - rel, err = db.Relation(ctx, n.TableDef.Name, c.proc) - if err != nil { - if txnOp.IsSnapOp() { - return nil, nil, nil, err - } - var e error // avoid contamination of error messages - db, e = c.e.Database(ctx, defines.TEMPORARY_DBNAME, txnOp) - if e != nil { - return nil, nil, nil, err - } - // if temporary table, just scan at local cn. - rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc) - if e != nil { - return nil, nil, nil, err - } - c.cnList = engine.Nodes{ - engine.Node{ - Addr: c.addr, - Mcpu: 1, - }, - } + // if temporary table, just scan at local cn. + rel, e = db.Relation(ctx, engine.GetTempTableName(n.ObjRef.SchemaName, n.TableDef.Name), c.proc) + if e != nil { + return nil, nil, nil, err } + c.cnList = engine.Nodes{ + engine.Node{ + Addr: c.addr, + Mcpu: 1, + }, + } + } + + return rel, db, ctx, nil +} + +func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, error) { + var relData engine.RelData + var partialResults []any + var partialResultTypes []types.T + var nodes engine.Nodes + + rel, _, ctx, err := c.handleDbRelContext(n) + if err != nil { + return nil, nil, nil, err + } + + if c.determinExpandRanges(n) { + if c.isPrepare { + return nil, nil, nil, cantCompileForPrepareErr + } + //@todo need remove expandRanges from Compile. // all expandRanges should be called by Run - var filterExpr []*plan.Expr + var newFilterExpr []*plan.Expr if len(n.BlockFilterList) > 0 { - filterExpr = plan2.DeepCopyExprList(n.BlockFilterList) - for _, e := range filterExpr { + newFilterExpr = plan2.DeepCopyExprList(n.BlockFilterList) + for _, e := range newFilterExpr { _, err := plan2.ReplaceFoldExpr(c.proc, e, &c.filterExprExes) if err != nil { return nil, nil, nil, err } } - for _, e := range filterExpr { + for _, e := range newFilterExpr { err = plan2.EvalFoldExpr(c.proc, e, &c.filterExprExes) if err != nil { return nil, nil, nil, err @@ -4136,20 +4076,20 @@ func (c *Compile) generateNodes(n *plan.Node) (engine.Nodes, []any, []types.T, e } } - crs := new(perfcounter.CounterSet) - relData, err = c.expandRanges(n, filterExpr, crs) + counterset := new(perfcounter.CounterSet) + relData, err = c.expandRanges(n, newFilterExpr, counterset) if err != nil { return nil, nil, nil, err } stats := statistic.StatsInfoFromContext(ctx) stats.CompileExpandRangesS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + List: counterset.FileService.S3.List.Load(), + Head: counterset.FileService.S3.Head.Load(), + Put: counterset.FileService.S3.Put.Load(), + Get: counterset.FileService.S3.Get.Load(), + Delete: counterset.FileService.S3.Delete.Load(), + DeleteMul: counterset.FileService.S3.DeleteMulti.Load(), }) } else { // add current CN diff --git a/pkg/sql/compile/scope.go b/pkg/sql/compile/scope.go index 4daa4c29dba0..845f583c8f73 100644 --- a/pkg/sql/compile/scope.go +++ b/pkg/sql/compile/scope.go @@ -512,7 +512,7 @@ func buildScanParallelRun(s *Scope, c *Compile) (*Scope, error) { func (s *Scope) handleRuntimeFilter(c *Compile) error { var err error - var inExprList []*plan.Expr + var runtimeInExprList []*plan.Expr exprs := make([]*plan.Expr, 0, len(s.DataSource.RuntimeFilterSpecs)) filters := make([]message.RuntimeFilterMessage, 0, len(exprs)) @@ -538,7 +538,7 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { return nil case message.RuntimeFilter_IN: inExpr := plan2.MakeInExpr(c.proc.Ctx, spec.Expr, msg.Card, msg.Data, spec.MatchPrefix) - inExprList = append(inExprList, inExpr) + runtimeInExprList = append(runtimeInExprList, inExpr) // TODO: implement BETWEEN expression } @@ -549,15 +549,15 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { } var appendNotPkFilter []*plan.Expr - for i := range inExprList { - fn := inExprList[i].GetF() + for i := range runtimeInExprList { + fn := runtimeInExprList[i].GetF() col := fn.Args[0].GetCol() if col == nil { panic("only support col in runtime filter's left child!") } pkPos := s.DataSource.TableDef.Name2ColIndex[s.DataSource.TableDef.Pkey.PkeyColName] if pkPos != col.ColPos { - appendNotPkFilter = append(appendNotPkFilter, plan2.DeepCopyExpr(inExprList[i])) + appendNotPkFilter = append(appendNotPkFilter, plan2.DeepCopyExpr(runtimeInExprList[i])) } } @@ -579,8 +579,8 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { } // reset datasource - if len(inExprList) > 0 { - newExprList := plan2.DeepCopyExprList(inExprList) + if len(runtimeInExprList) > 0 { + newExprList := plan2.DeepCopyExprList(runtimeInExprList) if s.DataSource.FilterExpr != nil { newExprList = append(newExprList, s.DataSource.FilterExpr) } @@ -600,33 +600,33 @@ func (s *Scope) handleRuntimeFilter(c *Compile) error { } } - newList := plan2.DeepCopyExprList(inExprList) + newExprList := plan2.DeepCopyExprList(runtimeInExprList) if len(s.DataSource.node.BlockFilterList) > 0 { - newList = append(newList, s.DataSource.BlockFilterList...) + newExprList = append(newExprList, s.DataSource.BlockFilterList...) } - crs := new(perfcounter.CounterSet) - relData, err := c.expandRanges(s.DataSource.node, newList, crs) + counterSet := new(perfcounter.CounterSet) + relData, err := c.expandRanges(s.DataSource.node, newExprList, counterSet) if err != nil { return err } - ctx := c.proc.GetTopContext() - stats := statistic.StatsInfoFromContext(ctx) + ctx1 := c.proc.GetTopContext() + stats := statistic.StatsInfoFromContext(ctx1) stats.AddScopePrepareS3Request(statistic.S3Request{ - List: crs.FileService.S3.List.Load(), - Head: crs.FileService.S3.Head.Load(), - Put: crs.FileService.S3.Put.Load(), - Get: crs.FileService.S3.Get.Load(), - Delete: crs.FileService.S3.Delete.Load(), - DeleteMul: crs.FileService.S3.DeleteMulti.Load(), + List: counterSet.FileService.S3.List.Load(), + Head: counterSet.FileService.S3.Head.Load(), + Put: counterSet.FileService.S3.Put.Load(), + Get: counterSet.FileService.S3.Get.Load(), + Delete: counterSet.FileService.S3.Delete.Load(), + DeleteMul: counterSet.FileService.S3.DeleteMulti.Load(), }) //FIXME:: Do need to attache tombstones? No, because the scope runs on local CN //relData.AttachTombstones() s.NodeInfo.Data = relData - } else if len(inExprList) > 0 { + } else if len(runtimeInExprList) > 0 { s.NodeInfo.Data, err = ApplyRuntimeFilters(c.proc.Ctx, s.Proc, s.DataSource.TableDef, s.NodeInfo.Data, exprs, filters) if err != nil { return err