Skip to content

Commit

Permalink
feat: gw ha - label lb
Browse files Browse the repository at this point in the history
  • Loading branch information
cheina97 committed Oct 7, 2024
1 parent 8d4fd1f commit afac6ed
Show file tree
Hide file tree
Showing 17 changed files with 217 additions and 34 deletions.
8 changes: 7 additions & 1 deletion cmd/fabric/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
sourcedetector "github.com/liqotech/liqo/pkg/fabric/source-detector"
"github.com/liqotech/liqo/pkg/firewall"
"github.com/liqotech/liqo/pkg/gateway"
"github.com/liqotech/liqo/pkg/gateway/concurrent"
"github.com/liqotech/liqo/pkg/liqo-controller-manager/networking/external-network/remapping"
"github.com/liqotech/liqo/pkg/route"
flagsutils "github.com/liqotech/liqo/pkg/utils/flags"
Expand Down Expand Up @@ -100,6 +101,11 @@ func run(cmd *cobra.Command, _ []string) error {
selection.Equals,
[]string{gateway.GatewayComponentGateway},
)
reqActiveGatewayPods, err := labels.NewRequirement(
concurrent.ActiveGatewayKey,
selection.Equals,
[]string{concurrent.ActiveGatewayValue},
)
utilruntime.Must(err)

// Create the manager.
Expand All @@ -114,7 +120,7 @@ func run(cmd *cobra.Command, _ []string) error {
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.ByObject = map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Label: labels.NewSelector().Add(*reqGatewayPods),
Label: labels.NewSelector().Add(*reqGatewayPods).Add(*reqActiveGatewayPods),
},
}
return cache.New(config, opts)
Expand Down
14 changes: 1 addition & 13 deletions cmd/gateway/geneve/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -33,7 +32,6 @@ import (
"github.com/liqotech/liqo/pkg/gateway"
"github.com/liqotech/liqo/pkg/gateway/fabric"
"github.com/liqotech/liqo/pkg/gateway/fabric/geneve"
"github.com/liqotech/liqo/pkg/gateway/forge"
flagsutils "github.com/liqotech/liqo/pkg/utils/flags"
"github.com/liqotech/liqo/pkg/utils/mapper"
"github.com/liqotech/liqo/pkg/utils/restcfg"
Expand Down Expand Up @@ -90,17 +88,7 @@ func run(cmd *cobra.Command, _ []string) error {
BindAddress: options.GwOptions.MetricsAddress,
},
HealthProbeBindAddress: options.GwOptions.ProbeAddr,
LeaderElection: options.GwOptions.LeaderElection,
LeaderElectionID: fmt.Sprintf(
"%s.%s.%s.genevegateway.liqo.io",
forge.GatewayResourceName(options.GwOptions.Name), options.GwOptions.Namespace, options.GwOptions.Mode,
),
LeaderElectionNamespace: options.GwOptions.Namespace,
LeaderElectionReleaseOnCancel: true,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: &options.GwOptions.LeaderElectionLeaseDuration,
RenewDeadline: &options.GwOptions.LeaderElectionRenewDeadline,
RetryPeriod: &options.GwOptions.LeaderElectionRetryPeriod,
LeaderElection: false,
})
if err != nil {
return fmt.Errorf("unable to create manager: %w", err)
Expand Down
9 changes: 9 additions & 0 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
networkingv1beta1 "github.com/liqotech/liqo/apis/networking/v1beta1"
"github.com/liqotech/liqo/pkg/firewall"
"github.com/liqotech/liqo/pkg/gateway"
"github.com/liqotech/liqo/pkg/gateway/concurrent"
"github.com/liqotech/liqo/pkg/gateway/connection"
"github.com/liqotech/liqo/pkg/gateway/connection/conncheck"
"github.com/liqotech/liqo/pkg/liqo-controller-manager/networking/external-network/remapping"
Expand Down Expand Up @@ -191,6 +192,14 @@ func run(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("unable to setup firewall configuration reconciler: %w", err)
}

if err := mgr.Add(&concurrent.ConcurrentRunnable{
PodName: connoptions.GwOptions.PodName,
DeploymentName: connoptions.GwOptions.Name,
Namespace: connoptions.GwOptions.Namespace,
}); err != nil {
return fmt.Errorf("unable to add concurrent runnable: %w", err)
}

// Start the manager.
return mgr.Start(cmd.Context())
}
14 changes: 1 addition & 13 deletions cmd/gateway/wireguard/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -38,7 +37,6 @@ import (
ipamv1alpha1 "github.com/liqotech/liqo/apis/ipam/v1alpha1"
networkingv1beta1 "github.com/liqotech/liqo/apis/networking/v1beta1"
"github.com/liqotech/liqo/pkg/gateway"
"github.com/liqotech/liqo/pkg/gateway/forge"
"github.com/liqotech/liqo/pkg/gateway/tunnel/wireguard"
flagsutils "github.com/liqotech/liqo/pkg/utils/flags"
"github.com/liqotech/liqo/pkg/utils/mapper"
Expand Down Expand Up @@ -112,17 +110,7 @@ func run(cmd *cobra.Command, _ []string) error {
BindAddress: options.GwOptions.MetricsAddress,
},
HealthProbeBindAddress: options.GwOptions.ProbeAddr,
LeaderElection: options.GwOptions.LeaderElection,
LeaderElectionID: fmt.Sprintf(
"%s.%s.%s.wgtunnel.liqo.io",
forge.GatewayResourceName(options.GwOptions.Name), options.GwOptions.Namespace, options.GwOptions.Mode,
),
LeaderElectionNamespace: options.GwOptions.Namespace,
LeaderElectionReleaseOnCancel: true,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaseDuration: &options.GwOptions.LeaderElectionLeaseDuration,
RenewDeadline: &options.GwOptions.LeaderElectionRenewDeadline,
RetryPeriod: &options.GwOptions.LeaderElectionRetryPeriod,
LeaderElection: false,
})
if err != nil {
return fmt.Errorf("unable to create manager: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ spec:
- --remote-cluster-id={{"{{ .ClusterID }}"}}
- --gateway-uid={{"{{ .GatewayUID }}"}}
- --node-name={{"$(NODE_NAME)"}}
- --pod-name={{"$(POD_NAME)"}}
- --mode=client
{{- if .Values.metrics.enabled }}
- --metrics-address=:8080
Expand All @@ -70,6 +71,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
privileged: true
capabilities:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ spec:
- --namespace={{"{{ .Namespace }}"}}
- --remote-cluster-id={{"{{ .ClusterID }}"}}
- --node-name={{"$(NODE_NAME)"}}
- --pod-name={{"$(POD_NAME)"}}
- --gateway-uid={{"{{ .GatewayUID }}"}}
- --mode=server
{{- if .Values.metrics.enabled }}
Expand All @@ -97,6 +98,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
privileged: true
capabilities:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ spec:
- --namespace={{"{{ .Namespace }}"}}
- --remote-cluster-id={{"{{ .ClusterID }}"}}
- --node-name={{"$(NODE_NAME)"}}
- --pod-name={{"$(POD_NAME)"}}
- --gateway-uid={{"{{ .GatewayUID }}"}}
- --mode=server
{{- if .Values.metrics.enabled }}
Expand All @@ -88,6 +89,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
securityContext:
privileged: true
capabilities:
Expand Down
2 changes: 1 addition & 1 deletion pkg/fabric/source-detector/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
func (r *GatewayReconciler) SetupWithManager(mgr ctrl.Manager) error {
filterByLabelsGatewayPods, err := predicate.LabelSelectorPredicate(
metav1.LabelSelector{
MatchLabels: gateway.ForgeGatewayPodLabels(),
MatchLabels: gateway.ForgeActiveGatewayPodLabels(),
},
)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions pkg/gateway/concurrent/concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2019-2024 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrent

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

var _ manager.Runnable = &ConcurrentRunnable{}

// ConcurrentRunnable is a Runnable that manages concurrency.
type ConcurrentRunnable struct {
Client client.Client

PodName string
DeploymentName string
Namespace string
}

// Start starts the ConcurrentRunnable.
func (cr *ConcurrentRunnable) Start(ctx context.Context) error {
pods, err := ListAllGatewaysReplicas(ctx, cr.Client, cr.Namespace, cr.DeploymentName)
if err != nil {
return err
}

for i := range pods {
if pods[i].GetName() == cr.PodName {
if err := AddActiveGatewayLabel(ctx, cr.Client, client.ObjectKeyFromObject(&pods[i])); err != nil {
return err
}
} else {
if err := RemoveActiveGatewayLabel(ctx, cr.Client, client.ObjectKeyFromObject(&pods[i])); err != nil {
return err
}
}
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/gateway/concurrent/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2019-2024 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package concurrent contains the logic to manage same gateway replicas.
// They are managed using an active/passive approach.
// The gateway container try to acquire the active role using the controller manager lease.
// Then, the active gateway is labeled with the ActiveGatewayKey and ActiveGatewayValue, and the passive gateways are unlabeled.
// The gateway service target the active gateway using the ActiveGatewayKey and ActiveGatewayValue labels.
package concurrent
73 changes: 73 additions & 0 deletions pkg/gateway/concurrent/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2019-2024 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrent

import (
"context"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// AddActiveGatewayLabel adds the active gateway label to the pod.
func AddActiveGatewayLabel(ctx context.Context, cl client.Client, key client.ObjectKey) error {
pod := &corev1.Pod{}
if err := cl.Get(ctx, key, &corev1.Pod{}); err != nil {
return err
}

labels := pod.GetLabels()
if labels == nil {
labels = map[string]string{}
}
labels[ActiveGatewayKey] = ActiveGatewayValue
pod.SetLabels(labels)

if err := cl.Update(ctx, pod); err != nil {
return err
}
return nil
}

// RemoveActiveGatewayLabel removes the active gateway label from the pod.
func RemoveActiveGatewayLabel(ctx context.Context, cl client.Client, key client.ObjectKey) error {
pod := &corev1.Pod{}
if err := cl.Get(ctx, key, &corev1.Pod{}); err != nil {
return err
}

labels := pod.GetLabels()
if labels == nil {
return nil
}
delete(labels, ActiveGatewayKey)
pod.SetLabels(labels)

if err := cl.Update(ctx, pod); err != nil {
return err
}
return nil
}

// ListAllGatewaysReplicas returns the list of all the gateways replicas of the same deployment.
func ListAllGatewaysReplicas(ctx context.Context, cl client.Client, namespace, deploymentName string) ([]corev1.Pod, error) {
podList := &corev1.PodList{}
if err := cl.List(ctx, podList, client.InNamespace(namespace), client.MatchingLabels{
"app": deploymentName,
}); err != nil {
return nil, err
}
return podList.Items, nil
}
22 changes: 22 additions & 0 deletions pkg/gateway/concurrent/label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2019-2024 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrent

const (
// ActiveGatewayKey is the key used to label the active pod gateway.
ActiveGatewayKey = "networking.liqo.io/active"
// ActiveGatewayValue is the value used to label the active pod gateway.
ActiveGatewayValue = "true"
)
4 changes: 4 additions & 0 deletions pkg/gateway/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ const (
FlagNameRemoteClusterID FlagName = "remote-cluster-id"
// FlagNameNodeName is the name of the node.
FlagNameNodeName FlagName = "node-name"
// FlagNamePodName is the name of the pod.
FlagNamePodName FlagName = "pod-name"

// FlagNameGatewayUID is the UID of the Gateway resource.
FlagNameGatewayUID FlagName = "gateway-uid"
Expand Down Expand Up @@ -72,6 +74,7 @@ var RequiredFlags = []FlagName{
FlagNameMode,
FlagNameGatewayUID,
FlagNameNodeName,
FlagNamePodName,
}

// InitFlags initializes the flags for the gateway.
Expand All @@ -80,6 +83,7 @@ func InitFlags(flagset *pflag.FlagSet, opts *Options) {
flagset.StringVar(&opts.Namespace, FlagNameNamespace.String(), "", "Parent gateway namespace")
flagset.StringVar(&opts.RemoteClusterID, FlagNameRemoteClusterID.String(), "", "ClusterID of the remote cluster")
flagset.StringVar(&opts.NodeName, FlagNameNodeName.String(), "", "Node name")
flagset.StringVar(&opts.PodName, FlagNamePodName.String(), "", "Pod name")

flagset.StringVar(&opts.GatewayUID, FlagNameGatewayUID.String(), "", "Parent gateway resource UID")

Expand Down
Loading

0 comments on commit afac6ed

Please sign in to comment.