Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Support allocating shared LB for services
Browse files Browse the repository at this point in the history
Signed-off-by: Daishan Peng <[email protected]>
  • Loading branch information
StrongMonkey committed Nov 2, 2023
1 parent 5e07dce commit 01317fc
Show file tree
Hide file tree
Showing 11 changed files with 407 additions and 56 deletions.
132 changes: 122 additions & 10 deletions controllers/service/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package service
import (
"context"
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/google/uuid"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/aws-load-balancer-controller/controllers/service/eventhandlers"
Expand Down Expand Up @@ -48,7 +51,7 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
modelBuilder := service.NewDefaultModelBuilder(annotationParser, subnetsResolver, vpcInfoProvider, cloud.VpcID(), trackingProvider,
elbv2TaggingManager, cloud.EC2(), controllerConfig.FeatureGates, controllerConfig.ClusterName, controllerConfig.DefaultTags, controllerConfig.ExternalManagedTags,
controllerConfig.DefaultSSLPolicy, controllerConfig.DefaultTargetType, controllerConfig.FeatureGates.Enabled(config.EnableIPTargetType), serviceUtils,
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules)
backendSGProvider, sgResolver, controllerConfig.EnableBackendSecurityGroup, controllerConfig.DisableRestrictedSGRules, k8sClient)
stackMarshaller := deploy.NewDefaultStackMarshaller()
stackDeployer := deploy.NewDefaultStackDeployer(cloud, k8sClient, networkingSGManager, networkingSGReconciler, controllerConfig, serviceTagPrefix, logger)
return &serviceReconciler{
Expand All @@ -65,6 +68,9 @@ func NewServiceReconciler(cloud aws.Cloud, k8sClient client.Client, eventRecorde
stackDeployer: stackDeployer,
logger: logger,

allocatedServices: map[string]map[int32]struct{}{},
lock: sync.Mutex{},

maxConcurrentReconciles: controllerConfig.ServiceMaxConcurrentReconciles,
}
}
Expand All @@ -83,6 +89,9 @@ type serviceReconciler struct {
stackDeployer deploy.StackDeployer
logger logr.Logger

allocatedServices map[string]map[int32]struct{}
initialized bool
lock sync.Mutex
maxConcurrentReconciles int
}

Expand All @@ -99,16 +108,95 @@ func (r *serviceReconciler) reconcile(ctx context.Context, req ctrl.Request) err
if err := r.k8sClient.Get(ctx, req.NamespacedName, svc); err != nil {
return client.IgnoreNotFound(err)
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
r.lock.Lock()
if err := r.allocatedService(ctx, svc); err != nil {
r.lock.Unlock()
return err
}
r.lock.Unlock()
}

stack, lb, backendSGRequired, err := r.buildModel(ctx, svc)
if err != nil {
return err
}
if lb == nil {
return r.cleanupLoadBalancerResources(ctx, svc, stack)
if lb == nil || !svc.DeletionTimestamp.IsZero() {
return r.cleanupLoadBalancerResources(ctx, svc, stack, lb == nil)
}
return r.reconcileLoadBalancerResources(ctx, svc, stack, lb, backendSGRequired)
}

// allocatedService allocates a stack to a service so that it can share load balancer resources with other services.
func (r *serviceReconciler) allocatedService(ctx context.Context, svc *corev1.Service) error {
if !r.initialized {
var serviceList corev1.ServiceList
if err := r.k8sClient.List(ctx, &serviceList); err != nil {
return err
}
for _, svc := range serviceList.Items {
if svc.Annotations[service.LoadBalancerStackKey] != "" {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]] = map[int32]struct{}{}
for _, port := range svc.Spec.Ports {
r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]][port.NodePort] = struct{}{}
}
}
}
r.initialized = true
}

if !svc.DeletionTimestamp.IsZero() {
if _, ok := r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]; !ok {
return nil
}

for _, port := range svc.Spec.Ports {
delete(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]], port.NodePort)
}

if len(r.allocatedServices[svc.Annotations[service.LoadBalancerStackKey]]) == 0 {
delete(r.allocatedServices, svc.Annotations[service.LoadBalancerStackKey])
}

return nil
}

// If service is not type loadbalancer, or it is not intended to share LB, or it has been allocated, skip the controller
if svc.Spec.Type != corev1.ServiceTypeLoadBalancer || svc.Annotations[service.LoadBalancerAllocatingPortKey] != "true" || svc.Annotations[service.LoadBalancerStackKey] != "" {
return nil
}

allocated := false
for stackName := range r.allocatedServices {
usedPort := r.allocatedServices[stackName]
if len(usedPort) <= 50-len(svc.Spec.Ports) {
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
for _, port := range svc.Spec.Ports {
usedPort[port.NodePort] = struct{}{}
}
allocated = true
break
}
}

if !allocated {
stackName := uuid.New().String()
svc.Annotations[service.LoadBalancerStackKey] = stackName
if err := r.k8sClient.Update(ctx, svc); err != nil {
return err
}
for _, port := range svc.Spec.Ports {
r.allocatedServices[stackName] = map[int32]struct{}{
port.NodePort: {},
}
}
}
return nil
}

func (r *serviceReconciler) buildModel(ctx context.Context, svc *corev1.Service) (core.Stack, *elbv2model.LoadBalancer, bool, error) {
stack, lb, backendSGRequired, err := r.modelBuilder.Build(ctx, svc)
if err != nil {
Expand Down Expand Up @@ -140,10 +228,15 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedAddFinalizer, fmt.Sprintf("Failed add finalizer due to %v", err))
return err
}
// always lock the stack deploying models. Since stack can be accessed by multiple goroutines from multiple service reconciliation loops,
// make sure only one goroutine is building the model at a time to guarantee thread safety.
stack.Lock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
stack.Unlock()
return err
}
stack.Unlock()
lbDNS, err := lb.DNSName().Resolve(ctx)
if err != nil {
return err
Expand All @@ -163,15 +256,27 @@ func (r *serviceReconciler) reconcileLoadBalancerResources(ctx context.Context,
return nil
}

func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack) error {
func (r *serviceReconciler) cleanupLoadBalancerResources(ctx context.Context, svc *corev1.Service, stack core.Stack, cleanlb bool) error {
if k8s.HasFinalizer(svc, serviceFinalizer) {
stack.Lock()
defer stack.Unlock()
err := r.deployModel(ctx, svc, stack)
if err != nil {
return err
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{k8s.NamespacedName(svc)}); err != nil {
return err
if cleanlb {
nsName := k8s.NamespacedName(svc)
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
nsName = types.NamespacedName{
Namespace: "stack",
Name: svc.Annotations[service.LoadBalancerStackKey],
}
}
if err := r.backendSGProvider.Release(ctx, networking.ResourceTypeService, []types.NamespacedName{nsName}); err != nil && !apierrors.IsNotFound(err) {
return err
}
}

if err = r.cleanupServiceStatus(ctx, svc); err != nil {
r.eventRecorder.Event(svc, corev1.EventTypeWarning, k8s.ServiceEventReasonFailedCleanupStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
Expand All @@ -189,11 +294,17 @@ func (r *serviceReconciler) updateServiceStatus(ctx context.Context, lbDNS strin
svc.Status.LoadBalancer.Ingress[0].IP != "" ||
svc.Status.LoadBalancer.Ingress[0].Hostname != lbDNS {
svcOld := svc.DeepCopy()
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{
{
Hostname: lbDNS,
},
ingress := corev1.LoadBalancerIngress{
Hostname: lbDNS,
}
if svc.Annotations[service.LoadBalancerAllocatingPortKey] == "true" {
for _, port := range svc.Spec.Ports {
ingress.Ports = append(ingress.Ports, corev1.PortStatus{
Port: port.NodePort,
})
}
}
svc.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ingress}
if err := r.k8sClient.Status().Patch(ctx, svc, client.MergeFrom(svcOld)); err != nil {
return errors.Wrapf(err, "failed to update service status: %v", k8s.NamespacedName(svc))
}
Expand Down Expand Up @@ -221,6 +332,7 @@ func (r *serviceReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
if err := r.setupWatches(ctx, c); err != nil {
return err
}

return nil
}

Expand Down
22 changes: 9 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ import (
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/throttle"
"sigs.k8s.io/aws-load-balancer-controller/pkg/config"
"sigs.k8s.io/aws-load-balancer-controller/pkg/inject"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"sigs.k8s.io/aws-load-balancer-controller/pkg/targetgroupbinding"
"sigs.k8s.io/aws-load-balancer-controller/pkg/version"
corewebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/core"
elbv2webhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/elbv2"
networkingwebhook "sigs.k8s.io/aws-load-balancer-controller/webhooks/networking"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -91,7 +87,7 @@ func main() {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr)
//config.ConfigureWebhookServer(controllerCFG.RuntimeConfig, mgr)
clientSet, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
setupLog.Error(err, "unable to obtain clientSet")
Expand Down Expand Up @@ -149,14 +145,14 @@ func main() {
os.Exit(1)
}

podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig,
mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector"))
corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr)
corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr)
elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr)
elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr)
//podReadinessGateInjector := inject.NewPodReadinessGate(controllerCFG.PodWebhookConfig,
// mgr.GetClient(), ctrl.Log.WithName("pod-readiness-gate-injector"))
//corewebhook.NewPodMutator(podReadinessGateInjector).SetupWithManager(mgr)
//corewebhook.NewServiceMutator(controllerCFG.ServiceConfig.LoadBalancerClass, ctrl.Log).SetupWithManager(mgr)
//elbv2webhook.NewIngressClassParamsValidator().SetupWithManager(mgr)
//elbv2webhook.NewTargetGroupBindingMutator(cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
//elbv2webhook.NewTargetGroupBindingValidator(mgr.GetClient(), cloud.ELBV2(), ctrl.Log).SetupWithManager(mgr)
//networkingwebhook.NewIngressValidator(mgr.GetClient(), controllerCFG.IngressConfig, ctrl.Log).SetupWithManager(mgr)
//+kubebuilder:scaffold:builder

go func() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/deploy/ec2/security_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package ec2

import (
"context"
"time"

awssdk "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
ec2sdk "github.com/aws/aws-sdk-go/service/ec2"
Expand All @@ -12,7 +14,6 @@ import (
ec2model "sigs.k8s.io/aws-load-balancer-controller/pkg/model/ec2"
"sigs.k8s.io/aws-load-balancer-controller/pkg/networking"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
"time"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/deploy/ec2/security_group_synthesizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ec2

import (
"context"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down
20 changes: 20 additions & 0 deletions pkg/model/core/graph/resource_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ResourceGraph interface {
// Add a node into ResourceGraph.
AddNode(node ResourceUID)

RemoveNode(node ResourceUID)

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
AddEdge(srcNode ResourceUID, dstNode ResourceUID)

Expand Down Expand Up @@ -44,6 +46,24 @@ func (g *defaultResourceGraph) AddNode(node ResourceUID) {
g.nodes = append(g.nodes, node)
}

func (g *defaultResourceGraph) RemoveNode(node ResourceUID) {
for i, n := range g.nodes {
if n == node {
g.nodes = append(g.nodes[:i], g.nodes[i+1:]...)
break
}
}
delete(g.outEdges, node)
for _, nodes := range g.outEdges {
for i, n := range nodes {
if n == node {
nodes = append(nodes[:i], nodes[i+1:]...)
break
}
}
}
}

// Add a edge into ResourceGraph, where dstNode depends on srcNode.
func (g *defaultResourceGraph) AddEdge(srcNode ResourceUID, dstNode ResourceUID) {
g.outEdges[srcNode] = append(g.outEdges[srcNode], dstNode)
Expand Down
Loading

0 comments on commit 01317fc

Please sign in to comment.