diff --git a/pkg/reconciler/workload/namespace/namespace_controller.go b/pkg/reconciler/workload/namespace/namespace_controller.go index 15c535ba442..dfe61d87620 100644 --- a/pkg/reconciler/workload/namespace/namespace_controller.go +++ b/pkg/reconciler/workload/namespace/namespace_controller.go @@ -18,25 +18,21 @@ package namespace import ( "context" - "encoding/json" "fmt" "time" - jsonpatch "github.com/evanphx/json-patch" kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" kcpcorev1informers "github.com/kcp-dev/client-go/informers/core/v1" kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes" - corev1listers "github.com/kcp-dev/client-go/listers/core/v1" "github.com/kcp-dev/logicalcluster/v3" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" @@ -44,9 +40,9 @@ import ( "github.com/kcp-dev/kcp/pkg/logging" "github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexport" + "github.com/kcp-dev/kcp/pkg/reconciler/committer" schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1" schedulingv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/scheduling/v1alpha1" - schedulingv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/scheduling/v1alpha1" ) const ( @@ -64,21 +60,20 @@ func NewController( c := &controller{ queue: queue, - enqueueAfter: func(ns *corev1.Namespace, duration time.Duration) { - key, err := kcpcache.MetaClusterNamespaceKeyFunc(ns) - if err != nil { - runtime.HandleError(err) - return - } - queue.AddAfter(key, duration) - }, kubeClusterClient: kubeClusterClient, - namespaceLister: namespaceInformer.Lister(), - - placementLister: placementInformer.Lister(), - placementIndexer: placementInformer.Informer().GetIndexer(), + listNamespaces: func(clusterName logicalcluster.Name) ([]*corev1.Namespace, error) { + return namespaceInformer.Cluster(clusterName).Lister().List(labels.Everything()) + }, + getNamespace: func(clusterName logicalcluster.Name, name string) (*corev1.Namespace, error) { + return namespaceInformer.Cluster(clusterName).Lister().Get(name) + }, + listPlacements: func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) { + return placementInformer.Cluster(clusterName).Lister().List(labels.Everything()) + }, + commit: committer.NewCommitter[*Namespace, Patcher, *NamespaceSpec, *NamespaceStatus](kubeClusterClient.CoreV1().Namespaces()), + now: time.Now, } // namespaceBlocklist holds a set of namespaces that should never be synced from kcp to physical clusters. @@ -112,17 +107,24 @@ func NewController( // controller. type controller struct { - queue workqueue.RateLimitingInterface - enqueueAfter func(*corev1.Namespace, time.Duration) + queue workqueue.RateLimitingInterface kubeClusterClient kcpkubernetesclientset.ClusterInterface - namespaceLister corev1listers.NamespaceClusterLister - - placementLister schedulingv1alpha1listers.PlacementClusterLister - placementIndexer cache.Indexer + listNamespaces func(clusterName logicalcluster.Name) ([]*corev1.Namespace, error) + getNamespace func(clusterName logicalcluster.Name, name string) (*corev1.Namespace, error) + listPlacements func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) + commit CommitFunc + now func() time.Time } +type Namespace = corev1.Namespace +type NamespaceSpec = corev1.NamespaceSpec +type NamespaceStatus = corev1.NamespaceStatus +type Patcher = corev1client.NamespaceInterface +type Resource = committer.Resource[*NamespaceSpec, *NamespaceStatus] +type CommitFunc = func(ctx context.Context, original, updated *Resource) error + func (c *controller) enqueueNamespace(obj interface{}) { key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj) if err != nil { @@ -147,14 +149,14 @@ func (c *controller) enqueuePlacement(obj interface{}) { return } - nss, err := c.namespaceLister.Cluster(clusterName).List(labels.Everything()) + namespaces, err := c.listNamespaces(clusterName) if err != nil { runtime.HandleError(err) return } logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*schedulingv1alpha1.Placement)) - for _, ns := range nss { + for _, ns := range namespaces { logger = logging.WithObject(logger, ns) nsKey, err := kcpcache.MetaClusterNamespaceKeyFunc(ns) @@ -222,55 +224,29 @@ func (c *controller) process(ctx context.Context, key string) error { return nil } - obj, err := c.namespaceLister.Cluster(clusterName).Get(name) + ns, err := c.getNamespace(clusterName, name) if err != nil { if errors.IsNotFound(err) { return nil // object deleted before we handled it } return err } - old := obj - obj = obj.DeepCopy() + old := ns + ns = ns.DeepCopy() - logger = logging.WithObject(logger, obj) + logger = logging.WithObject(logger, ns) ctx = klog.NewContext(ctx, logger) - reconcileErr := c.reconcile(ctx, obj) - - // If the object being reconciled changed as a result, update it. - if !equality.Semantic.DeepEqual(old.Status, obj.Status) { - oldData, err := json.Marshal(corev1.Namespace{ - Status: old.Status, - }) - if err != nil { - return fmt.Errorf("failed to Marshal old data for placement %s|%s: %w", clusterName, name, err) - } - - newData, err := json.Marshal(corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - UID: old.UID, - ResourceVersion: old.ResourceVersion, - }, // to ensure they appear in the patch as preconditions - Status: obj.Status, - }) - if err != nil { - return fmt.Errorf("failed to Marshal new data for LocationDomain %s|%s: %w", clusterName, name, err) - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create patch for LocationDomain %s|%s: %w", clusterName, name, err) - } - logger.WithValues("patch", string(patchBytes)).V(2).Info("patching Namespace") - _, uerr := c.kubeClusterClient.Cluster(clusterName.Path()).CoreV1().Namespaces().Patch(ctx, obj.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - return uerr + var errs []error + if err := c.reconcile(ctx, key, ns); err != nil { + errs = append(errs, err) } - return reconcileErr -} + oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status} + newResource := &Resource{ObjectMeta: ns.ObjectMeta, Spec: &ns.Spec, Status: &ns.Status} + if err := c.commit(ctx, oldResource, newResource); err != nil { + errs = append(errs, err) + } -func (c *controller) patchNamespace(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) { - logger := klog.FromContext(ctx) - logger.WithValues("patch", string(data)).V(2).Info("patching Namespace") - return c.kubeClusterClient.Cluster(clusterName).CoreV1().Namespaces().Patch(ctx, name, pt, data, opts, subresources...) + return utilerrors.NewAggregate(errs) } diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile.go b/pkg/reconciler/workload/namespace/namespace_reconcile.go index aeb6d7c859b..44d63cfa6d0 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile.go @@ -20,60 +20,37 @@ import ( "context" "time" - "github.com/kcp-dev/logicalcluster/v3" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" - utilserrors "k8s.io/apimachinery/pkg/util/errors" - - schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1" -) - -type reconcileStatus int - -const ( - reconcileStatusStop reconcileStatus = iota - reconcileStatusContinue ) -type reconciler interface { - reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error) +type reconcileResult struct { + stop bool + requeueAfter time.Duration } -func (c *controller) reconcile(ctx context.Context, ns *corev1.Namespace) error { - reconcilers := []reconciler{ - &bindNamespaceReconciler{ - listPlacement: c.listPlacement, - patchNamespace: c.patchNamespace, - }, - &placementSchedulingReconciler{ - listPlacement: c.listPlacement, - enqueueAfter: c.enqueueAfter, - patchNamespace: c.patchNamespace, - now: time.Now, - }, - &statusConditionReconciler{ - patchNamespace: c.patchNamespace, - }, - } +type reconcileFunc func(ctx context.Context, key string, ns *corev1.Namespace) (reconcileResult, error) - var errs []error +func (c *controller) reconcile(ctx context.Context, key string, ns *corev1.Namespace) error { + reconcilers := []reconcileFunc{ + c.reconcilePlacementBind, + c.reconcileScheduling, + c.reconcileStatus, + } for _, r := range reconcilers { - var err error - var status reconcileStatus - status, ns, err = r.reconcile(ctx, ns) + result, err := r(ctx, key, ns) if err != nil { - errs = append(errs, err) + return err } - if status == reconcileStatusStop { + + if result.stop { break } - } - return utilserrors.NewAggregate(errs) -} + if result.requeueAfter > 0 { + c.queue.AddAfter(key, result.requeueAfter) + } + } -func (c *controller) listPlacement(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) { - return c.placementLister.Cluster(clusterName).List(labels.Everything()) + return nil } diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind.go b/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind.go index b0b58cbba77..d1e945764dd 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind.go @@ -18,73 +18,54 @@ package namespace import ( "context" - "encoding/json" "github.com/kcp-dev/logicalcluster/v3" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1" "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions" ) -// bindNamespaceReconciler updates the existing annotation and creates an empty one if -// at least one placement matches and there is no annotation. It delete the annotation +// reconcilePlacementBind updates the existing scheduling.kcp.io/placement annotation and creates an +// empty one if at least one placement matches and there is no annotation. It deletes the annotation // if there is no matched placement. +// // TODO this should be reconsidered when we want lazy binding. -type bindNamespaceReconciler struct { - listPlacement func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) - - patchNamespace func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) -} - -func (r *bindNamespaceReconciler) reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error) { - logger := klog.FromContext(ctx) +func (c *controller) reconcilePlacementBind( + _ context.Context, + _ string, + ns *corev1.Namespace, +) (reconcileResult, error) { clusterName := logicalcluster.From(ns) - _, foundPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] - - validPlacements, err := r.validPlacements(clusterName, ns) + validPlacements, err := c.validPlacements(clusterName, ns) if err != nil { - return reconcileStatusContinue, ns, err + return reconcileResult{stop: true}, err } - expectedAnnotations := map[string]interface{}{} // nil means to remove the key - if len(validPlacements) > 0 && !foundPlacement { - expectedAnnotations[schedulingv1alpha1.PlacementAnnotationKey] = "" - } else if len(validPlacements) == 0 && foundPlacement { - expectedAnnotations[schedulingv1alpha1.PlacementAnnotationKey] = nil - } - - if len(expectedAnnotations) == 0 { - return reconcileStatusContinue, ns, nil - } + _, hasPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] + shouldHavePlacement := len(validPlacements) > 0 - patch := map[string]interface{}{} - if err := unstructured.SetNestedField(patch, expectedAnnotations, "metadata", "annotations"); err != nil { - return reconcileStatusStop, ns, err - } - - patchBytes, err := json.Marshal(patch) - if err != nil { - return reconcileStatusStop, ns, err - } - logger.WithValues("patch", string(patchBytes)).V(3).Info("patching Namespace to update placement annotation") - updated, err := r.patchNamespace(ctx, clusterName.Path(), ns.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) - if err != nil { - return reconcileStatusStop, ns, err + switch { + case shouldHavePlacement && hasPlacement, !shouldHavePlacement && !hasPlacement: + return reconcileResult{}, nil + case shouldHavePlacement && !hasPlacement: + if ns.Annotations == nil { + ns.Annotations = make(map[string]string) + } + ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] = "" + case !shouldHavePlacement && hasPlacement: + delete(ns.Annotations, schedulingv1alpha1.PlacementAnnotationKey) } - return reconcileStatusContinue, updated, nil + return reconcileResult{stop: true}, nil } -func (r *bindNamespaceReconciler) validPlacements(clusterName logicalcluster.Name, ns *corev1.Namespace) ([]*schedulingv1alpha1.Placement, error) { - placements, err := r.listPlacement(clusterName) +func (c *controller) validPlacements(clusterName logicalcluster.Name, ns *corev1.Namespace) ([]*schedulingv1alpha1.Placement, error) { + placements, err := c.listPlacements(clusterName) if err != nil { return nil, err diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind_test.go b/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind_test.go index f92f6b292d9..dad6cbf2d99 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind_test.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_placementbind_test.go @@ -18,16 +18,13 @@ package namespace import ( "context" - "encoding/json" "testing" - jsonpatch "github.com/evanphx/json-patch" "github.com/kcp-dev/logicalcluster/v3" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" @@ -44,7 +41,7 @@ func TestBindPlacement(t *testing.T) { namespaceSelector *metav1.LabelSelector expectedAnnotation map[string]string - wantPatch bool + expectStop bool }{ { name: "placement is pending", @@ -72,7 +69,7 @@ func TestBindPlacement(t *testing.T) { placementPhase: schedulingv1alpha1.PlacementBound, isReady: true, namespaceSelector: &metav1.LabelSelector{}, - wantPatch: true, + expectStop: true, expectedAnnotation: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -85,7 +82,6 @@ func TestBindPlacement(t *testing.T) { }, isReady: true, namespaceSelector: &metav1.LabelSelector{}, - wantPatch: false, expectedAnnotation: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -98,7 +94,7 @@ func TestBindPlacement(t *testing.T) { }, isReady: false, namespaceSelector: &metav1.LabelSelector{}, - wantPatch: true, + expectStop: true, expectedAnnotation: map[string]string{}, }, } @@ -135,36 +131,15 @@ func TestBindPlacement(t *testing.T) { return []*schedulingv1alpha1.Placement{testPlacement}, nil } - var patched bool - reconciler := &bindNamespaceReconciler{ - listPlacement: listPlacement, - patchNamespace: patchNamespaceFunc(&patched, ns), + c := &controller{ + listPlacements: listPlacement, } - _, updated, err := reconciler.reconcile(context.TODO(), ns) + result, err := c.reconcilePlacementBind(context.Background(), "key", ns) require.NoError(t, err) - require.Equal(t, testCase.wantPatch, patched) - require.Equal(t, testCase.expectedAnnotation, updated.Annotations) + require.Equal(t, testCase.expectedAnnotation, ns.Annotations) + require.Equal(t, testCase.expectStop, result.stop) + require.Zero(t, result.requeueAfter) }) } } - -func patchNamespaceFunc(patched *bool, ns *corev1.Namespace) func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) { - return func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) { - *patched = true - - nsData, _ := json.Marshal(ns) - updatedData, err := jsonpatch.MergePatch(nsData, data) - if err != nil { - return nil, err - } - - var patchedNS corev1.Namespace - err = json.Unmarshal(updatedData, &patchedNS) - if err != nil { - return ns, err - } - - return &patchedNS, nil - } -} diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go index 091417a815c..7604dab0b49 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling.go @@ -18,16 +18,12 @@ package namespace import ( "context" - "encoding/json" "strings" "time" "github.com/kcp-dev/logicalcluster/v3" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" @@ -37,20 +33,14 @@ import ( const removingGracePeriod = 5 * time.Second -// placementSchedulingReconciler reconciles the state.workload.kcp.io/ labels according the +// reconcileScheduling reconciles the state.workload.kcp.io/ labels according the // selected synctarget stored in the internal.workload.kcp.io/synctarget annotation // on each placement. -type placementSchedulingReconciler struct { - listPlacement func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) - - patchNamespace func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) - - enqueueAfter func(*corev1.Namespace, time.Duration) - - now func() time.Time -} - -func (r *placementSchedulingReconciler) reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error) { +func (c *controller) reconcileScheduling( + ctx context.Context, + _ string, + ns *corev1.Namespace, +) (reconcileResult, error) { logger := klog.FromContext(ctx) clusterName := logicalcluster.From(ns) @@ -58,9 +48,9 @@ func (r *placementSchedulingReconciler) reconcile(ctx context.Context, ns *corev _, foundPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] if foundPlacement { - placements, err := r.listPlacement(clusterName) + placements, err := c.listPlacements(clusterName) if err != nil { - return reconcileStatusStop, ns, err + return reconcileResult{}, err } validPlacements = filterValidPlacements(ns, placements) @@ -77,92 +67,73 @@ func (r *placementSchedulingReconciler) reconcile(ctx context.Context, ns *corev } // 2. find the scheduled synctarget to the ns, including synced, removing - synced, removing := syncedRemovingCluster(ns) + syncStatus := syncStatusFor(ns) // 3. if the synced synctarget is not in the scheduled synctargets, mark it as removing. - expectedAnnotations := map[string]interface{}{} // nil means to remove the key - expectedLabels := map[string]interface{}{} // nil means to remove the key + changed := false + annotations := ns.Annotations + labels := ns.Labels - for syncTarget := range synced { + for syncTarget := range syncStatus.active { if !scheduledSyncTargets.Has(syncTarget) { // it is no longer a synced synctarget, mark it as removing. - now := r.now().UTC().Format(time.RFC3339) - expectedAnnotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+syncTarget] = now + now := c.now().UTC().Format(time.RFC3339) + annotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+syncTarget] = now + changed = true logger.WithValues("syncTarget", syncTarget).V(4).Info("setting SyncTarget as removing for Namespace since it is not a valid syncTarget anymore") } } // 4. remove the synctarget after grace period minEnqueueDuration := removingGracePeriod + 1 - for cluster, removingTime := range removing { - if removingTime.Add(removingGracePeriod).Before(r.now()) { - expectedLabels[workloadv1alpha1.ClusterResourceStateLabelPrefix+cluster] = nil - expectedAnnotations[workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+cluster] = nil + for cluster, removingTime := range syncStatus.pendingRemoval { + if removingTime.Add(removingGracePeriod).Before(c.now()) { + delete(labels, workloadv1alpha1.ClusterResourceStateLabelPrefix+cluster) + delete(annotations, workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix+cluster) + changed = true logger.WithValues("syncTarget", cluster).V(4).Info("removing SyncTarget for Namespace") } else { - enqueuDuration := time.Until(removingTime.Add(removingGracePeriod)) - if enqueuDuration < minEnqueueDuration { - minEnqueueDuration = enqueuDuration + enqueueDuration := time.Until(removingTime.Add(removingGracePeriod)) + if enqueueDuration < minEnqueueDuration { + minEnqueueDuration = enqueueDuration } } } // 5. if a scheduled synctarget is not in synced and removing, add it in to the label for scheduledSyncTarget := range scheduledSyncTargets { - if synced.Has(scheduledSyncTarget) { + if syncStatus.active.Has(scheduledSyncTarget) { continue } - if _, ok := removing[scheduledSyncTarget]; ok { + if _, ok := syncStatus.pendingRemoval[scheduledSyncTarget]; ok { continue } - expectedLabels[workloadv1alpha1.ClusterResourceStateLabelPrefix+scheduledSyncTarget] = string(workloadv1alpha1.ResourceStateSync) + if labels == nil { + labels = make(map[string]string) + ns.Labels = labels + } + labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+scheduledSyncTarget] = string(workloadv1alpha1.ResourceStateSync) + changed = true logger.WithValues("syncTarget", scheduledSyncTarget).V(4).Info("setting syncTarget as sync for Namespace") } - if len(expectedLabels) > 0 || len(expectedAnnotations) > 0 { - ns, err := r.patchNamespaceLabelAnnotation(ctx, clusterName.Path(), ns, expectedLabels, expectedAnnotations) - return reconcileStatusContinue, ns, err - } - // 6. Requeue at last to check if removing syncTarget should be removed later. + var requeueAfter time.Duration if minEnqueueDuration <= removingGracePeriod { logger.WithValues("after", minEnqueueDuration).V(2).Info("enqueue Namespace later") - r.enqueueAfter(ns, minEnqueueDuration) + requeueAfter = minEnqueueDuration } - return reconcileStatusContinue, ns, nil + return reconcileResult{stop: changed, requeueAfter: requeueAfter}, nil } -func (r *placementSchedulingReconciler) patchNamespaceLabelAnnotation(ctx context.Context, clusterName logicalcluster.Path, ns *corev1.Namespace, labels, annotations map[string]interface{}) (*corev1.Namespace, error) { - logger := klog.FromContext(ctx) - patch := map[string]interface{}{} - if len(annotations) > 0 { - if err := unstructured.SetNestedField(patch, annotations, "metadata", "annotations"); err != nil { - return ns, err - } - } - if len(labels) > 0 { - if err := unstructured.SetNestedField(patch, labels, "metadata", "labels"); err != nil { - return ns, err - } - } - patchBytes, err := json.Marshal(patch) - if err != nil { - return ns, err +func syncStatusFor(ns *corev1.Namespace) namespaceSyncStatus { + status := namespaceSyncStatus{ + active: sets.NewString(), + pendingRemoval: make(map[string]time.Time), } - logger.WithValues("patch", string(patchBytes)).V(3).Info("patching Namespace to update SyncTarget information") - updated, err := r.patchNamespace(ctx, clusterName, ns.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) - if err != nil { - return ns, err - } - return updated, nil -} -// syncedRemovingCluster finds synced and removing clusters for this ns. -func syncedRemovingCluster(ns *corev1.Namespace) (sets.String, map[string]time.Time) { - synced := sets.NewString() - removing := map[string]time.Time{} for k := range ns.Labels { if !strings.HasPrefix(k, workloadv1alpha1.ClusterResourceStateLabelPrefix) { continue @@ -174,12 +145,17 @@ func syncedRemovingCluster(ns *corev1.Namespace) (sets.String, map[string]time.T if value, ok := ns.Annotations[deletionAnnotationKey]; ok { removingTime, _ := time.Parse(time.RFC3339, value) - removing[syncTarget] = removingTime + status.pendingRemoval[syncTarget] = removingTime continue } - synced.Insert(syncTarget) + status.active.Insert(syncTarget) } - return synced, removing + return status +} + +type namespaceSyncStatus struct { + active sets.String + pendingRemoval map[string]time.Time } diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go index f0c4fcbe65a..c6d5f232918 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_scheduling_test.go @@ -27,6 +27,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/workqueue" schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1" workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1" @@ -45,9 +46,10 @@ func TestScheduling(t *testing.T) { labels map[string]string annotations map[string]string - wantPatch bool + expectStop bool expectedLabels map[string]string expectedAnnotations map[string]string + expectRequeue bool }{ { name: "placement not found", @@ -58,7 +60,7 @@ func TestScheduling(t *testing.T) { schedulingv1alpha1.PlacementAnnotationKey: "", }, noPlacements: true, - wantPatch: true, + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "cluster1": now3339, @@ -72,8 +74,8 @@ func TestScheduling(t *testing.T) { annotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, - placement: newPlacement("test-placement", "test-location", "test-cluster"), - wantPatch: true, + placement: newPlacement("test-placement", "test-location", "test-cluster"), + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -90,7 +92,6 @@ func TestScheduling(t *testing.T) { workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", "test-cluster"), - wantPatch: false, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -106,8 +107,8 @@ func TestScheduling(t *testing.T) { labels: map[string]string{ workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, - placement: newPlacement("test-placement", "test-location", ""), - wantPatch: true, + placement: newPlacement("test-placement", "test-location", ""), + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, @@ -124,8 +125,8 @@ func TestScheduling(t *testing.T) { labels: map[string]string{ workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, - placement: newPlacement("test-placement", "test-location", "test-cluster-2"), - wantPatch: true, + placement: newPlacement("test-placement", "test-location", "test-cluster-2"), + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, @@ -145,7 +146,6 @@ func TestScheduling(t *testing.T) { workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, placement: newPlacement("test-placement", "test-location", "test-cluster"), - wantPatch: false, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": now3339, @@ -153,6 +153,7 @@ func TestScheduling(t *testing.T) { expectedLabels: map[string]string{ workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, + expectRequeue: true, }, { name: "remove clusters which is removing after grace period", @@ -163,8 +164,8 @@ func TestScheduling(t *testing.T) { labels: map[string]string{ workloadv1alpha1.ClusterResourceStateLabelPrefix + "34sZi3721YwBLDHUuNVIOLxuYp5nEZBpsTQyDq": string(workloadv1alpha1.ResourceStateSync), }, - placement: newPlacement("test-placement", "test-location", ""), - wantPatch: true, + placement: newPlacement("test-placement", "test-location", ""), + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -189,19 +190,22 @@ func TestScheduling(t *testing.T) { return []*schedulingv1alpha1.Placement{testCase.placement}, nil } - var patched bool - reconciler := &placementSchedulingReconciler{ - listPlacement: listPlacement, - patchNamespace: patchNamespaceFunc(&patched, ns), - enqueueAfter: func(*corev1.Namespace, time.Duration) {}, + c := &controller{ + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + listPlacements: listPlacement, now: func() time.Time { return now }, } - _, updated, err := reconciler.reconcile(context.TODO(), ns) + result, err := c.reconcileScheduling(context.Background(), "key", ns) require.NoError(t, err) - require.Equal(t, testCase.wantPatch, patched) - require.Equal(t, testCase.expectedAnnotations, updated.Annotations) - require.Equal(t, testCase.expectedLabels, updated.Labels) + require.Equal(t, testCase.expectedAnnotations, ns.Annotations) + require.Equal(t, testCase.expectedLabels, ns.Labels) + require.Equal(t, testCase.expectStop, result.stop) + if testCase.expectRequeue { + require.NotZero(t, result.requeueAfter) + } else { + require.Zero(t, result.requeueAfter) + } }) } } @@ -217,7 +221,7 @@ func TestMultiplePlacements(t *testing.T) { labels map[string]string annotations map[string]string - wantPatch bool + expectStop bool expectedLabels map[string]string expectedAnnotations map[string]string }{ @@ -230,7 +234,7 @@ func TestMultiplePlacements(t *testing.T) { annotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, - wantPatch: true, + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -248,7 +252,7 @@ func TestMultiplePlacements(t *testing.T) { annotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, - wantPatch: true, + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -269,7 +273,6 @@ func TestMultiplePlacements(t *testing.T) { workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), }, - wantPatch: false, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", }, @@ -293,7 +296,7 @@ func TestMultiplePlacements(t *testing.T) { workloadv1alpha1.ClusterResourceStateLabelPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": string(workloadv1alpha1.ResourceStateSync), workloadv1alpha1.ClusterResourceStateLabelPrefix + "aPkhvUbGK0xoZIjMnM2pA0AuV1g7i4tBwxu5m4": string(workloadv1alpha1.ResourceStateSync), }, - wantPatch: true, + expectStop: true, expectedAnnotations: map[string]string{ schedulingv1alpha1.PlacementAnnotationKey: "", workloadv1alpha1.InternalClusterDeletionTimestampAnnotationPrefix + "aQtdeEWVcqU7h7AKnYMm3KRQ96U4oU2W04yeOa": now3339, @@ -322,19 +325,17 @@ func TestMultiplePlacements(t *testing.T) { return testCase.placements, nil } - var patched bool - reconciler := &placementSchedulingReconciler{ - listPlacement: listPlacement, - patchNamespace: patchNamespaceFunc(&patched, ns), - enqueueAfter: func(*corev1.Namespace, time.Duration) {}, + c := &controller{ + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + listPlacements: listPlacement, now: func() time.Time { return now }, } - _, updated, err := reconciler.reconcile(context.TODO(), ns) + result, err := c.reconcileScheduling(context.Background(), "key", ns) require.NoError(t, err) - require.Equal(t, testCase.wantPatch, patched) - require.Equal(t, testCase.expectedAnnotations, updated.Annotations) - require.Equal(t, testCase.expectedLabels, updated.Labels) + require.Equal(t, testCase.expectStop, result.stop) + require.Equal(t, testCase.expectedAnnotations, ns.Annotations) + require.Equal(t, testCase.expectedLabels, ns.Labels) }) } } diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_status.go b/pkg/reconciler/workload/namespace/namespace_reconcile_status.go index 42219742467..21e3f730320 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_status.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_status.go @@ -18,17 +18,8 @@ package namespace import ( "context" - "encoding/json" - "fmt" - - jsonpatch "github.com/evanphx/json-patch" - "github.com/kcp-dev/logicalcluster/v3" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1" conditionsv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/apis/conditions/v1alpha1" @@ -51,59 +42,29 @@ const ( NamespaceReasonPlacementInvalid = "PlacementInvalid" ) -// statusReconciler updates conditions on the namespace. -type statusConditionReconciler struct { - patchNamespace func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) -} - -// ensureScheduledStatus ensures the status of the given namespace reflects the +// reconcileStatus ensures the status of the given namespace reflects the // namespace's scheduled state. -func (r *statusConditionReconciler) reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error) { - logger := klog.FromContext(ctx) - updatedNs := setScheduledCondition(ns) - - if equality.Semantic.DeepEqual(ns.Status, updatedNs.Status) { - return reconcileStatusContinue, ns, nil - } - - patchBytes, err := statusPatchBytes(ns, updatedNs) - if err != nil { - return reconcileStatusStop, ns, err - } - logger.WithValues("patch", string(patchBytes)).V(2).Info("updating status for namespace") - patchedNamespace, err := r.patchNamespace(ctx, logicalcluster.From(ns).Path(), ns.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - if err != nil { - return reconcileStatusStop, ns, fmt.Errorf("failed to patch status on namespace %s|%s: %w", logicalcluster.From(ns), ns.Name, err) - } - - return reconcileStatusContinue, patchedNamespace, nil -} +func (c *controller) reconcileStatus(_ context.Context, _ string, ns *corev1.Namespace) (reconcileResult, error) { + conditionsAdapter := &NamespaceConditionsAdapter{ns} -// statusPatchBytes returns the bytes required to patch status for the provided namespace from its old to new state. -func statusPatchBytes(old, new *corev1.Namespace) ([]byte, error) { - oldData, err := json.Marshal(corev1.Namespace{ - Status: old.Status, - }) - if err != nil { - return nil, fmt.Errorf("failed to marshal existing status for namespace %s|%s: %w", logicalcluster.From(new), new.Name, err) + _, found := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] + if !found { + conditions.MarkFalse(conditionsAdapter, NamespaceScheduled, NamespaceReasonUnschedulable, + conditionsv1alpha1.ConditionSeverityNone, // NamespaceCondition doesn't support severity + "No available placements") + return reconcileResult{}, nil } - newData, err := json.Marshal(corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - UID: new.UID, - ResourceVersion: new.ResourceVersion, - }, // to ensure they appear in the patch as preconditions - Status: new.Status, - }) - if err != nil { - return nil, fmt.Errorf("failed to marshal new status for namespace %s|%s: %w", logicalcluster.From(new), new.Name, err) + syncStatus := syncStatusFor(ns) + if len(syncStatus.active) == 0 { + conditions.MarkFalse(conditionsAdapter, NamespaceScheduled, NamespaceReasonUnschedulable, + conditionsv1alpha1.ConditionSeverityNone, // NamespaceCondition doesn't support severity + "No available sync targets") + return reconcileResult{}, nil } - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return nil, fmt.Errorf("failed to create status patch for namespace %s|%s: %w", logicalcluster.From(new), new.Name, err) - } - return patchBytes, nil + conditions.MarkTrue(conditionsAdapter, NamespaceScheduled) + return reconcileResult{}, nil } // NamespaceConditionsAdapter enables the use of the conditions helper @@ -142,27 +103,3 @@ func (ca *NamespaceConditionsAdapter) SetConditions(conditions conditionsv1alpha } ca.Status.Conditions = nsConditions } - -func setScheduledCondition(ns *corev1.Namespace) *corev1.Namespace { - updatedNs := ns.DeepCopy() - conditionsAdapter := &NamespaceConditionsAdapter{updatedNs} - - _, found := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] - if !found { - conditions.MarkFalse(conditionsAdapter, NamespaceScheduled, NamespaceReasonUnschedulable, - conditionsv1alpha1.ConditionSeverityNone, // NamespaceCondition doesn't support severity - "No available placements") - return updatedNs - } - - synced, _ := syncedRemovingCluster(ns) - if len(synced) == 0 { - conditions.MarkFalse(conditionsAdapter, NamespaceScheduled, NamespaceReasonUnschedulable, - conditionsv1alpha1.ConditionSeverityNone, // NamespaceCondition doesn't support severity - "No available sync targets") - return updatedNs - } - - conditions.MarkTrue(conditionsAdapter, NamespaceScheduled) - return updatedNs -} diff --git a/pkg/reconciler/workload/namespace/namespace_reconcile_status_test.go b/pkg/reconciler/workload/namespace/namespace_reconcile_status_test.go index 7772056f5ad..b06de43996d 100644 --- a/pkg/reconciler/workload/namespace/namespace_reconcile_status_test.go +++ b/pkg/reconciler/workload/namespace/namespace_reconcile_status_test.go @@ -17,6 +17,7 @@ limitations under the License. package namespace import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -64,13 +65,18 @@ func TestSetScheduledCondition(t *testing.T) { Annotations: testCase.annotations, }, } - updatedNs := setScheduledCondition(ns) + + c := &controller{} + result, err := c.reconcileStatus(context.Background(), "key", ns) + require.NoError(t, err) + require.False(t, result.stop) + require.Zero(t, result.requeueAfter) if !testCase.scheduled && testCase.reason == "" { - c := conditions.Get(&NamespaceConditionsAdapter{updatedNs}, NamespaceScheduled) + c := conditions.Get(&NamespaceConditionsAdapter{ns}, NamespaceScheduled) require.Nil(t, c) } else { - c := conditions.Get(&NamespaceConditionsAdapter{updatedNs}, NamespaceScheduled) + c := conditions.Get(&NamespaceConditionsAdapter{ns}, NamespaceScheduled) require.NotNil(t, c) scheduled := c.Status == corev1.ConditionTrue require.Equal(t, testCase.scheduled, scheduled, "unexpected value for scheduled")