From dec4e42101cf5e3438e166abc0e73f8fb597d7a8 Mon Sep 17 00:00:00 2001 From: RainbowMango Date: Wed, 19 Jul 2023 19:27:52 +0800 Subject: [PATCH] refactor cached restmapper based on Kubernetes restmapper Signed-off-by: RainbowMango --- pkg/util/restmapper/restmapper.go | 76 ++++++++++++++++++-------- pkg/util/restmapper/restmapper_test.go | 55 ++++++++++--------- 2 files changed, 80 insertions(+), 51 deletions(-) diff --git a/pkg/util/restmapper/restmapper.go b/pkg/util/restmapper/restmapper.go index 29afa1a33f0b..fe2317d3ce8f 100644 --- a/pkg/util/restmapper/restmapper.go +++ b/pkg/util/restmapper/restmapper.go @@ -8,7 +8,6 @@ import ( "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "k8s.io/client-go/restmapper" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) // GetGroupVersionResource is a helper to map GVK(schema.GroupVersionKind) to GVR(schema.GroupVersionResource). @@ -23,39 +22,42 @@ func GetGroupVersionResource(restMapper meta.RESTMapper, gvk schema.GroupVersion // cachedRESTMapper caches the previous result to accelerate subsequent queries. // Note: now the acceleration applies only to RESTMapping() which is heavily used by Karmada. type cachedRESTMapper struct { - restMapper meta.RESTMapper - gvkToGVR sync.Map + restMapper meta.RESTMapper + discoveryClient discovery.DiscoveryInterface + gvkToGVR sync.Map + // mu is used to provide thread-safe mapper reloading. + mu sync.RWMutex } func (g *cachedRESTMapper) KindFor(resource schema.GroupVersionResource) (schema.GroupVersionKind, error) { - return g.restMapper.KindFor(resource) + return g.getMapper().KindFor(resource) } func (g *cachedRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { - return g.restMapper.KindsFor(resource) + return g.getMapper().KindsFor(resource) } func (g *cachedRESTMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) { - return g.restMapper.ResourceFor(input) + return g.getMapper().ResourceFor(input) } func (g *cachedRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { - return g.restMapper.ResourcesFor(input) + return g.getMapper().ResourcesFor(input) } func (g *cachedRESTMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) { - return g.restMapper.RESTMappings(gk, versions...) + return g.getMapper().RESTMappings(gk, versions...) } func (g *cachedRESTMapper) ResourceSingularizer(resource string) (singular string, err error) { - return g.restMapper.ResourceSingularizer(resource) + return g.getMapper().ResourceSingularizer(resource) } func (g *cachedRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { // in case of multi-versions or no versions, cachedRESTMapper don't know which is the preferred version, // so just bypass the cache and consult the underlying mapper. if len(versions) != 1 { - return g.restMapper.RESTMapping(gk, versions...) + return g.getMapper().RESTMapping(gk, versions...) } gvk := gk.WithVersion(versions[0]) @@ -65,16 +67,49 @@ func (g *cachedRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) } // consult underlying mapper and then update cache - restMapping, err := g.restMapper.RESTMapping(gk, versions...) + restMapping, err := g.getMapper().RESTMapping(gk, versions...) + if meta.IsNoMatchError(err) { + // hit here means a resource might be missing from the current rest mapper, + // probably because a new resource(CRD) has been added, we have to reload + // resource and rebuild the rest mapper. + + var groupResources []*restmapper.APIGroupResources + groupResources, err = restmapper.GetAPIGroupResources(g.discoveryClient) + if err != nil { + return nil, err + } + + newMapper := restmapper.NewDiscoveryRESTMapper(groupResources) + restMapping, err = newMapper.RESTMapping(gk, versions...) + if err == nil { + // hit here means after reloading, the new rest mapper can recognize + // the resource, we have to replace the mapper and clear cache. + g.mu.Lock() + g.restMapper = newMapper + g.mu.Unlock() + g.gvkToGVR.Range(func(key, value any) bool { + g.gvkToGVR.Delete(key) + return true + }) + } + } + if err != nil { return restMapping, err } g.gvkToGVR.Store(gvk, restMapping) + return restMapping, nil } +func (g *cachedRESTMapper) getMapper() meta.RESTMapper { + g.mu.RLock() + defer g.mu.RUnlock() + return g.restMapper +} + // NewCachedRESTMapper builds a cachedRESTMapper with a customized underlyingMapper. -// If underlyingMapper is nil, defaults to DynamicRESTMapper. +// If underlyingMapper is nil, defaults to DiscoveryRESTMapper. func NewCachedRESTMapper(cfg *rest.Config, underlyingMapper meta.RESTMapper) (meta.RESTMapper, error) { cachedMapper := cachedRESTMapper{} @@ -89,21 +124,14 @@ func NewCachedRESTMapper(cfg *rest.Config, underlyingMapper meta.RESTMapper) (me return nil, err } - option := apiutil.WithCustomMapper(func() (meta.RESTMapper, error) { - groupResources, err := restmapper.GetAPIGroupResources(client) - if err != nil { - return nil, err - } - // clear the cache map when reloading DiscoveryRESTMapper - cachedMapper.gvkToGVR = sync.Map{} - return restmapper.NewDiscoveryRESTMapper(groupResources), nil - }) - - underlyingMapper, err = apiutil.NewDynamicRESTMapper(cfg, option) + // loading current resources for building a base rest mapper. + groupResources, err := restmapper.GetAPIGroupResources(client) if err != nil { return nil, err } - cachedMapper.restMapper = underlyingMapper + + cachedMapper.restMapper = restmapper.NewDiscoveryRESTMapper(groupResources) + cachedMapper.discoveryClient = client return &cachedMapper, nil } diff --git a/pkg/util/restmapper/restmapper_test.go b/pkg/util/restmapper/restmapper_test.go index c0708660f91b..ba8d42cd4241 100644 --- a/pkg/util/restmapper/restmapper_test.go +++ b/pkg/util/restmapper/restmapper_test.go @@ -1,7 +1,6 @@ package restmapper import ( - "sync" "testing" "k8s.io/apimachinery/pkg/api/meta" @@ -128,26 +127,36 @@ func BenchmarkGetGroupVersionResource(b *testing.B) { } } } +func BenchmarkGetGroupVersionResourceWithoutCache(b *testing.B) { + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) + if err != nil { + b.Fatalf("Failed to load resources: %v", err) + } -func BenchmarkGetGroupVersionResourceWithCache(b *testing.B) { - cachedMapper := &cachedRESTMapper{} + mapper := restmapper.NewDiscoveryRESTMapper(groupResources) - var option = apiutil.WithCustomMapper(func() (meta.RESTMapper, error) { - groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) - if err != nil { - return nil, err + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, tc := range getGVRTestCases { + _, err := GetGroupVersionResource(mapper, tc.inputGVK) + if (err != nil && !tc.expectErr) || (err == nil && tc.expectErr) { + b.Errorf("GetGroupVersionResource For %#v Error: %v, wantErr: %v", tc.inputGVK, err, tc.expectErr) + } } - // clear the cache map when reloading DiscoveryRESTMapper - cachedMapper.gvkToGVR = sync.Map{} - return restmapper.NewDiscoveryRESTMapper(groupResources), nil - }) + } +} - mapper, err := apiutil.NewDynamicRESTMapper(&rest.Config{}, option) +func BenchmarkGetGroupVersionResourceWithCache(b *testing.B) { + cachedMapper := &cachedRESTMapper{} + + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) if err != nil { - b.Error(err) + b.Fatalf("Failed to load resources: %v", err) } - cachedMapper.restMapper = mapper + newMapper := restmapper.NewDiscoveryRESTMapper(groupResources) + cachedMapper.restMapper = newMapper + cachedMapper.discoveryClient = discoveryClient b.ResetTimer() for i := 0; i < b.N; i++ { @@ -163,22 +172,14 @@ func BenchmarkGetGroupVersionResourceWithCache(b *testing.B) { func TestGetGroupVersionResourceWithCache(t *testing.T) { cachedMapper := &cachedRESTMapper{} - var option = apiutil.WithCustomMapper(func() (meta.RESTMapper, error) { - groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) - if err != nil { - return nil, err - } - // clear the cache map when reloading DiscoveryRESTMapper - cachedMapper.gvkToGVR = sync.Map{} - return restmapper.NewDiscoveryRESTMapper(groupResources), nil - }) - - mapper, err := apiutil.NewDynamicRESTMapper(&rest.Config{}, option) + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) if err != nil { - t.Error(err) + t.Fatalf("Failed to load resources: %v", err) } - cachedMapper.restMapper = mapper + newMapper := restmapper.NewDiscoveryRESTMapper(groupResources) + cachedMapper.restMapper = newMapper + cachedMapper.discoveryClient = discoveryClient for _, tc := range getGVRTestCases { t.Run(tc.name, func(t *testing.T) {