Skip to content

Commit

Permalink
Merge pull request #5597 from a7i/amir/preserveResourcesOnDeletion
Browse files Browse the repository at this point in the history
implement preserveResourcesOnDeletion to support migration rollback
  • Loading branch information
karmada-bot authored Oct 16, 2024
2 parents 4db8867 + 68c0104 commit 19b32a2
Show file tree
Hide file tree
Showing 14 changed files with 335 additions and 43 deletions.
14 changes: 11 additions & 3 deletions pkg/controllers/binding/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func ensureWork(
var replicas int32
var conflictResolutionInBinding policyv1alpha1.ConflictResolution
var suspension *policyv1alpha1.Suspension
var preserveResourcesOnDeletion *bool
switch scope {
case apiextensionsv1.NamespaceScoped:
bindingObj := binding.(*workv1alpha2.ResourceBinding)
Expand All @@ -57,6 +58,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
case apiextensionsv1.ClusterScoped:
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
targetClusters = bindingObj.Spec.Clusters
Expand All @@ -65,6 +67,7 @@ func ensureWork(
replicas = bindingObj.Spec.Replicas
conflictResolutionInBinding = bindingObj.Spec.ConflictResolution
suspension = bindingObj.Spec.Suspension
preserveResourcesOnDeletion = bindingObj.Spec.PreserveResourcesOnDeletion
}

targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
Expand Down Expand Up @@ -133,9 +136,14 @@ func ensureWork(
Annotations: annotations,
}

suspendDispatching := shouldSuspendDispatching(suspension, targetCluster)

if err = helper.CreateOrUpdateWork(ctx, c, workMeta, clonedWorkload, &suspendDispatching); err != nil {
if err = helper.CreateOrUpdateWork(
ctx,
c,
workMeta,
clonedWorkload,
helper.WithSuspendDispatching(shouldSuspendDispatching(suspension, targetCluster)),
helper.WithPreserveResourcesOnDeletion(ptr.Deref(preserveResourcesOnDeletion, false)),
); err != nil {
return err
}
}
Expand Down
97 changes: 86 additions & 11 deletions pkg/controllers/execution/execution_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/predicate"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/metrics"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
Expand Down Expand Up @@ -102,15 +106,8 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques
}

if !work.DeletionTimestamp.IsZero() {
// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, clusterName, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return controllerruntime.Result{}, err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return controllerruntime.Result{}, fmt.Errorf("cluster(%s) not ready", cluster.Name)
if err := c.handleWorkDelete(ctx, work, cluster); err != nil {
return controllerruntime.Result{}, err
}

return c.removeFinalizer(ctx, work)
Expand Down Expand Up @@ -151,16 +148,94 @@ func (c *Controller) syncWork(ctx context.Context, clusterName string, work *wor
metrics.ObserveSyncWorkloadLatency(err, start)
if err != nil {
msg := fmt.Sprintf("Failed to sync work(%s/%s) to cluster(%s), err: %v", work.Namespace, work.Name, clusterName, err)
klog.Errorf(msg)
klog.Error(msg)
c.EventRecorder.Event(work, corev1.EventTypeWarning, events.EventReasonSyncWorkloadFailed, msg)
return controllerruntime.Result{}, err
}
msg := fmt.Sprintf("Sync work(%s/%s) to cluster(%s) successful.", work.Namespace, work.Name, clusterName)
klog.V(4).Infof(msg)
klog.V(4).Info(msg)
c.EventRecorder.Event(work, corev1.EventTypeNormal, events.EventReasonSyncWorkloadSucceed, msg)
return controllerruntime.Result{}, nil
}

func (c *Controller) handleWorkDelete(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
if ptr.Deref(work.Spec.PreserveResourcesOnDeletion, false) {
if err := c.cleanupPolicyClaimMetadata(ctx, work, cluster); err != nil {
klog.Errorf("Failed to remove annotations and labels in on cluster(%s)", cluster.Name)
return err
}
klog.V(4).Infof("Preserving resource on deletion from work(%s/%s) on cluster(%s)", work.Namespace, work.Name, cluster.Name)
return nil
}

// Abort deleting workload if cluster is unready when unjoining cluster, otherwise the unjoin process will be failed.
if util.IsClusterReady(&cluster.Status) {
err := c.tryDeleteWorkload(ctx, cluster.Name, work)
if err != nil {
klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err)
return err
}
} else if cluster.DeletionTimestamp.IsZero() { // cluster is unready, but not terminating
return fmt.Errorf("cluster(%s) not ready", cluster.Name)
}

return nil
}

func (c *Controller) cleanupPolicyClaimMetadata(ctx context.Context, work *workv1alpha1.Work, cluster *clusterv1alpha1.Cluster) error {
for _, manifest := range work.Spec.Workload.Manifests {
workload := &unstructured.Unstructured{}
if err := workload.UnmarshalJSON(manifest.Raw); err != nil {
klog.Errorf("Failed to unmarshal workload from work(%s/%s), error is: %v", err, work.GetNamespace(), work.GetName())
return err
}

fedKey, err := keys.FederatedKeyFunc(cluster.Name, workload)
if err != nil {
klog.Errorf("Failed to get the federated key resource(kind=%s, %s/%s) from member cluster(%s), err is %v ",
workload.GetKind(), workload.GetNamespace(), workload.GetName(), cluster.Name, err)
return err
}

clusterObj, err := helper.GetObjectFromCache(c.RESTMapper, c.InformerManager, fedKey)
if err != nil {
klog.Errorf("Failed to get the resource(kind=%s, %s/%s) from member cluster(%s) cache, err is %v ",
workload.GetKind(), workload.GetNamespace(), workload.GetName(), cluster.Name, err)
return err
}

if workload.GetNamespace() == corev1.NamespaceAll {
detector.CleanupCPPClaimMetadata(workload)
} else {
detector.CleanupPPClaimMetadata(workload)
}
util.RemoveLabels(
workload,
workv1alpha2.ResourceBindingPermanentIDLabel,
workv1alpha2.WorkPermanentIDLabel,
util.ManagedByKarmadaLabel,
)
util.RemoveAnnotations(
workload,
workv1alpha2.ManagedAnnotation,
workv1alpha2.ManagedLabels,
workv1alpha2.ResourceBindingNamespaceAnnotationKey,
workv1alpha2.ResourceBindingNameAnnotationKey,
workv1alpha2.ResourceTemplateUIDAnnotation,
workv1alpha2.ResourceTemplateGenerationAnnotationKey,
workv1alpha2.WorkNameAnnotation,
workv1alpha2.WorkNamespaceAnnotation,
)

if err := c.ObjectWatcher.Update(ctx, cluster.Name, workload, clusterObj); err != nil {
klog.Errorf("Failed to update metadata in the given member cluster %v, err is %v", cluster.Name, err)
return err
}
}

return nil
}

// tryDeleteWorkload tries to delete resources in the given member cluster.
func (c *Controller) tryDeleteWorkload(ctx context.Context, clusterName string, work *workv1alpha1.Work) error {
for _, manifest := range work.Spec.Workload.Manifests {
Expand Down
98 changes: 90 additions & 8 deletions pkg/controllers/execution/execution_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -33,18 +34,33 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/resourceinterpreter/default/native"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
"github.com/karmada-io/karmada/pkg/util/gclient"
"github.com/karmada-io/karmada/pkg/util/objectwatcher"
testhelper "github.com/karmada-io/karmada/test/helper"
)

type FakeResourceInterpreter struct {
*native.DefaultInterpreter
}

var _ resourceinterpreter.ResourceInterpreter = &FakeResourceInterpreter{}

const (
podNamespace = "default"
podName = "test"
clusterName = "cluster"
)

func TestExecutionController_Reconcile(t *testing.T) {
tests := []struct {
name string
Expand All @@ -54,6 +70,7 @@ func TestExecutionController_Reconcile(t *testing.T) {
expectCondition *metav1.Condition
expectEventMessage string
existErr bool
resourceExists *bool
}{
{
name: "work dispatching is suspended, no error, no apply",
Expand Down Expand Up @@ -112,10 +129,52 @@ func TestExecutionController_Reconcile(t *testing.T) {
work.Spec.SuspendDispatching = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=true, deletion timestamp set, does not delete resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(true),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(true)
}),
},
{
name: "PreserveResourcesOnDeletion=false, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
work.Spec.PreserveResourcesOnDeletion = ptr.To(false)
}),
},
{
name: "PreserveResourcesOnDeletion unset, deletion timestamp set, deletes resource",
ns: "karmada-es-cluster",
expectRes: controllerruntime.Result{},
existErr: false,
resourceExists: ptr.To(false),
work: newWork(func(work *workv1alpha1.Work) {
now := metav1.Now()
work.SetDeletionTimestamp(&now)
work.SetFinalizers([]string{util.ExecutionControllerFinalizer})
}),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Cleanup(func() {
genericmanager.GetInstance().Stop(clusterName)
})

req := controllerruntime.Request{
NamespacedName: types.NamespacedName{
Name: "work",
Expand Down Expand Up @@ -143,32 +202,51 @@ func TestExecutionController_Reconcile(t *testing.T) {
e := <-eventRecorder.Events
assert.Equal(t, tt.expectEventMessage, e)
}

if tt.resourceExists != nil {
resourceInterface := c.InformerManager.GetSingleClusterManager(clusterName).GetClient().
Resource(corev1.SchemeGroupVersion.WithResource("pods")).Namespace(podNamespace)
_, err = resourceInterface.Get(context.TODO(), podName, metav1.GetOptions{})
if *tt.resourceExists {
assert.NoErrorf(t, err, "unable to query pod (%s/%s)", podNamespace, podName)
} else {
assert.True(t, apierrors.IsNotFound(err), "pod (%s/%s) was not deleted", podNamespace, podName)
}
}
})
}
}

func newController(work *workv1alpha1.Work, eventRecorder *record.FakeRecorder) Controller {
cluster := newCluster("cluster", clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod("default", "test")
client := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work, pod).WithStatusSubresource(work).Build()
func newController(work *workv1alpha1.Work, recorder *record.FakeRecorder) Controller {
cluster := newCluster(clusterName, clusterv1alpha1.ClusterConditionReady, metav1.ConditionTrue)
pod := testhelper.NewPod(podNamespace, podName)
pod.SetLabels(map[string]string{util.ManagedByKarmadaLabel: util.ManagedByKarmadaLabelValue})
restMapper := meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
restMapper.Add(corev1.SchemeGroupVersion.WithKind(pod.Kind), meta.RESTScopeNamespace)
fakeClient := fake.NewClientBuilder().WithScheme(gclient.NewSchema()).WithObjects(cluster, work).WithStatusSubresource(work).WithRESTMapper(restMapper).Build()
dynamicClientSet := dynamicfake.NewSimpleDynamicClient(scheme.Scheme, pod)
informerManager := genericmanager.GetInstance()
informerManager.ForCluster(cluster.Name, dynamicClientSet, 0).Lister(corev1.SchemeGroupVersion.WithResource("pods"))
informerManager.Start(cluster.Name)
informerManager.WaitForCacheSync(cluster.Name)
clusterClientSetFunc := func(string, client.Client) (*util.DynamicClusterClient, error) {
return &util.DynamicClusterClient{
ClusterName: clusterName,
DynamicClientSet: dynamicClientSet,
}, nil
}
resourceInterpreter := FakeResourceInterpreter{DefaultInterpreter: native.NewDefaultInterpreter()}
return Controller{
Client: client,
Client: fakeClient,
InformerManager: informerManager,
EventRecorder: eventRecorder,
EventRecorder: recorder,
RESTMapper: restMapper,
ObjectWatcher: objectwatcher.NewObjectWatcher(client, restMapper, util.NewClusterDynamicClientSetForAgent, nil),
ObjectWatcher: objectwatcher.NewObjectWatcher(fakeClient, restMapper, clusterClientSetFunc, resourceInterpreter),
}
}

func newWork(applyFunc func(work *workv1alpha1.Work)) *workv1alpha1.Work {
pod := testhelper.NewPod("default", "test")
pod := testhelper.NewPod(podNamespace, podName)
bytes, _ := json.Marshal(pod)
work := testhelper.NewWork("work", "karmada-es-cluster", string(uuid.NewUUID()), bytes)
if applyFunc != nil {
Expand All @@ -193,3 +271,7 @@ func newCluster(name string, clusterType string, clusterStatus metav1.ConditionS
},
}
}

func (f FakeResourceInterpreter) Start(context.Context) error {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (c *SyncController) buildWorks(ctx context.Context, quota *policyv1alpha1.F
},
}

err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj, nil)
err = helper.CreateOrUpdateWork(ctx, c.Client, objectMeta, resourceQuotaObj)
if err != nil {
errs = append(errs, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/mcs/service_export_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
return err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func reportEndpointSlice(ctx context.Context, c client.Client, endpointSlice *un
return err
}

if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c, workMeta, endpointSlice); err != nil {
klog.Errorf("Failed to create or update work(%s/%s), Error: %v", workMeta.Namespace, workMeta.Name, err)
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (c *EndpointsliceDispatchController) ensureEndpointSliceWork(ctx context.Co
klog.Errorf("Failed to convert typed object to unstructured object, error is: %v", err)
return err
}
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS, nil); err != nil {
if err := helper.CreateOrUpdateWork(ctx, c.Client, workMeta, unstructuredEPS); err != nil {
klog.Errorf("Failed to dispatch EndpointSlice %s/%s from %s to cluster %s:%v",
work.GetNamespace(), work.GetName(), providerCluster, consumerCluster, err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/multiclusterservice/mcs_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (c *MCSController) propagateMultiClusterService(ctx context.Context, mcs *n
klog.Errorf("Failed to convert MultiClusterService(%s/%s) to unstructured object, err is %v", mcs.Namespace, mcs.Name, err)
return err
}
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj, nil); err != nil {
if err = helper.CreateOrUpdateWork(ctx, c, workMeta, mcsObj); err != nil {
klog.Errorf("Failed to create or update MultiClusterService(%s/%s) work in the given member cluster %s, err is %v",
mcs.Namespace, mcs.Name, clusterName, err)
return err
Expand Down
Loading

0 comments on commit 19b32a2

Please sign in to comment.