From 6960aa800e4dde62859d261601fb5894b42a4d4c Mon Sep 17 00:00:00 2001 From: Russell Centanni Date: Fri, 25 Oct 2024 18:33:53 -0400 Subject: [PATCH] fix: reduce concurrent reconciles of the same resource, and use last event as additional reconciliation context --- pkg/syncer/handler.go | 11 ++- pkg/syncer/synccontext/events.go | 2 + pkg/syncer/syncer.go | 163 ++++++++++++++----------------- pkg/util/loghelper/loghelper.go | 15 +++ test/conformanceValues.yaml | 12 --- 5 files changed, 98 insertions(+), 105 deletions(-) diff --git a/pkg/syncer/handler.go b/pkg/syncer/handler.go index 989df250d2..b4a7dd2fce 100644 --- a/pkg/syncer/handler.go +++ b/pkg/syncer/handler.go @@ -3,6 +3,7 @@ package syncer import ( "context" + "github.com/loft-sh/vcluster/pkg/syncer/synccontext" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -10,7 +11,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/handler" ) -type enqueueFunc func(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], isDelete, isPendingDelete bool) +type enqueueFunc func(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType) func newEventHandler(enqueue enqueueFunc) handler.EventHandler { return &eventHandler{enqueue: enqueue} @@ -22,21 +23,21 @@ type eventHandler struct { // Create is called in response to an create event - e.g. Pod Creation. func (r *eventHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) { - r.enqueue(ctx, evt.Object, q, false, false) + r.enqueue(ctx, evt.Object, q, synccontext.SyncEventTypeCreate) } // Update is called in response to an update event - e.g. Pod Updated. func (r *eventHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) { - r.enqueue(ctx, evt.ObjectNew, q, false, !evt.ObjectNew.GetDeletionTimestamp().IsZero()) + r.enqueue(ctx, evt.ObjectNew, q, synccontext.SyncEventTypeUpdate) } // Delete is called in response to a delete event - e.g. Pod Deleted. func (r *eventHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) { - r.enqueue(ctx, evt.Object, q, true, false) + r.enqueue(ctx, evt.Object, q, synccontext.SyncEventTypeDelete) } // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request - e.g. reconcile Autoscaling, or a Webhook. func (r *eventHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) { - r.enqueue(ctx, evt.Object, q, false, !evt.Object.GetDeletionTimestamp().IsZero()) + r.enqueue(ctx, evt.Object, q, synccontext.SyncEventTypeUnknown) } diff --git a/pkg/syncer/synccontext/events.go b/pkg/syncer/synccontext/events.go index a30da79a26..3e3dc62899 100644 --- a/pkg/syncer/synccontext/events.go +++ b/pkg/syncer/synccontext/events.go @@ -6,6 +6,8 @@ type SyncEventType string const ( SyncEventTypeUnknown SyncEventType = "" + SyncEventTypeCreate SyncEventType = "Create" + SyncEventTypeUpdate SyncEventType = "Update" SyncEventTypeDelete SyncEventType = "Delete" SyncEventTypePendingDelete SyncEventType = "PendingDelete" ) diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index 944114b465..8675ccc8f7 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/loft-sh/vcluster/pkg/config" @@ -31,9 +32,7 @@ import ( ) const ( - hostObjectRequestPrefix = "host#" - deleteObjectRequestPrefix = "delete#" - pendingDeleteObjectRequestPrefix = "pendingdelete#" + hostObjectRequestPrefix = "host#" ) func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Syncer) (*SyncController, error) { @@ -63,6 +62,8 @@ func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Sync options: options, locker: fifolocker.New(), + + lastEvent: make(map[types.UID]synccontext.SyncEventType), }, nil } @@ -96,13 +97,16 @@ type SyncController struct { options *syncertypes.Options locker *fifolocker.Locker + + lastEventLock sync.RWMutex + lastEvent map[types.UID]synccontext.SyncEventType } func (r *SyncController) newSyncContext(ctx context.Context, logName string) *synccontext.SyncContext { return &synccontext.SyncContext{ Context: ctx, Config: r.config, - Log: loghelper.NewFromExisting(r.log.Base(), logName), + Log: loghelper.NewFromContext(ctx).WithName(logName), PhysicalClient: r.physicalClient, CurrentNamespace: r.currentNamespace, CurrentNamespaceClient: r.currentNamespaceClient, @@ -112,9 +116,6 @@ func (r *SyncController) newSyncContext(ctx context.Context, logName string) *sy } func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ ctrl.Result, retErr error) { - // extract if this was a delete request - origReq, syncEventType := fromDeleteRequest(origReq) - // determine event source syncEventSource := synccontext.SyncEventSourceVirtual if isHostRequest(origReq) { @@ -178,10 +179,17 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ } } + vUID := extractVirtualUID(vObj, pObj) + syncEventType := r.getSyncEvent(vUID) + if syncEventType == synccontext.SyncEventTypeDelete { + defer r.forgetSyncEvent(vUID) + } + // check what function we should call if vObj != nil && pObj != nil { // make sure the object uid matches pAnnotations := pObj.GetAnnotations() + if !r.options.DisableUIDDeletion && pAnnotations[translate.UIDAnnotation] != "" && pAnnotations[translate.UIDAnnotation] != string(vObj.GetUID()) { if pAnnotations[translate.KindAnnotation] == "" || pAnnotations[translate.KindAnnotation] == r.syncer.GroupVersionKind().String() { // requeue if object is already being deleted @@ -216,7 +224,7 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ } } - if syncEventSource == synccontext.SyncEventSourceVirtual && syncEventType == synccontext.SyncEventTypePendingDelete { + if syncEventType == synccontext.SyncEventTypePendingDelete { return ctrl.Result{}, nil } @@ -404,38 +412,19 @@ func (r *SyncController) extractRequest(ctx *synccontext.SyncContext, req ctrl.R return req, pReq, nil } -func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], isDelete, isPendingDelete bool) { +func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType) { if obj == nil { return } - // add a new request for the host object as otherwise this information might be lost after a delete event - if isDelete { - // add a new request for the virtual object - q.Add(toDeleteRequest(reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - })) - - return - } - - // add a new request for the host object as otherwise this information might be lost after update + delete event + isDelete := eventType == synccontext.SyncEventTypeDelete + isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero() if isPendingDelete { - // add a new request for the virtual object - q.Add(toPendingDeleteRequest(reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - })) - - return + r.setSyncEvent(obj.GetUID(), synccontext.SyncEventTypePendingDelete) + } else { + r.setSyncEvent(obj.GetUID(), eventType) } - // add a new request for the virtual object q.Add(reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: obj.GetNamespace(), @@ -444,11 +433,14 @@ func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q }) } -func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], isDelete, _ bool) { +func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType) { if obj == nil { return } + isDelete := eventType == synccontext.SyncEventTypeDelete + isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero() + // sync context syncContext := r.newSyncContext(ctx, obj.GetName()) @@ -460,7 +452,7 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, } else if !managed { // check if we should import imported := false - if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete { + if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete && !isPendingDelete { imported, err = importer.Import(syncContext, obj) if err != nil { klog.Errorf("error importing object %v: %v", obj, err) @@ -480,23 +472,18 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, // isDelete = false here to make sure the event is propagated and not missed and the syncer is recreating the // object correctly as soon as its deleted. However, we don't want it to be a delete event as this will delete // the virtual object so we need to set that to false here. - isDelete = false + eventType = synccontext.SyncEventTypeUpdate } - // add a new request for the virtual object as otherwise this information might be lost after a delete event - if isDelete { - // add a new request for the host object - q.Add(toDeleteRequest(toHostRequest(reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), - }, - }))) - - return + vUID := extractVirtualUID(nil, obj) + if vUID != "" { + if isPendingDelete { + r.setSyncEvent(vUID, synccontext.SyncEventTypePendingDelete) + } else { + r.setSyncEvent(vUID, eventType) + } } - // add a new request for the host object q.Add(toHostRequest(reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: obj.GetNamespace(), @@ -505,6 +492,31 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, })) } +func (r *SyncController) getSyncEvent(uid types.UID) synccontext.SyncEventType { + r.lastEventLock.RLock() + defer r.lastEventLock.RUnlock() + + return r.lastEvent[uid] +} + +func (r *SyncController) setSyncEvent(uid types.UID, eventType synccontext.SyncEventType) { + r.lastEventLock.Lock() + defer r.lastEventLock.Unlock() + + if r.lastEvent[uid] == synccontext.SyncEventTypeDelete { + return + } + + r.lastEvent[uid] = eventType +} + +func (r *SyncController) forgetSyncEvent(uid types.UID) { + r.lastEventLock.Lock() + defer r.lastEventLock.Unlock() + + delete(r.lastEvent, uid) +} + func (r *SyncController) Build(ctx *synccontext.RegisterContext) (controller.Controller, error) { // build the basic controller controllerBuilder := ctrl.NewControllerManagedBy(ctx.VirtualManager). @@ -619,24 +631,6 @@ func deleteObject(ctx *synccontext.SyncContext, obj client.Object, reason string return ctrl.Result{}, nil } -func toDeleteRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: deleteObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - -func toPendingDeleteRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: pendingDeleteObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - func toHostRequest(name reconcile.Request) reconcile.Request { return reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -650,28 +644,6 @@ func isHostRequest(name reconcile.Request) bool { return strings.HasPrefix(name.Namespace, hostObjectRequestPrefix) } -func fromDeleteRequest(req reconcile.Request) (reconcile.Request, synccontext.SyncEventType) { - if strings.HasPrefix(req.Namespace, deleteObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, deleteObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypeDelete - } - - if strings.HasPrefix(req.Namespace, pendingDeleteObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, pendingDeleteObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypePendingDelete - } - - return req, synccontext.SyncEventTypeUnknown -} - func fromHostRequest(req reconcile.Request) reconcile.Request { return reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -680,3 +652,18 @@ func fromHostRequest(req reconcile.Request) reconcile.Request { }, } } + +func extractVirtualUID(vObj, pObj client.Object) types.UID { + if vObj != nil { + return vObj.GetUID() + } + + if pObj != nil { + pAnnotations := pObj.GetAnnotations() + if pAnnotations != nil && pAnnotations[translate.UIDAnnotation] != "" { + return types.UID(pAnnotations[translate.UIDAnnotation]) + } + } + + return "" +} diff --git a/pkg/util/loghelper/loghelper.go b/pkg/util/loghelper/loghelper.go index 2e05c1fd4c..11cef0d705 100644 --- a/pkg/util/loghelper/loghelper.go +++ b/pkg/util/loghelper/loghelper.go @@ -1,9 +1,11 @@ package loghelper import ( + "context" "fmt" "github.com/go-logr/logr" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" ) @@ -24,6 +26,19 @@ func New(name string) Logger { ctrl.Log.WithName(name).WithCallDepth(1), } } + +func NewFromContext(ctx context.Context) Logger { + if ctxLogger, err := logr.FromContext(ctx); err == nil { + return &logger{ + ctxLogger, + } + } + + return &logger{ + klog.Background(), + } +} + func NewFromExisting(log logr.Logger, name string) Logger { return &logger{ log.WithName(name).WithCallDepth(1), diff --git a/test/conformanceValues.yaml b/test/conformanceValues.yaml index ee51fe8c9b..4edb1f3813 100644 --- a/test/conformanceValues.yaml +++ b/test/conformanceValues.yaml @@ -6,23 +6,11 @@ controlPlane: etcd: deploy: enabled: true - statefulSet: - image: - tag: 3.5.14-0 distro: k8s: apiServer: extraArgs: - --service-account-jwks-uri=https://kubernetes.default.svc.cluster.local/openid/v1/jwks - image: - tag: v1.31.1 - controllerManager: - image: - tag: v1.31.1 - enabled: true - scheduler: - image: - tag: v1.31.1 statefulSet: scheduling: podManagementPolicy: OrderedReady