Skip to content

Commit

Permalink
Merge branch 'main' into patch-for-pr-19501
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Oct 29, 2024
2 parents 2e6ef5e + 895fb49 commit 406cd98
Show file tree
Hide file tree
Showing 154 changed files with 3,882 additions and 2,387 deletions.
22 changes: 21 additions & 1 deletion .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ queue_rules:
{{ body.split("___\n\n### **PR Type**")[0] | trim | get_section("## What this PR does / why we need it:") }}
Approved by: @{{ approved_reviews_by | join(', @') }}
- name: release-2.0
merge_conditions: []
checks_timeout: 4h
merge_method: squash
commit_message_template: |
{{ title }} (#{{ number }})
{{ body.split("___\n\n### **PR Type**")[0] | trim | get_section("## What this PR does / why we need it:") }}
Approved by: @{{ approved_reviews_by | join(', @') }}
pull_request_rules:
- name: Automatic queue on approval for main
Expand All @@ -40,14 +50,24 @@ pull_request_rules:
queue:
name: release-1.2

- name: Automatic queue on approval for release-2.0
conditions:
- "#changes-requested-reviews-by<=0"
- label!=do-not-merge/wip
- base=2.0-dev
- approved-reviews-by=sukki37
actions:
queue:
name: release-2.0


- name: Auto update branch
conditions:
- created-at>=00:10 ago
actions:
update:

- name: Auto Request Reviewer For 1.2-dev
- name: Auto Request Reviewer For Non Main Branch
conditions:
- base!=main
actions:
Expand Down
33 changes: 28 additions & 5 deletions cmd/mo-service/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"hash/fnv"
"io"
"net/http"
_ "net/http/pprof"
"os"
Expand Down Expand Up @@ -399,14 +400,17 @@ func saveProfilesLoop(sigs chan os.Signal) {
*profileInterval = time.Second * 10
}

cpuProfileInterval := *profileInterval / 2

quit := false
tk := time.NewTicker(*profileInterval)
logutil.GetGlobalLogger().Info("save profiles loop started", zap.Duration("profile-interval", *profileInterval))
logutil.GetGlobalLogger().Info("save profiles loop started", zap.Duration("profile-interval", *profileInterval), zap.Duration("cpuProfileInterval", cpuProfileInterval))
for {
select {
case <-tk.C:
logutil.GetGlobalLogger().Info("save profiles start")
saveProfiles()
saveCpuProfile(cpuProfileInterval)
logutil.GetGlobalLogger().Info("save profiles end")
case <-sigs:
quit = true
Expand Down Expand Up @@ -434,18 +438,37 @@ func saveProfile(typ string) string {
return profilePath
}

func saveCpuProfile(cpuProfileInterval time.Duration) {
genCpuProfile := func(writer io.Writer) error {
err := pprof.StartCPUProfile(writer)
if err != nil {
return err
}
time.Sleep(cpuProfileInterval)
pprof.StopCPUProfile()
return err
}
saveProfileWithType("cpu", genCpuProfile)
}

func saveMallocProfile() {
saveProfileWithType("malloc", malloc.WriteProfileData)
}

func saveProfileWithType(typ string, genData func(writer io.Writer) error) {
buf := bytes.Buffer{}
w := gzip.NewWriter(&buf)
if err := malloc.WriteProfileData(w); err != nil {
logutil.GetGlobalLogger().Error("failed to write malloc profile", zap.Error(err))

if err := genData(w); err != nil {
logutil.GetGlobalLogger().Error(fmt.Sprintf("failed to generate %s profile", typ), zap.Error(err))
return
}
if err := w.Close(); err != nil {
return
}

name, _ := uuid.NewV7()
profilePath := catalog.BuildProfilePath(globalServiceType, globalNodeId, "malloc", name.String()) + ".gz"
profilePath := catalog.BuildProfilePath(globalServiceType, globalNodeId, typ, name.String()) + ".gz"
writeVec := fileservice.IOVector{
FilePath: profilePath,
Entries: []fileservice.IOEntry{
Expand All @@ -460,6 +483,6 @@ func saveMallocProfile() {
defer cancel()
if err := globalEtlFS.Write(ctx, writeVec); err != nil {
err = moerr.AttachCause(ctx, err)
logutil.GetGlobalLogger().Error("failed to save malloc profile", zap.Error(err))
logutil.GetGlobalLogger().Error(fmt.Sprintf("failed to write %s profile", typ), zap.Error(err))
}
}
47 changes: 45 additions & 2 deletions cmd/mo-service/debug_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,55 @@ package main

import (
"context"
"io"
"iter"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
)

func Test_saveProfile(t *testing.T) {
dir := t.TempDir()
fs, err := fileservice.NewLocalETLFS(defines.ETLFileServiceName, dir)
assert.Nil(t, err)
defer fs.Close()
globalEtlFS = fs
saveCpuProfile(time.Second)
saveMallocProfile()
}

func Test_saveProfile2(t *testing.T) {
fs, err := fileservice.NewMemoryFS("memory", fileservice.DisabledCacheConfig, nil)
assert.NoError(t, err)
defer fs.Close()
globalEtlFS = fs
saveCpuProfile(time.Second)
}

func Test_saveProfile3(t *testing.T) {
sigs := make(chan os.Signal, 1)
close(sigs)
*profileInterval = time.Second * 10
dir := t.TempDir()
fs, err := fileservice.NewLocalETLFS(defines.ETLFileServiceName, dir)
assert.Nil(t, err)
defer fs.Close()
globalEtlFS = fs
saveProfilesLoop(sigs)
}

func Test_saveProfile4(t *testing.T) {
saveProfileWithType("cpu", func(writer io.Writer) error {
return context.DeadlineExceeded
})
}

var _ fileservice.FileService = &testFS{}

type testFS struct {
Expand All @@ -46,7 +89,7 @@ func (tfs *testFS) ReadCache(ctx context.Context, vector *fileservice.IOVector)
panic("implement me")
}

func (tfs *testFS) List(ctx context.Context, dirPath string) ([]fileservice.DirEntry, error) {
func (tfs *testFS) List(ctx context.Context, dirPath string) iter.Seq2[*fileservice.DirEntry, error] {
//TODO implement me
panic("implement me")
}
Expand Down Expand Up @@ -76,7 +119,7 @@ func (tfs *testFS) Close() {
panic("implement me")
}

func Test_saveMallocProfile(t *testing.T) {
func Test_saveMallocProfile5(t *testing.T) {
globalEtlFS = &testFS{}
saveMallocProfile()
}
4 changes: 2 additions & 2 deletions pkg/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func Test_backupConfigFile(t *testing.T) {
},
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
assert.NoError(t, err)
list, err2 := Fs.List(context.Background(), configDir)
list, err2 := fileservice.SortedList(Fs.List(context.Background(), configDir))
assert.NoError(t, err2)
var configFile string
for _, entry := range list {
Expand Down Expand Up @@ -587,7 +587,7 @@ func TestBackup(t *testing.T) {
assert.NotNil(t, cfg)

//checkup config files
list, err2 := cfg.GeneralDir.List(context.Background(), configDir)
list, err2 := fileservice.SortedList(cfg.GeneralDir.List(context.Background(), configDir))
assert.NoError(t, err2)
var configFile string
for _, entry := range list {
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/tae.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func copyFileAndGetMetaFiles(
decodeFunc func(string) (types.TS, types.TS, string),
copy bool,
) ([]*taeFile, []*checkpoint.MetaFile, []fileservice.DirEntry, error) {
files, err := srcFs.List(ctx, dir)
files, err := fileservice.SortedList(srcFs.List(ctx, dir))
if err != nil {
return nil, nil, nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cdc/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ func (reader *tableReader) readTableWithTxn(
_ = reader.sinker.SendRollback(ctx)
}
}

if err == nil {
reader.wMarkUpdater.UpdateMem(reader.info.SourceTblIdStr, toTs)
}
}()

var curHint engine.ChangesHandle_Hint
Expand Down
Loading

0 comments on commit 406cd98

Please sign in to comment.