Skip to content

Commit

Permalink
workload/namespace: use committer
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Goldstein <[email protected]>
  • Loading branch information
ncdc committed Mar 31, 2023
1 parent 8eacb11 commit 35e42f5
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 375 deletions.
108 changes: 42 additions & 66 deletions pkg/reconciler/workload/namespace/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,31 @@ package namespace

import (
"context"
"encoding/json"
"fmt"
"time"

jsonpatch "github.com/evanphx/json-patch"
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpcorev1informers "github.com/kcp-dev/client-go/informers/core/v1"
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
corev1listers "github.com/kcp-dev/client-go/listers/core/v1"
"github.com/kcp-dev/logicalcluster/v3"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/kube-openapi/pkg/util/sets"

"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexport"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
schedulingv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/scheduling/v1alpha1"
schedulingv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/scheduling/v1alpha1"
)

const (
Expand All @@ -64,21 +60,20 @@ func NewController(

c := &controller{
queue: queue,
enqueueAfter: func(ns *corev1.Namespace, duration time.Duration) {
key, err := kcpcache.MetaClusterNamespaceKeyFunc(ns)
if err != nil {
runtime.HandleError(err)
return
}
queue.AddAfter(key, duration)
},

kubeClusterClient: kubeClusterClient,

namespaceLister: namespaceInformer.Lister(),

placementLister: placementInformer.Lister(),
placementIndexer: placementInformer.Informer().GetIndexer(),
listNamespaces: func(clusterName logicalcluster.Name) ([]*corev1.Namespace, error) {
return namespaceInformer.Cluster(clusterName).Lister().List(labels.Everything())
},
getNamespace: func(clusterName logicalcluster.Name, name string) (*corev1.Namespace, error) {
return namespaceInformer.Cluster(clusterName).Lister().Get(name)
},
listPlacements: func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) {
return placementInformer.Cluster(clusterName).Lister().List(labels.Everything())
},
commit: committer.NewCommitter[*Namespace, Patcher, *NamespaceSpec, *NamespaceStatus](kubeClusterClient.CoreV1().Namespaces()),
now: time.Now,
}

// namespaceBlocklist holds a set of namespaces that should never be synced from kcp to physical clusters.
Expand Down Expand Up @@ -112,17 +107,24 @@ func NewController(

// controller.
type controller struct {
queue workqueue.RateLimitingInterface
enqueueAfter func(*corev1.Namespace, time.Duration)
queue workqueue.RateLimitingInterface

kubeClusterClient kcpkubernetesclientset.ClusterInterface

namespaceLister corev1listers.NamespaceClusterLister

placementLister schedulingv1alpha1listers.PlacementClusterLister
placementIndexer cache.Indexer
listNamespaces func(clusterName logicalcluster.Name) ([]*corev1.Namespace, error)
getNamespace func(clusterName logicalcluster.Name, name string) (*corev1.Namespace, error)
listPlacements func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error)
commit CommitFunc
now func() time.Time
}

type Namespace = corev1.Namespace
type NamespaceSpec = corev1.NamespaceSpec
type NamespaceStatus = corev1.NamespaceStatus
type Patcher = corev1client.NamespaceInterface
type Resource = committer.Resource[*NamespaceSpec, *NamespaceStatus]
type CommitFunc = func(ctx context.Context, original, updated *Resource) error

func (c *controller) enqueueNamespace(obj interface{}) {
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
if err != nil {
Expand All @@ -147,14 +149,14 @@ func (c *controller) enqueuePlacement(obj interface{}) {
return
}

nss, err := c.namespaceLister.Cluster(clusterName).List(labels.Everything())
namespaces, err := c.listNamespaces(clusterName)
if err != nil {
runtime.HandleError(err)
return
}

logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*schedulingv1alpha1.Placement))
for _, ns := range nss {
for _, ns := range namespaces {
logger = logging.WithObject(logger, ns)

nsKey, err := kcpcache.MetaClusterNamespaceKeyFunc(ns)
Expand Down Expand Up @@ -222,55 +224,29 @@ func (c *controller) process(ctx context.Context, key string) error {
return nil
}

obj, err := c.namespaceLister.Cluster(clusterName).Get(name)
ns, err := c.getNamespace(clusterName, name)
if err != nil {
if errors.IsNotFound(err) {
return nil // object deleted before we handled it
}
return err
}
old := obj
obj = obj.DeepCopy()
old := ns
ns = ns.DeepCopy()

logger = logging.WithObject(logger, obj)
logger = logging.WithObject(logger, ns)
ctx = klog.NewContext(ctx, logger)

reconcileErr := c.reconcile(ctx, obj)

// If the object being reconciled changed as a result, update it.
if !equality.Semantic.DeepEqual(old.Status, obj.Status) {
oldData, err := json.Marshal(corev1.Namespace{
Status: old.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for placement %s|%s: %w", clusterName, name, err)
}

newData, err := json.Marshal(corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
UID: old.UID,
ResourceVersion: old.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: obj.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for LocationDomain %s|%s: %w", clusterName, name, err)
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for LocationDomain %s|%s: %w", clusterName, name, err)
}
logger.WithValues("patch", string(patchBytes)).V(2).Info("patching Namespace")
_, uerr := c.kubeClusterClient.Cluster(clusterName.Path()).CoreV1().Namespaces().Patch(ctx, obj.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return uerr
var errs []error
if err := c.reconcile(ctx, key, ns); err != nil {
errs = append(errs, err)
}

return reconcileErr
}
oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
newResource := &Resource{ObjectMeta: ns.ObjectMeta, Spec: &ns.Spec, Status: &ns.Status}
if err := c.commit(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
}

func (c *controller) patchNamespace(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) {
logger := klog.FromContext(ctx)
logger.WithValues("patch", string(data)).V(2).Info("patching Namespace")
return c.kubeClusterClient.Cluster(clusterName).CoreV1().Namespaces().Patch(ctx, name, pt, data, opts, subresources...)
return utilerrors.NewAggregate(errs)
}
61 changes: 19 additions & 42 deletions pkg/reconciler/workload/namespace/namespace_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,60 +20,37 @@ import (
"context"
"time"

"github.com/kcp-dev/logicalcluster/v3"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
utilserrors "k8s.io/apimachinery/pkg/util/errors"

schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
)

type reconcileStatus int

const (
reconcileStatusStop reconcileStatus = iota
reconcileStatusContinue
)

type reconciler interface {
reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error)
type reconcileResult struct {
stop bool
requeueAfter time.Duration
}

func (c *controller) reconcile(ctx context.Context, ns *corev1.Namespace) error {
reconcilers := []reconciler{
&bindNamespaceReconciler{
listPlacement: c.listPlacement,
patchNamespace: c.patchNamespace,
},
&placementSchedulingReconciler{
listPlacement: c.listPlacement,
enqueueAfter: c.enqueueAfter,
patchNamespace: c.patchNamespace,
now: time.Now,
},
&statusConditionReconciler{
patchNamespace: c.patchNamespace,
},
}
type reconcileFunc func(ctx context.Context, key string, ns *corev1.Namespace) (reconcileResult, error)

var errs []error
func (c *controller) reconcile(ctx context.Context, key string, ns *corev1.Namespace) error {
reconcilers := []reconcileFunc{
c.reconcilePlacementBind,
c.reconcileScheduling,
c.reconcileStatus,
}

for _, r := range reconcilers {
var err error
var status reconcileStatus
status, ns, err = r.reconcile(ctx, ns)
result, err := r(ctx, key, ns)
if err != nil {
errs = append(errs, err)
return err
}
if status == reconcileStatusStop {

if result.stop {
break
}
}

return utilserrors.NewAggregate(errs)
}
if result.requeueAfter > 0 {
c.queue.AddAfter(key, result.requeueAfter)
}
}

func (c *controller) listPlacement(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) {
return c.placementLister.Cluster(clusterName).List(labels.Everything())
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,73 +18,54 @@ package namespace

import (
"context"
"encoding/json"

"github.com/kcp-dev/logicalcluster/v3"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
"github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions"
)

// bindNamespaceReconciler updates the existing annotation and creates an empty one if
// at least one placement matches and there is no annotation. It delete the annotation
// reconcilePlacementBind updates the existing scheduling.kcp.io/placement annotation and creates an
// empty one if at least one placement matches and there is no annotation. It deletes the annotation
// if there is no matched placement.
//
// TODO this should be reconsidered when we want lazy binding.
type bindNamespaceReconciler struct {
listPlacement func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error)

patchNamespace func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error)
}

func (r *bindNamespaceReconciler) reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error) {
logger := klog.FromContext(ctx)
func (c *controller) reconcilePlacementBind(
_ context.Context,
_ string,
ns *corev1.Namespace,
) (reconcileResult, error) {
clusterName := logicalcluster.From(ns)

_, foundPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey]

validPlacements, err := r.validPlacements(clusterName, ns)
validPlacements, err := c.validPlacements(clusterName, ns)
if err != nil {
return reconcileStatusContinue, ns, err
return reconcileResult{stop: true}, err
}

expectedAnnotations := map[string]interface{}{} // nil means to remove the key
if len(validPlacements) > 0 && !foundPlacement {
expectedAnnotations[schedulingv1alpha1.PlacementAnnotationKey] = ""
} else if len(validPlacements) == 0 && foundPlacement {
expectedAnnotations[schedulingv1alpha1.PlacementAnnotationKey] = nil
}

if len(expectedAnnotations) == 0 {
return reconcileStatusContinue, ns, nil
}
_, hasPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey]
shouldHavePlacement := len(validPlacements) > 0

patch := map[string]interface{}{}
if err := unstructured.SetNestedField(patch, expectedAnnotations, "metadata", "annotations"); err != nil {
return reconcileStatusStop, ns, err
}

patchBytes, err := json.Marshal(patch)
if err != nil {
return reconcileStatusStop, ns, err
}
logger.WithValues("patch", string(patchBytes)).V(3).Info("patching Namespace to update placement annotation")
updated, err := r.patchNamespace(ctx, clusterName.Path(), ns.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return reconcileStatusStop, ns, err
switch {
case shouldHavePlacement && hasPlacement, !shouldHavePlacement && !hasPlacement:
return reconcileResult{}, nil
case shouldHavePlacement && !hasPlacement:
if ns.Annotations == nil {
ns.Annotations = make(map[string]string)
}
ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] = ""
case !shouldHavePlacement && hasPlacement:
delete(ns.Annotations, schedulingv1alpha1.PlacementAnnotationKey)
}

return reconcileStatusContinue, updated, nil
return reconcileResult{stop: true}, nil
}

func (r *bindNamespaceReconciler) validPlacements(clusterName logicalcluster.Name, ns *corev1.Namespace) ([]*schedulingv1alpha1.Placement, error) {
placements, err := r.listPlacement(clusterName)
func (c *controller) validPlacements(clusterName logicalcluster.Name, ns *corev1.Namespace) ([]*schedulingv1alpha1.Placement, error) {
placements, err := c.listPlacements(clusterName)

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 35e42f5

Please sign in to comment.