diff --git a/pkg/vm/engine/tae/db/merge/executor.go b/pkg/vm/engine/tae/db/merge/executor.go index 7cd22a358ef0..a8cbcc05c212 100644 --- a/pkg/vm/engine/tae/db/merge/executor.go +++ b/pkg/vm/engine/tae/db/merge/executor.go @@ -20,13 +20,8 @@ import ( "errors" "fmt" "math" - "os" - "sync" - "sync/atomic" - "github.com/KimMachineGun/automemlimit/memlimit" "github.com/matrixorigin/matrixone/pkg/logutil" - "github.com/matrixorigin/matrixone/pkg/objectio" "github.com/matrixorigin/matrixone/pkg/pb/api" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" @@ -35,33 +30,13 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks" - "github.com/shirou/gopsutil/v3/cpu" - "github.com/shirou/gopsutil/v3/mem" - "github.com/shirou/gopsutil/v3/process" ) -type activeTaskStats map[uint64]struct { - blk int - estBytes int -} - // executor consider resources to decide to merge or not. type executor struct { - tableName string - rt *dbutils.Runtime - cnSched CNMergeScheduler - memLimit int - memUsing int - transPageLimit uint64 - cpuPercent float64 - activeMergeBlkCount atomic.Int32 - activeEstimateBytes atomic.Int64 - roundMergeRows uint64 - taskConsume struct { - sync.Mutex - o map[objectio.ObjectId]struct{} - m activeTaskStats - } + tableName string + rt *dbutils.Runtime + cnSched CNMergeScheduler } func newMergeExecutor(rt *dbutils.Runtime, sched CNMergeScheduler) *executor { @@ -71,100 +46,7 @@ func newMergeExecutor(rt *dbutils.Runtime, sched CNMergeScheduler) *executor { } } -func (e *executor) setMemLimit(total uint64) { - containerMLimit, err := memlimit.FromCgroup() - if containerMLimit > 0 && containerMLimit < total { - e.memLimit = int(float64(containerMLimit) * 0.9) - } else { - e.memLimit = int(float64(total) * 0.9) - } - - if e.memLimit > 200*common.Const1GBytes { - e.transPageLimit = uint64(e.memLimit / 25 * 2) // 8% - } else if e.memLimit > 100*common.Const1GBytes { - e.transPageLimit = uint64(e.memLimit / 25 * 3) // 12% - } else if e.memLimit > 40*common.Const1GBytes { - e.transPageLimit = uint64(e.memLimit / 25 * 4) // 16% - } else { - e.transPageLimit = math.MaxUint64 // no limit - } - - logutil.Info( - "MergeExecutorMemoryInfo", - common.AnyField("container-limit", common.HumanReadableBytes(int(containerMLimit))), - common.AnyField("host-memory", common.HumanReadableBytes(int(total))), - common.AnyField("process-limit", common.HumanReadableBytes(e.memLimit)), - common.AnyField("transfer-page-limit", common.HumanReadableBytes(int(e.transPageLimit))), - common.AnyField("error", err), - ) -} - -var proc *process.Process - -func (e *executor) refreshMemInfo() { - if proc == nil { - proc, _ = process.NewProcess(int32(os.Getpid())) - } else if mem, err := proc.MemoryInfo(); err == nil { - e.memUsing = int(mem.RSS) - } - - if e.memLimit == 0 { - if stats, err := mem.VirtualMemory(); err == nil { - e.setMemLimit(stats.Total) - } - } - - if percents, err := cpu.Percent(0, false); err == nil { - e.cpuPercent = percents[0] - } - e.roundMergeRows = 0 -} - -func (e *executor) printStats() { - cnt := e.activeMergeBlkCount.Load() - if cnt == 0 && e.memAvailBytes() > 512*common.Const1MBytes { - return - } - - logutil.Info( - "MergeExecutorMemoryStats", - common.AnyField("process-limit", common.HumanReadableBytes(e.memLimit)), - common.AnyField("process-mem", common.HumanReadableBytes(e.memUsing)), - common.AnyField("inuse-mem", common.HumanReadableBytes(int(e.activeEstimateBytes.Load()))), - common.AnyField("inuse-cnt", cnt), - ) -} - -func (e *executor) addActiveTask(taskId uint64, blkn, esize int) { - e.activeEstimateBytes.Add(int64(esize)) - e.activeMergeBlkCount.Add(int32(blkn)) - e.taskConsume.Lock() - if e.taskConsume.m == nil { - e.taskConsume.m = make(activeTaskStats) - } - e.taskConsume.m[taskId] = struct { - blk int - estBytes int - }{blkn, esize} - e.taskConsume.Unlock() -} - -func (e *executor) OnExecDone(v any) { - task := v.(tasks.MScopedTask) - - e.taskConsume.Lock() - stat := e.taskConsume.m[task.ID()] - delete(e.taskConsume.m, task.ID()) - e.taskConsume.Unlock() - - e.activeMergeBlkCount.Add(-int32(stat.blk)) - e.activeEstimateBytes.Add(-int64(stat.estBytes)) -} - func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) { - if e.roundMergeRows*36 /*28 * 1.3 */ > e.transPageLimit/8 { - return - } e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema(false).Name) if ActiveCNObj.CheckOverlapOnCNActive(mobjs) { @@ -172,7 +54,7 @@ func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.Object } if kind == TaskHostCN { - osize, esize := estimateMergeConsume(mobjs) + _, esize := estimateMergeConsume(mobjs) blkCnt := 0 for _, obj := range mobjs { blkCnt += obj.BlockCnt() @@ -201,7 +83,7 @@ func (e *executor) executeFor(entry *catalog.TableEntry, mobjs []*catalog.Object } if err := e.cnSched.SendMergeTask(context.TODO(), cntask); err == nil { ActiveCNObj.AddActiveCNObj(mobjs) - logMergeTask(e.tableName, math.MaxUint64, mobjs, blkCnt, osize, esize) + logMergeTask(e.tableName, math.MaxUint64, mobjs, blkCnt) } else { logutil.Info( "MergeExecutorError", @@ -244,7 +126,7 @@ func (e *executor) scheduleMergeObjects(scopes []common.ID, mobjs []*catalog.Obj txn.GetMemo().IsFlushOrMerge = true return jobs.NewMergeObjectsTask(ctx, txn, mobjs, e.rt, common.DefaultMaxOsizeObjMB*common.Const1MBytes, isTombstone) } - task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTaskWithObserver(nil, tasks.DataCompactionTask, scopes, factory, e) + task, err := e.rt.Scheduler.ScheduleMultiScopedTxnTask(nil, tasks.DataCompactionTask, scopes, factory) if err != nil { if !errors.Is(err, tasks.ErrScheduleScopeConflict) { logutil.Info( @@ -256,35 +138,14 @@ func (e *executor) scheduleMergeObjects(scopes []common.ID, mobjs []*catalog.Obj } return } - osize, esize := estimateMergeConsume(mobjs) - e.addActiveTask(task.ID(), blkCnt, esize) - for _, obj := range mobjs { - e.roundMergeRows += uint64(obj.Rows()) - } - logMergeTask(e.tableName, task.ID(), mobjs, blkCnt, osize, esize) + logMergeTask(e.tableName, task.ID(), mobjs, blkCnt) entry.Stats.SetLastMergeTime() } -func (e *executor) memAvailBytes() int { - merging := int(e.activeEstimateBytes.Load()) - avail := e.memLimit - e.memUsing - merging - if avail < 0 { - avail = 0 - } - return avail -} - -func (e *executor) transferPageSizeLimit() uint64 { - return e.transPageLimit -} - -func (e *executor) CPUPercent() int64 { - return int64(e.cpuPercent) -} - -func logMergeTask(name string, taskId uint64, merges []*catalog.ObjectEntry, blkn, osize, esize int) { +func logMergeTask(name string, taskId uint64, merges []*catalog.ObjectEntry, blkn int) { rows := 0 infoBuf := &bytes.Buffer{} + osize, esize := estimateMergeConsume(merges) for _, obj := range merges { r := int(obj.Rows()) rows += r diff --git a/pkg/vm/engine/tae/db/merge/mod.go b/pkg/vm/engine/tae/db/merge/mod.go index f36c31c051bd..61bcb4323bf2 100644 --- a/pkg/vm/engine/tae/db/merge/mod.go +++ b/pkg/vm/engine/tae/db/merge/mod.go @@ -196,7 +196,7 @@ const ( type policy interface { onObject(*catalog.ObjectEntry, *BasicPolicyConfig) bool - revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult + revise(*resourceController, *BasicPolicyConfig) []reviseResult resetForTable(*catalog.TableEntry, *BasicPolicyConfig) } diff --git a/pkg/vm/engine/tae/db/merge/policyBasic.go b/pkg/vm/engine/tae/db/merge/policyBasic.go index 56ae069c7499..fe0826f8706f 100644 --- a/pkg/vm/engine/tae/db/merge/policyBasic.go +++ b/pkg/vm/engine/tae/db/merge/policyBasic.go @@ -57,15 +57,13 @@ func (g *policyGroup) onObject(obj *catalog.ObjectEntry) { } } -func (g *policyGroup) revise(cpu, mem int64) []reviseResult { +func (g *policyGroup) revise(rc *resourceController) []reviseResult { results := make([]reviseResult, 0, len(g.policies)) for _, p := range g.policies { - pResult := p.revise(cpu, mem, g.config) + pResult := p.revise(rc, g.config) for _, r := range pResult { if len(r.objs) > 0 { results = append(results, r) - _, eSize := estimateMergeConsume(r.objs) - mem -= int64(eSize) } } } @@ -236,7 +234,7 @@ func (o *basic) onObject(obj *catalog.ObjectEntry, config *BasicPolicyConfig) bo return false } -func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { +func (o *basic) revise(rc *resourceController, config *BasicPolicyConfig) []reviseResult { slices.SortFunc(o.objects, func(a, b *catalog.ObjectEntry) int { return cmp.Compare(a.Rows(), b.Rows()) }) @@ -246,21 +244,23 @@ func (o *basic) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult isStandalone := common.IsStandaloneBoost.Load() mergeOnDNIfStandalone := !common.ShouldStandaloneCNTakeOver.Load() */ - if ok, _ := controlMem(objs, mem); !ok { + dnobjs := o.optimize(objs, config) + + if !rc.resourceAvailable(objs) { return nil } - dnobjs := o.optimize(objs, config) dnosize, _ := estimateMergeConsume(dnobjs) schedDN := func() []reviseResult { - if cpu > 85 { + if rc.cpuPercent > 85 { if dnosize > 25*common.Const1MBytes { - logutil.Infof("mergeblocks skip big merge for high level cpu usage, %d", cpu) + logutil.Infof("mergeblocks skip big merge for high level cpu usage, %f", rc.cpuPercent) return nil } } if len(dnobjs) > 1 { + rc.reserveResources(dnobjs) return []reviseResult{{dnobjs, TaskHostDN}} } return nil @@ -323,17 +323,6 @@ func (o *basic) optimize(objs []*catalog.ObjectEntry, config *BasicPolicyConfig) return objs } -func controlMem(objs []*catalog.ObjectEntry, mem int64) (bool, int64) { - if mem > constMaxMemCap { - mem = constMaxMemCap - } - _, eSize := estimateMergeConsume(objs) - if eSize > int(2*mem/3) { - return false, 0 - } - return true, int64(eSize) -} - func (o *basic) resetForTable(entry *catalog.TableEntry, config *BasicPolicyConfig) { o.schema = entry.GetLastestSchemaLocked(false) o.lastMergeTime = entry.Stats.GetLastMergeTime() diff --git a/pkg/vm/engine/tae/db/merge/policyCompact.go b/pkg/vm/engine/tae/db/merge/policyCompact.go index 4a2065156228..6ad16cab859c 100644 --- a/pkg/vm/engine/tae/db/merge/policyCompact.go +++ b/pkg/vm/engine/tae/db/merge/policyCompact.go @@ -72,16 +72,16 @@ func (o *objCompactPolicy) onObject(entry *catalog.ObjectEntry, config *BasicPol return false } -func (o *objCompactPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { +func (o *objCompactPolicy) revise(rc *resourceController, config *BasicPolicyConfig) []reviseResult { if o.tblEntry == nil { return nil } o.filterValidTombstones() results := make([]reviseResult, 0, len(o.segObjects)+1) for _, objs := range o.segObjects { - if ok, eSize := controlMem(objs, mem); ok { + if rc.resourceAvailable(objs) { + rc.reserveResources(objs) results = append(results, reviseResult{objs, TaskHostDN}) - mem -= eSize } } if len(o.tombstones) > 0 { diff --git a/pkg/vm/engine/tae/db/merge/policyOverlap.go b/pkg/vm/engine/tae/db/merge/policyOverlap.go index 36686b0244ef..01934d770661 100644 --- a/pkg/vm/engine/tae/db/merge/policyOverlap.go +++ b/pkg/vm/engine/tae/db/merge/policyOverlap.go @@ -61,7 +61,7 @@ func (m *objOverlapPolicy) onObject(obj *catalog.ObjectEntry, config *BasicPolic return true } -func (m *objOverlapPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { +func (m *objOverlapPolicy) revise(rc *resourceController, config *BasicPolicyConfig) []reviseResult { for _, objects := range m.segments { segLevel := segLevel(len(objects)) for obj := range objects { @@ -70,21 +70,21 @@ func (m *objOverlapPolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []r } reviseResults := make([]reviseResult, len(levels)) - for i := range len(levels) { + for i := range 4 { if len(m.leveledObjects[i]) < 2 { continue } - if cpu > 80 { + if rc.cpuPercent > 80 { continue } m.overlappingObjsSet = m.overlappingObjsSet[:0] objs, taskHostKind := m.reviseLeveledObjs(i) - if ok, eSize := controlMem(objs, mem); ok && len(objs) > 1 { + if len(objs) > 1 && rc.resourceAvailable(objs) { + rc.reserveResources(objs) reviseResults[i] = reviseResult{slices.Clone(objs), taskHostKind} - mem -= eSize } } return reviseResults @@ -144,7 +144,7 @@ func (m *objOverlapPolicy) reviseLeveledObjs(level int) ([]*catalog.ObjectEntry, return nil, TaskHostDN } - if level < 2 && len(objs) > levels[3] { + if level < 3 && len(objs) > levels[3] { objs = objs[:levels[3]] } @@ -181,7 +181,7 @@ func (s *entrySet) add(obj *catalog.ObjectEntry) { } func segLevel(length int) int { - l := 5 + l := len(levels) - 1 for i, level := range levels { if length < level { l = i - 1 diff --git a/pkg/vm/engine/tae/db/merge/policyTombstone.go b/pkg/vm/engine/tae/db/merge/policyTombstone.go index 5eba5fb4b0ea..640a8d5eae1d 100644 --- a/pkg/vm/engine/tae/db/merge/policyTombstone.go +++ b/pkg/vm/engine/tae/db/merge/policyTombstone.go @@ -33,7 +33,7 @@ func (t *tombstonePolicy) onObject(entry *catalog.ObjectEntry, config *BasicPoli return true } -func (t *tombstonePolicy) revise(cpu, mem int64, config *BasicPolicyConfig) []reviseResult { +func (t *tombstonePolicy) revise(resourceControl *resourceController, config *BasicPolicyConfig) []reviseResult { if len(t.tombstones) < 2 { return nil } diff --git a/pkg/vm/engine/tae/db/merge/policy_test.go b/pkg/vm/engine/tae/db/merge/policy_test.go index 3cda2e70cee0..1fda9afd7ee0 100644 --- a/pkg/vm/engine/tae/db/merge/policy_test.go +++ b/pkg/vm/engine/tae/db/merge/policy_test.go @@ -16,7 +16,6 @@ package merge import ( "context" - "math" "math/rand/v2" "testing" @@ -114,6 +113,7 @@ func newTestObjectEntry(t *testing.T, size uint32, isTombstone bool) *catalog.Ob func TestPolicyBasic(t *testing.T) { common.IsStandaloneBoost.Store(true) p := newBasicPolicy() + rc := new(resourceController) // only schedule objects whose size < cfg.objectMinOSize p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) @@ -121,7 +121,7 @@ func TestPolicyBasic(t *testing.T) { require.True(t, p.onObject(newTestObjectEntry(t, 10, false), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, false), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 120, false), cfg)) - result := p.revise(0, math.MaxInt64, cfg) + result := p.revise(rc, cfg) require.Equal(t, 1, len(result)) require.Equal(t, 2, len(result[0].objs)) require.Equal(t, TaskHostDN, result[0].kind) @@ -132,7 +132,7 @@ func TestPolicyBasic(t *testing.T) { require.True(t, p.onObject(newTestObjectEntry(t, 10, false), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, false), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 30, false), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc, cfg) require.Equal(t, 1, len(result)) require.Equal(t, 2, len(result[0].objs)) require.Equal(t, TaskHostDN, result[0].kind) @@ -142,7 +142,7 @@ func TestPolicyBasic(t *testing.T) { cfg = testConfig(100, 2) require.False(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc, cfg) require.Equal(t, 0, len(result)) // memory limit @@ -151,27 +151,29 @@ func TestPolicyBasic(t *testing.T) { require.True(t, p.onObject(newTestObjectEntryWithRowCnt(t, 10, 1, false), cfg)) require.True(t, p.onObject(newTestObjectEntryWithRowCnt(t, 20, 1, false), cfg)) require.True(t, p.onObject(newTestObjectEntryWithRowCnt(t, 20, 1, false), cfg)) - result = p.revise(0, 36, cfg) + rc.limit = 36 + result = p.revise(rc, cfg) require.Equal(t, 0, len(result)) } func TestPolicyTombstone(t *testing.T) { common.IsStandaloneBoost.Store(true) p := newTombstonePolicy() + rc := new(resourceController) // tombstone policy do not schedule data objects p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) cfg := testConfig(100, 2) require.False(t, p.onObject(newTestObjectEntry(t, 10, false), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 20, false), cfg)) - result := p.revise(0, math.MaxInt64, cfg) + result := p.revise(rc, cfg) require.Equal(t, 0, len(result)) p.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}}), nil) cfg = testConfig(100, 2) require.True(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc, cfg) require.Equal(t, 1, len(result)) require.Equal(t, 2, len(result[0].objs)) require.Equal(t, TaskHostDN, result[0].kind) @@ -182,7 +184,7 @@ func TestPolicyTombstone(t *testing.T) { require.True(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) require.False(t, p.onObject(newTestObjectEntry(t, 30, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc, cfg) require.Equal(t, 1, len(result)) require.Equal(t, 2, len(result[0].objs)) require.Equal(t, TaskHostDN, result[0].kind) @@ -193,7 +195,7 @@ func TestPolicyTombstone(t *testing.T) { require.True(t, p.onObject(newTestObjectEntry(t, 10, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 20, true), cfg)) require.True(t, p.onObject(newTestObjectEntry(t, 120, true), cfg)) - result = p.revise(0, math.MaxInt64, cfg) + result = p.revise(rc, cfg) require.Equal(t, 1, len(result)) require.Equal(t, 3, len(result[0].objs)) require.Equal(t, TaskHostDN, result[0].kind) @@ -204,6 +206,7 @@ func TestPolicyGroup(t *testing.T) { g := newPolicyGroup(newBasicPolicy(), newTombstonePolicy()) g.resetForTable(catalog.MockStaloneTableEntry(0, &catalog.Schema{Extra: &api.SchemaExtra{BlockMaxRows: options.DefaultBlockMaxRows}})) g.config = &BasicPolicyConfig{MergeMaxOneRun: 2, ObjectMinOsize: 100} + rc := new(resourceController) g.onObject(newTestObjectEntry(t, 10, false)) g.onObject(newTestObjectEntry(t, 20, false)) @@ -212,7 +215,7 @@ func TestPolicyGroup(t *testing.T) { g.onObject(newTestObjectEntry(t, 20, true)) g.onObject(newTestObjectEntry(t, 30, true)) - results := g.revise(0, math.MaxInt64) + results := g.revise(rc) require.Equal(t, 2, len(results)) require.Equal(t, TaskHostDN, results[0].kind) require.Equal(t, TaskHostDN, results[1].kind) @@ -227,7 +230,9 @@ func TestObjOverlap(t *testing.T) { // empty policy policy := newObjOverlapPolicy() - objs := policy.revise(0, math.MaxInt64, defaultBasicConfig) + rc := new(resourceController) + rc.setMemLimit(estimateMemUsagePerRow * 20) + objs := policy.revise(rc, defaultBasicConfig) for _, obj := range objs { require.Equal(t, 0, len(obj.objs)) } @@ -239,7 +244,7 @@ func TestObjOverlap(t *testing.T) { entry2 := newSortedTestObjectEntry(t, 3, 4, overlapSizeThreshold) require.True(t, policy.onObject(entry1, defaultBasicConfig)) require.True(t, policy.onObject(entry2, defaultBasicConfig)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) + objs = policy.revise(rc, defaultBasicConfig) for _, obj := range objs { require.Equal(t, 0, len(obj.objs)) } @@ -251,7 +256,7 @@ func TestObjOverlap(t *testing.T) { entry4 := newSortedTestObjectEntry(t, 2, 3, overlapSizeThreshold) require.True(t, policy.onObject(entry3, defaultBasicConfig)) require.True(t, policy.onObject(entry4, defaultBasicConfig)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) + objs = policy.revise(rc, defaultBasicConfig) for i, obj := range objs { if i == 0 { require.Equal(t, 2, len(obj.objs)) @@ -269,7 +274,7 @@ func TestObjOverlap(t *testing.T) { require.False(t, policy.onObject(entry5, defaultBasicConfig)) require.False(t, policy.onObject(entry6, defaultBasicConfig)) require.Equal(t, 6, len(policy.leveledObjects)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) + objs = policy.revise(rc, defaultBasicConfig) for _, obj := range objs { require.Equal(t, 0, len(obj.objs)) } @@ -292,7 +297,7 @@ func TestObjOverlap(t *testing.T) { require.True(t, policy.onObject(entry10, defaultBasicConfig)) require.True(t, policy.onObject(entry11, defaultBasicConfig)) - objs = policy.revise(0, math.MaxInt64, defaultBasicConfig) + objs = policy.revise(rc, defaultBasicConfig) require.Equal(t, 6, len(objs)) require.Equal(t, 3, len(objs[0].objs)) require.Equal(t, TaskHostDN, objs[0].kind) @@ -306,7 +311,7 @@ func TestObjOverlap(t *testing.T) { require.True(t, policy.onObject(entry12, defaultBasicConfig)) require.True(t, policy.onObject(entry13, defaultBasicConfig)) - objs = policy.revise(0, 0, defaultBasicConfig) + objs = policy.revise(rc, defaultBasicConfig) for _, obj := range objs { require.Equal(t, 0, len(obj.objs)) } @@ -318,6 +323,7 @@ func TestPolicyCompact(t *testing.T) { fs, err := fileservice.NewMemoryFS("memory", fileservice.DisabledCacheConfig, nil) require.NoError(t, err) p := newObjCompactPolicy(fs) + rc := new(resourceController) cata := catalog.MockCatalog() defer cata.Close() @@ -334,7 +340,7 @@ func TestPolicyCompact(t *testing.T) { p.resetForTable(tbl, nil) - objs := p.revise(0, math.MaxInt64, defaultBasicConfig) + objs := p.revise(rc, defaultBasicConfig) require.Equal(t, 0, len(objs)) txn2, _ := txnMgr.StartTxn(nil) diff --git a/pkg/vm/engine/tae/db/merge/scheduler.go b/pkg/vm/engine/tae/db/merge/scheduler.go index 6deccb16d0cc..a5fd36524513 100644 --- a/pkg/vm/engine/tae/db/merge/scheduler.go +++ b/pkg/vm/engine/tae/db/merge/scheduler.go @@ -34,6 +34,8 @@ type Scheduler struct { executor *executor skipForTransPageLimit bool + + rc *resourceController } func NewScheduler(rt *dbutils.Runtime, sched CNMergeScheduler) *Scheduler { @@ -48,6 +50,7 @@ func NewScheduler(rt *dbutils.Runtime, sched CNMergeScheduler) *Scheduler { LoopProcessor: new(catalog.LoopProcessor), policies: newPolicyGroup(policySlice...), executor: newMergeExecutor(rt, sched), + rc: new(resourceController), } op.DatabaseFn = op.onDataBase @@ -76,22 +79,24 @@ func (s *Scheduler) resetForTable(entry *catalog.TableEntry) { } func (s *Scheduler) PreExecute() error { - s.executor.refreshMemInfo() + s.rc.refresh() s.skipForTransPageLimit = false m := &dto.Metric{} - v2.TaskMergeTransferPageLengthGauge.Write(m) + if err := v2.TaskMergeTransferPageLengthGauge.Write(m); err != nil { + return err + } pagesize := m.GetGauge().GetValue() * 28 /*int32 + rowid(24b)*/ - if pagesize > float64(s.executor.transferPageSizeLimit()) { + if pagesize > float64(s.rc.transferPageLimit) { logutil.Infof("[mergeblocks] skip merge scanning due to transfer page %s, limit %s", common.HumanReadableBytes(int(pagesize)), - common.HumanReadableBytes(int(s.executor.transferPageSizeLimit()))) + common.HumanReadableBytes(int(s.rc.transferPageLimit))) s.skipForTransPageLimit = true } return nil } func (s *Scheduler) PostExecute() error { - s.executor.printStats() + s.rc.printStats() return nil } @@ -99,7 +104,7 @@ func (s *Scheduler) onDataBase(*catalog.DBEntry) (err error) { if StopMerge.Load() { return moerr.GetOkStopCurrRecur() } - if s.executor.memAvailBytes() < 100*common.Const1MBytes { + if s.rc.availableMem() < 100*common.Const1MBytes { return moerr.GetOkStopCurrRecur() } @@ -143,7 +148,7 @@ func (s *Scheduler) onPostTable(tableEntry *catalog.TableEntry) (err error) { } // delObjs := s.ObjectHelper.finish() - results := s.policies.revise(s.executor.CPUPercent(), int64(s.executor.memAvailBytes())) + results := s.policies.revise(s.rc) for _, r := range results { if len(r.objs) > 0 { s.executor.executeFor(tableEntry, r.objs, r.kind) @@ -190,19 +195,3 @@ func (s *Scheduler) StopMerge(tblEntry *catalog.TableEntry, reentrant bool) erro func (s *Scheduler) StartMerge(tblID uint64, reentrant bool) error { return s.executor.rt.LockMergeService.UnlockFromUser(tblID, reentrant) } - -func objectValid(objectEntry *catalog.ObjectEntry) bool { - if objectEntry.IsAppendable() { - return false - } - if !objectEntry.IsActive() { - return false - } - if !objectEntry.IsCommitted() { - return false - } - if objectEntry.IsCreatingOrAborted() { - return false - } - return true -} diff --git a/pkg/vm/engine/tae/db/merge/utils.go b/pkg/vm/engine/tae/db/merge/utils.go index a88a383a556d..a135c5fb5581 100644 --- a/pkg/vm/engine/tae/db/merge/utils.go +++ b/pkg/vm/engine/tae/db/merge/utils.go @@ -15,23 +15,31 @@ package merge import ( + "bufio" + "fmt" + "math" + "os" + "strings" "time" + "github.com/KimMachineGun/automemlimit/memlimit" + "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" + "github.com/shirou/gopsutil/v3/cpu" + "github.com/shirou/gopsutil/v3/process" ) -func estimateMergeConsume(mobjs []*catalog.ObjectEntry) (origSize, estSize int) { - if len(mobjs) == 0 { - return - } - rows := 0 - for _, m := range mobjs { - rows += int(m.Rows()) - origSize += int(m.OriginSize()) +const estimateMemUsagePerRow = 20 + +func estimateMergeConsume(objs []*catalog.ObjectEntry) (origSize, estSize int) { + for _, o := range objs { + origSize += int(o.OriginSize()) + + // the main memory consumption is transfer table. + // each row uses ~20B, so estimate size is 20 * rows. + estSize += int(o.Rows()) * estimateMemUsagePerRow } - // the main memory consumption is transfer table. - // each row uses 12B, so estimate size is 12 * rows. - estSize = rows * 12 return } @@ -39,3 +47,147 @@ func entryOutdated(entry *catalog.ObjectEntry, lifetime time.Duration) bool { createdAt := entry.CreatedAt.Physical() return time.Unix(0, createdAt).Add(lifetime).Before(time.Now()) } + +type resourceController struct { + proc *process.Process + + limit int64 + using int64 + reserved int64 + + reservedMergeRows int64 + reservedMergeBlks uint64 + transferPageLimit int64 + + cpuPercent float64 +} + +func (c *resourceController) setMemLimit(total int64) { + cgroup, err := memlimit.FromCgroup() + if cgroup != 0 && int64(cgroup) < total { + c.limit = int64(cgroup / 4 * 3) + } else { + c.limit = total / 4 * 3 + } + + if c.limit > 200*common.Const1GBytes { + c.transferPageLimit = c.limit / 25 * 2 // 8% + } else if c.limit > 100*common.Const1GBytes { + c.transferPageLimit = c.limit / 25 * 3 // 12% + } else if c.limit > 40*common.Const1GBytes { + c.transferPageLimit = c.limit / 25 * 4 // 16% + } else { + c.transferPageLimit = math.MaxInt64 // no limit + } + + logutil.Info( + "MergeExecutorMemoryInfo", + common.AnyField("container-limit", common.HumanReadableBytes(int(cgroup))), + common.AnyField("host-memory", common.HumanReadableBytes(int(total))), + common.AnyField("process-limit", common.HumanReadableBytes(int(c.limit))), + common.AnyField("transfer-page-limit", common.HumanReadableBytes(int(c.transferPageLimit))), + common.AnyField("error", err), + ) +} + +func (c *resourceController) refresh() { + if c.limit == 0 { + c.setMemLimit(memInfo("MemTotal")) + } + + if c.proc == nil { + c.proc, _ = process.NewProcess(int32(os.Getpid())) + } + if m, err := c.proc.MemoryInfo(); err == nil { + c.using = int64(m.RSS) + } + + if percents, err := cpu.Percent(0, false); err == nil { + c.cpuPercent = percents[0] + } + c.reservedMergeRows = 0 + c.reservedMergeBlks = 0 + c.reserved = 0 +} + +func (c *resourceController) availableMem() int64 { + avail := c.limit - c.using - c.reserved + if avail < 0 { + avail = 0 + } + return avail +} + +func (c *resourceController) printStats() { + if c.reservedMergeBlks == 0 && c.availableMem() > 512*common.Const1MBytes { + return + } + + logutil.Info("MergeExecutorMemoryStats", + common.AnyField("process-limit", common.HumanReadableBytes(int(c.limit))), + common.AnyField("process-mem", common.HumanReadableBytes(int(c.using))), + common.AnyField("inuse-mem", common.HumanReadableBytes(int(c.reserved))), + common.AnyField("inuse-cnt", c.reservedMergeBlks), + ) +} + +func (c *resourceController) reserveResources(objs []*catalog.ObjectEntry) { + for _, obj := range objs { + c.reservedMergeRows += int64(obj.Rows()) + c.reserved += estimateMemUsagePerRow * int64(obj.Rows()) + c.reservedMergeBlks += uint64(obj.BlkCnt()) + } +} + +func (c *resourceController) resourceAvailable(objs []*catalog.ObjectEntry) bool { + if c.reservedMergeRows*36 /*28 * 1.3 */ > c.transferPageLimit/8 { + return false + } + + mem := c.availableMem() + if mem > constMaxMemCap { + mem = constMaxMemCap + } + _, eSize := estimateMergeConsume(objs) + return eSize <= int(2*mem/3) +} + +func memInfo(info string) int64 { + f, err := os.Open("/proc/meminfo") + if err != nil { + return 0 + } + defer f.Close() + + s := bufio.NewScanner(f) + available := int64(0) + for s.Scan() { + if strings.HasPrefix(s.Text(), info) { + _, err = fmt.Sscanf(s.Text(), info+": %d", &available) + if err != nil { + return 0 + } + return available << 10 + } + } + if err = s.Err(); err != nil { + panic(err) + } + return 0 +} + +func objectValid(objectEntry *catalog.ObjectEntry) bool { + if objectEntry.IsAppendable() { + return false + } + if !objectEntry.IsActive() { + return false + } + if !objectEntry.IsCommitted() { + return false + } + if objectEntry.IsCreatingOrAborted() { + return false + } + return true +} diff --git a/pkg/vm/engine/tae/db/merge/utils_test.go b/pkg/vm/engine/tae/db/merge/utils_test.go new file mode 100644 index 000000000000..96540d804ec8 --- /dev/null +++ b/pkg/vm/engine/tae/db/merge/utils_test.go @@ -0,0 +1,31 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestResourceController(t *testing.T) { + rc := new(resourceController) + rc.setMemLimit(10000) + require.Equal(t, int64(7500), rc.limit) + require.Equal(t, int64(7500), rc.availableMem()) + + rc.refresh() + rc.limit = rc.using + 1 + require.Equal(t, int64(1), rc.availableMem()) +}