From b56a0d11826ebf9b6055001bbd2242a601fd489f Mon Sep 17 00:00:00 2001 From: Russell Centanni Date: Mon, 28 Oct 2024 18:41:31 -0400 Subject: [PATCH] wip: reduce concurrent reconciles of the same resource, and use last event as additional reconciliation context --- pkg/syncer/synccontext/events.go | 1 - pkg/syncer/syncer.go | 239 ++++++++++--------------------- pkg/util/loghelper/loghelper.go | 15 ++ 3 files changed, 91 insertions(+), 164 deletions(-) diff --git a/pkg/syncer/synccontext/events.go b/pkg/syncer/synccontext/events.go index 204ef38c15..2d985af803 100644 --- a/pkg/syncer/synccontext/events.go +++ b/pkg/syncer/synccontext/events.go @@ -10,7 +10,6 @@ const ( SyncEventTypeDelete SyncEventType = "Delete" SyncEventTypePendingDelete SyncEventType = "PendingDelete" SyncEventTypeUpdate SyncEventType = "Update" - SyncEventTypeImport SyncEventType = "Import" ) type SyncEventSource string diff --git a/pkg/syncer/syncer.go b/pkg/syncer/syncer.go index ddf9bad1ba..4d6d516b10 100644 --- a/pkg/syncer/syncer.go +++ b/pkg/syncer/syncer.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/loft-sh/vcluster/pkg/config" @@ -31,12 +32,7 @@ import ( ) const ( - hostObjectRequestPrefix = "host#" - importObjectRequestPrefix = "import#" - createObjectRequestPrefix = "create#" - updateObjectRequestPrefix = "update#" - deleteObjectRequestPrefix = "delete#" - pendingDeleteObjectRequestPrefix = "pendingdelete#" + hostObjectRequestPrefix = "host#" ) func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Syncer) (*SyncController, error) { @@ -66,6 +62,8 @@ func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Sync options: options, locker: fifolocker.New(), + + lastEvent: make(map[types.UID]synccontext.SyncEventType), }, nil } @@ -99,13 +97,16 @@ type SyncController struct { options *syncertypes.Options locker *fifolocker.Locker + + lastEventLock sync.RWMutex + lastEvent map[types.UID]synccontext.SyncEventType } func (r *SyncController) newSyncContext(ctx context.Context, logName string) *synccontext.SyncContext { return &synccontext.SyncContext{ Context: ctx, Config: r.config, - Log: loghelper.NewFromExisting(r.log.Base(), logName), + Log: loghelper.NewFromContext(ctx).WithName(logName), PhysicalClient: r.physicalClient, CurrentNamespace: r.currentNamespace, CurrentNamespaceClient: r.currentNamespaceClient, @@ -115,9 +116,6 @@ func (r *SyncController) newSyncContext(ctx context.Context, logName string) *sy } func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ ctrl.Result, retErr error) { - // extract event type - origReq, syncEventType := extractSyncEventType(origReq) - // determine event source syncEventSource := synccontext.SyncEventSourceVirtual if isHostRequest(origReq) { @@ -148,10 +146,8 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ // This is FIFO, we use a special mutex for this (fifomu.Mutex) lockKey := vReq.String() r.locker.Lock(lockKey) - klog.FromContext(ctx).Info("!!!lock!!!", "lockKey", lockKey) defer func(lockKey string) { - err = r.locker.Unlock(lockKey) - klog.FromContext(ctx).Info("!!!unlock!!!", "lockKey", lockKey, "err", err) + _ = r.locker.Unlock(lockKey) }(lockKey) // check if we should skip reconcile @@ -183,10 +179,17 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ } } + vUID := extractVirtualUID(vObj, pObj) + syncEventType := r.getSyncEvent(vUID) + if syncEventType == synccontext.SyncEventTypeDelete { + defer r.forgetSyncEvent(vUID) + } + // check what function we should call if vObj != nil && pObj != nil { // make sure the object uid matches pAnnotations := pObj.GetAnnotations() + if !r.options.DisableUIDDeletion && pAnnotations[translate.UIDAnnotation] != "" && pAnnotations[translate.UIDAnnotation] != string(vObj.GetUID()) { if pAnnotations[translate.KindAnnotation] == "" || pAnnotations[translate.KindAnnotation] == r.syncer.GroupVersionKind().String() { // requeue if object is already being deleted @@ -221,15 +224,7 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ } } - _, isImporter := r.syncer.(syncertypes.Importer) - if isImporter && syncEventType == synccontext.SyncEventTypeCreate && syncEventSource == synccontext.SyncEventSourceHost { - // Skip syncing the object to the virtual cluster because the resource was capable of being imported - // but its importer ignored it. - return ctrl.Result{}, nil - } - if syncEventType == synccontext.SyncEventTypePendingDelete { - // Skip syncing the object to virtual cluster because it's about to be deleted return ctrl.Result{}, nil } @@ -422,28 +417,20 @@ func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q return } - request := reconcile.Request{ + isDelete := eventType == synccontext.SyncEventTypeDelete + isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero() + if isPendingDelete { + r.setSyncEvent(obj.GetUID(), synccontext.SyncEventTypePendingDelete) + } else { + r.setSyncEvent(obj.GetUID(), eventType) + } + + q.Add(reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: obj.GetNamespace(), Name: obj.GetName(), }, - } - - switch eventType { - case synccontext.SyncEventTypeCreate: - request = toCreateRequest(request) - case synccontext.SyncEventTypeUpdate: - if !obj.GetDeletionTimestamp().IsZero() { - request = toPendingDeleteRequest(request) - } else { - request = toUpdateRequest(request) - } - case synccontext.SyncEventTypeDelete: - request = toDeleteRequest(request) - default: - } - - q.Add(request) + }) } func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType) { @@ -452,19 +439,20 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, } isDelete := eventType == synccontext.SyncEventTypeDelete + isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero() // sync context syncContext := r.newSyncContext(ctx, obj.GetName()) // we have a physical object here - imported := false managed, err := r.syncer.IsManaged(syncContext, obj) if err != nil { klog.Errorf("error checking object %v if managed: %v", obj, err) return } else if !managed { // check if we should import - if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete { + imported := false + if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete && !isPendingDelete { imported, err = importer.Import(syncContext, obj) if err != nil { klog.Errorf("error importing object %v: %v", obj, err) @@ -484,43 +472,47 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, // isDelete = false here to make sure the event is propagated and not missed and the syncer is recreating the // object correctly as soon as its deleted. However, we don't want it to be a delete event as this will delete // the virtual object so we need to set that to false here. - isDelete = false + eventType = synccontext.SyncEventTypeUpdate } - hostRequest := toHostRequest(reconcile.Request{ + vUID := extractVirtualUID(nil, obj) + if vUID != "" { + if isPendingDelete { + r.setSyncEvent(vUID, synccontext.SyncEventTypePendingDelete) + } else { + r.setSyncEvent(vUID, eventType) + } + } + + q.Add(toHostRequest(reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: obj.GetNamespace(), Name: obj.GetName(), }, - }) + })) +} - switch eventType { - case synccontext.SyncEventTypeCreate: - if imported { - hostRequest = toImportRequest(hostRequest) - } else { - hostRequest = toCreateRequest(hostRequest) - } - case synccontext.SyncEventTypeUpdate: - if !obj.GetDeletionTimestamp().IsZero() { - hostRequest = toPendingDeleteRequest(hostRequest) - } else { - hostRequest = toUpdateRequest(hostRequest) - } - case synccontext.SyncEventTypeDelete: - if !isDelete { - if !obj.GetDeletionTimestamp().IsZero() { - hostRequest = toPendingDeleteRequest(hostRequest) - } else { - hostRequest = toUpdateRequest(hostRequest) - } - } else { - hostRequest = toDeleteRequest(hostRequest) - } - default: +func (r *SyncController) getSyncEvent(uid types.UID) synccontext.SyncEventType { + r.lastEventLock.RLock() + defer r.lastEventLock.RUnlock() + + return r.lastEvent[uid] +} + +func (r *SyncController) setSyncEvent(uid types.UID, eventType synccontext.SyncEventType) { + r.lastEventLock.Lock() + defer r.lastEventLock.Unlock() + + if r.lastEvent[uid] != synccontext.SyncEventTypeDelete { + r.lastEvent[uid] = eventType } +} - q.Add(hostRequest) +func (r *SyncController) forgetSyncEvent(uid types.UID) { + r.lastEventLock.Lock() + defer r.lastEventLock.Unlock() + + delete(r.lastEvent, uid) } func (r *SyncController) Build(ctx *synccontext.RegisterContext) (controller.Controller, error) { @@ -637,51 +629,6 @@ func deleteObject(ctx *synccontext.SyncContext, obj client.Object, reason string return ctrl.Result{}, nil } -func toCreateRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: createObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - -func toUpdateRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: updateObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - -func toDeleteRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: deleteObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - -func toPendingDeleteRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: pendingDeleteObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - -func toImportRequest(name reconcile.Request) reconcile.Request { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: importObjectRequestPrefix + name.Namespace, - Name: name.Name, - }, - } -} - func toHostRequest(name reconcile.Request) reconcile.Request { return reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -695,55 +642,6 @@ func isHostRequest(name reconcile.Request) bool { return strings.HasPrefix(name.Namespace, hostObjectRequestPrefix) } -func extractSyncEventType(req reconcile.Request) (reconcile.Request, synccontext.SyncEventType) { - if strings.HasPrefix(req.Namespace, createObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, createObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypeCreate - } - - if strings.HasPrefix(req.Namespace, updateObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, updateObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypeUpdate - } - - if strings.HasPrefix(req.Namespace, deleteObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, deleteObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypeDelete - } - - if strings.HasPrefix(req.Namespace, pendingDeleteObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, pendingDeleteObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypePendingDelete - } - - if strings.HasPrefix(req.Namespace, importObjectRequestPrefix) { - return reconcile.Request{ - NamespacedName: types.NamespacedName{ - Namespace: strings.TrimPrefix(req.Namespace, importObjectRequestPrefix), - Name: req.Name, - }, - }, synccontext.SyncEventTypeImport - } - - return req, synccontext.SyncEventTypeUnknown -} - func fromHostRequest(req reconcile.Request) reconcile.Request { return reconcile.Request{ NamespacedName: types.NamespacedName{ @@ -752,3 +650,18 @@ func fromHostRequest(req reconcile.Request) reconcile.Request { }, } } + +func extractVirtualUID(vObj, pObj client.Object) types.UID { + if vObj != nil { + return vObj.GetUID() + } + + if pObj != nil { + pAnnotations := pObj.GetAnnotations() + if pAnnotations != nil && pAnnotations[translate.UIDAnnotation] != "" { + return types.UID(pAnnotations[translate.UIDAnnotation]) + } + } + + return "" +} diff --git a/pkg/util/loghelper/loghelper.go b/pkg/util/loghelper/loghelper.go index 2e05c1fd4c..11cef0d705 100644 --- a/pkg/util/loghelper/loghelper.go +++ b/pkg/util/loghelper/loghelper.go @@ -1,9 +1,11 @@ package loghelper import ( + "context" "fmt" "github.com/go-logr/logr" + "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" ) @@ -24,6 +26,19 @@ func New(name string) Logger { ctrl.Log.WithName(name).WithCallDepth(1), } } + +func NewFromContext(ctx context.Context) Logger { + if ctxLogger, err := logr.FromContext(ctx); err == nil { + return &logger{ + ctxLogger, + } + } + + return &logger{ + klog.Background(), + } +} + func NewFromExisting(log logr.Logger, name string) Logger { return &logger{ log.WithName(name).WithCallDepth(1),