Skip to content

Commit

Permalink
Optimize expectations for Advanced DaemonSet (#940)
Browse files Browse the repository at this point in the history
Signed-off-by: FillZpp <[email protected]>
  • Loading branch information
FillZpp committed Mar 24, 2022
1 parent ccbdf0d commit ca19925
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 20 deletions.
29 changes: 13 additions & 16 deletions pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,15 +689,12 @@ func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelet

err = dsc.podControl.CreatePods(ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind))

if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return
if err != nil {
if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
}
if err != nil {
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
Expand All @@ -709,12 +706,10 @@ func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelet
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - batchSize
skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
for i := 0; i < skippedPods; i++ {
dsc.expectations.CreationObserved(dsKey)
}
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
Expand All @@ -728,10 +723,12 @@ func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelet
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
if !errors.IsNotFound(err) {
klog.V(2).Infof("Failed deletion, decremented expectations for set %q/%q", ds.Namespace, ds.Name)
errCh <- err
utilruntime.HandleError(err)
}
}
}(i)
}
Expand Down
20 changes: 16 additions & 4 deletions pkg/controller/daemonset/daemonset_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"reflect"
"strings"
"sync"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -40,7 +41,8 @@ var _ handler.EventHandler = &podEventHandler{}

type podEventHandler struct {
client.Reader
expectations kubecontroller.ControllerExpectationsInterface
expectations kubecontroller.ControllerExpectationsInterface
deletionUIDCache sync.Map
}

func enqueueDaemonSet(q workqueue.RateLimitingInterface, ds *appsv1alpha1.DaemonSet) {
Expand Down Expand Up @@ -117,7 +119,7 @@ func (e *podEventHandler) Update(evt event.UpdateEvent, q workqueue.RateLimiting
// and after such time has passed, the kubelet actually deletes it from the store. We receive an update
// for modification of the deletion timestamp and expect an ds to create more replicas asap, not wait
// until the kubelet actually deletes the pod.
e.Delete(event.DeleteEvent{Object: evt.ObjectNew}, q)
e.deletePod(curPod, q, false)
return
}

Expand Down Expand Up @@ -153,7 +155,10 @@ func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting
klog.Errorf("DeleteEvent parse pod failed, DeleteStateUnknown: %#v, obj: %#v", evt.DeleteStateUnknown, evt.Object)
return
}
e.deletePod(pod, q, true)
}

func (e *podEventHandler) deletePod(pod *v1.Pod, q workqueue.RateLimitingInterface, isDeleted bool) {
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil {
// No controller should care about orphans being deleted.
Expand All @@ -164,8 +169,15 @@ func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting
return
}

klog.V(4).Infof("Pod %s/%s deleted, owner: %s", pod.Namespace, pod.Name, ds.Name)
e.expectations.DeletionObserved(keyFunc(ds))
if _, loaded := e.deletionUIDCache.LoadOrStore(pod.UID, struct{}{}); !loaded {
e.expectations.DeletionObserved(keyFunc(ds))
}
if isDeleted {
e.deletionUIDCache.Delete(pod.UID)
klog.V(4).Infof("Pod %s/%s deleted, owner: %s", pod.Namespace, pod.Name, ds.Name)
} else {
klog.V(4).Infof("Pod %s/%s terminating, owner: %s", pod.Namespace, pod.Name, ds.Name)
}
enqueueDaemonSet(q, ds)
}

Expand Down

0 comments on commit ca19925

Please sign in to comment.