Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 placement_controller uses committer #2648

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 28 additions & 35 deletions pkg/reconciler/scheduling/placement/placement_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,18 @@ package placement

import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"

jsonpatch "github.com/evanphx/json-patch"
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
kcpcorev1informers "github.com/kcp-dev/client-go/informers/core/v1"
corev1listers "github.com/kcp-dev/client-go/listers/core/v1"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -43,10 +39,12 @@ import (

schedulingv1alpha1 "github.com/kcp-dev/kcp/pkg/apis/scheduling/v1alpha1"
kcpclientset "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/cluster"
schedulingv1alpha1client "github.com/kcp-dev/kcp/pkg/client/clientset/versioned/typed/scheduling/v1alpha1"
schedulingv1alpha1informers "github.com/kcp-dev/kcp/pkg/client/informers/externalversions/scheduling/v1alpha1"
schedulingv1alpha1listers "github.com/kcp-dev/kcp/pkg/client/listers/scheduling/v1alpha1"
"github.com/kcp-dev/kcp/pkg/indexers"
"github.com/kcp-dev/kcp/pkg/logging"
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
)

const (
Expand Down Expand Up @@ -83,6 +81,8 @@ func NewController(

placementLister: placementInformer.Lister(),
placementIndexer: placementInformer.Informer().GetIndexer(),

commit: committer.NewCommitter[*Placement, Patcher, *PlacementSpec, *PlacementStatus](kcpClusterClient.SchedulingV1alpha1().Placements()),
}

if err := placementInformer.Informer().AddIndexers(cache.Indexers{
Expand Down Expand Up @@ -145,6 +145,13 @@ func NewController(
return c, nil
}

type Placement = schedulingv1alpha1.Placement
type PlacementSpec = schedulingv1alpha1.PlacementSpec
type PlacementStatus = schedulingv1alpha1.PlacementStatus
type Patcher = schedulingv1alpha1client.PlacementInterface
type Resource = committer.Resource[*PlacementSpec, *PlacementStatus]
type CommitFunc = func(context.Context, *Resource, *Resource) error

// controller.
type controller struct {
queue workqueue.RateLimitingInterface
Expand All @@ -159,6 +166,8 @@ type controller struct {

placementLister schedulingv1alpha1listers.PlacementClusterLister
placementIndexer cache.Indexer

commit CommitFunc
}

func (c *controller) enqueuePlacement(obj interface{}) {
Expand Down Expand Up @@ -294,7 +303,7 @@ func (c *controller) process(ctx context.Context, key string) error {

obj, err := c.placementLister.Cluster(clusterName).Get(name)
if err != nil {
if errors.IsNotFound(err) {
if apierrors.IsNotFound(err) {
return nil // object deleted before we handled it
}
return err
Expand All @@ -305,36 +314,20 @@ func (c *controller) process(ctx context.Context, key string) error {
logger = logging.WithObject(logger, obj)
ctx = klog.NewContext(ctx, logger)

reconcileErr := c.reconcile(ctx, obj)

// If the object being reconciled changed as a result, update it.
if !equality.Semantic.DeepEqual(old.Status, obj.Status) {
oldData, err := json.Marshal(schedulingv1alpha1.Placement{
Status: old.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for placement %s|%s: %w", clusterName, name, err)
}
var errs []error
if err := c.reconcile(ctx, obj); err != nil {
errs = append(errs, err)
}

newData, err := json.Marshal(schedulingv1alpha1.Placement{
ObjectMeta: metav1.ObjectMeta{
UID: old.UID,
ResourceVersion: old.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: obj.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for LocationDomain %s|%s: %w", clusterName, name, err)
}
// Regardless of whether reconcile returned an error or not, always try to patch status if needed. Return the
// reconciliation error at the end.

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for LocationDomain %s|%s: %w", clusterName, name, err)
}
logger.V(2).Info("patching placement", "patch", string(patchBytes))
_, uerr := c.kcpClusterClient.Cluster(clusterName.Path()).SchedulingV1alpha1().Placements().Patch(ctx, obj.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return uerr
// If the object being reconciled changed as a result, update it.
oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
newResource := &Resource{ObjectMeta: obj.ObjectMeta, Spec: &obj.Spec, Status: &obj.Status}
if err := c.commit(ctx, oldResource, newResource); err != nil {
errs = append(errs, err)
}

return reconcileErr
return utilerrors.NewAggregate(errs)
}