Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a new controller in karmada-controller-manager to process MultiClusterService of CrossCluster type #3769

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions artifacts/deploy/webhook-configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ webhooks:
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
- name: resourcebinding.karmada.io
chaunceyjiang marked this conversation as resolved.
Show resolved Hide resolved
rules:
- operations: ["CREATE", "UPDATE"]
apiGroups: ["work.karmada.io"]
apiVersions: ["v1alpha2"]
resources: ["resourcebindings"]
scope: "Namespaced"
clientConfig:
url: https://karmada-webhook.karmada-system.svc:443/mutate-resourcebinding
caBundle: {{caBundle}}
failurePolicy: Fail
sideEffects: None
admissionReviewVersions: [ "v1" ]
timeoutSeconds: 3
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
Expand Down
13 changes: 13 additions & 0 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ func init() {
controllers["workStatus"] = startWorkStatusController
controllers["namespace"] = startNamespaceController
controllers["serviceExport"] = startServiceExportController
controllers["multiClusterService"] = startMultiClusterServiceController
controllers["endpointSlice"] = startEndpointSliceController
controllers["serviceImport"] = startServiceImportController
controllers["unifiedAuth"] = startUnifiedAuthController
Expand Down Expand Up @@ -462,6 +463,18 @@ func startServiceImportController(ctx controllerscontext.Context) (enabled bool,
return true, nil
}

func startMultiClusterServiceController(ctx controllerscontext.Context) (enabled bool, err error) {
multiClusterServiceController := &mcs.MultiClusterServiceController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(mcs.MultiClusterServiceControllerName),
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
}
if err := multiClusterServiceController.SetupWithManager(ctx.Mgr); err != nil {
return false, err
}
return true, nil
}

func startUnifiedAuthController(ctx controllerscontext.Context) (enabled bool, err error) {
unifiedAuthController := &unifiedauth.Controller{
Client: ctx.Mgr.GetClient(),
Expand Down
2 changes: 2 additions & 0 deletions cmd/webhook/app/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/karmada-io/karmada/pkg/webhook/multiclusterservice"
"github.com/karmada-io/karmada/pkg/webhook/overridepolicy"
"github.com/karmada-io/karmada/pkg/webhook/propagationpolicy"
"github.com/karmada-io/karmada/pkg/webhook/resourcebinding"
"github.com/karmada-io/karmada/pkg/webhook/resourceinterpretercustomization"
"github.com/karmada-io/karmada/pkg/webhook/work"
)
Expand Down Expand Up @@ -139,6 +140,7 @@ func Run(ctx context.Context, opts *options.Options) error {
hookServer.Register("/validate-multiclusteringress", &webhook.Admission{Handler: &multiclusteringress.ValidatingAdmission{Decoder: decoder}})
hookServer.Register("/validate-multiclusterservice", &webhook.Admission{Handler: &multiclusterservice.ValidatingAdmission{Decoder: decoder}})
hookServer.Register("/mutate-federatedhpa", &webhook.Admission{Handler: &federatedhpa.MutatingAdmission{Decoder: decoder}})
hookServer.Register("/mutate-resourcebinding", &webhook.Admission{Handler: &resourcebinding.MutatingAdmission{Decoder: decoder}})
hookServer.WebhookMux().Handle("/readyz/", http.StripPrefix("/readyz/", &healthz.Handler{}))

// blocks until the context is done.
Expand Down
11 changes: 0 additions & 11 deletions pkg/apis/networking/v1alpha1/ingress_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
// ResourceKindMultiClusterIngress is kind name of MultiClusterIngress.
ResourceKindMultiClusterIngress = "MultiClusterIngress"
// ResourceSingularMultiClusterIngress is singular name of MultiClusterIngress.
ResourceSingularMultiClusterIngress = "multiclusteringress"
// ResourcePluralMultiClusterIngress is plural name of MultiClusterIngress.
ResourcePluralMultiClusterIngress = "multiclusteringresses"
// ResourceNamespaceScopedMultiClusterIngress indicates if MultiClusterIngress is NamespaceScoped.
ResourceNamespaceScopedMultiClusterIngress = true
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:subresource:status
Expand Down
21 changes: 21 additions & 0 deletions pkg/apis/networking/v1alpha1/service_types_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package v1alpha1

// HasExposureTypeCrossCluster determines whether ExposureTypeCrossCluster exists.
func (m *MultiClusterServiceSpec) HasExposureTypeCrossCluster() bool {
for _, t := range m.Types {
if t == ExposureTypeCrossCluster {
return true
}
}
return false
}

// HasExposureTypeLoadBalancer determines whether ExposureTypeLoadBalancer exists.
func (m *MultiClusterServiceSpec) HasExposureTypeLoadBalancer() bool {
for _, t := range m.Types {
if t == ExposureTypeLoadBalancer {
return true
}
}
return false
}
16 changes: 16 additions & 0 deletions pkg/apis/networking/v1alpha1/well_known_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package v1alpha1

const (
// ResourceKindMultiClusterIngress is kind name of MultiClusterIngress.
ResourceKindMultiClusterIngress = "MultiClusterIngress"
// ResourceKindMultiMultiClusterService is kind name of MultiClusterService.
ResourceKindMultiMultiClusterService = "MultiClusterService"
// ResourceSingularMultiClusterIngress is singular name of MultiClusterIngress.
ResourceSingularMultiClusterIngress = "multiclusteringress"
// ResourcePluralMultiClusterIngress is plural name of MultiClusterIngress.
ResourcePluralMultiClusterIngress = "multiclusteringresses"
// ResourceNamespaceScopedMultiClusterIngress indicates if MultiClusterIngress is NamespaceScoped.
ResourceNamespaceScopedMultiClusterIngress = true
// ResourceKindMultiMultiClusterServiceUsedBy indicates which type of MultiClusterService is currently using the resource.
ResourceKindMultiMultiClusterServiceUsedBy = "multiclusterservices.networking.karmada.io/used-by"
chaunceyjiang marked this conversation as resolved.
Show resolved Hide resolved
)
225 changes: 225 additions & 0 deletions pkg/controllers/mcs/mcs_serviceexport_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
package mcs

import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
mcsv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
)

const (
// MultiClusterServiceControllerName MultiClusterServiceController name.
MultiClusterServiceControllerName = "mcs-serviceexport-controller"

// karmadaMCSFinalizer is added to MultiClusterService to ensure downstream resource,
// such as ServiceExport and ServiceImport are deleted before itself is deleted.
karmadaMCSFinalizer = "karmada.io/karmada-mcs-controller"
)

// MultiClusterServiceController lists/watches the multiClusterService resource,
// generates ServiceExport and propagate it to the source clusters.
// Note: This controller only focus on multiClusterService of type CrossCluster.
type MultiClusterServiceController struct {
client.Client
EventRecorder record.EventRecorder
RateLimiterOptions ratelimiterflag.Options
}

// Reconcile performs a full reconciliation for the object referred to by the Request.
func (c *MultiClusterServiceController) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
klog.V(4).Infof("Reconciling MultiClusterService %s.", req.NamespacedName.String())

mcs := &networkingv1alpha1.MultiClusterService{}
if err := c.Client.Get(ctx, req.NamespacedName, mcs); err != nil {
if apierrors.IsNotFound(err) {
return controllerruntime.Result{}, nil
}
return controllerruntime.Result{Requeue: true}, err
}

if err := c.reconcileServiceExport(ctx, mcs); err != nil {
klog.Errorf("Failed to handle ServiceExport for MultiClusterService(%s), error: %v", req.NamespacedName.String(), err)
return controllerruntime.Result{}, err
}
if !mcs.DeletionTimestamp.IsZero() || !mcs.Spec.HasExposureTypeCrossCluster() {
return controllerruntime.Result{}, c.removeFinalizer(ctx, mcs)
}
return controllerruntime.Result{}, c.ensureFinalizer(ctx, mcs)
}

func (c *MultiClusterServiceController) deriveServiceExportFromMultiClusterService(
chaunceyjiang marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context, mcs *networkingv1alpha1.MultiClusterService, ownSvc *corev1.Service) error {
oldSvcExport := &mcsv1alpha1.ServiceExport{}
err := c.Client.Get(ctx, types.NamespacedName{Name: mcs.Name, Namespace: mcs.Namespace}, oldSvcExport)
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
newSvcExport := &mcsv1alpha1.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: mcs.Namespace,
Name: mcs.Name,
Annotations: map[string]string{
networkingv1alpha1.ResourceKindMultiMultiClusterServiceUsedBy: string(networkingv1alpha1.ExposureTypeCrossCluster),
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(ownSvc, corev1.SchemeGroupVersion.WithKind("Service")),
},
},
}
err = c.Client.Create(ctx, newSvcExport)
if err != nil {
msg := fmt.Sprintf("Create derived ServiceExport(%s/%s) failed, Error: %v", newSvcExport.Namespace, newSvcExport.Name, err)
klog.Errorf(msg)
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventSyncDerivedServiceExportFailed, msg)
return err
}
msg := fmt.Sprintf("Sync the serviceExport of MultiClusterService(%s/%s) successful", mcs.Namespace, mcs.Name)
klog.V(4).Infof(msg)
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventSyncDerivedServiceExportSucceed, msg)
return nil
}
if addCrossClusterToUsedByAnnotationIfNeed(oldSvcExport) {
oldSvcExport.OwnerReferences = append(oldSvcExport.OwnerReferences, *metav1.NewControllerRef(ownSvc, corev1.SchemeGroupVersion.WithKind("Service")))
chaunceyjiang marked this conversation as resolved.
Show resolved Hide resolved
err = c.Client.Update(ctx, oldSvcExport)
if err != nil {
msg := fmt.Sprintf("Update derived ServiceExport(%s/%s) failed, Error: %v", oldSvcExport.Namespace, oldSvcExport.Name, err)
klog.Errorf(msg)
c.EventRecorder.Eventf(mcs, corev1.EventTypeWarning, events.EventSyncDerivedServiceExportFailed, msg)
return err
}
msg := fmt.Sprintf("Sync the serviceExport of MultiClusterService(%s/%s) successful", mcs.Namespace, mcs.Name)
klog.V(4).Infof(msg)
c.EventRecorder.Eventf(mcs, corev1.EventTypeNormal, events.EventSyncDerivedServiceExportSucceed, msg)
return nil
}
return nil
}

func (c *MultiClusterServiceController) cleanupDerivedServiceExport(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
derivedSvcExport := &mcsv1alpha1.ServiceExport{}
derivedSvcExportNamespacedName := types.NamespacedName{
Namespace: mcs.Namespace,
Name: mcs.Name,
}
err := c.Client.Get(ctx, derivedSvcExportNamespacedName, derivedSvcExport)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
if removeCrossClusterToUsedByAnnotationIfNeed(derivedSvcExport) {
err = c.Client.Update(ctx, derivedSvcExport)
}
used := derivedSvcExport.Annotations[networkingv1alpha1.ResourceKindMultiMultiClusterServiceUsedBy]
if len(used) == 0 {
err = c.Client.Delete(ctx, derivedSvcExport)
}
if err != nil && !apierrors.IsNotFound(err) {
chaunceyjiang marked this conversation as resolved.
Show resolved Hide resolved
klog.ErrorS(err, "failed to delete ServiceExport or remove multiclusterservices.networking.karmada.io/used-by annotation of the ServiceExport",
"namespacedName", derivedSvcExportNamespacedName.String(), "used-by", used)
return err
}
klog.V(4).InfoS("success to delete ServiceExport or remove multiclusterservices.networking.karmada.io/used-by annotation of the ServiceExport",
"namespacedName", derivedSvcExportNamespacedName.String(), "used-by", used)
return nil
}

func (c *MultiClusterServiceController) reconcileServiceExport(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
if !mcs.Spec.HasExposureTypeCrossCluster() || !mcs.DeletionTimestamp.IsZero() {
return c.cleanupDerivedServiceExport(ctx, mcs)
}
ownerSvc := &corev1.Service{}
err := c.Client.Get(ctx, types.NamespacedName{Name: mcs.Name, Namespace: mcs.Namespace}, ownerSvc)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
return c.deriveServiceExportFromMultiClusterService(ctx, mcs, ownerSvc)
}

// SetupWithManager creates a controller and register to controller manager.
func (c *MultiClusterServiceController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).
For(&networkingv1alpha1.MultiClusterService{}).
Watches(&corev1.Service{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool { return true },
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return false },
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false },
})).
WithOptions(controller.Options{
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
}).
Complete(c)
}

// removeFinalizer remove finalizer from the given MultiClusterService
func (c *MultiClusterServiceController) removeFinalizer(ctx context.Context, obj client.Object) error {
if !controllerutil.ContainsFinalizer(obj, karmadaMCSFinalizer) {
return nil
}
controllerutil.RemoveFinalizer(obj, karmadaMCSFinalizer)
return c.Client.Update(ctx, obj)
}

// ensureFinalizer add finalizer from the given MultiClusterService
func (c *MultiClusterServiceController) ensureFinalizer(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
if controllerutil.ContainsFinalizer(mcs, karmadaMCSFinalizer) {
return nil
}
controllerutil.AddFinalizer(mcs, karmadaMCSFinalizer)
return c.Client.Update(ctx, mcs)
}

func removeCrossClusterToUsedByAnnotationIfNeed(se *mcsv1alpha1.ServiceExport) bool {
if se.Annotations == nil {
return false
}
used := se.Annotations[networkingv1alpha1.ResourceKindMultiMultiClusterServiceUsedBy]
usedTypes := strings.Split(used, ",")
for i, usedType := range usedTypes {
if usedType == string(networkingv1alpha1.ExposureTypeCrossCluster) {
se.Annotations[networkingv1alpha1.ResourceKindMultiMultiClusterServiceUsedBy] = strings.Join(append(usedTypes[:i], usedTypes[i+1:]...), ",")
return true
}
}

return false
}

func addCrossClusterToUsedByAnnotationIfNeed(se *mcsv1alpha1.ServiceExport) bool {
if se.Annotations == nil {
se.Annotations = map[string]string{}
}
used := se.Annotations[networkingv1alpha1.ResourceKindMultiMultiClusterServiceUsedBy]
usedTypes := strings.Split(used, ",")
for _, usedType := range usedTypes {
if usedType == string(networkingv1alpha1.ExposureTypeCrossCluster) {
return false
}
}
se.Annotations[networkingv1alpha1.ResourceKindMultiMultiClusterServiceUsedBy] = strings.Trim(strings.Join([]string{string(networkingv1alpha1.ExposureTypeCrossCluster), used}, ","), ",")
return true
}
12 changes: 12 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,15 @@ const (
// EventReasonSyncDerivedServiceFailed indicates that sync derived service failed.
EventReasonSyncDerivedServiceFailed = "SyncDerivedServiceFailed"
)

// Define events for MultiClusterService objects.
const (
// EventSyncDerivedServiceExportSucceed indicates that sync derived ServiceExport succeed.
EventSyncDerivedServiceExportSucceed = "SyncServiceExportSucceed"
// EventSyncDerivedServiceExportFailed indicates that sync derived ServiceExport failed.
EventSyncDerivedServiceExportFailed = "SyncServiceExportFailed"
// EventSyncDerivedServiceImportSucceed indicates that sync derived ServiceImport succeed.
EventSyncDerivedServiceImportSucceed = "SyncServiceImportSucceed"
// EventSyncDerivedServiceImportFailed indicates that sync derived ServiceImport failed.
EventSyncDerivedServiceImportFailed = "SyncServiceImportFailed"
)
12 changes: 12 additions & 0 deletions pkg/resourceinterpreter/default/native/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func getAllDefaultDependenciesInterpreter() map[schema.GroupVersionKind]dependen
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = getJobDependencies
s[batchv1.SchemeGroupVersion.WithKind(util.CronJobKind)] = getCronJobDependencies
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = getPodDependencies
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = getServiceDependencies
s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = getDaemonSetDependencies
s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = getStatefulSetDependencies
s[networkingv1.SchemeGroupVersion.WithKind(util.IngressKind)] = getIngressDependencies
Expand Down Expand Up @@ -163,3 +164,14 @@ func getServiceImportDependencies(object *unstructured.Unstructured) ([]configv1
},
}, nil
}

func getServiceDependencies(object *unstructured.Unstructured) ([]configv1alpha1.DependentObjectReference, error) {
return []configv1alpha1.DependentObjectReference{
{
APIVersion: mcsv1alpha1.SchemeGroupVersion.String(),
Kind: util.ServiceExportKind,
Namespace: object.GetNamespace(),
Name: object.GetName(),
},
}, nil
}
Loading
Loading