Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Support allocating shared LB for services #1

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 129 additions & 10 deletions controllers/service/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/aws-load-balancer-controller/controllers/service/eventhandlers"
Expand All @@ -33,6 +36,8 @@ const (
serviceTagPrefix = "service.k8s.aws"
serviceAnnotationPrefix = "service.beta.kubernetes.io"
controllerName = "service"

nlbListenerLimit = 50
)

func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorder record.EventRecorder,
Expand All @@ -48,7 +53,7 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider,
elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags,
controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils,
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules)
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, k8sClient)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger)
return &serviceReconciler{
Expand All @@ -65,6 +70,9 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
stackDeployer: stackDeployer,
logger: logger,

allocatedServices: map[string]map[int32]struct{}{},
lock: sync.Mutex{},

maxConcurrentReconciles: controllerConfig.ServiceMaxConcurrentReconciles,
}
}
Expand All @@ -83,6 +91,9 @@ type serviceReconciler struct {
stackDeployer deploy.StackDeployer
logger logr.Logger

allocatedServices map[string]map[int32]struct{}
initialized bool
lock sync.Mutex
maxConcurrentReconciles int
}

Expand All @@ -99,16 +110,99 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err
if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil {
return client.IgnoreNotFound(err)
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
r.lock.Lock()
if err := r.allocatedService(ctx, svc); err != nil {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
r.lock.Unlock()
return err
}
r.lock.Unlock()
}

stack, lb, backendSGRequired, err := r.buildModel(ctx, svc)
if err != nil {
return err
}
if lb == nil {
return r.cleanupLoadBalancerResources(ctx, svc, stack)
if lb == nil || !svc.DeletionTimestamp.IsZero() {
return r.cleanupLoadBalancerResources(ctx, svc, stack, lb == nil)
}
return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired)
}

// allocatedService allocates a stack to a service so that it can share load balancer resources with other services.
func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
if !r.initialized {
var serviceList corev1.ServiceList
if err := r.k8sClient.List(ctx, &serviceList); err != nil {
return err
}
for _, svc := range serviceList.Items {
if svc.Annotations[service.LoadBalancerStackKey] != "" {
if r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] == nil {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] = map[int32]struct{}{}
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]][port.NodePort] = struct{}{}
}
}
}
r.initialized = true
}

if !svc.DeletionTimestamp.IsZero() {
if _, ok := r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]; !ok {
thedadams marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

for _, port := range svc.Spec.Ports {
delete(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]], port.NodePort)
}

if len(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]) == 0 {
delete(r.allocatedServices, svc.Annotations[service.LoadBalancerStackKey])
}

return nil
}

// If service is not type loadbalancer, or it is not intended to share LB, or it has been allocated, skip the controller
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Annotations[service.LoadBalancerAllocatingPortKey] != "true" || svc.Annotations[service.LoadBalancerStackKey] != "" {
return nil
}

allocated := false
for stackName := range r.allocatedServices {
usedPort := r.allocatedServices[stackName]
if len(usedPort) <= nlbListenerLimit-len(svc.Spec.Ports) {
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
for _, port := range svc.Spec.Ports {
usedPort[port.NodePort] = struct{}{}
}
r.allocatedServices[stackName] = usedPort
allocated = true
break
}
}

if !allocated {
stackName := uuid.New().String()
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
if r.allocatedServices[stackName] == nil {
r.allocatedServices[stackName] = map[int32]struct{}{}
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[stackName][port.NodePort] = struct{}{}
}
}
return nil
}

func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) {
stack, lb, backendSGRequired, err := r.modelBuilder.Build(ctx, svc)
if err != nil {
Expand Down Expand Up @@ -140,10 +234,15 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
return err
}
// always lock the stack deploying models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops,
// make sure only one goroutine is building the model at a time to guarantee thread safety.
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
stack.Unlock()
lbDNS, err := lb.DNSName().Resolve(ctx)
if err != nil {
return err
Expand All @@ -163,15 +262,28 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
return nil
}

func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error {
func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, cleanlb bool) error {
if k8s.HasFinalizer(svc, serviceFinalizer) {
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(svc)}); err != nil {
return err
stack.Unlock()
if cleanlb {
nsName := k8s.NamespacedName(svc)
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
nsName = types.NamespacedName{
Namespace: "stack",
Name: svc.Annotations[service.LoadBalancerStackKey],
}
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}

if err = r.cleanupServiceStatus(ctx, svc); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
Expand All @@ -189,11 +301,17 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin
svc.Status.LoadBalancer.Ingress[0].IP != "" ||
svc.Status.LoadBalancer.Ingress[0].Hostname != lbDNS {
svcOld := svc.DeepCopy()
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{
{
Hostname: lbDNS,
},
ingress := corev1.LoadBalancerIngress{
Hostname: lbDNS,
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When service is in "shared" mode, it will use the nodePort as the listener. This avoids the need to allocate a dedicated port for service, as nodePort is already unique across cluster that is allocated by k8s server.

for _, port := range svc.Spec.Ports {
ingress.Ports = append(ingress.Ports, corev1.PortStatus{
Port: port.NodePort,
thedadams marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ingress}
if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil {
return errors.Wrapf(err, "failed to update service status: %v", k8s.NamespacedName(svc))
}
Expand Down Expand Up @@ -221,6 +339,7 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
if err := r.setupWatches(ctx, c); err != nil {
return err
}

return nil
}

Expand Down
22 changes: 9 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/inject"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
"sigs.k8s.io/aws-load-balancer-controller/pkg/version"
corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core"
elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2"
networkingwebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/networking"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -91,7 +87,7 @@ func main() {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr)
//config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr)
clientSet, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "unable to obtain clientSet")
Expand Down Expand Up @@ -149,14 +145,14 @@ func main() {
os.Exit(1)
}

podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig,
mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector"))
corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr)
corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr)
elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr)
elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr)
//podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to uncomment this, used for local development.

// mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector"))
//corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr)
//corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr)
//elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr)
//elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
//elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
//networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr)
//+kubebuilder:scaffold:builder

go func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/deploy/ec2/security_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ec2

import (
"context"
"time"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -12,7 +14,6 @@ import (
ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"time"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/ec2/security_group_synthesizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ec2

import (
"context"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down
20 changes: 20 additions & 0 deletions pkg/model/core/graph/resource_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ResourceGraph interface {
// Add a node into ResourceGraph.
AddNode(node ResourceUID)

RemoveNode(node ResourceUID)

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
AddEdge(srcNode ResourceUID, dstNode ResourceUID)

Expand Down Expand Up @@ -44,6 +46,24 @@ func (g *defaultResourceGraph) AddNode(node ResourceUID) {
g.nodes = append(g.nodes, node)
}

func (g *defaultResourceGraph) RemoveNode(node ResourceUID) {
for i, n := range g.nodes {
if n == node {
g.nodes = append(g.nodes[:i], g.nodes[i+1:]...)
break
}
}
delete(g.outEdges, node)
for _, nodes := range g.outEdges {
for i, n := range nodes {
if n == node {
nodes = append(nodes[:i], nodes[i+1:]...)
break
}
}
}
}

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
func (g *defaultResourceGraph) AddEdge(srcNode ResourceUID, dstNode ResourceUID) {
g.outEdges[srcNode] = append(g.outEdges[srcNode], dstNode)
Expand Down
Loading
Loading