Skip to content

Commit

Permalink
wip: reduce concurrent reconciles of the same resource, and use last …
Browse files Browse the repository at this point in the history
…event as additional reconciliation context
  • Loading branch information
lizardruss committed Oct 29, 2024
1 parent b6aad95 commit 66a01af
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 154 deletions.
1 change: 0 additions & 1 deletion pkg/syncer/synccontext/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ const (
SyncEventTypeDelete SyncEventType = "Delete"
SyncEventTypePendingDelete SyncEventType = "PendingDelete"
SyncEventTypeUpdate SyncEventType = "Update"
SyncEventTypeImport SyncEventType = "Import"
)

type SyncEventSource string
Expand Down
224 changes: 71 additions & 153 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/loft-sh/vcluster/pkg/config"
Expand All @@ -31,12 +32,7 @@ import (
)

const (
hostObjectRequestPrefix = "host#"
importObjectRequestPrefix = "import#"
createObjectRequestPrefix = "create#"
updateObjectRequestPrefix = "update#"
deleteObjectRequestPrefix = "delete#"
pendingDeleteObjectRequestPrefix = "pendingdelete#"
hostObjectRequestPrefix = "host#"
)

func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Syncer) (*SyncController, error) {
Expand Down Expand Up @@ -66,6 +62,8 @@ func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Sync
options: options,

locker: fifolocker.New(),

lastEvent: make(map[types.UID]synccontext.SyncEventType),
}, nil
}

Expand Down Expand Up @@ -99,13 +97,16 @@ type SyncController struct {
options *syncertypes.Options

locker *fifolocker.Locker

lastEventLock sync.RWMutex
lastEvent map[types.UID]synccontext.SyncEventType
}

func (r *SyncController) newSyncContext(ctx context.Context, logName string) *synccontext.SyncContext {
return &synccontext.SyncContext{
Context: ctx,
Config: r.config,
Log: loghelper.NewFromExisting(r.log.Base(), logName),
Log: loghelper.NewFromContext(ctx).WithName(logName),
PhysicalClient: r.physicalClient,
CurrentNamespace: r.currentNamespace,
CurrentNamespaceClient: r.currentNamespaceClient,
Expand All @@ -115,9 +116,6 @@ func (r *SyncController) newSyncContext(ctx context.Context, logName string) *sy
}

func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ ctrl.Result, retErr error) {
// extract event type
origReq, syncEventType := extractSyncEventType(origReq)

// determine event source
syncEventSource := synccontext.SyncEventSourceVirtual
if isHostRequest(origReq) {
Expand Down Expand Up @@ -148,10 +146,8 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_
// This is FIFO, we use a special mutex for this (fifomu.Mutex)
lockKey := vReq.String()
r.locker.Lock(lockKey)
klog.FromContext(ctx).Info("!!!lock!!!", "lockKey", lockKey)
defer func(lockKey string) {
err = r.locker.Unlock(lockKey)
klog.FromContext(ctx).Info("!!!unlock!!!", "lockKey", lockKey, "err", err)
_ = r.locker.Unlock(lockKey)
}(lockKey)

// check if we should skip reconcile
Expand Down Expand Up @@ -183,10 +179,17 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_
}
}

vUID := extractVirtualUID(vObj, pObj)
syncEventType := r.getSyncEvent(vUID)
if syncEventType == synccontext.SyncEventTypeDelete {
defer r.forgetSyncEvent(vUID)
}

// check what function we should call
if vObj != nil && pObj != nil {
// make sure the object uid matches
pAnnotations := pObj.GetAnnotations()

if !r.options.DisableUIDDeletion && pAnnotations[translate.UIDAnnotation] != "" && pAnnotations[translate.UIDAnnotation] != string(vObj.GetUID()) {
if pAnnotations[translate.KindAnnotation] == "" || pAnnotations[translate.KindAnnotation] == r.syncer.GroupVersionKind().String() {
// requeue if object is already being deleted
Expand Down Expand Up @@ -214,22 +217,15 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_
Virtual: vObj,
})
} else if pObj != nil {
if pObj.GetAnnotations() != nil {
if shouldSkip, ok := pObj.GetAnnotations()[translate.SkipBackSyncInMultiNamespaceMode]; ok && shouldSkip == "true" {
pAnnotations := pObj.GetAnnotations()
if pAnnotations != nil {
if shouldSkip, ok := pAnnotations[translate.SkipBackSyncInMultiNamespaceMode]; ok && shouldSkip == "true" {
// do not delete
return ctrl.Result{}, nil
}
}

_, isImporter := r.syncer.(syncertypes.Importer)
if isImporter && syncEventType == synccontext.SyncEventTypeCreate && syncEventSource == synccontext.SyncEventSourceHost {
// Skip syncing the object to the virtual cluster because the resource was capable of being imported
// but its importer ignored it.
return ctrl.Result{}, nil
}

if syncEventType == synccontext.SyncEventTypePendingDelete {
// Skip syncing the object to virtual cluster because it's about to be deleted
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -429,18 +425,12 @@ func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q
},
}

switch eventType {
case synccontext.SyncEventTypeCreate:
request = toCreateRequest(request)
case synccontext.SyncEventTypeUpdate:
if !obj.GetDeletionTimestamp().IsZero() {
request = toPendingDeleteRequest(request)
} else {
request = toUpdateRequest(request)
}
case synccontext.SyncEventTypeDelete:
request = toDeleteRequest(request)
default:
isDelete := eventType == synccontext.SyncEventTypeDelete
isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero()
if isPendingDelete {
r.setSyncEvent(obj.GetUID(), synccontext.SyncEventTypePendingDelete)
} else {
r.setSyncEvent(obj.GetUID(), eventType)
}

q.Add(request)
Expand All @@ -452,6 +442,7 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
}

isDelete := eventType == synccontext.SyncEventTypeDelete
isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero()

// sync context
syncContext := r.newSyncContext(ctx, obj.GetName())
Expand All @@ -464,7 +455,7 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
return
} else if !managed {
// check if we should import
if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete {
if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete && !isPendingDelete {
imported, err = importer.Import(syncContext, obj)
if err != nil {
klog.Errorf("error importing object %v: %v", obj, err)
Expand All @@ -484,7 +475,7 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
// isDelete = false here to make sure the event is propagated and not missed and the syncer is recreating the
// object correctly as soon as its deleted. However, we don't want it to be a delete event as this will delete
// the virtual object so we need to set that to false here.
isDelete = false
eventType = synccontext.SyncEventTypeUpdate
}

hostRequest := toHostRequest(reconcile.Request{
Expand All @@ -494,35 +485,41 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
},
})

switch eventType {
case synccontext.SyncEventTypeCreate:
if imported {
hostRequest = toImportRequest(hostRequest)
} else {
hostRequest = toCreateRequest(hostRequest)
}
case synccontext.SyncEventTypeUpdate:
if !obj.GetDeletionTimestamp().IsZero() {
hostRequest = toPendingDeleteRequest(hostRequest)
} else {
hostRequest = toUpdateRequest(hostRequest)
}
case synccontext.SyncEventTypeDelete:
if !isDelete {
if !obj.GetDeletionTimestamp().IsZero() {
hostRequest = toPendingDeleteRequest(hostRequest)
} else {
hostRequest = toUpdateRequest(hostRequest)
}
vUID := extractVirtualUID(nil, obj)
if vUID != "" {
if isPendingDelete {
r.setSyncEvent(vUID, synccontext.SyncEventTypePendingDelete)
} else {
hostRequest = toDeleteRequest(hostRequest)
r.setSyncEvent(vUID, eventType)
}
default:
}

q.Add(hostRequest)
}

func (r *SyncController) getSyncEvent(uid types.UID) synccontext.SyncEventType {
r.lastEventLock.RLock()
defer r.lastEventLock.RUnlock()

return r.lastEvent[uid]
}

func (r *SyncController) setSyncEvent(uid types.UID, eventType synccontext.SyncEventType) {
r.lastEventLock.Lock()
defer r.lastEventLock.Unlock()

if r.lastEvent[uid] != synccontext.SyncEventTypeDelete {
r.lastEvent[uid] = eventType
}
}

func (r *SyncController) forgetSyncEvent(uid types.UID) {
r.lastEventLock.Lock()
defer r.lastEventLock.Unlock()

delete(r.lastEvent, uid)
}

func (r *SyncController) Build(ctx *synccontext.RegisterContext) (controller.Controller, error) {
// build the basic controller
controllerBuilder := ctrl.NewControllerManagedBy(ctx.VirtualManager).
Expand Down Expand Up @@ -637,51 +634,6 @@ func deleteObject(ctx *synccontext.SyncContext, obj client.Object, reason string
return ctrl.Result{}, nil
}

func toCreateRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: createObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toUpdateRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: updateObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toDeleteRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: deleteObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toPendingDeleteRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: pendingDeleteObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toImportRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: importObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toHostRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Expand All @@ -695,55 +647,6 @@ func isHostRequest(name reconcile.Request) bool {
return strings.HasPrefix(name.Namespace, hostObjectRequestPrefix)
}

func extractSyncEventType(req reconcile.Request) (reconcile.Request, synccontext.SyncEventType) {
if strings.HasPrefix(req.Namespace, createObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, createObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypeCreate
}

if strings.HasPrefix(req.Namespace, updateObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, updateObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypeUpdate
}

if strings.HasPrefix(req.Namespace, deleteObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, deleteObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypeDelete
}

if strings.HasPrefix(req.Namespace, pendingDeleteObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, pendingDeleteObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypePendingDelete
}

if strings.HasPrefix(req.Namespace, importObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, importObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypeImport
}

return req, synccontext.SyncEventTypeUnknown
}

func fromHostRequest(req reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Expand All @@ -752,3 +655,18 @@ func fromHostRequest(req reconcile.Request) reconcile.Request {
},
}
}

func extractVirtualUID(vObj, pObj client.Object) types.UID {
if vObj != nil {
return vObj.GetUID()
}

if pObj != nil {
pAnnotations := pObj.GetAnnotations()
if pAnnotations != nil && pAnnotations[translate.UIDAnnotation] != "" {
return types.UID(pAnnotations[translate.UIDAnnotation])
}
}

return ""
}
15 changes: 15 additions & 0 deletions pkg/util/loghelper/loghelper.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package loghelper

import (
"context"
"fmt"

"github.com/go-logr/logr"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -24,6 +26,19 @@ func New(name string) Logger {
ctrl.Log.WithName(name).WithCallDepth(1),
}
}

func NewFromContext(ctx context.Context) Logger {
if ctxLogger, err := logr.FromContext(ctx); err == nil {
return &logger{
ctxLogger,
}
}

return &logger{
klog.Background(),
}
}

func NewFromExisting(log logr.Logger, name string) Logger {
return &logger{
log.WithName(name).WithCallDepth(1),
Expand Down

0 comments on commit 66a01af

Please sign in to comment.