diff --git a/apis/apps/v2alpha2/emqx_webhook.go b/apis/apps/v2alpha2/emqx_webhook.go index 52a5cad6d..677702e94 100644 --- a/apis/apps/v2alpha2/emqx_webhook.go +++ b/apis/apps/v2alpha2/emqx_webhook.go @@ -53,6 +53,7 @@ func (r *EMQX) Default() { r.defaultLabels() r.defaultAnnotations() r.defaultConfiguration() + r.defaultListenersServiceTemplate() r.defaultDashboardServiceTemplate() r.defaultContainerPort() } @@ -201,6 +202,13 @@ func (r *EMQX) defaultConfiguration() { } } +func (r *EMQX) defaultListenersServiceTemplate() { + r.Spec.ListenersServiceTemplate.Spec.Selector = r.Spec.CoreTemplate.Labels + if IsExistReplicant(r) { + r.Spec.ListenersServiceTemplate.Spec.Selector = r.Spec.ReplicantTemplate.Labels + } +} + func (r *EMQX) defaultDashboardServiceTemplate() { r.Spec.DashboardServiceTemplate.Spec.Selector = r.Spec.CoreTemplate.Labels dashboardPort, err := GetDashboardServicePort(r.Spec.Config.Data) diff --git a/apis/apps/v2alpha2/emqx_webhook_test.go b/apis/apps/v2alpha2/emqx_webhook_test.go index 28cb58fd7..5c8c42886 100644 --- a/apis/apps/v2alpha2/emqx_webhook_test.go +++ b/apis/apps/v2alpha2/emqx_webhook_test.go @@ -242,6 +242,42 @@ func TestDefaultConfiguration(t *testing.T) { }) } +func TestDefaultListeneresServiceTemplate(t *testing.T) { + t.Run("check selector", func(t *testing.T) { + instance := &EMQX{ + Spec: EMQXSpec{ + CoreTemplate: EMQXCoreTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + DBRoleLabelKey: "core", + }, + }, + }, + ReplicantTemplate: nil, + }, + } + instance.defaultListenersServiceTemplate() + assert.Equal(t, "core", instance.Spec.ListenersServiceTemplate.Spec.Selector[DBRoleLabelKey]) + + instance.Spec.ReplicantTemplate = &EMQXReplicantTemplate{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + DBRoleLabelKey: "replicant", + }, + }, + Spec: EMQXReplicantTemplateSpec{ + Replicas: pointer.Int32(0), + }, + } + instance.defaultListenersServiceTemplate() + assert.Equal(t, "core", instance.Spec.ListenersServiceTemplate.Spec.Selector[DBRoleLabelKey]) + + instance.Spec.ReplicantTemplate.Spec.Replicas = pointer.Int32(1) + instance.defaultListenersServiceTemplate() + assert.Equal(t, "replicant", instance.Spec.ListenersServiceTemplate.Spec.Selector[DBRoleLabelKey]) + }) +} + func TestDefaultDashboardServiceTemplate(t *testing.T) { t.Run("failed to get dashboard listeners", func(t *testing.T) { instance := &EMQX{} diff --git a/apis/apps/v2alpha2/util.go b/apis/apps/v2alpha2/util.go index 0d1d463bd..4be375600 100644 --- a/apis/apps/v2alpha2/util.go +++ b/apis/apps/v2alpha2/util.go @@ -29,6 +29,10 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +func IsExistReplicant(instance *EMQX) bool { + return instance.Spec.ReplicantTemplate != nil && instance.Spec.ReplicantTemplate.Spec.Replicas != nil && *instance.Spec.ReplicantTemplate.Spec.Replicas > 0 +} + // Clones the given selector and returns a new selector with the given key and value added. // Returns the given selector, if labelKey is empty. func CloneSelectorAndAddLabel(selector *metav1.LabelSelector, labelKey, labelValue string) *metav1.LabelSelector { diff --git a/controllers/apps/v2alpha2/add_svc.go b/controllers/apps/v2alpha2/add_svc.go index 1f59e7af8..df62a5a01 100644 --- a/controllers/apps/v2alpha2/add_svc.go +++ b/controllers/apps/v2alpha2/add_svc.go @@ -10,7 +10,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -40,12 +39,6 @@ func (a *addSvc) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _ i listeners := generateListenerService(instance, ports) if listeners != nil { resources = append(resources, listeners) - if instance.Status.IsConditionTrue(appsv2alpha2.CoreNodesReady) { - pods := a.getPodList(ctx, instance) - if len(pods) > 0 { - resources = append(resources, generateEndpoints(listeners, pods)) - } - } } if err := a.CreateOrUpdateList(instance, a.Scheme, resources); err != nil { @@ -55,41 +48,6 @@ func (a *addSvc) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, _ i return subResult{} } -func (a *addSvc) getPodList(ctx context.Context, instance *appsv2alpha2.EMQX) []corev1.Pod { - labels := appsv2alpha2.CloneAndAddLabel( - instance.Spec.CoreTemplate.Labels, - appsv2alpha2.PodTemplateHashLabelKey, - instance.Status.CoreNodesStatus.CurrentRevision, - ) - if isExistReplicant(instance) { - labels = appsv2alpha2.CloneAndAddLabel( - instance.Spec.ReplicantTemplate.Labels, - appsv2alpha2.PodTemplateHashLabelKey, - instance.Status.ReplicantNodesStatus.CurrentRevision, - ) - } - - podList := &corev1.PodList{} - _ = a.Client.List(ctx, podList, - client.InNamespace(instance.Namespace), - client.MatchingLabels(labels), - ) - - list := []corev1.Pod{} - for _, pod := range podList.Items { - for _, condition := range pod.Status.Conditions { - // We also add readiness gate to the pod, so if pod is ready, the EMQX will definitely be in the cluster. - // More info: https://git.k8s.io/enhancements/keps/sig-network/580-pod-readiness-gates - if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue { - if pod.Status.PodIP != "" { - list = append(list, pod) - } - } - } - } - return list -} - func generateHeadlessService(instance *appsv2alpha2.EMQX) *corev1.Service { headlessSvc := &corev1.Service{ TypeMeta: metav1.TypeMeta{ @@ -99,6 +57,7 @@ func generateHeadlessService(instance *appsv2alpha2.EMQX) *corev1.Service { ObjectMeta: metav1.ObjectMeta{ Name: instance.HeadlessServiceNamespacedName().Name, Namespace: instance.Namespace, + Labels: instance.Labels, }, Spec: corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, @@ -143,10 +102,6 @@ func generateDashboardService(instance *appsv2alpha2.EMQX) *corev1.Service { func generateListenerService(instance *appsv2alpha2.EMQX, ports []corev1.ServicePort) *corev1.Service { listener := instance.Spec.ListenersServiceTemplate.DeepCopy() - // We don't need to set the selector for the service - // because the Operator will manager the endpoints - // please check https://kubernetes.io/docs/concepts/services-networking/service/#services-without-selectors - listener.Spec.Selector = nil listener.Spec.Ports = appsv2alpha2.MergeServicePorts( listener.Spec.Ports, ports, @@ -168,42 +123,3 @@ func generateListenerService(instance *appsv2alpha2.EMQX, ports []corev1.Service Spec: listener.Spec, } } - -func generateEndpoints(svc *corev1.Service, pods []corev1.Pod) *corev1.Endpoints { - subSet := corev1.EndpointSubset{} - for _, port := range svc.Spec.Ports { - subSet.Ports = append(subSet.Ports, corev1.EndpointPort{ - Name: port.Name, - Port: port.Port, - Protocol: port.Protocol, - }) - } - for _, p := range pods { - pod := p.DeepCopy() - subSet.Addresses = append(subSet.Addresses, corev1.EndpointAddress{ - IP: pod.Status.PodIP, - NodeName: pointer.String(pod.Spec.NodeName), - TargetRef: &corev1.ObjectReference{ - Kind: "Pod", - Namespace: pod.Namespace, - Name: pod.Name, - UID: pod.UID, - }, - }) - - } - - return &corev1.Endpoints{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Endpoints", - }, - ObjectMeta: metav1.ObjectMeta{ - Namespace: svc.Namespace, - Name: svc.Name, - Annotations: svc.Annotations, - Labels: svc.Labels, - }, - Subsets: []corev1.EndpointSubset{subSet}, - } -} diff --git a/controllers/apps/v2alpha2/status_machine.go b/controllers/apps/v2alpha2/status_machine.go index 3f766623e..cdee8d6b6 100644 --- a/controllers/apps/v2alpha2/status_machine.go +++ b/controllers/apps/v2alpha2/status_machine.go @@ -152,7 +152,7 @@ type codeNodesReadyStatus struct { func (s *codeNodesReadyStatus) nextStatus() { emqx := s.emqxStatusMachine.GetEMQX() - if isExistReplicant(emqx) { + if appsv2alpha2.IsExistReplicant(emqx) { emqx.Status.SetCondition(metav1.Condition{ Type: appsv2alpha2.ReplicantNodesProgressing, Status: metav1.ConditionTrue, @@ -179,7 +179,7 @@ type replicantNodesProgressingStatus struct { func (s *replicantNodesProgressingStatus) nextStatus() { emqx := s.emqxStatusMachine.GetEMQX() - if !isExistReplicant(emqx) { + if !appsv2alpha2.IsExistReplicant(emqx) { s.emqxStatusMachine.initialized.nextStatus() return } @@ -201,7 +201,7 @@ type replicantNodesReadyStatus struct { } func (s *replicantNodesReadyStatus) nextStatus() { - if !isExistReplicant(s.emqxStatusMachine.emqx) { + if !appsv2alpha2.IsExistReplicant(s.emqxStatusMachine.emqx) { s.emqxStatusMachine.initialized.nextStatus() return } @@ -227,7 +227,7 @@ func (s *availableStatus) nextStatus() { return } - if isExistReplicant(emqx) { + if appsv2alpha2.IsExistReplicant(emqx) { if emqx.Status.ReplicantNodesStatus.ReadyReplicas != emqx.Status.ReplicantNodesStatus.Replicas || emqx.Status.ReplicantNodesStatus.UpdateRevision != emqx.Status.ReplicantNodesStatus.CurrentRevision { return diff --git a/controllers/apps/v2alpha2/sync_pods.go b/controllers/apps/v2alpha2/sync_pods.go index b1b49b751..e5678ab51 100644 --- a/controllers/apps/v2alpha2/sync_pods.go +++ b/controllers/apps/v2alpha2/sync_pods.go @@ -29,7 +29,7 @@ func (s *syncPods) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, r updateRs, currentRs, _ := getReplicaSetList(ctx, s.Client, instance) targetedEMQXNodesName := []string{} - if isExistReplicant(instance) { + if appsv2alpha2.IsExistReplicant(instance) { for _, node := range instance.Status.ReplicantNodes { if node.ControllerUID == currentRs.UID { targetedEMQXNodesName = append(targetedEMQXNodesName, node.Node) @@ -135,7 +135,7 @@ func (s *syncPods) canBeScaleDownSts( oldSts *appsv1.StatefulSet, targetedEMQXNodesName []string, ) (bool, error) { - if isExistReplicant(instance) { + if appsv2alpha2.IsExistReplicant(instance) { if instance.Status.ReplicantNodesStatus.CurrentRevision != instance.Status.ReplicantNodesStatus.UpdateRevision { return false, nil } diff --git a/controllers/apps/v2alpha2/update_pod_conditions.go b/controllers/apps/v2alpha2/update_pod_conditions.go index 354472e5f..a297a7cf5 100644 --- a/controllers/apps/v2alpha2/update_pod_conditions.go +++ b/controllers/apps/v2alpha2/update_pod_conditions.go @@ -19,31 +19,37 @@ type updatePodConditions struct { } func (u *updatePodConditions) reconcile(ctx context.Context, instance *appsv2alpha2.EMQX, r innerReq.RequesterInterface) subResult { + updateRs, _, _ := getReplicaSetList(ctx, u.Client, instance) + updateSts, _, _ := getStateFulSetList(ctx, u.Client, instance) + pods := &corev1.PodList{} _ = u.Client.List(ctx, pods, client.InNamespace(instance.Namespace), client.MatchingLabels(instance.Labels), ) - for _, pod := range pods.Items { - hash := make(map[corev1.PodConditionType]int) - - for i, condition := range pod.Status.Conditions { - hash[condition.Type] = i - } - - if index, ok := hash[corev1.ContainersReady]; !ok || pod.Status.Conditions[index].Status != corev1.ConditionTrue { + for _, p := range pods.Items { + pod := p.DeepCopy() + controllerRef := metav1.GetControllerOf(pod) + if controllerRef == nil { continue } onServingCondition := corev1.PodCondition{ Type: appsv2alpha2.PodOnServing, - Status: u.checkInCluster(instance, r, pod.DeepCopy()), + Status: corev1.ConditionFalse, LastProbeTime: metav1.Now(), LastTransitionTime: metav1.Now(), } - if index, ok := hash[appsv2alpha2.PodOnServing]; ok { - onServingCondition.LastTransitionTime = pod.Status.Conditions[index].LastTransitionTime + + if (updateSts != nil && controllerRef.UID == updateSts.UID) || + (updateRs != nil && controllerRef.UID == updateRs.UID) { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.ContainersReady && condition.Status == corev1.ConditionTrue { + onServingCondition.Status = u.checkInCluster(instance, r, pod) + break + } + } } patchBytes, _ := json.Marshal(corev1.Pod{ @@ -51,14 +57,14 @@ func (u *updatePodConditions) reconcile(ctx context.Context, instance *appsv2alp Conditions: []corev1.PodCondition{onServingCondition}, }, }) - _ = u.Client.Status().Patch(ctx, &pod, client.RawPatch(types.StrategicMergePatchType, patchBytes)) + _ = u.Client.Status().Patch(ctx, pod.DeepCopy(), client.RawPatch(types.StrategicMergePatchType, patchBytes)) } return subResult{} } func (u *updatePodConditions) checkInCluster(instance *appsv2alpha2.EMQX, r innerReq.RequesterInterface, pod *corev1.Pod) corev1.ConditionStatus { nodes := instance.Status.CoreNodes - if isExistReplicant(instance) { + if appsv2alpha2.IsExistReplicant(instance) { nodes = append(nodes, instance.Status.ReplicantNodes...) } for _, node := range nodes { diff --git a/controllers/apps/v2alpha2/util.go b/controllers/apps/v2alpha2/util.go index a5d85b2ce..bee22f4a7 100644 --- a/controllers/apps/v2alpha2/util.go +++ b/controllers/apps/v2alpha2/util.go @@ -24,10 +24,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -func isExistReplicant(instance *appsv2alpha2.EMQX) bool { - return instance.Spec.ReplicantTemplate != nil && instance.Spec.ReplicantTemplate.Spec.Replicas != nil && *instance.Spec.ReplicantTemplate.Spec.Replicas > 0 -} - func getRsPodMap(ctx context.Context, k8sClient client.Client, instance *appsv2alpha2.EMQX) map[types.UID][]*corev1.Pod { podList := &corev1.PodList{} _ = k8sClient.List(ctx, podList,