Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce concurrent reconciles of the same virtual resource #2246

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package syncer
import (
"context"

"github.com/loft-sh/vcluster/pkg/syncer/synccontext"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
)

type enqueueFunc func(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], isDelete, isPendingDelete bool)
type enqueueFunc func(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType)

func newEventHandler(enqueue enqueueFunc) handler.EventHandler {
return &eventHandler{enqueue: enqueue}
Expand All @@ -22,21 +23,21 @@ type eventHandler struct {

// Create is called in response to an create event - e.g. Pod Creation.
func (r *eventHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
r.enqueue(ctx, evt.Object, q, false, false)
r.enqueue(ctx, evt.Object, q, synccontext.SyncEventTypeCreate)
}

// Update is called in response to an update event - e.g. Pod Updated.
func (r *eventHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
r.enqueue(ctx, evt.ObjectNew, q, false, !evt.ObjectNew.GetDeletionTimestamp().IsZero())
r.enqueue(ctx, evt.ObjectNew, q, synccontext.SyncEventTypeUpdate)
}

// Delete is called in response to a delete event - e.g. Pod Deleted.
func (r *eventHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
r.enqueue(ctx, evt.Object, q, true, false)
r.enqueue(ctx, evt.Object, q, synccontext.SyncEventTypeDelete)
}

// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or
// external trigger request - e.g. reconcile Autoscaling, or a Webhook.
func (r *eventHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[ctrl.Request]) {
r.enqueue(ctx, evt.Object, q, false, !evt.Object.GetDeletionTimestamp().IsZero())
r.enqueue(ctx, evt.Object, q, synccontext.SyncEventTypeUnknown)
}
2 changes: 2 additions & 0 deletions pkg/syncer/synccontext/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ type SyncEventType string

const (
SyncEventTypeUnknown SyncEventType = ""
SyncEventTypeCreate SyncEventType = "Create"
SyncEventTypeUpdate SyncEventType = "Update"
SyncEventTypeDelete SyncEventType = "Delete"
SyncEventTypePendingDelete SyncEventType = "PendingDelete"
)
Expand Down
163 changes: 75 additions & 88 deletions pkg/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/loft-sh/vcluster/pkg/config"
Expand All @@ -31,9 +32,7 @@ import (
)

const (
hostObjectRequestPrefix = "host#"
deleteObjectRequestPrefix = "delete#"
pendingDeleteObjectRequestPrefix = "pendingdelete#"
hostObjectRequestPrefix = "host#"
)

func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Syncer) (*SyncController, error) {
Expand Down Expand Up @@ -63,6 +62,8 @@ func NewSyncController(ctx *synccontext.RegisterContext, syncer syncertypes.Sync
options: options,

locker: fifolocker.New(),

lastEvent: make(map[types.UID]synccontext.SyncEventType),
}, nil
}

Expand Down Expand Up @@ -96,13 +97,16 @@ type SyncController struct {
options *syncertypes.Options

locker *fifolocker.Locker

lastEventLock sync.RWMutex
lastEvent map[types.UID]synccontext.SyncEventType
}

func (r *SyncController) newSyncContext(ctx context.Context, logName string) *synccontext.SyncContext {
return &synccontext.SyncContext{
Context: ctx,
Config: r.config,
Log: loghelper.NewFromExisting(r.log.Base(), logName),
Log: loghelper.NewFromContext(ctx).WithName(logName),
PhysicalClient: r.physicalClient,
CurrentNamespace: r.currentNamespace,
CurrentNamespaceClient: r.currentNamespaceClient,
Expand All @@ -112,9 +116,6 @@ func (r *SyncController) newSyncContext(ctx context.Context, logName string) *sy
}

func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_ ctrl.Result, retErr error) {
// extract if this was a delete request
origReq, syncEventType := fromDeleteRequest(origReq)

// determine event source
syncEventSource := synccontext.SyncEventSourceVirtual
if isHostRequest(origReq) {
Expand Down Expand Up @@ -178,10 +179,17 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_
}
}

vUID := extractVirtualUID(vObj, pObj)
syncEventType := r.getSyncEvent(vUID)
if syncEventType == synccontext.SyncEventTypeDelete {
defer r.forgetSyncEvent(vUID)
}

// check what function we should call
if vObj != nil && pObj != nil {
// make sure the object uid matches
pAnnotations := pObj.GetAnnotations()

if !r.options.DisableUIDDeletion && pAnnotations[translate.UIDAnnotation] != "" && pAnnotations[translate.UIDAnnotation] != string(vObj.GetUID()) {
if pAnnotations[translate.KindAnnotation] == "" || pAnnotations[translate.KindAnnotation] == r.syncer.GroupVersionKind().String() {
// requeue if object is already being deleted
Expand Down Expand Up @@ -216,7 +224,7 @@ func (r *SyncController) Reconcile(ctx context.Context, origReq ctrl.Request) (_
}
}

if syncEventSource == synccontext.SyncEventSourceVirtual && syncEventType == synccontext.SyncEventTypePendingDelete {
if syncEventType == synccontext.SyncEventTypePendingDelete {
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -404,38 +412,19 @@ func (r *SyncController) extractRequest(ctx *synccontext.SyncContext, req ctrl.R
return req, pReq, nil
}

func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], isDelete, isPendingDelete bool) {
func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType) {
if obj == nil {
return
}

// add a new request for the host object as otherwise this information might be lost after a delete event
if isDelete {
// add a new request for the virtual object
q.Add(toDeleteRequest(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
},
}))

return
}

// add a new request for the host object as otherwise this information might be lost after update + delete event
isDelete := eventType == synccontext.SyncEventTypeDelete
isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero()
if isPendingDelete {
// add a new request for the virtual object
q.Add(toPendingDeleteRequest(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
},
}))

return
r.setSyncEvent(obj.GetUID(), synccontext.SyncEventTypePendingDelete)
} else {
r.setSyncEvent(obj.GetUID(), eventType)
}

// add a new request for the virtual object
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Expand All @@ -444,11 +433,14 @@ func (r *SyncController) enqueueVirtual(_ context.Context, obj client.Object, q
})
}

func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], isDelete, _ bool) {
func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object, q workqueue.TypedRateLimitingInterface[ctrl.Request], eventType synccontext.SyncEventType) {
if obj == nil {
return
}

isDelete := eventType == synccontext.SyncEventTypeDelete
isPendingDelete := !isDelete && !obj.GetDeletionTimestamp().IsZero()

// sync context
syncContext := r.newSyncContext(ctx, obj.GetName())

Expand All @@ -460,7 +452,7 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
} else if !managed {
// check if we should import
imported := false
if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete {
if importer, ok := r.syncer.(syncertypes.Importer); ok && !isDelete && !isPendingDelete {
imported, err = importer.Import(syncContext, obj)
if err != nil {
klog.Errorf("error importing object %v: %v", obj, err)
Expand All @@ -480,23 +472,18 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
// isDelete = false here to make sure the event is propagated and not missed and the syncer is recreating the
// object correctly as soon as its deleted. However, we don't want it to be a delete event as this will delete
// the virtual object so we need to set that to false here.
isDelete = false
eventType = synccontext.SyncEventTypeUpdate
}

// add a new request for the virtual object as otherwise this information might be lost after a delete event
if isDelete {
// add a new request for the host object
q.Add(toDeleteRequest(toHostRequest(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
},
})))

return
vUID := extractVirtualUID(nil, obj)
if vUID != "" {
if isPendingDelete {
r.setSyncEvent(vUID, synccontext.SyncEventTypePendingDelete)
} else {
r.setSyncEvent(vUID, eventType)
}
}

// add a new request for the host object
q.Add(toHostRequest(reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Expand All @@ -505,6 +492,31 @@ func (r *SyncController) enqueuePhysical(ctx context.Context, obj client.Object,
}))
}

func (r *SyncController) getSyncEvent(uid types.UID) synccontext.SyncEventType {
r.lastEventLock.RLock()
defer r.lastEventLock.RUnlock()

return r.lastEvent[uid]
}

func (r *SyncController) setSyncEvent(uid types.UID, eventType synccontext.SyncEventType) {
r.lastEventLock.Lock()
defer r.lastEventLock.Unlock()

if r.lastEvent[uid] == synccontext.SyncEventTypeDelete {
return
}

r.lastEvent[uid] = eventType
}

func (r *SyncController) forgetSyncEvent(uid types.UID) {
r.lastEventLock.Lock()
defer r.lastEventLock.Unlock()

delete(r.lastEvent, uid)
}

func (r *SyncController) Build(ctx *synccontext.RegisterContext) (controller.Controller, error) {
// build the basic controller
controllerBuilder := ctrl.NewControllerManagedBy(ctx.VirtualManager).
Expand Down Expand Up @@ -619,24 +631,6 @@ func deleteObject(ctx *synccontext.SyncContext, obj client.Object, reason string
return ctrl.Result{}, nil
}

func toDeleteRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: deleteObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toPendingDeleteRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: pendingDeleteObjectRequestPrefix + name.Namespace,
Name: name.Name,
},
}
}

func toHostRequest(name reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Expand All @@ -650,28 +644,6 @@ func isHostRequest(name reconcile.Request) bool {
return strings.HasPrefix(name.Namespace, hostObjectRequestPrefix)
}

func fromDeleteRequest(req reconcile.Request) (reconcile.Request, synccontext.SyncEventType) {
if strings.HasPrefix(req.Namespace, deleteObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, deleteObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypeDelete
}

if strings.HasPrefix(req.Namespace, pendingDeleteObjectRequestPrefix) {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: strings.TrimPrefix(req.Namespace, pendingDeleteObjectRequestPrefix),
Name: req.Name,
},
}, synccontext.SyncEventTypePendingDelete
}

return req, synccontext.SyncEventTypeUnknown
}

func fromHostRequest(req reconcile.Request) reconcile.Request {
return reconcile.Request{
NamespacedName: types.NamespacedName{
Expand All @@ -680,3 +652,18 @@ func fromHostRequest(req reconcile.Request) reconcile.Request {
},
}
}

func extractVirtualUID(vObj, pObj client.Object) types.UID {
if vObj != nil {
return vObj.GetUID()
}

if pObj != nil {
pAnnotations := pObj.GetAnnotations()
if pAnnotations != nil && pAnnotations[translate.UIDAnnotation] != "" {
return types.UID(pAnnotations[translate.UIDAnnotation])
}
}

return ""
}
15 changes: 15 additions & 0 deletions pkg/util/loghelper/loghelper.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package loghelper

import (
"context"
"fmt"

"github.com/go-logr/logr"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -24,6 +26,19 @@ func New(name string) Logger {
ctrl.Log.WithName(name).WithCallDepth(1),
}
}

func NewFromContext(ctx context.Context) Logger {
if ctxLogger, err := logr.FromContext(ctx); err == nil {
return &logger{
ctxLogger,
}
}

return &logger{
klog.Background(),
}
}

func NewFromExisting(log logr.Logger, name string) Logger {
return &logger{
log.WithName(name).WithCallDepth(1),
Expand Down
12 changes: 0 additions & 12 deletions test/conformanceValues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,11 @@ controlPlane:
etcd:
deploy:
enabled: true
statefulSet:
image:
tag: 3.5.14-0
distro:
k8s:
apiServer:
extraArgs:
- --service-account-jwks-uri=https://kubernetes.default.svc.cluster.local/openid/v1/jwks
image:
tag: v1.31.1
controllerManager:
image:
tag: v1.31.1
enabled: true
scheduler:
image:
tag: v1.31.1
statefulSet:
scheduling:
podManagementPolicy: OrderedReady
Expand Down
Loading