Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Make node ready only after static pods are registered #2078

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 92 additions & 31 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,14 @@ type Kubelet struct {

// Track node startup latencies
nodeStartupLatencyTracker util.NodeStartupLatencyTracker

// whether initial static pods registration was completed or not
// before node became ready.
initialStaticPodsRegistered bool

// Lock to ensure it is concurrency-safe to access
// initialStaticPodsRegistered field.
initialStaticPodsRegisteredLock sync.Mutex
}

// ListPodStats is delegated to StatsProvider, which implements stats.Provider interface
Expand Down Expand Up @@ -1685,6 +1693,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {

// start syncing lease
go kl.nodeLeaseController.Run(context.Background())

// During node startup, it is possible that the static pod is running but mirror pod was not created because
// either the node was not registered fast enough or node informers were not synced. The mirror pods will be
// created eventually when the static pods are resynced 1-1.5 mins later. However, this also affects node
// readiness latency as we gate node readiness on initial static pods being registered. To improve the node
// readiness latency, we want to ensure mirror pods exists as soon as the node is registered.
go kl.fastStaticPodsRegistration()
}
go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

Expand Down Expand Up @@ -1900,37 +1915,7 @@ func (kl *Kubelet) SyncPod(ctx context.Context, updateType kubetypes.SyncPodType
}

// Create Mirror Pod for Static Pod if it doesn't already exist
if kubetypes.IsStaticPod(pod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(pod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(pod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(pod))
if err := kl.mirrorPodClient.CreateMirrorPod(pod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(pod))
}
}
}
}
kl.ensureMirrorPodExists(pod, mirrorPod)

// Make data directories for the pod
if err := kl.makePodDataDirs(pod); err != nil {
Expand Down Expand Up @@ -3090,3 +3075,79 @@ func (kl *Kubelet) warnCgroupV1Usage() {
}
metrics.CgroupVersion.Set(float64(cgroupVersion))
}

// staticPodRegistration ensures that all static pods are registered to the apiserver
// before node became ready.
func (kl *Kubelet) initialStaticPodsRegistration() error {
kl.initialStaticPodsRegisteredLock.Lock()
defer kl.initialStaticPodsRegisteredLock.Unlock()
// kubelet running in standalone mode does not register static pods to the apiserver.
if kl.kubeClient == nil {
return nil
}
// Check if initial static pod registration has already been completed.
if kl.initialStaticPodsRegistered {
return nil
}
staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for _, mirrorPod := range staticPodToMirrorPodMap {
if mirrorPod == nil {
return fmt.Errorf("not all static pods are registered")
}
}

kl.initialStaticPodsRegistered = true
return nil
}

// Ensure Mirror Pod for Static Pod exists.
func (kl *Kubelet) ensureMirrorPodExists(staticPod, mirrorPod *v1.Pod) {
if kubetypes.IsStaticPod(staticPod) {
deleted := false
if mirrorPod != nil {
if mirrorPod.DeletionTimestamp != nil || !kubepod.IsMirrorPodOf(mirrorPod, staticPod) {
// The mirror pod is semantically different from the static pod. Remove
// it. The mirror pod will get recreated later.
klog.InfoS("Trying to delete pod", "pod", klog.KObj(staticPod), "podUID", mirrorPod.ObjectMeta.UID)
podFullName := kubecontainer.GetPodFullName(staticPod)
var err error
deleted, err = kl.mirrorPodClient.DeleteMirrorPod(podFullName, &mirrorPod.ObjectMeta.UID)
if deleted {
klog.InfoS("Deleted mirror pod because it is outdated", "pod", klog.KObj(mirrorPod))
} else if err != nil {
klog.ErrorS(err, "Failed deleting mirror pod", "pod", klog.KObj(mirrorPod))
}
}
}
if mirrorPod == nil || deleted {
node, err := kl.GetNode()
if err != nil {
klog.V(4).ErrorS(err, "No need to create a mirror pod, since failed to get node info from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else if node.DeletionTimestamp != nil {
klog.V(4).InfoS("No need to create a mirror pod, since node has been removed from the cluster", "node", klog.KRef("", string(kl.nodeName)))
} else {
klog.V(4).InfoS("Creating a mirror pod for static pod", "pod", klog.KObj(staticPod))
if err := kl.mirrorPodClient.CreateMirrorPod(staticPod); err != nil {
klog.ErrorS(err, "Failed creating a mirror pod for", "pod", klog.KObj(staticPod))
}
}
}
}
}

// Ensure Mirror Pod for Static Pod exists as soon as node is registered.
func (kl *Kubelet) fastStaticPodsRegistration() {
for {
_, err := kl.GetNode()
if err == nil {
break
}
klog.V(5).ErrorS(err, "unable to ensure mirror pod because node is not registered yet", "node", klog.KRef("", string(kl.nodeName)))
time.Sleep(time.Second)
}

staticPodToMirrorPodMap := kl.podManager.GetStaticPodToMirrorPodMap()
for staticPod, mirrorPod := range staticPodToMirrorPodMap {
kl.ensureMirrorPodExists(staticPod, mirrorPod)
}
}
3 changes: 2 additions & 1 deletion pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,8 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(context.Context, *v1.Node) er
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors,
kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent, kl.supportLocalStorageCapacityIsolation()),
kl.containerManager.Status, kl.shutdownManager.ShutdownStatus, kl.recordNodeStatusEvent, kl.initialStaticPodsRegistration,
kl.supportLocalStorageCapacityIsolation()),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
Expand Down
59 changes: 59 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3280,3 +3280,62 @@ func TestSyncPodSpans(t *testing.T) {
assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID()))
}
}

func TestInitialStaticPodsRegistration(t *testing.T) {
normalPod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{})
staticPod := podWithUIDNameNsSpec("123456789", "kube-system", "new", v1.PodSpec{})
staticPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
mirrorPod := podWithUIDNameNsSpec("111111", "kube-system", "new", v1.PodSpec{})
mirrorPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "api"
mirrorPod.Annotations[kubetypes.ConfigMirrorAnnotationKey] = "mirror"
tests := []struct {
desc string
pods []*v1.Pod
alreadyRegistered bool
wantErr bool
}{
{
desc: "a normal pod",
pods: []*v1.Pod{normalPod},
},
{
desc: "a static pod",
pods: []*v1.Pod{staticPod},
wantErr: true,
},
{
desc: "a static pod and a corresponding mirror pod",
pods: []*v1.Pod{staticPod, mirrorPod},
},
{
desc: "a normal pod and a static pod",
pods: []*v1.Pod{normalPod, staticPod},
wantErr: true,
},
{
desc: "a normal pod, a static pod and a corresponding mirror pod",
pods: []*v1.Pod{normalPod, staticPod, mirrorPod},
},
{
desc: "static pods already registered and new static pod added",
alreadyRegistered: true,
pods: []*v1.Pod{staticPod},
},
}
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet
kubelet.podManager.SetPods(tc.pods)
kubelet.initialStaticPodsRegistered = tc.alreadyRegistered
err := kubelet.initialStaticPodsRegistration()
if tc.wantErr && err == nil {
t.Fatal("initialStaticPodsRegistration() did not return any error, want error")
}
if !tc.wantErr && err != nil {
t.Fatal("initialStaticPodsRegistration() returned error, want nil")
}
})
}
}
3 changes: 2 additions & 1 deletion pkg/kubelet/nodestatus/setters.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func ReadyCondition(
cmStatusFunc func() cm.Status, // typically Kubelet.containerManager.Status
nodeShutdownManagerErrorsFunc func() error, // typically kubelet.shutdownManager.errors.
recordEventFunc func(eventType, event string), // typically Kubelet.recordNodeStatusEvent
initialStaticPodsRegistrationFunc func() error, // typically Kubelet.initialStaticPodsRegistration
localStorageCapacityIsolation bool,
) Setter {
return func(ctx context.Context, node *v1.Node) error {
Expand All @@ -546,7 +547,7 @@ func ReadyCondition(
Message: "kubelet is posting ready status",
LastHeartbeatTime: currentTime,
}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc()}
errs := []error{runtimeErrorsFunc(), networkErrorsFunc(), storageErrorsFunc(), nodeShutdownManagerErrorsFunc(), initialStaticPodsRegistrationFunc()}
requiredCapacities := []v1.ResourceName{v1.ResourceCPU, v1.ResourceMemory, v1.ResourcePods}
if localStorageCapacityIsolation {
requiredCapacities = append(requiredCapacities, v1.ResourceEphemeralStorage)
Expand Down
12 changes: 11 additions & 1 deletion pkg/kubelet/nodestatus/setters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,7 @@ func TestReadyCondition(t *testing.T) {
storageErrors error
cmStatus cm.Status
nodeShutdownManagerErrors error
initialStaticPodsRegistrationErrors error
expectConditions []v1.NodeCondition
expectEvents []testEvent
disableLocalStorageCapacityIsolation bool
Expand Down Expand Up @@ -1688,6 +1689,12 @@ func TestReadyCondition(t *testing.T) {
disableLocalStorageCapacityIsolation: true,
expectConditions: []v1.NodeCondition{*makeReadyCondition(true, "kubelet is posting ready status", now, now)},
},
{
desc: "new, not ready: initial static pods not registered",
node: withCapacity.DeepCopy(),
initialStaticPodsRegistrationErrors: errors.New("all static pods are not registered"),
expectConditions: []v1.NodeCondition{*makeReadyCondition(false, "all static pods are not registered", now, now)},
},
// the transition tests ensure timestamps are set correctly, no need to test the entire condition matrix in this section
{
desc: "transition to ready",
Expand Down Expand Up @@ -1760,6 +1767,9 @@ func TestReadyCondition(t *testing.T) {
nodeShutdownErrorsFunc := func() error {
return tc.nodeShutdownManagerErrors
}
initialStaticPodsRegistrationErrorsFunc := func() error {
return tc.initialStaticPodsRegistrationErrors
}
events := []testEvent{}
recordEventFunc := func(eventType, event string) {
events = append(events, testEvent{
Expand All @@ -1768,7 +1778,7 @@ func TestReadyCondition(t *testing.T) {
})
}
// construct setter
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, cmStatusFunc, nodeShutdownErrorsFunc, recordEventFunc, !tc.disableLocalStorageCapacityIsolation)
setter := ReadyCondition(nowFunc, runtimeErrorsFunc, networkErrorsFunc, storageErrorsFunc, cmStatusFunc, nodeShutdownErrorsFunc, recordEventFunc, initialStaticPodsRegistrationErrorsFunc, !tc.disableLocalStorageCapacityIsolation)
// call setter on node
if err := setter(ctx, tc.node); err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
18 changes: 18 additions & 0 deletions pkg/kubelet/pod/pod_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ type Manager interface {
// the pod fullnames of any orphaned mirror pods.
GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods []*v1.Pod, orphanedMirrorPodFullnames []string)

// GetStaticPodToMirrorPodMap return a map of static pod to its corresponding
// mirror pods. It is possible that there is no mirror pod for a static pod
// if kubelet is running in standalone mode or is in the process of creating
// the mirror pod and in that case, the mirror pod is nil.
GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod

// SetPods replaces the internal pods with the new pods.
// It is currently only used for testing.
SetPods(pods []*v1.Pod)
Expand Down Expand Up @@ -226,6 +232,18 @@ func (pm *basicManager) GetPodsAndMirrorPods() (allPods []*v1.Pod, allMirrorPods
return allPods, allMirrorPods, orphanedMirrorPodFullnames
}

func (pm *basicManager) GetStaticPodToMirrorPodMap() map[*v1.Pod]*v1.Pod {
pm.lock.RLock()
defer pm.lock.RUnlock()
staticPodsMapToMirrorPods := make(map[*v1.Pod]*v1.Pod)
for _, pod := range podsMapToPods(pm.podByUID) {
if kubetypes.IsStaticPod(pod) {
staticPodsMapToMirrorPods[pod] = pm.mirrorPodByFullName[kubecontainer.GetPodFullName(pod)]
}
}
return staticPodsMapToMirrorPods
}

func (pm *basicManager) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
pm.lock.RLock()
defer pm.lock.RUnlock()
Expand Down
47 changes: 47 additions & 0 deletions pkg/kubelet/pod/testing/mock_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.