Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor merge memory control #19798

Merged
merged 16 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 9 additions & 148 deletions pkg/vm/engine/tae/db/merge/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,8 @@ import (
"errors"
"fmt"
"math"
"os"
"sync"
"sync/atomic"

"github.com/KimMachineGun/automemlimit/memlimit"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/api"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog"
Expand All @@ -35,33 +30,13 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
"github.com/shirou/gopsutil/v3/process"
)

type activeTaskStats map[uint64]struct {
blk int
estBytes int
}

// executor consider resources to decide to merge or not.
type executor struct {
tableName string
rt *dbutils.Runtime
cnSched CNMergeScheduler
memLimit int
memUsing int
transPageLimit uint64
cpuPercent float64
activeMergeBlkCount atomic.Int32
activeEstimateBytes atomic.Int64
roundMergeRows uint64
taskConsume struct {
sync.Mutex
o map[objectio.ObjectId]struct{}
m activeTaskStats
}
tableName string
rt *dbutils.Runtime
cnSched CNMergeScheduler
}

func newMergeExecutor(rt *dbutils.Runtime, sched CNMergeScheduler) *executor {
Expand All @@ -71,108 +46,15 @@ func newMergeExecutor(rt *dbutils.Runtime, sched CNMergeScheduler) *executor {
}
}

func (e *executor) setMemLimit(total uint64) {
containerMLimit, err := memlimit.FromCgroup()
if containerMLimit > 0 && containerMLimit < total {
e.memLimit = int(float64(containerMLimit) * 0.9)
} else {
e.memLimit = int(float64(total) * 0.9)
}

if e.memLimit > 200*common.Const1GBytes {
e.transPageLimit = uint64(e.memLimit / 25 * 2) // 8%
} else if e.memLimit > 100*common.Const1GBytes {
e.transPageLimit = uint64(e.memLimit / 25 * 3) // 12%
} else if e.memLimit > 40*common.Const1GBytes {
e.transPageLimit = uint64(e.memLimit / 25 * 4) // 16%
} else {
e.transPageLimit = math.MaxUint64 // no limit
}

logutil.Info(
"MergeExecutorMemoryInfo",
common.AnyField("container-limit", common.HumanReadableBytes(int(containerMLimit))),
common.AnyField("host-memory", common.HumanReadableBytes(int(total))),
common.AnyField("process-limit", common.HumanReadableBytes(e.memLimit)),
common.AnyField("transfer-page-limit", common.HumanReadableBytes(int(e.transPageLimit))),
common.AnyField("error", err),
)
}

var proc *process.Process

func (e *executor) refreshMemInfo() {
if proc == nil {
proc, _ = process.NewProcess(int32(os.Getpid()))
} else if mem, err := proc.MemoryInfo(); err == nil {
e.memUsing = int(mem.RSS)
}

if e.memLimit == 0 {
if stats, err := mem.VirtualMemory(); err == nil {
e.setMemLimit(stats.Total)
}
}

if percents, err := cpu.Percent(0, false); err == nil {
e.cpuPercent = percents[0]
}
e.roundMergeRows = 0
}

func (e *executor) printStats() {
cnt := e.activeMergeBlkCount.Load()
if cnt == 0 && e.memAvailBytes() > 512*common.Const1MBytes {
return
}

logutil.Info(
"MergeExecutorMemoryStats",
common.AnyField("process-limit", common.HumanReadableBytes(e.memLimit)),
common.AnyField("process-mem", common.HumanReadableBytes(e.memUsing)),
common.AnyField("inuse-mem", common.HumanReadableBytes(int(e.activeEstimateBytes.Load()))),
common.AnyField("inuse-cnt", cnt),
)
}

func (e *executor) addActiveTask(taskId uint64, blkn, esize int) {
e.activeEstimateBytes.Add(int64(esize))
e.activeMergeBlkCount.Add(int32(blkn))
e.taskConsume.Lock()
if e.taskConsume.m == nil {
e.taskConsume.m = make(activeTaskStats)
}
e.taskConsume.m[taskId] = struct {
blk int
estBytes int
}{blkn, esize}
e.taskConsume.Unlock()
}

func (e *executor) OnExecDone(v any) {
task := v.(tasks.MScopedTask)

e.taskConsume.Lock()
stat := e.taskConsume.m[task.ID()]
delete(e.taskConsume.m, task.ID())
e.taskConsume.Unlock()

e.activeMergeBlkCount.Add(-int32(stat.blk))
e.activeEstimateBytes.Add(-int64(stat.estBytes))
}

func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) {
if e.roundMergeRows*36 /*28 * 1.3 */ > e.transPageLimit/8 {
return
}
e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema(false).Name)

if ActiveCNObj.CheckOverlapOnCNActive(mobjs) {
return
}

if kind == TaskHostCN {
osize, esize := estimateMergeConsume(mobjs)
_, esize := estimateMergeConsume(mobjs)
blkCnt := 0
for _, obj := range mobjs {
blkCnt += obj.BlockCnt()
Expand Down Expand Up @@ -201,7 +83,7 @@ func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.Object
}
if err := e.cnSched.SendMergeTask(context.TODO(), cntask); err == nil {
ActiveCNObj.AddActiveCNObj(mobjs)
logMergeTask(e.tableName, math.MaxUint64, mobjs, blkCnt, osize, esize)
logMergeTask(e.tableName, math.MaxUint64, mobjs, blkCnt)
} else {
logutil.Info(
"MergeExecutorError",
Expand Down Expand Up @@ -244,7 +126,7 @@ func (e *executor) scheduleMergeObjects(scopes []common.ID, mobjs []*catalog.Obj
txn.GetMemo().IsFlushOrMerge = true
return jobs.NewMergeObjectsTask(ctx, txn, mobjs, e.rt, common.DefaultMaxOsizeObjMB*common.Const1MBytes, isTombstone)
}
task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTaskWithObserver(nil, tasks.DataCompactionTask, scopes, factory, e)
task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTask(nil, tasks.DataCompactionTask, scopes, factory)
if err != nil {
if !errors.Is(err, tasks.ErrScheduleScopeConflict) {
logutil.Info(
Expand All @@ -256,35 +138,14 @@ func (e *executor) scheduleMergeObjects(scopes []common.ID, mobjs []*catalog.Obj
}
return
}
osize, esize := estimateMergeConsume(mobjs)
e.addActiveTask(task.ID(), blkCnt, esize)
for _, obj := range mobjs {
e.roundMergeRows += uint64(obj.Rows())
}
logMergeTask(e.tableName, task.ID(), mobjs, blkCnt, osize, esize)
logMergeTask(e.tableName, task.ID(), mobjs, blkCnt)
entry.Stats.SetLastMergeTime()
}

func (e *executor) memAvailBytes() int {
merging := int(e.activeEstimateBytes.Load())
avail := e.memLimit - e.memUsing - merging
if avail < 0 {
avail = 0
}
return avail
}

func (e *executor) transferPageSizeLimit() uint64 {
return e.transPageLimit
}

func (e *executor) CPUPercent() int64 {
return int64(e.cpuPercent)
}

func logMergeTask(name string, taskId uint64, merges []*catalog.ObjectEntry, blkn, osize, esize int) {
func logMergeTask(name string, taskId uint64, merges []*catalog.ObjectEntry, blkn int) {
rows := 0
infoBuf := &bytes.Buffer{}
osize, esize := estimateMergeConsume(merges)
for _, obj := range merges {
r := int(obj.Rows())
rows += r
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/merge/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ const (

type policy interface {
onObject(*catalog.ObjectEntry, *BasicPolicyConfig) bool
revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult
revise(*resourceController, *BasicPolicyConfig) []reviseResult
resetForTable(*catalog.TableEntry, *BasicPolicyConfig)
}

Expand Down
29 changes: 9 additions & 20 deletions pkg/vm/engine/tae/db/merge/policyBasic.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ func (g *policyGroup) onObject(obj *catalog.ObjectEntry) {
}
}

func (g *policyGroup) revise(cpu, mem int64) []reviseResult {
func (g *policyGroup) revise(rc *resourceController) []reviseResult {
results := make([]reviseResult, 0, len(g.policies))
for _, p := range g.policies {
pResult := p.revise(cpu, mem, g.config)
pResult := p.revise(rc, g.config)
for _, r := range pResult {
if len(r.objs) > 0 {
results = append(results, r)
_, eSize := estimateMergeConsume(r.objs)
mem -= int64(eSize)
}
}
}
Expand Down Expand Up @@ -236,7 +234,7 @@ func (o *basic) onObject(obj *catalog.ObjectEntry, config *BasicPolicyConfig) bo
return false
}

func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult {
func (o *basic) revise(rc *resourceController, config *BasicPolicyConfig) []reviseResult {
slices.SortFunc(o.objects, func(a, b *catalog.ObjectEntry) int {
return cmp.Compare(a.Rows(), b.Rows())
})
Expand All @@ -246,21 +244,23 @@ func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult
isStandalone := common.IsStandaloneBoost.Load()
mergeOnDNIfStandalone := !common.ShouldStandaloneCNTakeOver.Load()
*/
if ok, _ := controlMem(objs, mem); !ok {
dnobjs := o.optimize(objs, config)

if !rc.resourceAvailable(objs) {
return nil
}
dnobjs := o.optimize(objs, config)

dnosize, _ := estimateMergeConsume(dnobjs)

schedDN := func() []reviseResult {
if cpu > 85 {
if rc.cpuPercent > 85 {
if dnosize > 25*common.Const1MBytes {
logutil.Infof("mergeblocks skip big merge for high level cpu usage, %d", cpu)
logutil.Infof("mergeblocks skip big merge for high level cpu usage, %f", rc.cpuPercent)
return nil
}
}
if len(dnobjs) > 1 {
rc.reserveResources(dnobjs)
return []reviseResult{{dnobjs, TaskHostDN}}
}
return nil
Expand Down Expand Up @@ -323,17 +323,6 @@ func (o *basic) optimize(objs []*catalog.ObjectEntry, config *BasicPolicyConfig)
return objs
}

func controlMem(objs []*catalog.ObjectEntry, mem int64) (bool, int64) {
if mem > constMaxMemCap {
mem = constMaxMemCap
}
_, eSize := estimateMergeConsume(objs)
if eSize > int(2*mem/3) {
return false, 0
}
return true, int64(eSize)
}

func (o *basic) resetForTable(entry *catalog.TableEntry, config *BasicPolicyConfig) {
o.schema = entry.GetLastestSchemaLocked(false)
o.lastMergeTime = entry.Stats.GetLastMergeTime()
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/db/merge/policyCompact.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ func (o *objCompactPolicy) onObject(entry *catalog.ObjectEntry, config *BasicPol
return false
}

func (o *objCompactPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult {
func (o *objCompactPolicy) revise(rc *resourceController, config *BasicPolicyConfig) []reviseResult {
if o.tblEntry == nil {
return nil
}
o.filterValidTombstones()
results := make([]reviseResult, 0, len(o.segObjects)+1)
for _, objs := range o.segObjects {
if ok, eSize := controlMem(objs, mem); ok {
if rc.resourceAvailable(objs) {
rc.reserveResources(objs)
results = append(results, reviseResult{objs, TaskHostDN})
mem -= eSize
}
}
if len(o.tombstones) > 0 {
Expand Down
14 changes: 7 additions & 7 deletions pkg/vm/engine/tae/db/merge/policyOverlap.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (m *objOverlapPolicy) onObject(obj *catalog.ObjectEntry, config *BasicPolic
return true
}

func (m *objOverlapPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult {
func (m *objOverlapPolicy) revise(rc *resourceController, config *BasicPolicyConfig) []reviseResult {
for _, objects := range m.segments {
segLevel := segLevel(len(objects))
for obj := range objects {
Expand All @@ -70,21 +70,21 @@ func (m *objOverlapPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []r
}

reviseResults := make([]reviseResult, len(levels))
for i := range len(levels) {
for i := range 4 {
if len(m.leveledObjects[i]) < 2 {
continue
}

if cpu > 80 {
if rc.cpuPercent > 80 {
continue
}

m.overlappingObjsSet = m.overlappingObjsSet[:0]

objs, taskHostKind := m.reviseLeveledObjs(i)
if ok, eSize := controlMem(objs, mem); ok && len(objs) > 1 {
if len(objs) > 1 && rc.resourceAvailable(objs) {
rc.reserveResources(objs)
reviseResults[i] = reviseResult{slices.Clone(objs), taskHostKind}
mem -= eSize
}
}
return reviseResults
Expand Down Expand Up @@ -144,7 +144,7 @@ func (m *objOverlapPolicy) reviseLeveledObjs(level int) ([]*catalog.ObjectEntry,
return nil, TaskHostDN
}

if level < 2 && len(objs) > levels[3] {
if level < 3 && len(objs) > levels[3] {
objs = objs[:levels[3]]
}

Expand Down Expand Up @@ -181,7 +181,7 @@ func (s *entrySet) add(obj *catalog.ObjectEntry) {
}

func segLevel(length int) int {
l := 5
l := len(levels) - 1
for i, level := range levels {
if length < level {
l = i - 1
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/db/merge/policyTombstone.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (t *tombstonePolicy) onObject(entry *catalog.ObjectEntry, config *BasicPoli
return true
}

func (t *tombstonePolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult {
func (t *tombstonePolicy) revise(resourceControl *resourceController, config *BasicPolicyConfig) []reviseResult {
if len(t.tombstones) < 2 {
return nil
}
Expand Down
Loading
Loading