From d52dc729160d4f4c0b3b71ca73a03bad7265b491 Mon Sep 17 00:00:00 2001 From: reus Date: Mon, 21 Oct 2024 16:18:54 +0800 Subject: [PATCH] fileservice: add QCloudSDK --- go.mod | 5 + go.sum | 15 + pkg/fileservice/disk_cache_test.go | 13 +- pkg/fileservice/file_service_bench_test.go | 28 +- pkg/fileservice/file_service_test.go | 34 +- pkg/fileservice/memory_fs_test.go | 26 +- pkg/fileservice/object_storage_test.go | 7 + pkg/fileservice/qcloud_sdk.go | 503 +++++++++++++++++++++ pkg/fileservice/qcloud_sdk_test.go | 83 ++++ pkg/fileservice/s3_fs.go | 11 +- 10 files changed, 675 insertions(+), 50 deletions(-) create mode 100644 pkg/fileservice/qcloud_sdk.go create mode 100644 pkg/fileservice/qcloud_sdk_test.go diff --git a/go.mod b/go.mod index ac23d685eec8..1b18e8196d4c 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spkg/bom v1.0.0 github.com/stretchr/testify v1.9.0 + github.com/tencentyun/cos-go-sdk-v5 v0.7.55 github.com/ti-mo/conntrack v0.5.1 github.com/ti-mo/netfilter v0.5.2 github.com/tidwall/btree v1.6.0 @@ -93,6 +94,7 @@ require ( github.com/andybalholm/brotli v1.1.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect github.com/cilium/ebpf v0.9.1 // indirect + github.com/clbanning/mxj v1.8.4 // indirect github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/go-ini/ini v1.67.0 // indirect @@ -101,6 +103,7 @@ require ( github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 // indirect github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-querystring v1.1.0 // indirect github.com/gosimple/slug v1.13.1 // indirect github.com/gosimple/unidecode v1.0.1 // indirect github.com/itchyny/timefmt-go v0.1.6 // indirect @@ -110,8 +113,10 @@ require ( github.com/mdlayher/netlink v1.7.2 // indirect github.com/mdlayher/socket v0.5.1 // indirect github.com/minio/md5-simd v1.1.2 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/mozillazg/go-httpheader v0.2.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b // indirect diff --git a/go.sum b/go.sum index 14ae71a8b4f0..18929d9fdb94 100644 --- a/go.sum +++ b/go.sum @@ -42,6 +42,7 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5 github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8= github.com/Microsoft/hcsshim v0.11.4/go.mod h1:smjE4dvqPX9Zldna+t5FG3rnoHhaB7QYxPRqGcpAD9w= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= github.com/RoaringBitmap/roaring v1.2.3 h1:yqreLINqIrX22ErkKI0vY47/ivtJr6n+kMhVOVmhWBY= github.com/RoaringBitmap/roaring v1.2.3/go.mod h1:plvDsJQpxOC5bw8LRteu/MLWHsHez/3y6cubLI4/1yE= github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0= @@ -133,6 +134,8 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5P github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/cilium/ebpf v0.9.1 h1:64sn2K3UKw8NbP/blsixRpF3nXuyhz/VjRlRzvlBRu4= github.com/cilium/ebpf v0.9.1/go.mod h1:+OhNOIXx/Fnu1IE8bJz2dzOA+VSfyTfdNUVdlQnxUFY= +github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I= +github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -295,6 +298,7 @@ github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptG github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= +github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -329,6 +333,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= @@ -337,6 +342,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= +github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -348,6 +355,7 @@ github.com/google/pprof v0.0.0-20230510103437-eeec1cb781c3/go.mod h1:79YE0hCXdHa github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4= github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -543,6 +551,7 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1 github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= @@ -579,6 +588,8 @@ github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwd github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= +github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSrwhe88TQQ= +github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -738,6 +749,10 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.563/go.mod h1:7sCQWVkxcsR38nffDW057DRGk8mUjK1Ing/EFOK8s8Y= +github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/kms v1.0.563/go.mod h1:uom4Nvi9W+Qkom0exYiJ9VWJjXwyxtPYTkKkaLMlfE0= +github.com/tencentyun/cos-go-sdk-v5 v0.7.55 h1:9DfH3umWUd0I2jdqcUxrU1kLfUPOydULNy4T9qN5PF8= +github.com/tencentyun/cos-go-sdk-v5 v0.7.55/go.mod h1:8+hG+mQMuRP/OIS9d83syAvXvrMj9HhkND6Q1fLghw0= github.com/testcontainers/testcontainers-go v0.29.1 h1:z8kxdFlovA2y97RWx98v/TQ+tR+SXZm6p35M+xB92zk= github.com/testcontainers/testcontainers-go v0.29.1/go.mod h1:SnKnKQav8UcgtKqjp/AD8bE1MqZm+3TDb/B8crE3XnI= github.com/testcontainers/testcontainers-go/modules/compose v0.29.1 h1:47ipPM+s+ltCDOP3Sa1j95AkNb+z+WGiHLDbLU8ixuc= diff --git a/pkg/fileservice/disk_cache_test.go b/pkg/fileservice/disk_cache_test.go index 9b5cf527c114..0d003c88ef44 100644 --- a/pkg/fileservice/disk_cache_test.go +++ b/pkg/fileservice/disk_cache_test.go @@ -537,13 +537,11 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) { } var counter perfcounter.CounterSet ctx := perfcounter.WithCounterSet(ctx, &counter) - err := cache.Read( - ctx, - &IOVector{ - FilePath: "foo", - Entries: entries, - }, - ) + vec := &IOVector{ + FilePath: "foo", + Entries: entries, + } + err := cache.Read(ctx, vec) if err != nil { b.Fatal(err) } @@ -551,6 +549,7 @@ func BenchmarkDiskCacheMultipleIOEntries(b *testing.B) { if numOpen != 1 { b.Fatal() } + vec.Release() } } diff --git a/pkg/fileservice/file_service_bench_test.go b/pkg/fileservice/file_service_bench_test.go index e5a873976f45..14835de51f5a 100644 --- a/pkg/fileservice/file_service_bench_test.go +++ b/pkg/fileservice/file_service_bench_test.go @@ -53,26 +53,22 @@ func benchmarkFileService(ctx context.Context, b *testing.B, newFS func() FileSe b.RunParallel(func(pb *testing.PB) { - readVector := &IOVector{ - FilePath: "foo", - } - offset := int64(0) - for _, part := range parts2 { - readVector.Entries = append(readVector.Entries, IOEntry{ - Offset: offset, - Size: int64(len(part)), - ToCacheData: CacheOriginalData, - }) - offset += int64(len(part)) - } - for pb.Next() { - for i := range readVector.Entries { - readVector.Entries[i].done = false - readVector.Entries[i].Data = nil + readVector := &IOVector{ + FilePath: "foo", + } + offset := int64(0) + for _, part := range parts2 { + readVector.Entries = append(readVector.Entries, IOEntry{ + Offset: offset, + Size: int64(len(part)), + ToCacheData: CacheOriginalData, + }) + offset += int64(len(part)) } err := fs.Read(ctx, readVector) assert.Nil(b, err) + readVector.Release() } }) diff --git a/pkg/fileservice/file_service_test.go b/pkg/fileservice/file_service_test.go index 93abe794f14e..4b5d6f0d7dd2 100644 --- a/pkg/fileservice/file_service_test.go +++ b/pkg/fileservice/file_service_test.go @@ -323,7 +323,7 @@ func testFileService( content := make([]byte, _BlockContentSize*4) _, err := rand.Read(content) assert.Nil(t, err) - parts := randomSplit(content, len(content)/16) + parts := randomCut(content, 16) // write writeVector := IOVector{ @@ -361,7 +361,7 @@ func testFileService( readVector.Release() // read, random entry - parts = randomSplit(content, len(content)/16) + parts = randomCut(content, 16) readVector.Entries = readVector.Entries[:0] offset = int64(0) for _, part := range parts { @@ -374,12 +374,12 @@ func testFileService( err = fs.Read(ctx, readVector) assert.Nil(t, err) for i, entry := range readVector.Entries { - assert.Equal(t, parts[i], entry.Data, "path: %s, entry: %+v, content %v", filePath, entry, content) + assert.Equal(t, parts[i], entry.Data, "path: %s, entry: %+v", filePath, entry) } readVector.Release() // read, random entry with ReadCloserForRead - parts = randomSplit(content, len(content)/16) + parts = randomCut(content, 16) readVector.Entries = readVector.Entries[:0] offset = int64(0) readers := make([]io.ReadCloser, len(parts)) @@ -1014,19 +1014,25 @@ func testFileService( } -func randomSplit(data []byte, maxLen int) (ret [][]byte) { - for { - if len(data) == 0 { - return +func randomCut(data []byte, parts int) [][]byte { + positions := mrand.Perm(len(data))[:parts-1] + sort.Ints(positions) + slices := make([][]byte, 0, parts) + slices = append(slices, data[:positions[0]]) + for i, pos := range positions { + if i == len(positions)-1 { + break } - if len(data) < maxLen { - ret = append(ret, data) - return + slices = append(slices, data[pos:positions[i+1]]) + } + slices = append(slices, data[positions[len(positions)-1]:]) + ret := slices[:0] + for _, slice := range slices { + if len(slice) > 0 { + ret = append(ret, slice) } - cut := 1 + mrand.Intn(maxLen) - ret = append(ret, data[:cut]) - data = data[cut:] } + return ret } func fixedSplit(data []byte, l int) (ret [][]byte) { diff --git a/pkg/fileservice/memory_fs_test.go b/pkg/fileservice/memory_fs_test.go index 2d1a044295be..7d13c9e9bce5 100644 --- a/pkg/fileservice/memory_fs_test.go +++ b/pkg/fileservice/memory_fs_test.go @@ -57,15 +57,18 @@ func BenchmarkMemoryFSWithMemoryCache(b *testing.B) { var counterSet perfcounter.CounterSet ctx = perfcounter.WithCounterSet(ctx, &counterSet) + cache := NewMemCache( + fscache.ConstCapacity(128*1024*1024), + nil, + nil, + "", + ) + defer cache.Close() + benchmarkFileService(ctx, b, func() FileService { fs, err := NewMemoryFS("memory", DisabledCacheConfig, nil) assert.Nil(b, err) - fs.caches = append(fs.caches, NewMemCache( - fscache.ConstCapacity(128*1024*1024), - nil, - nil, - "", - )) + fs.caches = append(fs.caches, cache) return fs }) @@ -79,13 +82,16 @@ func BenchmarkMemoryFSWithMemoryCacheLowCapacity(b *testing.B) { var counterSet perfcounter.CounterSet ctx = perfcounter.WithCounterSet(ctx, &counterSet) + cache := NewMemCache( + fscache.ConstCapacity(2*1024*1024), nil, nil, + "", + ) + defer cache.Close() + benchmarkFileService(ctx, b, func() FileService { fs, err := NewMemoryFS("memory", DisabledCacheConfig, nil) assert.Nil(b, err) - fs.caches = append(fs.caches, NewMemCache( - fscache.ConstCapacity(2*1024*1024), nil, nil, - "", - )) + fs.caches = append(fs.caches, cache) return fs }) diff --git a/pkg/fileservice/object_storage_test.go b/pkg/fileservice/object_storage_test.go index 8cf174cc4c45..37bfa77282e5 100644 --- a/pkg/fileservice/object_storage_test.go +++ b/pkg/fileservice/object_storage_test.go @@ -146,6 +146,13 @@ func TestObjectStorages(t *testing.T) { } return storage }) + testObjectStorage(t, "qcloud", func(t *testing.T) *QCloudSDK { + storage, err := NewQCloudSDK(context.Background(), args, nil) + if err != nil { + t.Fatal(err) + } + return storage + }) case strings.Contains(args.Endpoint, "aws"): // AWS diff --git a/pkg/fileservice/qcloud_sdk.go b/pkg/fileservice/qcloud_sdk.go new file mode 100644 index 000000000000..aa34b74dd6a4 --- /dev/null +++ b/pkg/fileservice/qcloud_sdk.go @@ -0,0 +1,503 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + gotrace "runtime/trace" + "strconv" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/perfcounter" + "github.com/matrixorigin/matrixone/pkg/util/trace" + "github.com/tencentyun/cos-go-sdk-v5" + "go.uber.org/zap" +) + +type QCloudSDK struct { + name string + client *cos.Client + perfCounterSets []*perfcounter.CounterSet + listMaxKeys int +} + +func NewQCloudSDK( + ctx context.Context, + args ObjectStorageArguments, + perfCounterSets []*perfcounter.CounterSet, +) (_ *QCloudSDK, err error) { + defer catch(&err) + + // args + if err := args.validate(); err != nil { + return nil, err + } + + // bucket url + baseURL, err := url.Parse(fmt.Sprintf( + "https://%s.cos.%s.myqcloud.com", + args.Bucket, + args.Region, + )) + if err != nil { + return nil, err + } + + // transport + transport := &cos.AuthorizationTransport{ + SecretID: args.KeyID, + SecretKey: args.KeySecret, + SessionToken: args.SessionToken, + Transport: newHTTPClient(args).Transport, + } + + // client + client := cos.NewClient( + &cos.BaseURL{BucketURL: baseURL}, + &http.Client{ + Transport: transport, + }, + ) + + logutil.Info("new object storage", + zap.Any("sdk", "qcloud"), + zap.Any("arguments", args), + ) + + if !args.NoBucketValidation { + // validate bucket + _, err := client.Bucket.Head(ctx, &cos.BucketHeadOptions{}) + if err != nil { + return nil, err + } + } + + return &QCloudSDK{ + name: args.Name, + client: client, + perfCounterSets: perfCounterSets, + }, nil +} + +var _ ObjectStorage = new(QCloudSDK) + +func (a *QCloudSDK) List( + ctx context.Context, + prefix string, + fn func(bool, string, int64) (bool, error), +) error { + + if err := ctx.Err(); err != nil { + return err + } + + var cont string + +loop1: + for { + result, err := a.listObjects(ctx, prefix, cont) + if err != nil { + return err + } + + for _, obj := range result.Contents { + more, err := fn(false, obj.Key, obj.Size) + if err != nil { + return err + } + if !more { + break loop1 + } + } + + for _, prefix := range result.CommonPrefixes { + more, err := fn(true, prefix, 0) + if err != nil { + return err + } + if !more { + break loop1 + } + } + + if !result.IsTruncated { + break + } + cont = result.NextMarker + } + + return nil +} + +func (a *QCloudSDK) Stat( + ctx context.Context, + key string, +) ( + size int64, + err error, +) { + + defer func() { + if a.is404(err) { + err = moerr.NewFileNotFoundNoCtx(key) + } + }() + + if err := ctx.Err(); err != nil { + return 0, err + } + + header, err := a.statObject( + ctx, + key, + ) + if err != nil { + return + } + + if str := header.Get("Content-Length"); str != "" { + size, err = strconv.ParseInt(str, 10, 64) + if err != nil { + return + } + } + + return +} + +func (a *QCloudSDK) Exists( + ctx context.Context, + key string, +) ( + bool, + error, +) { + + if err := ctx.Err(); err != nil { + return false, err + } + + _, err := a.statObject( + ctx, + key, + ) + if err != nil { + if a.is404(err) { + return false, nil + } + return false, err + } + + return true, nil +} + +func (a *QCloudSDK) Write( + ctx context.Context, + key string, + r io.Reader, + size int64, + expire *time.Time, +) ( + err error, +) { + + err = a.putObject( + ctx, + key, + r, + size, + expire, + ) + if err != nil { + return err + } + + return +} + +func (a *QCloudSDK) Read( + ctx context.Context, + key string, + min *int64, + max *int64, +) ( + r io.ReadCloser, + err error, +) { + + defer func() { + if a.is404(err) { + err = moerr.NewFileNotFoundNoCtx(key) + } + }() + + if max == nil { + // read to end + r, err := a.getObject( + ctx, + key, + min, + nil, + ) + if err != nil { + return nil, err + } + return r, nil + } + + r, err = a.getObject( + ctx, + key, + min, + max, + ) + if err != nil { + return nil, err + } + return &readCloser{ + r: io.LimitReader(r, int64(*max-*min)), + closeFunc: r.Close, + }, nil +} + +func (a *QCloudSDK) Delete( + ctx context.Context, + keys ...string, +) ( + err error, +) { + + if err := ctx.Err(); err != nil { + return err + } + + if len(keys) == 0 { + return nil + } + if len(keys) == 1 { + return a.deleteSingle(ctx, keys[0]) + } + + for i := 0; i < len(keys); i += 1000 { + end := i + 1000 + if end > len(keys) { + end = len(keys) + } + if _, err := a.deleteObjects(ctx, keys[i:end]...); err != nil { + return err + } + } + + return nil +} + +func (a *QCloudSDK) deleteSingle(ctx context.Context, key string) error { + ctx, span := trace.Start(ctx, "QCloudSDK.deleteSingle") + defer span.End() + + _, err := a.deleteObject( + ctx, + key, + ) + if err != nil { + return err + } + + return nil +} + +func (a *QCloudSDK) listObjects(ctx context.Context, prefix string, marker string) (*cos.BucketGetResult, error) { + ctx, task := gotrace.NewTask(ctx, "QCloudSDK.listObjects") + defer task.End() + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.List.Add(1) + }, a.perfCounterSets...) + + opts := &cos.BucketGetOptions{ + Delimiter: "/", + } + if prefix != "" { + opts.Prefix = prefix + } + if marker != "" { + opts.Marker = marker + } + if a.listMaxKeys > 0 { + opts.MaxKeys = a.listMaxKeys + } + + return DoWithRetry( + "s3 list objects", + func() (*cos.BucketGetResult, error) { + result, _, err := a.client.Bucket.Get(ctx, opts) + if err != nil { + return nil, err + } + return result, nil + }, + maxRetryAttemps, + IsRetryableError, + ) +} + +func (a *QCloudSDK) statObject(ctx context.Context, key string) (http.Header, error) { + ctx, task := gotrace.NewTask(ctx, "QCloudSDK.statObject") + defer task.End() + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Head.Add(1) + }, a.perfCounterSets...) + + return DoWithRetry( + "s3 head object", + func() (http.Header, error) { + resp, err := a.client.Object.Head(ctx, key, &cos.ObjectHeadOptions{}) + if err != nil { + return nil, err + } + return resp.Header, nil + }, + maxRetryAttemps, + IsRetryableError, + ) +} + +func (a *QCloudSDK) putObject( + ctx context.Context, + key string, + r io.Reader, + size int64, + expire *time.Time, +) (err error) { + ctx, task := gotrace.NewTask(ctx, "QCloudSDK.putObject") + defer task.End() + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Put.Add(1) + }, a.perfCounterSets...) + + // not retryable because Reader may be half consumed + _, err = a.client.Object.Put(ctx, key, r, &cos.ObjectPutOptions{}) + if err != nil { + return err + } + return nil +} + +func (a *QCloudSDK) getObject(ctx context.Context, key string, min *int64, max *int64) (io.ReadCloser, error) { + ctx, task := gotrace.NewTask(ctx, "QCloudSDK.getObject") + defer task.End() + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Get.Add(1) + }, a.perfCounterSets...) + + if min == nil { + min = ptrTo[int64](0) + } + + return newRetryableReader( + func(offset int64) (io.ReadCloser, error) { + var rang string + if max != nil { + rang = fmt.Sprintf("bytes=%d-%d", offset, *max) + } else { + rang = fmt.Sprintf("bytes=%d-", offset) + } + opts := &cos.ObjectGetOptions{ + Range: rang, + } + + return DoWithRetry( + "s3 get object", + func() (io.ReadCloser, error) { + resp, err := a.client.Object.Get(ctx, key, opts) + if err != nil { + return nil, err + } + return resp.Body, nil + }, + maxRetryAttemps, + IsRetryableError, + ) + + }, + *min, + IsRetryableError, + ) +} + +func (a *QCloudSDK) deleteObject(ctx context.Context, key string) (bool, error) { + ctx, task := gotrace.NewTask(ctx, "QCloudSDK.deleteObject") + defer task.End() + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.Delete.Add(1) + }, a.perfCounterSets...) + return DoWithRetry( + "s3 delete object", + func() (bool, error) { + if _, err := a.client.Object.Delete(ctx, key); err != nil { + return false, err + } + return true, nil + }, + maxRetryAttemps, + IsRetryableError, + ) +} + +func (a *QCloudSDK) deleteObjects(ctx context.Context, keys ...string) (bool, error) { + ctx, task := gotrace.NewTask(ctx, "QCloudSDK.deleteObjects") + defer task.End() + perfcounter.Update(ctx, func(counter *perfcounter.CounterSet) { + counter.FileService.S3.DeleteMulti.Add(1) + }, a.perfCounterSets...) + return DoWithRetry( + "s3 delete objects", + func() (bool, error) { + objects := make([]cos.Object, 0, len(keys)) + for _, key := range keys { + objects = append(objects, cos.Object{ + Key: key, + }) + } + _, _, err := a.client.Object.DeleteMulti(ctx, &cos.ObjectDeleteMultiOptions{ + Quiet: true, + Objects: objects, + }) + if err != nil { + return false, err + } + return true, nil + }, + maxRetryAttemps, + IsRetryableError, + ) +} + +func (a *QCloudSDK) is404(err error) bool { + if err == nil { + return false + } + var resp *cos.ErrorResponse + if errors.As(err, &resp) { + return resp.Response.StatusCode == 404 + } + return false +} diff --git a/pkg/fileservice/qcloud_sdk_test.go b/pkg/fileservice/qcloud_sdk_test.go new file mode 100644 index 000000000000..409b68e4eaad --- /dev/null +++ b/pkg/fileservice/qcloud_sdk_test.go @@ -0,0 +1,83 @@ +// Copyright 2024 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fileservice + +import ( + "context" + "fmt" + "math/rand/v2" + "strings" + "testing" +) + +func TestQCloudSDK(t *testing.T) { + + t.Run("object storage", func(t *testing.T) { + for _, args := range objectStorageArgumentsForTest("test", t) { + if !strings.Contains(args.Endpoint, "myqcloud") { + continue + } + + t.Run(fmt.Sprintf("%s %s", args.Name, args.Bucket), func(t *testing.T) { + + testObjectStorage(t, "qcloud", func(t *testing.T) *QCloudSDK { + args.KeyPrefix = fmt.Sprintf("%v", rand.Int64()) + ret, err := NewQCloudSDK( + context.Background(), + args, + nil, + ) + if err != nil { + t.Fatal(err) + } + return ret + }) + + }) + } + }) + + t.Run("file service", func(t *testing.T) { + for _, args := range objectStorageArgumentsForTest("test", t) { + if !strings.Contains(args.Endpoint, "myqcloud") { + continue + } + + t.Run(fmt.Sprintf("%s %s", args.Name, args.Bucket), func(t *testing.T) { + + t.Run("file service", func(t *testing.T) { + testFileService(t, 0, func(name string) FileService { + args.Name = name + args.KeyPrefix = fmt.Sprintf("%v", rand.Int64()) + ret, err := NewS3FS( + context.Background(), + args, + DisabledCacheConfig, + nil, + true, + true, + ) + if err != nil { + t.Fatal(err) + } + return ret + }) + }) + + }) + } + }) + +} diff --git a/pkg/fileservice/s3_fs.go b/pkg/fileservice/s3_fs.go index e6db2895f9f5..93d98da39e9d 100644 --- a/pkg/fileservice/s3_fs.go +++ b/pkg/fileservice/s3_fs.go @@ -85,15 +85,20 @@ func NewS3FS( case args.IsMinio || // 天翼云 - strings.Contains(args.Endpoint, "ctyunapi.cn") || - // 腾讯云 - strings.Contains(args.Endpoint, "myqcloud.com"): + strings.Contains(args.Endpoint, "ctyunapi.cn"): // MinIO SDK fs.storage, err = NewMinioSDK(ctx, args, perfCounterSets) if err != nil { return nil, err } + case strings.Contains(args.Endpoint, "myqcloud.com"): + // 腾讯云 + fs.storage, err = NewQCloudSDK(ctx, args, perfCounterSets) + if err != nil { + return nil, err + } + case strings.Contains(args.Endpoint, "aliyuncs.com"): // 阿里云 fs.storage, err = NewAliyunSDK(ctx, args, perfCounterSets)