Skip to content

Commit

Permalink
fix: fix can not start new pod error
Browse files Browse the repository at this point in the history
cancel sync EMQX config from EMQX API

fix #983

Signed-off-by: Rory Z <[email protected]>
  • Loading branch information
Rory-Z committed Jan 4, 2024
1 parent 7cbdb12 commit 2c6c6b5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 63 deletions.
5 changes: 0 additions & 5 deletions apis/apps/v2beta1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ const (
LabelsPodTemplateHashKey string = "apps.emqx.io/pod-template-hash"
)

const (
// annotations
AnnotationsLastEMQXConfigKey string = "apps.emqx.io/last-emqx-configuration"
)

const (
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-readiness-gate
PodOnServing corev1.PodConditionType = "apps.emqx.io/on-serving"
Expand Down
81 changes: 25 additions & 56 deletions controllers/apps/v2beta1/sync_emqx_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v2beta1

import (
"context"
"fmt"
"net/http"
"strings"

Expand All @@ -10,6 +11,7 @@ import (
innerReq "github.com/emqx/emqx-operator/internal/requester"
"github.com/rory-z/go-hocon"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -19,41 +21,35 @@ type syncConfig struct {
}

func (s *syncConfig) reconcile(ctx context.Context, instance *appsv2beta1.EMQX, r innerReq.RequesterInterface) subResult {
hoconConfig, _ := hocon.ParseString(instance.Spec.Config.Data)
defaultListenerConfig := ""
defaultListenerConfig += fmt.Sprintln("listeners.tcp.default.bind = 1883")
defaultListenerConfig += fmt.Sprintln("listeners.ssl.default.bind = 8883")
defaultListenerConfig += fmt.Sprintln("listeners.ws.default.bind = 8083")
defaultListenerConfig += fmt.Sprintln("listeners.wss.default.bind = 8084")

// If core nodes is nil, the EMQX is in the process of being created
if len(instance.Status.CoreNodes) == 0 {
configMap := generateConfigMap(instance, instance.Spec.Config.Data)
if err := s.Handler.CreateOrUpdateList(instance, s.Scheme, []client.Object{configMap}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update configMap")}
}
return subResult{}
hoconConfig, err := hocon.ParseString(defaultListenerConfig + instance.Spec.Config.Data)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to parse EMQX config")}
}
configMap := generateConfigMap(instance, hoconConfig.String())

lastConfig, ok := instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey]
if !ok {
// If it is the first time to start and Mode = Replace, update the EMQX configuration once.
if instance.Spec.Config.Mode == "Replace" {
// Delete readonly configs
hoconConfigObj := hoconConfig.GetRoot().(hocon.Object)
delete(hoconConfigObj, "node")
delete(hoconConfigObj, "cluster")
delete(hoconConfigObj, "dashboard")

if err := putEMQXConfigsByAPI(r, instance.Spec.Config.Mode, hoconConfigObj.String()); err != nil {
return subResult{err: emperror.Wrap(err, "failed to put emqx config")}
storageConfigMap := &corev1.ConfigMap{}
if err := s.Client.Get(ctx, client.ObjectKeyFromObject(configMap), storageConfigMap); err != nil {
if k8sErrors.IsNotFound(err) {
if err := s.Handler.Create(configMap); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create configMap")}
}
return subResult{}
}
if instance.Annotations == nil {
instance.Annotations = map[string]string{}
}
instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey] = instance.Spec.Config.Data
if err := s.Client.Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update emqx instance")}
}
return subResult{}
return subResult{err: emperror.Wrap(err, "failed to get configMap")}
}
if ok && instance.Spec.Config.Data != lastConfig {

patchResult, _ := s.Patcher.Calculate(
storageConfigMap.DeepCopy(),
configMap.DeepCopy(),
)

if !patchResult.IsEmpty() {
// Delete readonly configs
hoconConfigObj := hoconConfig.GetRoot().(hocon.Object)
delete(hoconConfigObj, "node")
Expand All @@ -64,21 +60,9 @@ func (s *syncConfig) reconcile(ctx context.Context, instance *appsv2beta1.EMQX,
return subResult{err: emperror.Wrap(err, "failed to put emqx config")}
}

instance.Annotations[appsv2beta1.AnnotationsLastEMQXConfigKey] = instance.Spec.Config.Data
if err := s.Client.Update(ctx, instance); err != nil {
return subResult{err: emperror.Wrap(err, "failed to update emqx instance")}
}
return subResult{}
}

config, err := getEMQXConfigsByAPI(r)
if err != nil {
return subResult{err: emperror.Wrap(err, "failed to get emqx config")}
}

configMap := generateConfigMap(instance, config)
if err := s.Handler.CreateOrUpdateList(instance, s.Scheme, []client.Object{configMap}); err != nil {
return subResult{err: emperror.Wrap(err, "failed to create or update configMap")}
}

return subResult{}
Expand All @@ -101,21 +85,6 @@ func generateConfigMap(instance *appsv2beta1.EMQX, data string) *corev1.ConfigMa
}
}

func getEMQXConfigsByAPI(r innerReq.RequesterInterface) (string, error) {
url := r.GetURL("api/v5/configs")

resp, body, err := r.Request("GET", url, nil, http.Header{
"Accept": []string{"text/plain"},
})
if err != nil {
return "", emperror.Wrapf(err, "failed to get API %s", url.String())
}
if resp.StatusCode != 200 {
return "", emperror.Errorf("failed to get API %s, status : %s, body: %s", url.String(), resp.Status, body)
}
return string(body), nil
}

func putEMQXConfigsByAPI(r innerReq.RequesterInterface, mode, config string) error {
url := r.GetURL("api/v5/configs", "mode="+strings.ToLower(mode))

Expand Down
4 changes: 2 additions & 2 deletions deploy/charts/emqx-operator/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 2.2.5
version: 2.2.6

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: 2.2.5
appVersion: 2.2.6

0 comments on commit 2c6c6b5

Please sign in to comment.