diff --git a/pkg/sql/colexec/deletion/deletion.go b/pkg/sql/colexec/deletion/deletion.go index a8d32e46dc49..ecbe3621dd5e 100644 --- a/pkg/sql/colexec/deletion/deletion.go +++ b/pkg/sql/colexec/deletion/deletion.go @@ -262,7 +262,7 @@ func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, er if tempRows > 0 { affectedRows += tempRows - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) err = deletion.ctr.partitionSources[partIdx].Delete(newCtx, deletion.ctr.resBat, catalog.Row_ID) if err != nil { @@ -281,7 +281,7 @@ func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, er } affectedRows = uint64(deletion.ctr.resBat.RowCount()) if affectedRows > 0 { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) err = deletion.ctr.source.Delete(newCtx, deletion.ctr.resBat, catalog.Row_ID) if err != nil { diff --git a/pkg/sql/colexec/deletion/types.go b/pkg/sql/colexec/deletion/types.go index 2cac007f379d..11589911f712 100644 --- a/pkg/sql/colexec/deletion/types.go +++ b/pkg/sql/colexec/deletion/types.go @@ -276,7 +276,7 @@ func (ctr *container) flush(proc *process.Process, analyzer process.Analyzer) (u delete(blockId_rowIdBatch, blkid) } - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) _, stats, err := s3writer.SortAndSync(newCtx, proc) if err != nil { diff --git a/pkg/sql/colexec/external/external.go b/pkg/sql/colexec/external/external.go index 13e8a17c197f..af9f660e46d3 100644 --- a/pkg/sql/colexec/external/external.go +++ b/pkg/sql/colexec/external/external.go @@ -1080,7 +1080,7 @@ func scanZonemapFile(ctx context.Context, param *ExternalParam, proc *process.Pr return err } - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) // getZonemapBatch will access Fileservice if err := getZonemapBatch(newCtx, param, proc, param.Filter.blockReader, bat); err != nil { diff --git a/pkg/sql/colexec/insert/insert.go b/pkg/sql/colexec/insert/insert.go index 57425ff92ee2..c61982370af3 100644 --- a/pkg/sql/colexec/insert/insert.go +++ b/pkg/sql/colexec/insert/insert.go @@ -236,7 +236,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy return input, err } - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) err = insert.ctr.partitionSources[partIdx].Write(newCtx, insert.ctr.buf) if err != nil { @@ -257,7 +257,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy } insert.ctr.buf.SetRowCount(input.Batch.RowCount()) - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) // insert into table, insertBat will be deeply copied into txn's workspace. @@ -279,7 +279,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy func writeBatch(proc *process.Process, writer *colexec.S3Writer, bat *batch.Batch, analyzer process.Analyzer) error { if writer.StashBatch(proc, bat) { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) blockInfos, stats, err := writer.SortAndSync(newCtx, proc) @@ -298,7 +298,7 @@ func writeBatch(proc *process.Process, writer *colexec.S3Writer, bat *batch.Batc } func flushTailBatch(proc *process.Process, writer *colexec.S3Writer, result *vm.CallResult, analyzer process.Analyzer) error { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) blockInfos, stats, err := writer.FlushTailBatch(newCtx, proc) diff --git a/pkg/sql/colexec/lockop/lock_op.go b/pkg/sql/colexec/lockop/lock_op.go index 24d3c03bc89c..647dc4af19c0 100644 --- a/pkg/sql/colexec/lockop/lock_op.go +++ b/pkg/sql/colexec/lockop/lock_op.go @@ -951,7 +951,7 @@ func hasNewVersionInRange( } } - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) defer func() { if analyzer != nil { diff --git a/pkg/sql/colexec/mergeblock/mergeblock.go b/pkg/sql/colexec/mergeblock/mergeblock.go index d35cfb16b4c9..596566ec957f 100644 --- a/pkg/sql/colexec/mergeblock/mergeblock.go +++ b/pkg/sql/colexec/mergeblock/mergeblock.go @@ -91,7 +91,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error) // 'i' aligns with partition number for i := range mergeBlock.container.partitionSources { if mergeBlock.container.mp[i].RowCount() > 0 { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) // batches in mp will be deeply copied into txn's workspace. @@ -103,7 +103,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error) } for _, bat := range mergeBlock.container.mp2[i] { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) // batches in mp2 will be deeply copied into txn's workspace. if err = mergeBlock.container.partitionSources[i].Write(newCtx, bat); err != nil { @@ -119,7 +119,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error) } else { // handle origin/main table. if mergeBlock.container.mp[0].RowCount() > 0 { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) //batches in mp will be deeply copied into txn's workspace. @@ -131,7 +131,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error) } for _, bat := range mergeBlock.container.mp2[0] { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) //batches in mp2 will be deeply copied into txn's workspace. if err = mergeBlock.container.source.Write(newCtx, bat); err != nil { diff --git a/pkg/sql/colexec/mergeblock/types.go b/pkg/sql/colexec/mergeblock/types.go index f170c9af2836..21ff5be4d712 100644 --- a/pkg/sql/colexec/mergeblock/types.go +++ b/pkg/sql/colexec/mergeblock/types.go @@ -199,7 +199,7 @@ func splitObjectStats(mergeBlock *MergeBlock, proc *process.Process, destVec := mergeBlock.container.mp[int(tblIdx[idx])].Vecs[1] if needLoad { - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) // comes from old version cn diff --git a/pkg/sql/colexec/output/output.go b/pkg/sql/colexec/output/output.go index 8174130c3b71..f277a6586b78 100644 --- a/pkg/sql/colexec/output/output.go +++ b/pkg/sql/colexec/output/output.go @@ -18,7 +18,6 @@ import ( "bytes" "github.com/matrixorigin/matrixone/pkg/container/batch" - "github.com/matrixorigin/matrixone/pkg/perfcounter" "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -74,13 +73,13 @@ func (output *Output) Call(proc *process.Process) (vm.CallResult, error) { } bat := result.Batch - retrievedCounter := new(perfcounter.CounterSet) - if err = output.Func(bat, retrievedCounter); err != nil { + crs := analyzer.GetOpCounterSet() + if err = output.Func(bat, crs); err != nil { result.Status = vm.ExecStop return result, err } - analyzer.AddS3RequestCount(retrievedCounter) - analyzer.AddDiskIO(retrievedCounter) + analyzer.AddS3RequestCount(crs) + analyzer.AddDiskIO(crs) // TODO: analyzer.Output(result.Batch) return result, nil @@ -121,13 +120,13 @@ func (output *Output) Call(proc *process.Process) (vm.CallResult, error) { bat := output.ctr.cachedBatches[output.ctr.currentIdx] output.ctr.currentIdx = output.ctr.currentIdx + 1 - retrievedCounter := new(perfcounter.CounterSet) - if err := output.Func(bat, retrievedCounter); err != nil { + crs := analyzer.GetOpCounterSet() + if err := output.Func(bat, crs); err != nil { result.Status = vm.ExecStop return result, err } - analyzer.AddS3RequestCount(retrievedCounter) - analyzer.AddDiskIO(retrievedCounter) + analyzer.AddS3RequestCount(crs) + analyzer.AddDiskIO(crs) result.Batch = bat // same as nonBlock diff --git a/pkg/sql/colexec/table_function/metadata_scan.go b/pkg/sql/colexec/table_function/metadata_scan.go index ae95deffde7e..f3cddbc6537c 100644 --- a/pkg/sql/colexec/table_function/metadata_scan.go +++ b/pkg/sql/colexec/table_function/metadata_scan.go @@ -74,7 +74,7 @@ func (s *metadataScanState) start(tf *TableFunction, proc *process.Process, nthR return err } - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) metaInfos, err := rel.GetColumMetadataScanInfo(newCtx, colname) if err != nil { diff --git a/pkg/sql/colexec/table_scan/table_scan.go b/pkg/sql/colexec/table_scan/table_scan.go index 8ab329f69256..6b6462bd18cf 100644 --- a/pkg/sql/colexec/table_scan/table_scan.go +++ b/pkg/sql/colexec/table_scan/table_scan.go @@ -117,7 +117,7 @@ func (tableScan *TableScan) Call(proc *process.Process) (vm.CallResult, error) { // read data from storage engine tableScan.ctr.buf.CleanOnlyData() - crs := new(perfcounter.CounterSet) + crs := analyzer.GetOpCounterSet() newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs) isEnd, err := tableScan.Reader.Read(newCtx, tableScan.Attrs, nil, proc.Mp(), tableScan.ctr.buf) if err != nil { diff --git a/pkg/vm/process/operator_analyzer.go b/pkg/vm/process/operator_analyzer.go index 2d983d3183fb..0523d2d9d563 100644 --- a/pkg/vm/process/operator_analyzer.go +++ b/pkg/vm/process/operator_analyzer.go @@ -47,6 +47,7 @@ type Analyzer interface { AddWaitLockTime(t time.Time) AddS3RequestCount(counter *perfcounter.CounterSet) AddDiskIO(counter *perfcounter.CounterSet) + GetOpCounterSet() *perfcounter.CounterSet GetOpStats() *OperatorStats Reset() @@ -62,6 +63,7 @@ type operatorAnalyzer struct { start time.Time wait time.Duration childrenCallDuration time.Duration + crs *perfcounter.CounterSet opStats *OperatorStats } @@ -75,6 +77,7 @@ func NewAnalyzer(idx int, isFirst bool, isLast bool, operatorName string) Analyz isLast: isLast, wait: 0, childrenCallDuration: 0, + crs: new(perfcounter.CounterSet), opStats: NewOperatorStats(operatorName), } } @@ -82,13 +85,23 @@ func NewAnalyzer(idx int, isFirst bool, isLast bool, operatorName string) Analyz // NewTempAnalyzer is used to provide resource statistics services for non operator logic func NewTempAnalyzer() Analyzer { return &operatorAnalyzer{ + wait: 0, + crs: new(perfcounter.CounterSet), opStats: NewOperatorStats("temp Analyzer"), } } +// GetOpCounterSet returns the current CounterSet and resets it. +// This method should be used when you want to start fresh with the performance counters. +func (opAlyzr *operatorAnalyzer) GetOpCounterSet() *perfcounter.CounterSet { + opAlyzr.crs.Reset() + return opAlyzr.crs +} + func (opAlyzr *operatorAnalyzer) Reset() { opAlyzr.wait = 0 opAlyzr.childrenCallDuration = 0 + opAlyzr.crs.Reset() opAlyzr.opStats.Reset() }