Skip to content

Commit

Permalink
Operator analyze counter set reuse2.0-dev (#19708)
Browse files Browse the repository at this point in the history
Operator analyzer adds CounterSet member for S3 resource statistics, reducing memory allocation

Approved by: @XuPeng-SH, @m-schen, @sukki37
  • Loading branch information
qingxinhome authored Nov 2, 2024
1 parent 1130481 commit fd77e50
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pkg/sql/colexec/deletion/deletion.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, er
if tempRows > 0 {
affectedRows += tempRows

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
err = deletion.ctr.partitionSources[partIdx].Delete(newCtx, deletion.ctr.resBat, catalog.Row_ID)
if err != nil {
Expand All @@ -281,7 +281,7 @@ func (deletion *Deletion) normalDelete(proc *process.Process) (vm.CallResult, er
}
affectedRows = uint64(deletion.ctr.resBat.RowCount())
if affectedRows > 0 {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
err = deletion.ctr.source.Delete(newCtx, deletion.ctr.resBat, catalog.Row_ID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/deletion/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (ctr *container) flush(proc *process.Process, analyzer process.Analyzer) (u
delete(blockId_rowIdBatch, blkid)
}

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
_, stats, err := s3writer.SortAndSync(newCtx, proc)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func scanZonemapFile(ctx context.Context, param *ExternalParam, proc *process.Pr
return err
}

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
// getZonemapBatch will access Fileservice
if err := getZonemapBatch(newCtx, param, proc, param.Filter.blockReader, bat); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy
return input, err
}

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
err = insert.ctr.partitionSources[partIdx].Write(newCtx, insert.ctr.buf)
if err != nil {
Expand All @@ -257,7 +257,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy
}
insert.ctr.buf.SetRowCount(input.Batch.RowCount())

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

// insert into table, insertBat will be deeply copied into txn's workspace.
Expand All @@ -279,7 +279,7 @@ func (insert *Insert) insert_table(proc *process.Process, analyzer process.Analy

func writeBatch(proc *process.Process, writer *colexec.S3Writer, bat *batch.Batch, analyzer process.Analyzer) error {
if writer.StashBatch(proc, bat) {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

blockInfos, stats, err := writer.SortAndSync(newCtx, proc)
Expand All @@ -298,7 +298,7 @@ func writeBatch(proc *process.Process, writer *colexec.S3Writer, bat *batch.Batc
}

func flushTailBatch(proc *process.Process, writer *colexec.S3Writer, result *vm.CallResult, analyzer process.Analyzer) error {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

blockInfos, stats, err := writer.FlushTailBatch(newCtx, proc)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ func hasNewVersionInRange(
}
}

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
defer func() {
if analyzer != nil {
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colexec/mergeblock/mergeblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
// 'i' aligns with partition number
for i := range mergeBlock.container.partitionSources {
if mergeBlock.container.mp[i].RowCount() > 0 {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

// batches in mp will be deeply copied into txn's workspace.
Expand All @@ -103,7 +103,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
}

for _, bat := range mergeBlock.container.mp2[i] {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
// batches in mp2 will be deeply copied into txn's workspace.
if err = mergeBlock.container.partitionSources[i].Write(newCtx, bat); err != nil {
Expand All @@ -119,7 +119,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
} else {
// handle origin/main table.
if mergeBlock.container.mp[0].RowCount() > 0 {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

//batches in mp will be deeply copied into txn's workspace.
Expand All @@ -131,7 +131,7 @@ func (mergeBlock *MergeBlock) Call(proc *process.Process) (vm.CallResult, error)
}

for _, bat := range mergeBlock.container.mp2[0] {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
//batches in mp2 will be deeply copied into txn's workspace.
if err = mergeBlock.container.source.Write(newCtx, bat); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/mergeblock/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func splitObjectStats(mergeBlock *MergeBlock, proc *process.Process,
destVec := mergeBlock.container.mp[int(tblIdx[idx])].Vecs[1]

if needLoad {
crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

// comes from old version cn
Expand Down
17 changes: 8 additions & 9 deletions pkg/sql/colexec/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"bytes"

"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
Expand Down Expand Up @@ -74,13 +73,13 @@ func (output *Output) Call(proc *process.Process) (vm.CallResult, error) {
}
bat := result.Batch

retrievedCounter := new(perfcounter.CounterSet)
if err = output.Func(bat, retrievedCounter); err != nil {
crs := analyzer.GetOpCounterSet()
if err = output.Func(bat, crs); err != nil {
result.Status = vm.ExecStop
return result, err
}
analyzer.AddS3RequestCount(retrievedCounter)
analyzer.AddDiskIO(retrievedCounter)
analyzer.AddS3RequestCount(crs)
analyzer.AddDiskIO(crs)

// TODO: analyzer.Output(result.Batch)
return result, nil
Expand Down Expand Up @@ -121,13 +120,13 @@ func (output *Output) Call(proc *process.Process) (vm.CallResult, error) {
bat := output.ctr.cachedBatches[output.ctr.currentIdx]
output.ctr.currentIdx = output.ctr.currentIdx + 1

retrievedCounter := new(perfcounter.CounterSet)
if err := output.Func(bat, retrievedCounter); err != nil {
crs := analyzer.GetOpCounterSet()
if err := output.Func(bat, crs); err != nil {
result.Status = vm.ExecStop
return result, err
}
analyzer.AddS3RequestCount(retrievedCounter)
analyzer.AddDiskIO(retrievedCounter)
analyzer.AddS3RequestCount(crs)
analyzer.AddDiskIO(crs)

result.Batch = bat
// same as nonBlock
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/table_function/metadata_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *metadataScanState) start(tf *TableFunction, proc *process.Process, nthR
return err
}

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
metaInfos, err := rel.GetColumMetadataScanInfo(newCtx, colname)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/table_scan/table_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (tableScan *TableScan) Call(proc *process.Process) (vm.CallResult, error) {
// read data from storage engine
tableScan.ctr.buf.CleanOnlyData()

crs := new(perfcounter.CounterSet)
crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)
isEnd, err := tableScan.Reader.Read(newCtx, tableScan.Attrs, nil, proc.Mp(), tableScan.ctr.buf)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/vm/process/operator_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Analyzer interface {
AddWaitLockTime(t time.Time)
AddS3RequestCount(counter *perfcounter.CounterSet)
AddDiskIO(counter *perfcounter.CounterSet)
GetOpCounterSet() *perfcounter.CounterSet
GetOpStats() *OperatorStats
Reset()

Expand All @@ -62,6 +63,7 @@ type operatorAnalyzer struct {
start time.Time
wait time.Duration
childrenCallDuration time.Duration
crs *perfcounter.CounterSet
opStats *OperatorStats
}

Expand All @@ -75,20 +77,31 @@ func NewAnalyzer(idx int, isFirst bool, isLast bool, operatorName string) Analyz
isLast: isLast,
wait: 0,
childrenCallDuration: 0,
crs: new(perfcounter.CounterSet),
opStats: NewOperatorStats(operatorName),
}
}

// NewTempAnalyzer is used to provide resource statistics services for non operator logic
func NewTempAnalyzer() Analyzer {
return &operatorAnalyzer{
wait: 0,
crs: new(perfcounter.CounterSet),
opStats: NewOperatorStats("temp Analyzer"),
}
}

// GetOpCounterSet returns the current CounterSet and resets it.
// This method should be used when you want to start fresh with the performance counters.
func (opAlyzr *operatorAnalyzer) GetOpCounterSet() *perfcounter.CounterSet {
opAlyzr.crs.Reset()
return opAlyzr.crs
}

func (opAlyzr *operatorAnalyzer) Reset() {
opAlyzr.wait = 0
opAlyzr.childrenCallDuration = 0
opAlyzr.crs.Reset()
opAlyzr.opStats.Reset()
}

Expand Down

0 comments on commit fd77e50

Please sign in to comment.