Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Support allocating shared LB for services #1

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 131 additions & 10 deletions controllers/service/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/aws-load-balancer-controller/controllers/service/eventhandlers"
Expand All @@ -33,6 +36,8 @@ const (
serviceTagPrefix = "service.k8s.aws"
serviceAnnotationPrefix = "service.beta.kubernetes.io"
controllerName = "service"

nlbListenerLimit = 50
)

func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder,
Expand All @@ -48,7 +53,7 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider,
elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags,
controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils,
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules)
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, k8sClient)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger)
return &serviceReconciler{
Expand All @@ -65,6 +70,9 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
stackDeployer: stackDeployer,
logger: logger,

allocatedServices: map[string]map[int32]struct{}{},
lock: sync.Mutex{},

maxConcurrentReconciles: controllerConfig.ServiceMaxConcurrentReconciles,
}
}
Expand All @@ -83,6 +91,9 @@ type serviceReconciler struct {
stackDeployer deploy.StackDeployer
logger logr.Logger

allocatedServices map[string]map[int32]struct{}
initialized bool
lock sync.Mutex
maxConcurrentReconciles int
}

Expand All @@ -99,16 +110,101 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err
if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil {
return client.IgnoreNotFound(err)
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
// AllocateService has to be locked to guarantee thread-safe since it read/writes map concurrently
r.lock.Lock()
if err := r.allocatedService(ctx, svc); err != nil {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
r.lock.Unlock()
return err
}
r.lock.Unlock()
}

stack, lb, backendSGRequired, err := r.buildModel(ctx, svc)
if err != nil {
return err
}
if lb == nil {
return r.cleanupLoadBalancerResources(ctx, svc, stack)
if lb == nil || !svc.DeletionTimestamp.IsZero() {
return r.cleanupLoadBalancerResources(ctx, svc, stack, lb == nil)
}
return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired)
}

// AllocateService makes sure that each service is allocated to a virtual stack, and a stack will not have more than 50 service/listener(the limit of listener on NLB).
// It maintains an in-memory cache to be able to track the usage. If no stack is available, it will create a new stack.
func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
if !r.initialized {
var serviceList corev1.ServiceList
if err := r.k8sClient.List(ctx, &serviceList); err != nil {
return err
}
for _, svc := range serviceList.Items {
if svc.Annotations[service.LoadBalancerStackKey] != "" {
if r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] == nil {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] = map[int32]struct{}{}
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]][port.NodePort] = struct{}{}
}
}
}
r.initialized = true
}

if !svc.DeletionTimestamp.IsZero() {
if _, ok := r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]; !ok {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

for _, port := range svc.Spec.Ports {
delete(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]], port.NodePort)
}

if len(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]) == 0 {
delete(r.allocatedServices, svc.Annotations[service.LoadBalancerStackKey])
}

return nil
}

// If service is not type loadbalancer, or it is not intended to share LB, or it has been allocated, skip the controller
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Annotations[service.LoadBalancerAllocatingPortKey] != "true" || svc.Annotations[service.LoadBalancerStackKey] != "" {
return nil
}

allocated := false
for stackName := range r.allocatedServices {
usedPort := r.allocatedServices[stackName]
if len(usedPort) <= nlbListenerLimit-len(svc.Spec.Ports) {
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
for _, port := range svc.Spec.Ports {
usedPort[port.NodePort] = struct{}{}
}
r.allocatedServices[stackName] = usedPort
allocated = true
break
}
}

if !allocated {
stackName := uuid.New().String()
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
if r.allocatedServices[stackName] == nil {
r.allocatedServices[stackName] = map[int32]struct{}{}
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[stackName][port.NodePort] = struct{}{}
}
}
return nil
}

func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) {
stack, lb, backendSGRequired, err := r.modelBuilder.Build(ctx, svc)
if err != nil {
Expand Down Expand Up @@ -140,10 +236,15 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
return err
}
// always lock the stack deploying models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops,
// make sure only one goroutine is building the model at a time to guarantee thread safety.
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
stack.Unlock()
lbDNS, err := lb.DNSName().Resolve(ctx)
if err != nil {
return err
Expand All @@ -163,15 +264,28 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
return nil
}

func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error {
func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, cleanlb bool) error {
if k8s.HasFinalizer(svc, serviceFinalizer) {
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(svc)}); err != nil {
return err
stack.Unlock()
if cleanlb {
nsName := k8s.NamespacedName(svc)
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
nsName = types.NamespacedName{
Namespace: "stack",
Name: svc.Annotations[service.LoadBalancerStackKey],
}
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}

if err = r.cleanupServiceStatus(ctx, svc); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
Expand All @@ -189,11 +303,17 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin
svc.Status.LoadBalancer.Ingress[0].IP != "" ||
svc.Status.LoadBalancer.Ingress[0].Hostname != lbDNS {
svcOld := svc.DeepCopy()
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{
{
Hostname: lbDNS,
},
ingress := corev1.LoadBalancerIngress{
Hostname: lbDNS,
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When service is in "shared" mode, it will use the nodePort as the listener. This avoids the need to allocate a dedicated port for service, as nodePort is already unique across cluster that is allocated by k8s server.

for _, port := range svc.Spec.Ports {
ingress.Ports = append(ingress.Ports, corev1.PortStatus{
Port: port.NodePort,
thedadams marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ingress}
if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil {
return errors.Wrapf(err, "failed to update service status: %v", k8s.NamespacedName(svc))
}
Expand Down Expand Up @@ -221,6 +341,7 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
if err := r.setupWatches(ctx, c); err != nil {
return err
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/deploy/ec2/security_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ec2

import (
"context"
"time"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -12,7 +14,6 @@ import (
ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"time"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/ec2/security_group_synthesizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ec2

import (
"context"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down
20 changes: 20 additions & 0 deletions pkg/model/core/graph/resource_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ResourceGraph interface {
// Add a node into ResourceGraph.
AddNode(node ResourceUID)

RemoveNode(node ResourceUID)

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
AddEdge(srcNode ResourceUID, dstNode ResourceUID)

Expand Down Expand Up @@ -44,6 +46,24 @@ func (g *defaultResourceGraph) AddNode(node ResourceUID) {
g.nodes = append(g.nodes, node)
}

func (g *defaultResourceGraph) RemoveNode(node ResourceUID) {
for i, n := range g.nodes {
if n == node {
g.nodes = append(g.nodes[:i], g.nodes[i+1:]...)
break
}
}
delete(g.outEdges, node)
for _, nodes := range g.outEdges {
for i, n := range nodes {
if n == node {
nodes = append(nodes[:i], nodes[i+1:]...)
break
}
}
}
}

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
func (g *defaultResourceGraph) AddEdge(srcNode ResourceUID, dstNode ResourceUID) {
g.outEdges[srcNode] = append(g.outEdges[srcNode], dstNode)
Expand Down
54 changes: 53 additions & 1 deletion pkg/model/core/stack.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package core

import (
"github.com/pkg/errors"
"reflect"
"sync"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core/graph"
)

Expand All @@ -14,6 +17,19 @@ type Stack interface {
// Add a resource into stack.
AddResource(res Resource) error

// Remove a resource
RemoveResource(id graph.ResourceUID)

AddService(service *corev1.Service)

RemoveService(service *corev1.Service)

ListServices() []corev1.Service

Lock()

Unlock()

// Add a dependency relationship between resources.
AddDependency(dependee Resource, depender Resource) error

Expand All @@ -30,8 +46,10 @@ func NewDefaultStack(stackID StackID) *defaultStack {
return &defaultStack{
stackID: stackID,

services: make(map[string]corev1.Service),
resources: make(map[graph.ResourceUID]Resource),
resourceGraph: graph.NewDefaultResourceGraph(),
lock: sync.Mutex{},
}
}

Expand All @@ -41,8 +59,11 @@ var _ Stack = &defaultStack{}
type defaultStack struct {
stackID StackID

services map[string]corev1.Service
resources map[graph.ResourceUID]Resource
resourceGraph graph.ResourceGraph

lock sync.Mutex
}

func (s *defaultStack) StackID() StackID {
Expand All @@ -60,6 +81,26 @@ func (s *defaultStack) AddResource(res Resource) error {
return nil
}

func (s *defaultStack) RemoveResource(id graph.ResourceUID) {
delete(s.resources, id)
s.resourceGraph.RemoveNode(id)
}

func (s *defaultStack) AddService(service *corev1.Service) {
s.services[string(service.UID)] = *service
}

func (s *defaultStack) RemoveService(service *corev1.Service) {
delete(s.services, string(service.UID))
}

func (s *defaultStack) ListServices() (result []corev1.Service) {
for _, service := range s.services {
result = append(result, service)
}
return
thedadams marked this conversation as resolved.
Show resolved Hide resolved
}

// Add a dependency relationship between resources.
func (s *defaultStack) AddDependency(dependee Resource, depender Resource) error {
dependeeResUID := s.computeResourceUID(dependee)
Expand Down Expand Up @@ -101,6 +142,9 @@ func (s *defaultStack) ListResources(pResourceSlice interface{}) error {

func (s *defaultStack) TopologicalTraversal(visitor ResourceVisitor) error {
return graph.TopologicalTraversal(s.resourceGraph, func(uid graph.ResourceUID) error {
if _, ok := s.resources[uid]; !ok {
return nil
}
return visitor.Visit(s.resources[uid])
})
}
Expand All @@ -112,3 +156,11 @@ func (s *defaultStack) computeResourceUID(res Resource) graph.ResourceUID {
ResID: res.ID(),
}
}

func (s *defaultStack) Lock() {
s.lock.Lock()
}

func (s *defaultStack) Unlock() {
s.lock.Unlock()
}
Loading
Loading