From 337919fcb32db193f95800899de2aad74dda25fd Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Tue, 31 Oct 2023 03:10:36 -0700 Subject: [PATCH 1/2] Support allocating shared LB for services Signed-off-by: Daishan Peng --- controllers/service/service_controller.go | 139 +++++++++++++++++-- main.go | 22 ++- pkg/deploy/ec2/security_group_manager.go | 3 +- pkg/deploy/ec2/security_group_synthesizer.go | 1 + pkg/model/core/graph/resource_graph.go | 20 +++ pkg/model/core/stack.go | 54 ++++++- pkg/model/ec2/security_group.go | 15 ++ pkg/service/model_build_listener.go | 23 ++- pkg/service/model_build_load_balancer.go | 20 ++- pkg/service/model_build_managed_sg.go | 56 +++++--- pkg/service/model_builder.go | 132 +++++++++++++++++- pkg/service/service_utils.go | 11 ++ 12 files changed, 440 insertions(+), 56 deletions(-) diff --git a/controllers/service/service_controller.go b/controllers/service/service_controller.go index 1b51e9c72..2a3b13525 100644 --- a/controllers/service/service_controller.go +++ b/controllers/service/service_controller.go @@ -3,10 +3,13 @@ package service import ( "context" "fmt" + "sync" "github.com/go-logr/logr" + "github.com/google/uuid" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/aws-load-balancer-controller/controllers/service/eventhandlers" @@ -33,6 +36,8 @@ const ( serviceTagPrefix = "service.k8s.aws" serviceAnnotationPrefix = "service.beta.kubernetes.io" controllerName = "service" + + nlbListenerLimit = 50 ) func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder, @@ -48,7 +53,7 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider, elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags, controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils, - backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules) + backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, k8sClient) stackMarshaller := deploy.NewDefaultStackMarshaller() stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger) return &serviceReconciler{ @@ -65,6 +70,9 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde stackDeployer: stackDeployer, logger: logger, + allocatedServices: map[string]map[int32]struct{}{}, + lock: sync.Mutex{}, + maxConcurrentReconciles: controllerConfig.ServiceMaxConcurrentReconciles, } } @@ -83,6 +91,9 @@ type serviceReconciler struct { stackDeployer deploy.StackDeployer logger logr.Logger + allocatedServices map[string]map[int32]struct{} + initialized bool + lock sync.Mutex maxConcurrentReconciles int } @@ -99,16 +110,99 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil { return client.IgnoreNotFound(err) } + if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + r.lock.Lock() + if err := r.allocatedService(ctx, svc); err != nil { + r.lock.Unlock() + return err + } + r.lock.Unlock() + } + stack, lb, backendSGRequired, err := r.buildModel(ctx, svc) if err != nil { return err } - if lb == nil { - return r.cleanupLoadBalancerResources(ctx, svc, stack) + if lb == nil || !svc.DeletionTimestamp.IsZero() { + return r.cleanupLoadBalancerResources(ctx, svc, stack, lb == nil) } return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired) } +// allocatedService allocates a stack to a service so that it can share load balancer resources with other services. +func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error { + if !r.initialized { + var serviceList corev1.ServiceList + if err := r.k8sClient.List(ctx, &serviceList); err != nil { + return err + } + for _, svc := range serviceList.Items { + if svc.Annotations[service.LoadBalancerStackKey] != "" { + if r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] == nil { + r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] = map[int32]struct{}{} + } + for _, port := range svc.Spec.Ports { + r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]][port.NodePort] = struct{}{} + } + } + } + r.initialized = true + } + + if !svc.DeletionTimestamp.IsZero() { + if _, ok := r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]; !ok { + return nil + } + + for _, port := range svc.Spec.Ports { + delete(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]], port.NodePort) + } + + if len(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]) == 0 { + delete(r.allocatedServices, svc.Annotations[service.LoadBalancerStackKey]) + } + + return nil + } + + // If service is not type loadbalancer, or it is not intended to share LB, or it has been allocated, skip the controller + if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Annotations[service.LoadBalancerAllocatingPortKey] != "true" || svc.Annotations[service.LoadBalancerStackKey] != "" { + return nil + } + + allocated := false + for stackName := range r.allocatedServices { + usedPort := r.allocatedServices[stackName] + if len(usedPort) <= nlbListenerLimit-len(svc.Spec.Ports) { + svc.Annotations[service.LoadBalancerStackKey] = stackName + if err := r.k8sClient.Update(ctx, svc); err != nil { + return err + } + for _, port := range svc.Spec.Ports { + usedPort[port.NodePort] = struct{}{} + } + r.allocatedServices[stackName] = usedPort + allocated = true + break + } + } + + if !allocated { + stackName := uuid.New().String() + svc.Annotations[service.LoadBalancerStackKey] = stackName + if err := r.k8sClient.Update(ctx, svc); err != nil { + return err + } + if r.allocatedServices[stackName] == nil { + r.allocatedServices[stackName] = map[int32]struct{}{} + } + for _, port := range svc.Spec.Ports { + r.allocatedServices[stackName][port.NodePort] = struct{}{} + } + } + return nil +} + func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) { stack, lb, backendSGRequired, err := r.modelBuilder.Build(ctx, svc) if err != nil { @@ -140,10 +234,15 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context, r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err)) return err } + // always lock the stack deploying models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops, + // make sure only one goroutine is building the model at a time to guarantee thread safety. + stack.Lock() err := r.deployModel(ctx, svc, stack) if err != nil { + stack.Unlock() return err } + stack.Unlock() lbDNS, err := lb.DNSName().Resolve(ctx) if err != nil { return err @@ -163,15 +262,28 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context, return nil } -func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error { +func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, cleanlb bool) error { if k8s.HasFinalizer(svc, serviceFinalizer) { + stack.Lock() err := r.deployModel(ctx, svc, stack) if err != nil { + stack.Unlock() return err } - if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(svc)}); err != nil { - return err + stack.Unlock() + if cleanlb { + nsName := k8s.NamespacedName(svc) + if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + nsName = types.NamespacedName{ + Namespace: "stack", + Name: svc.Annotations[service.LoadBalancerStackKey], + } + } + if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}); err != nil && !apierrors.IsNotFound(err) { + return err + } } + if err = r.cleanupServiceStatus(ctx, svc); err != nil { r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err)) return err @@ -189,11 +301,17 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin svc.Status.LoadBalancer.Ingress[0].IP != "" || svc.Status.LoadBalancer.Ingress[0].Hostname != lbDNS { svcOld := svc.DeepCopy() - svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ - { - Hostname: lbDNS, - }, + ingress := corev1.LoadBalancerIngress{ + Hostname: lbDNS, } + if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + for _, port := range svc.Spec.Ports { + ingress.Ports = append(ingress.Ports, corev1.PortStatus{ + Port: port.NodePort, + }) + } + } + svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ingress} if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil { return errors.Wrapf(err, "failed to update service status: %v", k8s.NamespacedName(svc)) } @@ -221,6 +339,7 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag if err := r.setupWatches(ctx, c); err != nil { return err } + return nil } diff --git a/main.go b/main.go index f690a069e..810a44d44 100644 --- a/main.go +++ b/main.go @@ -33,15 +33,11 @@ import ( "sigs.k8s.io/aws-load-balancer-controller/pkg/aws" "sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle" "sigs.k8s.io/aws-load-balancer-controller/pkg/config" - "sigs.k8s.io/aws-load-balancer-controller/pkg/inject" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" "sigs.k8s.io/aws-load-balancer-controller/pkg/runtime" "sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding" "sigs.k8s.io/aws-load-balancer-controller/pkg/version" - corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core" - elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2" - networkingwebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/networking" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -91,7 +87,7 @@ func main() { setupLog.Error(err, "unable to start manager") os.Exit(1) } - config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr) + //config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr) clientSet, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { setupLog.Error(err, "unable to obtain clientSet") @@ -149,14 +145,14 @@ func main() { os.Exit(1) } - podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig, - mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector")) - corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr) - corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr) - elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr) - elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) - elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) - networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr) + //podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig, + // mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector")) + //corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr) + //corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr) + //elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr) + //elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) + //elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) + //networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr) //+kubebuilder:scaffold:builder go func() { diff --git a/pkg/deploy/ec2/security_group_manager.go b/pkg/deploy/ec2/security_group_manager.go index 9fc84ceeb..37bd33247 100644 --- a/pkg/deploy/ec2/security_group_manager.go +++ b/pkg/deploy/ec2/security_group_manager.go @@ -2,6 +2,8 @@ package ec2 import ( "context" + "time" + awssdk "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" ec2sdk "github.com/aws/aws-sdk-go/service/ec2" @@ -12,7 +14,6 @@ import ( ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" "sigs.k8s.io/aws-load-balancer-controller/pkg/runtime" - "time" ) const ( diff --git a/pkg/deploy/ec2/security_group_synthesizer.go b/pkg/deploy/ec2/security_group_synthesizer.go index b5e76845d..e3244452f 100644 --- a/pkg/deploy/ec2/security_group_synthesizer.go +++ b/pkg/deploy/ec2/security_group_synthesizer.go @@ -2,6 +2,7 @@ package ec2 import ( "context" + "github.com/go-logr/logr" "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" diff --git a/pkg/model/core/graph/resource_graph.go b/pkg/model/core/graph/resource_graph.go index 78413c9aa..157552ffb 100644 --- a/pkg/model/core/graph/resource_graph.go +++ b/pkg/model/core/graph/resource_graph.go @@ -13,6 +13,8 @@ type ResourceGraph interface { // Add a node into ResourceGraph. AddNode(node ResourceUID) + RemoveNode(node ResourceUID) + // Add a edge into ResourceGraph, where dstNode depends on srcNode. AddEdge(srcNode ResourceUID, dstNode ResourceUID) @@ -44,6 +46,24 @@ func (g *defaultResourceGraph) AddNode(node ResourceUID) { g.nodes = append(g.nodes, node) } +func (g *defaultResourceGraph) RemoveNode(node ResourceUID) { + for i, n := range g.nodes { + if n == node { + g.nodes = append(g.nodes[:i], g.nodes[i+1:]...) + break + } + } + delete(g.outEdges, node) + for _, nodes := range g.outEdges { + for i, n := range nodes { + if n == node { + nodes = append(nodes[:i], nodes[i+1:]...) + break + } + } + } +} + // Add a edge into ResourceGraph, where dstNode depends on srcNode. func (g *defaultResourceGraph) AddEdge(srcNode ResourceUID, dstNode ResourceUID) { g.outEdges[srcNode] = append(g.outEdges[srcNode], dstNode) diff --git a/pkg/model/core/stack.go b/pkg/model/core/stack.go index 9329bc121..ef808936f 100644 --- a/pkg/model/core/stack.go +++ b/pkg/model/core/stack.go @@ -1,8 +1,11 @@ package core import ( - "github.com/pkg/errors" "reflect" + "sync" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph" ) @@ -14,6 +17,19 @@ type Stack interface { // Add a resource into stack. AddResource(res Resource) error + // Remove a resource + RemoveResource(id graph.ResourceUID) + + AddService(service *corev1.Service) + + RemoveService(service *corev1.Service) + + ListServices() []corev1.Service + + Lock() + + Unlock() + // Add a dependency relationship between resources. AddDependency(dependee Resource, depender Resource) error @@ -30,8 +46,10 @@ func NewDefaultStack(stackID StackID) *defaultStack { return &defaultStack{ stackID: stackID, + services: make(map[string]corev1.Service), resources: make(map[graph.ResourceUID]Resource), resourceGraph: graph.NewDefaultResourceGraph(), + lock: sync.Mutex{}, } } @@ -41,8 +59,11 @@ var _ Stack = &defaultStack{} type defaultStack struct { stackID StackID + services map[string]corev1.Service resources map[graph.ResourceUID]Resource resourceGraph graph.ResourceGraph + + lock sync.Mutex } func (s *defaultStack) StackID() StackID { @@ -60,6 +81,26 @@ func (s *defaultStack) AddResource(res Resource) error { return nil } +func (s *defaultStack) RemoveResource(id graph.ResourceUID) { + delete(s.resources, id) + s.resourceGraph.RemoveNode(id) +} + +func (s *defaultStack) AddService(service *corev1.Service) { + s.services[string(service.UID)] = *service +} + +func (s *defaultStack) RemoveService(service *corev1.Service) { + delete(s.services, string(service.UID)) +} + +func (s *defaultStack) ListServices() (result []corev1.Service) { + for _, service := range s.services { + result = append(result, service) + } + return +} + // Add a dependency relationship between resources. func (s *defaultStack) AddDependency(dependee Resource, depender Resource) error { dependeeResUID := s.computeResourceUID(dependee) @@ -101,6 +142,9 @@ func (s *defaultStack) ListResources(pResourceSlice interface{}) error { func (s *defaultStack) TopologicalTraversal(visitor ResourceVisitor) error { return graph.TopologicalTraversal(s.resourceGraph, func(uid graph.ResourceUID) error { + if _, ok := s.resources[uid]; !ok { + return nil + } return visitor.Visit(s.resources[uid]) }) } @@ -112,3 +156,11 @@ func (s *defaultStack) computeResourceUID(res Resource) graph.ResourceUID { ResID: res.ID(), } } + +func (s *defaultStack) Lock() { + s.lock.Lock() +} + +func (s *defaultStack) Unlock() { + s.lock.Unlock() +} diff --git a/pkg/model/ec2/security_group.go b/pkg/model/ec2/security_group.go index 3d40369f2..8b575f3e2 100644 --- a/pkg/model/ec2/security_group.go +++ b/pkg/model/ec2/security_group.go @@ -2,8 +2,11 @@ package ec2 import ( "context" + "reflect" + "github.com/pkg/errors" "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core" + "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph" ) var _ core.Resource = &SecurityGroup{} @@ -26,6 +29,18 @@ func NewSecurityGroup(stack core.Stack, id string, spec SecurityGroupSpec) *Secu Spec: spec, Status: nil, } + // since security can be overwritten, remove and re-add it to the stack to make sure latest spec is updated + var sgRes []*SecurityGroup + stack.ListResources(&sgRes) + for _, res := range sgRes { + if res.ID() == id { + sg.Status = res.Status + } + } + stack.RemoveResource(graph.ResourceUID{ + ResType: reflect.TypeOf(&SecurityGroup{}), + ResID: id, + }) stack.AddResource(sg) return sg } diff --git a/pkg/service/model_build_listener.go b/pkg/service/model_build_listener.go index d320fe5ea..1258e535c 100644 --- a/pkg/service/model_build_listener.go +++ b/pkg/service/model_build_listener.go @@ -19,12 +19,18 @@ func (t *defaultModelBuildTask) buildListeners(ctx context.Context, scheme elbv2 return err } - for _, port := range t.service.Spec.Ports { - _, err := t.buildListener(ctx, port, *cfg, scheme) - if err != nil { - return err + originalSvc := t.service + for _, service := range t.stack.ListServices() { + t.service = &service + for _, port := range service.Spec.Ports { + _, err := t.buildListener(ctx, port, *cfg, scheme) + if err != nil { + return err + } } } + t.service = originalSvc + return nil } @@ -35,6 +41,9 @@ func (t *defaultModelBuildTask) buildListener(ctx context.Context, port corev1.S return nil, err } listenerResID := fmt.Sprintf("%v", port.Port) + if t.service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + listenerResID = fmt.Sprintf("%v", port.NodePort) + } ls := elbv2model.NewListener(t.stack, listenerResID, lsSpec) return ls, nil } @@ -73,9 +82,13 @@ func (t *defaultModelBuildTask) buildListenerSpec(ctx context.Context, port core } defaultActions := t.buildListenerDefaultActions(ctx, targetGroup) + portNumber := int64(port.Port) + if t.service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + portNumber = int64(port.NodePort) + } return elbv2model.ListenerSpec{ LoadBalancerARN: t.loadBalancer.LoadBalancerARN(), - Port: int64(port.Port), + Port: portNumber, Protocol: listenerProtocol, Certificates: certificates, SSLPolicy: sslPolicy, diff --git a/pkg/service/model_build_load_balancer.go b/pkg/service/model_build_load_balancer.go index 3d8535c49..30eb11243 100644 --- a/pkg/service/model_build_load_balancer.go +++ b/pkg/service/model_build_load_balancer.go @@ -43,6 +43,13 @@ func (t *defaultModelBuildTask) buildLoadBalancer(ctx context.Context, scheme el if err != nil { return err } + var resLBs []*elbv2model.LoadBalancer + t.stack.ListResources(&resLBs) + if len(resLBs) > 0 { + t.loadBalancer = resLBs[0] + t.loadBalancer.Spec = spec + return nil + } t.loadBalancer = elbv2model.NewLoadBalancer(t.stack, resourceIDLoadBalancer, spec) return nil } @@ -109,7 +116,14 @@ func (t *defaultModelBuildTask) buildLoadBalancerSecurityGroups(ctx context.Cont if !t.enableBackendSG { t.backendSGIDToken = managedSG.GroupID() } else { - backendSGID, err := t.backendSGProvider.Get(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(t.service)}) + nsName := k8s.NamespacedName(t.service) + if t.service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + nsName = types.NamespacedName{ + Namespace: "stack", + Name: t.service.Annotations[LoadBalancerStackKey], + } + } + backendSGID, err := t.backendSGProvider.Get(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}) if err != nil { return nil, err } @@ -473,6 +487,10 @@ func (t *defaultModelBuildTask) getAnnotationSpecificLbAttributes() (map[string] var invalidLoadBalancerNamePattern = regexp.MustCompile("[[:^alnum:]]") func (t *defaultModelBuildTask) buildLoadBalancerName(_ context.Context, scheme elbv2model.LoadBalancerScheme) (string, error) { + stackName := t.serviceUtils.GetServiceStackName(t.service) + if stackName != "" { + return stackName, nil + } var name string if exists := t.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixLoadBalancerName, &name, t.service.Annotations); exists { // The name of the loadbalancer can only have up to 32 characters diff --git a/pkg/service/model_build_managed_sg.go b/pkg/service/model_build_managed_sg.go index 60381f6cf..44e39dbcd 100644 --- a/pkg/service/model_build_managed_sg.go +++ b/pkg/service/model_build_managed_sg.go @@ -25,6 +25,7 @@ func (t *defaultModelBuildTask) buildManagedSecurityGroup(ctx context.Context, i if err != nil { return nil, err } + sg := ec2model.NewSecurityGroup(t.stack, resourceIDManagedSecurityGroup, sgSpec) return sg, nil } @@ -50,6 +51,10 @@ func (t *defaultModelBuildTask) buildManagedSecurityGroupSpec(ctx context.Contex var invalidSecurityGroupNamePtn, _ = regexp.Compile("[[:^alnum:]]") func (t *defaultModelBuildTask) buildManagedSecurityGroupName(_ context.Context) string { + stackName := t.serviceUtils.GetServiceStackName(t.service) + if stackName != "" { + return stackName + } uuidHash := sha256.New() _, _ = uuidHash.Write([]byte(t.clusterName)) _, _ = uuidHash.Write([]byte(t.service.Name)) @@ -68,31 +73,36 @@ func (t *defaultModelBuildTask) buildManagedSecurityGroupIngressPermissions(ctx if err != nil { return nil, err } - for _, port := range t.service.Spec.Ports { - listenPort := int64(port.Port) - for _, cidr := range cidrs { - if !strings.Contains(cidr, ":") { - permissions = append(permissions, ec2model.IPPermission{ - IPProtocol: strings.ToLower(string(port.Protocol)), - FromPort: awssdk.Int64(listenPort), - ToPort: awssdk.Int64(listenPort), - IPRanges: []ec2model.IPRange{ - { - CIDRIP: cidr, + for _, service := range t.stack.ListServices() { + for _, port := range service.Spec.Ports { + listenPort := int64(port.Port) + if service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + listenPort = int64(port.NodePort) + } + for _, cidr := range cidrs { + if !strings.Contains(cidr, ":") { + permissions = append(permissions, ec2model.IPPermission{ + IPProtocol: strings.ToLower(string(port.Protocol)), + FromPort: awssdk.Int64(listenPort), + ToPort: awssdk.Int64(listenPort), + IPRanges: []ec2model.IPRange{ + { + CIDRIP: cidr, + }, }, - }, - }) - } else { - permissions = append(permissions, ec2model.IPPermission{ - IPProtocol: strings.ToLower(string(port.Protocol)), - FromPort: awssdk.Int64(listenPort), - ToPort: awssdk.Int64(listenPort), - IPv6Range: []ec2model.IPv6Range{ - { - CIDRIPv6: cidr, + }) + } else { + permissions = append(permissions, ec2model.IPPermission{ + IPProtocol: strings.ToLower(string(port.Protocol)), + FromPort: awssdk.Int64(listenPort), + ToPort: awssdk.Int64(listenPort), + IPv6Range: []ec2model.IPv6Range{ + { + CIDRIPv6: cidr, + }, }, - }, - }) + }) + } } } } diff --git a/pkg/service/model_builder.go b/pkg/service/model_builder.go index 94ea62ee4..49212ba64 100644 --- a/pkg/service/model_builder.go +++ b/pkg/service/model_builder.go @@ -2,12 +2,16 @@ package service import ( "context" + "fmt" + "reflect" "strconv" "sync" "github.com/aws/aws-sdk-go/service/ec2" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" "sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services" @@ -16,8 +20,10 @@ import ( "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/tracking" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core" + "sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph" elbv2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/elbv2" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -26,6 +32,9 @@ const ( LoadBalancerTargetTypeIP = "ip" LoadBalancerTargetTypeInstance = "instance" lbAttrsDeletionProtection = "deletion_protection.enabled" + + LoadBalancerAllocatingPortKey = "service.beta.kubernetes.io/aws-load-balancer-allocating-port" + LoadBalancerStackKey = "service.beta.kubernetes.io/aws-load-balancer-stack-name" ) // ModelBuilder builds the model stack for the service resource. @@ -40,7 +49,7 @@ func NewDefaultModelBuilder(annotationParser annotations.Parser, subnetsResolver elbv2TaggingManager elbv2deploy.TaggingManager, ec2Client services.EC2, featureGates config.FeatureGates, clusterName string, defaultTags map[string]string, externalManagedTags []string, defaultSSLPolicy string, defaultTargetType string, enableIPTargetType bool, serviceUtils ServiceUtils, backendSGProvider networking.BackendSGProvider, sgResolver networking.SecurityGroupResolver, enableBackendSG bool, - disableRestrictedSGRules bool) *defaultModelBuilder { + disableRestrictedSGRules bool, k8sClient client.Client) *defaultModelBuilder { return &defaultModelBuilder{ annotationParser: annotationParser, subnetsResolver: subnetsResolver, @@ -61,6 +70,8 @@ func NewDefaultModelBuilder(annotationParser annotations.Parser, subnetsResolver ec2Client: ec2Client, enableBackendSG: enableBackendSG, disableRestrictedSGRules: disableRestrictedSGRules, + stackGlobalCache: map[core.StackID]core.Stack{}, + client: k8sClient, } } @@ -80,6 +91,8 @@ type defaultModelBuilder struct { enableBackendSG bool disableRestrictedSGRules bool + stackGlobalCache map[core.StackID]core.Stack + clusterName string vpcID string defaultTags map[string]string @@ -87,10 +100,64 @@ type defaultModelBuilder struct { defaultSSLPolicy string defaultTargetType elbv2model.TargetType enableIPTargetType bool + + client client.Client + + initialized bool + lock sync.RWMutex } func (b *defaultModelBuilder) Build(ctx context.Context, service *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) { - stack := core.NewDefaultStack(core.StackID(k8s.NamespacedName(service))) + // Initialize the global cache if not initialized + if !b.initialized { + // if not initialized, we need to build the global cache based on existing services + var serviceList corev1.ServiceList + if err := b.client.List(ctx, &serviceList); err != nil { + return nil, nil, false, err + } + for _, svc := range serviceList.Items { + if svc.Annotations[LoadBalancerStackKey] != "" && svc.DeletionTimestamp.IsZero() { + stackID := core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: svc.Annotations[LoadBalancerStackKey], + }) + if b.stackGlobalCache[stackID] == nil { + b.lock.Lock() + b.stackGlobalCache[stackID] = core.NewDefaultStack(stackID) + b.lock.Unlock() + } + b.lock.Lock() + b.stackGlobalCache[stackID].AddService(&svc) + b.lock.Unlock() + } + } + b.initialized = true + } + + // For each stack ID, if we found the stack annotation, this means the service will be sharing the same stack with other services + // If so, we should reuse the same stack in the cache so that we can reuse the load balancer with shared listeners + stackID := core.StackID(k8s.NamespacedName(service)) + var stack core.Stack + stack = core.NewDefaultStack(stackID) + if service.Annotations[LoadBalancerAllocatingPortKey] == "true" { + // service will be allocated to a stack with shared loadbalancer + if service.Annotations[LoadBalancerStackKey] == "" { + return nil, nil, false, errors.Errorf("service %v/%v is waiting to allocated for a stack", service.Namespace, service.Name) + } + stackID = core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: service.Annotations[LoadBalancerStackKey], + }) + if b.stackGlobalCache[stackID] == nil { + s := core.NewDefaultStack(stackID) + b.lock.Lock() + b.stackGlobalCache[stackID] = s + b.lock.Unlock() + } + b.lock.RLock() + stack = b.stackGlobalCache[stackID] + b.lock.RUnlock() + } task := &defaultModelBuildTask{ clusterName: b.clusterName, vpcID: b.vpcID, @@ -221,13 +288,63 @@ func (t *defaultModelBuildTask) run(ctx context.Context) error { return errors.Errorf("deletion_protection is enabled, cannot delete the service: %v", t.service.Name) } } + + t.cleanupStackWithRemovingService() return nil } + t.stack.AddService(t.service) err := t.buildModel(ctx) return err } +// When service is deleted, update resources in the stack to make sure things are cleaned up properly. +func (t *defaultModelBuildTask) cleanupStackWithRemovingService() { + for _, port := range t.service.Spec.Ports { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: fmt.Sprintf("%v", port.NodePort), + ResType: reflect.TypeOf(&elbv2model.Listener{}), + }) + svcPort := intstr.FromInt(int(port.Port)) + tgResourceID := t.buildTargetGroupResourceID(k8s.NamespacedName(t.service), svcPort) + var targetGroups []*elbv2model.TargetGroup + var targetGroupBindingResources []*elbv2model.TargetGroupBindingResource + t.stack.ListResources(&targetGroups) + t.stack.ListResources(&targetGroupBindingResources) + for _, tg := range targetGroups { + if tg.ID() == tgResourceID { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: tg.ID(), + ResType: reflect.TypeOf(tg), + }) + } + } + for _, tgBinding := range targetGroupBindingResources { + if tgBinding.ID() == tgResourceID { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: tgBinding.ID(), + ResType: reflect.TypeOf(tgBinding), + }) + } + } + } + // Delete the load balancer if there is no listener left. + var resLSs []*elbv2model.Listener + t.stack.ListResources(&resLSs) + if len(resLSs) == 0 { + t.stack.RemoveResource(graph.ResourceUID{ + ResID: "LoadBalancer", + ResType: reflect.TypeOf(&elbv2model.LoadBalancer{}), + }) + t.loadBalancer = nil + } + t.stack.RemoveService(t.service) +} + func (t *defaultModelBuildTask) buildModel(ctx context.Context) error { + // always lock the stack building models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops, + // make sure only one goroutine is building the model at a time to guarantee thread safety. + t.stack.Lock() + defer t.stack.Unlock() scheme, err := t.buildLoadBalancerScheme(ctx) if err != nil { return err @@ -262,3 +379,14 @@ func (t *defaultModelBuildTask) getDeletionProtectionViaAnnotation(svc corev1.Se } return false, nil } + +func (t *defaultModelBuildTask) stackID() core.StackID { + stackID := core.StackID(k8s.NamespacedName(t.service)) + if t.service.Annotations[LoadBalancerStackKey] != "" { + stackID = core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: t.service.Annotations[LoadBalancerStackKey], + }) + } + return stackID +} diff --git a/pkg/service/service_utils.go b/pkg/service/service_utils.go index f367e2579..7cfa86097 100644 --- a/pkg/service/service_utils.go +++ b/pkg/service/service_utils.go @@ -1,6 +1,8 @@ package service import ( + "fmt" + corev1 "k8s.io/api/core/v1" "sigs.k8s.io/aws-load-balancer-controller/pkg/annotations" "sigs.k8s.io/aws-load-balancer-controller/pkg/config" @@ -14,6 +16,8 @@ type ServiceUtils interface { // IsServicePendingFinalization returns true if the service contains the aws-load-balancer-controller finalizer IsServicePendingFinalization(service *corev1.Service) bool + + GetServiceStackName(service *corev1.Service) string } func NewServiceUtils(annotationsParser annotations.Parser, serviceFinalizer string, loadBalancerClass string, @@ -61,6 +65,13 @@ func (u *defaultServiceUtils) IsServiceSupported(service *corev1.Service) bool { return u.checkAWSLoadBalancerTypeAnnotation(service) } +func (u *defaultServiceUtils) GetServiceStackName(service *corev1.Service) string { + if service.Annotations[LoadBalancerStackKey] != "" { + return fmt.Sprintf("k8s-%.8s", service.Annotations[LoadBalancerStackKey]) + } + return "" +} + func (u *defaultServiceUtils) checkAWSLoadBalancerTypeAnnotation(service *corev1.Service) bool { lbType := "" _ = u.annotationParser.ParseStringAnnotation(annotations.SvcLBSuffixLoadBalancerType, &lbType, service.Annotations) From 6c74e5f56e9a5b079960ebb85be96d546e83056a Mon Sep 17 00:00:00 2001 From: Daishan Peng Date: Thu, 2 Nov 2023 12:51:12 -0700 Subject: [PATCH 2/2] Address comments Signed-off-by: Daishan Peng --- controllers/service/service_controller.go | 4 ++- main.go | 22 +++++++----- pkg/service/model_build_load_balancer_test.go | 1 + pkg/service/model_builder.go | 34 +++++++++---------- pkg/service/model_builder_test.go | 2 +- 5 files changed, 34 insertions(+), 29 deletions(-) diff --git a/controllers/service/service_controller.go b/controllers/service/service_controller.go index 2a3b13525..fef585330 100644 --- a/controllers/service/service_controller.go +++ b/controllers/service/service_controller.go @@ -111,6 +111,7 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err return client.IgnoreNotFound(err) } if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" { + // AllocateService has to be locked to guarantee thread-safe since it read/writes map concurrently r.lock.Lock() if err := r.allocatedService(ctx, svc); err != nil { r.lock.Unlock() @@ -129,7 +130,8 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired) } -// allocatedService allocates a stack to a service so that it can share load balancer resources with other services. +// AllocateService makes sure that each service is allocated to a virtual stack, and a stack will not have more than 50 service/listener(the limit of listener on NLB). +// It maintains an in-memory cache to be able to track the usage. If no stack is available, it will create a new stack. func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error { if !r.initialized { var serviceList corev1.ServiceList diff --git a/main.go b/main.go index 810a44d44..f690a069e 100644 --- a/main.go +++ b/main.go @@ -33,11 +33,15 @@ import ( "sigs.k8s.io/aws-load-balancer-controller/pkg/aws" "sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle" "sigs.k8s.io/aws-load-balancer-controller/pkg/config" + "sigs.k8s.io/aws-load-balancer-controller/pkg/inject" "sigs.k8s.io/aws-load-balancer-controller/pkg/k8s" "sigs.k8s.io/aws-load-balancer-controller/pkg/networking" "sigs.k8s.io/aws-load-balancer-controller/pkg/runtime" "sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding" "sigs.k8s.io/aws-load-balancer-controller/pkg/version" + corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core" + elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2" + networkingwebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/networking" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -87,7 +91,7 @@ func main() { setupLog.Error(err, "unable to start manager") os.Exit(1) } - //config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr) + config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr) clientSet, err := kubernetes.NewForConfig(mgr.GetConfig()) if err != nil { setupLog.Error(err, "unable to obtain clientSet") @@ -145,14 +149,14 @@ func main() { os.Exit(1) } - //podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig, - // mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector")) - //corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr) - //corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr) - //elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr) - //elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) - //elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) - //networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr) + podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig, + mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector")) + corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr) + corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr) + elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr) + elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) + elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr) + networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr) //+kubebuilder:scaffold:builder go func() { diff --git a/pkg/service/model_build_load_balancer_test.go b/pkg/service/model_build_load_balancer_test.go index 46de842a6..018a744c7 100644 --- a/pkg/service/model_build_load_balancer_test.go +++ b/pkg/service/model_build_load_balancer_test.go @@ -1431,6 +1431,7 @@ func Test_defaultModelBuildTask_buildLoadBalancerName(t *testing.T) { service: tt.service, clusterName: tt.clusterName, annotationParser: annotations.NewSuffixAnnotationParser("service.beta.kubernetes.io"), + serviceUtils: &defaultServiceUtils{}, } got, err := task.buildLoadBalancerName(context.Background(), tt.scheme) if err != nil { diff --git a/pkg/service/model_builder.go b/pkg/service/model_builder.go index 49212ba64..3648aa252 100644 --- a/pkg/service/model_builder.go +++ b/pkg/service/model_builder.go @@ -112,23 +112,23 @@ func (b *defaultModelBuilder) Build(ctx context.Context, service *corev1.Service if !b.initialized { // if not initialized, we need to build the global cache based on existing services var serviceList corev1.ServiceList - if err := b.client.List(ctx, &serviceList); err != nil { - return nil, nil, false, err - } - for _, svc := range serviceList.Items { - if svc.Annotations[LoadBalancerStackKey] != "" && svc.DeletionTimestamp.IsZero() { - stackID := core.StackID(types.NamespacedName{ - Namespace: "stack", - Name: svc.Annotations[LoadBalancerStackKey], - }) - if b.stackGlobalCache[stackID] == nil { + if b.client != nil { + if err := b.client.List(ctx, &serviceList); err != nil { + return nil, nil, false, err + } + for _, svc := range serviceList.Items { + if svc.Annotations[LoadBalancerStackKey] != "" && svc.DeletionTimestamp.IsZero() { + stackID := core.StackID(types.NamespacedName{ + Namespace: "stack", + Name: svc.Annotations[LoadBalancerStackKey], + }) b.lock.Lock() - b.stackGlobalCache[stackID] = core.NewDefaultStack(stackID) + if b.stackGlobalCache[stackID] == nil { + b.stackGlobalCache[stackID] = core.NewDefaultStack(stackID) + } + b.stackGlobalCache[stackID].AddService(&svc) b.lock.Unlock() } - b.lock.Lock() - b.stackGlobalCache[stackID].AddService(&svc) - b.lock.Unlock() } } b.initialized = true @@ -148,15 +148,13 @@ func (b *defaultModelBuilder) Build(ctx context.Context, service *corev1.Service Namespace: "stack", Name: service.Annotations[LoadBalancerStackKey], }) + b.lock.Lock() if b.stackGlobalCache[stackID] == nil { s := core.NewDefaultStack(stackID) - b.lock.Lock() b.stackGlobalCache[stackID] = s - b.lock.Unlock() } - b.lock.RLock() stack = b.stackGlobalCache[stackID] - b.lock.RUnlock() + b.lock.Unlock() } task := &defaultModelBuildTask{ clusterName: b.clusterName, diff --git a/pkg/service/model_builder_test.go b/pkg/service/model_builder_test.go index 3bbe57fcf..ded7809d3 100644 --- a/pkg/service/model_builder_test.go +++ b/pkg/service/model_builder_test.go @@ -6417,7 +6417,7 @@ func Test_defaultModelBuilderTask_Build(t *testing.T) { } builder := NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, "vpc-xxx", trackingProvider, elbv2TaggingManager, ec2Client, featureGates, "my-cluster", nil, nil, "ELBSecurityPolicy-2016-08", defaultTargetType, enableIPTargetType, serviceUtils, - backendSGProvider, sgResolver, tt.enableBackendSG, tt.disableRestrictedSGRules) + backendSGProvider, sgResolver, tt.enableBackendSG, tt.disableRestrictedSGRules, nil) ctx := context.Background() stack, _, _, err := builder.Build(ctx, tt.svc) if tt.wantError {