Skip to content

Commit

Permalink
feat: Add a new controller in karmada-controller-manager to process M…
Browse files Browse the repository at this point in the history
…ultiClusterService of CrossCluster type, enable service discovery between clusters.

Signed-off-by: chaunceyjiang <[email protected]>
  • Loading branch information
chaunceyjiang committed Jul 6, 2023
1 parent b6aa0b7 commit 837b676
Show file tree
Hide file tree
Showing 10 changed files with 939 additions and 38 deletions.
29 changes: 29 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func init() {
controllers["workStatus"] = startWorkStatusController
controllers["namespace"] = startNamespaceController
controllers["serviceExport"] = startServiceExportController
controllers["multiClusterService"] = startMultiClusterServiceController
controllers["endpointSlice"] = startEndpointSliceController
controllers["serviceImport"] = startServiceImportController
controllers["unifiedAuth"] = startUnifiedAuthController
Expand Down Expand Up @@ -477,6 +478,34 @@ func startServiceImportController(ctx controllerscontext.Context) (enabled bool,
return true, nil
}

func startMultiClusterServiceController(ctx controllerscontext.Context) (enabled bool, err error) {
multiClusterServiceController := &mcs.MultiClusterServiceController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.MultiClusterServiceControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err := multiClusterServiceController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
serviceImportWorkController := &mcs.ServiceImportWorkController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.MultiClusterServiceServiceImportControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err := serviceImportWorkController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
serviceExportWorkController := &mcs.ServiceExportWorkController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.MultiClusterServiceServiceExportControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err := serviceExportWorkController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}

func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, err error) {
unifiedAuthController := &unifiedauth.Controller{
Client: ctx.Mgr.GetClient(),
Expand Down
11 changes: 0 additions & 11 deletions pkg/apis/networking/v1alpha1/ingress_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// ResourceKindMultiClusterIngress is kind name of MultiClusterIngress.
ResourceKindMultiClusterIngress = "MultiClusterIngress"
// ResourceSingularMultiClusterIngress is singular name of MultiClusterIngress.
ResourceSingularMultiClusterIngress = "multiclusteringress"
// ResourcePluralMultiClusterIngress is plural name of MultiClusterIngress.
ResourcePluralMultiClusterIngress = "multiclusteringresses"
// ResourceNamespaceScopedMultiClusterIngress indicates if MultiClusterIngress is NamespaceScoped.
ResourceNamespaceScopedMultiClusterIngress = true
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
Expand Down
21 changes: 21 additions & 0 deletions pkg/apis/networking/v1alpha1/service_types_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package v1alpha1

// HasExposureTypeCrossCluster determines whether ExposureTypeCrossCluster exists.
func (m *MultiClusterServiceSpec) HasExposureTypeCrossCluster() bool {
for _, t := range m.Types {
if t == ExposureTypeCrossCluster {
return true
}
}
return false
}

// HasExposureTypeLoadBalancer determines whether ExposureTypeLoadBalancer exists.
func (m *MultiClusterServiceSpec) HasExposureTypeLoadBalancer() bool {
for _, t := range m.Types {
if t == ExposureTypeLoadBalancer {
return true
}
}
return false
}
20 changes: 20 additions & 0 deletions pkg/apis/networking/v1alpha1/well_known_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package v1alpha1

const (
// ResourceKindMultiClusterIngress is kind name of MultiClusterIngress.
ResourceKindMultiClusterIngress = "MultiClusterIngress"
// ResourceKindMultiMultiClusterService is kind name of MultiClusterService.
ResourceKindMultiMultiClusterService = "MultiClusterService"
// ResourceSingularMultiClusterIngress is singular name of MultiClusterIngress.
ResourceSingularMultiClusterIngress = "multiclusteringress"
// ResourcePluralMultiClusterIngress is plural name of MultiClusterIngress.
ResourcePluralMultiClusterIngress = "multiclusteringresses"
// ResourceNamespaceScopedMultiClusterIngress indicates if MultiClusterIngress is NamespaceScoped.
ResourceNamespaceScopedMultiClusterIngress = true
// MultiClusterServiceDerivedWorkKind represents the source kind of derived work, ServiceExport or ServiceImport.
MultiClusterServiceDerivedWorkKind = "multiclusterservice.networking.karmada.io/source-kind"
// MultiClusterServiceDerivedWorkName represents the name of the source for derived work.
MultiClusterServiceDerivedWorkName = "multiclusterservice.networking.karmada.io/source-name"
// MultiClusterServiceDerivedWorkNamespace represents the namespace of the source for derived work.
MultiClusterServiceDerivedWorkNamespace = "multiclusterservice.networking.karmada.io/source-namespace"
)
144 changes: 144 additions & 0 deletions pkg/controllers/mcs/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package mcs

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/names"
)

func cleanupDerivedWork(c client.Client, key keys.FederatedKey) error {
clusters := &clusterv1alpha1.ClusterList{}
if err := c.List(context.TODO(), clusters); err != nil {
return err
}
for _, cluster := range clusters.Items {
key.Cluster = cluster.Name
if err := cleanupWorkWithLabelsSelectorDelete(c, key,
labels.SelectorFromSet(labels.Set{
networkingv1alpha1.MultiClusterServiceDerivedWorkNamespace: key.Namespace,
networkingv1alpha1.MultiClusterServiceDerivedWorkName: key.Name,
networkingv1alpha1.MultiClusterServiceDerivedWorkKind: key.Kind,
})); err != nil {
return err
}
}
return nil
}

func cleanupWorkWithLabelsSelectorDelete(c client.Client, key keys.FederatedKey, labelsSelector labels.Selector) error {
executionSpace := names.GenerateExecutionSpaceName(key.Cluster)

workList := &workv1alpha1.WorkList{}
if err := c.List(context.TODO(), workList, &client.ListOptions{
Namespace: executionSpace,
LabelSelector: labelsSelector,
}); err != nil {
klog.Errorf("Failed to list workList reported by %s(%s) in executionSpace(%s), Error: %v",
key.Kind, key.NamespaceKey(), executionSpace, err)
return err
}

var errs []error
for index, work := range workList.Items {
if err := c.Delete(context.TODO(), &workList.Items[index]); err != nil && !apierrors.IsNotFound(err) {
klog.Errorf("Failed to delete work(%s/%s), Error: %v", work.Namespace, work.Name, err)
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}

// needCleanupDerivedWork If ServiceImport or ServiceExport is not found, delete the associated work.
func needCleanupDerivedWork(obj client.Object) bool {
return obj != nil && !obj.GetDeletionTimestamp().IsZero() ||
obj != nil && obj.GetDeletionTimestamp().IsZero() &&
!controllerutil.ContainsFinalizer(obj, MultiClusterServiceControllerFinalizer)
}

func getServiceImportOrServiceExport(c client.Client, fedKey keys.FederatedKey) (*unstructured.Unstructured, error) {
switch fedKey.Kind {
case util.ServiceImportKind:
svcImport := &mcsv1alpha1.ServiceImport{}
err := c.Get(context.TODO(), types.NamespacedName{Namespace: fedKey.Namespace, Name: fedKey.Name}, svcImport)
if err != nil {
return nil, err
}
return helper.ToUnstructured(svcImport)

case util.ServiceExportKind:
svcExport := &mcsv1alpha1.ServiceExport{}
err := c.Get(context.TODO(), types.NamespacedName{Namespace: fedKey.Namespace, Name: fedKey.Name}, svcExport)
if err != nil {
return nil, err
}
return helper.ToUnstructured(svcExport)
}
return nil, fmt.Errorf("unsupported kind(%s)", fedKey.Kind)
}

func handleServiceExportOrServiceExportEvent(c client.Client, record record.EventRecorder, fedKey keys.FederatedKey) (err error) {
var obj *unstructured.Unstructured
obj, err = getServiceImportOrServiceExport(c, fedKey)
if err != nil || needCleanupDerivedWork(obj) {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
workMeta := metav1.ObjectMeta{
Name: names.GenerateWorkName(fedKey.Kind, fedKey.Name, fedKey.Namespace),
Namespace: names.GenerateExecutionSpaceName(fedKey.Cluster),
Labels: map[string]string{
networkingv1alpha1.MultiClusterServiceDerivedWorkNamespace: fedKey.Namespace,
networkingv1alpha1.MultiClusterServiceDerivedWorkName: fedKey.Name,
networkingv1alpha1.MultiClusterServiceDerivedWorkKind: fedKey.Kind,
},
Finalizers: []string{util.ExecutionControllerFinalizer},
}

util.MergeLabel(obj, workv1alpha1.WorkNamespaceLabel, workMeta.Namespace)
util.MergeLabel(obj, workv1alpha1.WorkNameLabel, workMeta.Name)
util.MergeLabel(obj, util.ManagedByKarmadaLabel, util.ManagedByKarmadaLabelValue)

if err = helper.CreateOrUpdateWork(c, workMeta, obj); err != nil {
return err
}
record.Eventf(obj, corev1.EventTypeNormal, events.EventReasonSyncDerivedWorkSucceed, "Sync derived Work for %s(%s/%s) succeed.", fedKey.Kind, fedKey.Namespace, fedKey.Name)
return nil
}

// removeFinalizer remove finalizer from the given MultiClusterService
func removeFinalizer(cli client.Client, obj client.Object) (controllerruntime.Result, error) {
if !controllerutil.ContainsFinalizer(obj, MultiClusterServiceControllerFinalizer) {
return controllerruntime.Result{}, nil
}

controllerutil.RemoveFinalizer(obj, MultiClusterServiceControllerFinalizer)
err := cli.Update(context.TODO(), obj)
if err != nil {
return controllerruntime.Result{Requeue: true}, err
}
return controllerruntime.Result{}, nil
}
Loading

0 comments on commit 837b676

Please sign in to comment.