diff --git a/controllers/apps/v2beta1/sync_pods.go b/controllers/apps/v2beta1/sync_pods.go index ed7e4dee2..5b90d4643 100644 --- a/controllers/apps/v2beta1/sync_pods.go +++ b/controllers/apps/v2beta1/sync_pods.go @@ -98,6 +98,10 @@ func (s *syncPods) canBeScaleDownRs( return nil, nil } + if len(instance.Status.NodeEvacuationsStatus) > 0 { + return nil, nil + } + oldRsPods := getRsPodMap(ctx, s.Client, instance)[oldRs.UID] sort.Sort(PodsByNameOlder(oldRsPods)) if len(oldRsPods) == 0 { @@ -105,6 +109,13 @@ func (s *syncPods) canBeScaleDownRs( } shouldDeletePod := oldRsPods[0].DeepCopy() + for _, pod := range oldRsPods { + if _, ok := pod.Annotations["controller.kubernetes.io/pod-deletion-cost"]; ok { + shouldDeletePod = pod.DeepCopy() + break + } + } + shouldDeletePodInfo, err := getEMQXNodeInfoByAPI(r, fmt.Sprintf("emqx@%s", shouldDeletePod.Status.PodIP)) if err != nil { return nil, emperror.Wrap(err, "failed to get node info by API") @@ -115,12 +126,10 @@ func (s *syncPods) canBeScaleDownRs( } if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 { - if len(instance.Status.NodeEvacuationsStatus) == 0 { - if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { - return nil, emperror.Wrap(err, "failed to start node evacuation") - } - s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) + if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { + return nil, emperror.Wrap(err, "failed to start node evacuation") } + s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) return nil, nil } @@ -148,6 +157,10 @@ func (s *syncPods) canBeScaleDownSts( return false, nil } + if len(instance.Status.NodeEvacuationsStatus) > 0 { + return false, nil + } + shouldDeletePod := &corev1.Pod{} _ = s.Client.Get(ctx, types.NamespacedName{ Namespace: instance.Namespace, @@ -164,12 +177,10 @@ func (s *syncPods) canBeScaleDownSts( } if shouldDeletePodInfo.Edition == "Enterprise" && shouldDeletePodInfo.Session > 0 { - if len(instance.Status.NodeEvacuationsStatus) == 0 { - if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { - return false, emperror.Wrap(err, "failed to start node evacuation") - } - s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) + if err := startEvacuationByAPI(r, instance, targetedEMQXNodesName, shouldDeletePodInfo.Node); err != nil { + return false, emperror.Wrap(err, "failed to start node evacuation") } + s.EventRecorder.Event(instance, corev1.EventTypeNormal, "NodeEvacuation", fmt.Sprintf("Node %s is being evacuated", shouldDeletePodInfo.Node)) return false, nil } // Open Source or Enterprise with no session diff --git a/controllers/apps/v2beta1/sync_pods_suite_test.go b/controllers/apps/v2beta1/sync_pods_suite_test.go index 28b68cbb8..d742ca5dc 100644 --- a/controllers/apps/v2beta1/sync_pods_suite_test.go +++ b/controllers/apps/v2beta1/sync_pods_suite_test.go @@ -455,6 +455,17 @@ var _ = Describe("check can be scale down", func() { Expect(canBeScaledDown).Should(BeNil()) }) + It("emqx is in node evacuations", func() { + instance.Status.NodeEvacuationsStatus = []appsv2beta1.NodeEvacuationStatus{ + { + State: "fake", + }, + } + canBeScaledDown, err := s.canBeScaleDownRs(ctx, instance, fakeR, oldRs, []string{}) + Expect(err).ShouldNot(HaveOccurred()) + Expect(canBeScaledDown).Should(BeNil()) + }) + It("emqx is enterprise, and node session more than 0", func() { fakeR.ReqFunc = func(method string, url url.URL, body []byte, header http.Header) (resp *http.Response, respBody []byte, err error) { resp = &http.Response{