Skip to content

Commit

Permalink
feat(main): using deploy (#34)
Browse files Browse the repository at this point in the history
Signed-off-by: cuisongliu <[email protected]>
  • Loading branch information
cuisongliu authored Oct 10, 2024
1 parent 4ff5ef1 commit 9bc2e7d
Showing 1 changed file with 31 additions and 34 deletions.
65 changes: 31 additions & 34 deletions internal/controller/automq_controller_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controller
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -108,7 +107,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
})
return ctx
}
if err := r.syncControllerSTS(ctx, obj, int32(i)); err != nil {
if err := r.syncControllerDeploy(ctx, obj, int32(i)); err != nil {
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand Down Expand Up @@ -171,18 +170,18 @@ func (r *AutoMQReconciler) controllerVoters(obj *infrav1beta1.AutoMQ) []string {
var voters []string
for i := 0; i < int(obj.Spec.Controller.Replicas); i++ {
index := int32(i)
voters = append(voters, fmt.Sprintf("%d@%s.%s.%s.svc.cluster.local:%d", i, getAutoMQName(controllerRole, &index)+"-"+strconv.Itoa(i), getAutoMQName(controllerRole, &index), obj.Namespace, 9093))
voters = append(voters, fmt.Sprintf("%d@%s.%s.svc.cluster.local:%d", i, getAutoMQName(controllerRole, &index), obj.Namespace, 9093))
}
return voters
}

func (r *AutoMQReconciler) syncControllerSTS(ctx context.Context, obj *infrav1beta1.AutoMQ, index int32) error {
sts := &appsv1.StatefulSet{}
sts.Namespace = obj.Namespace
sts.Name = getAutoMQName(controllerRole, &index)
func (r *AutoMQReconciler) syncControllerDeploy(ctx context.Context, obj *infrav1beta1.AutoMQ, index int32) error {
deploy := &appsv1.Deployment{}
deploy.Namespace = obj.Namespace
deploy.Name = getAutoMQName(controllerRole, &index)
labelMap := getAutoMQLabelMap(obj.GetName(), controllerRole)
labelMap[autoMQIndexKey] = fmt.Sprintf("%d", index)
sts.Spec.Selector = &metav1.LabelSelector{
deploy.Spec.Selector = &metav1.LabelSelector{
MatchLabels: labelMap,
}
sysctl := sysctlContainer()
Expand Down Expand Up @@ -244,21 +243,20 @@ func (r *AutoMQReconciler) syncControllerSTS(ctx context.Context, obj *infrav1be
"--s3.path.style",
fmt.Sprintf("%t", obj.Spec.S3.EnablePathStyle),
}
sts.Spec.ServiceName = sts.Name
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, sts, func() error {
sts.Labels = getAutoMQLabelMap(obj.GetName(), controllerRole)
sts.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, deploy, func() error {
deploy.Labels = getAutoMQLabelMap(obj.GetName(), controllerRole)
deploy.Spec.Strategy = appsv1.DeploymentStrategy{
Type: appsv1.RollingUpdateDeploymentStrategyType,
}
sts.Spec.Template.Labels = labelMap
sts.Spec.Template.Spec.HostNetwork = false
sts.Spec.Template.Spec.TerminationGracePeriodSeconds = aws.Int64(60 * 2)
sts.Spec.Template.Spec.InitContainers = []v1.Container{
deploy.Spec.Template.Labels = labelMap
deploy.Spec.Template.Spec.HostNetwork = false
deploy.Spec.Template.Spec.TerminationGracePeriodSeconds = aws.Int64(60 * 2)
deploy.Spec.Template.Spec.InitContainers = []v1.Container{
sysctl,
}
sts.Spec.Template.Spec.Affinity = obj.Spec.Controller.Affinity.ToK8sAffinity()
sts.Spec.Template.Spec.Volumes = []v1.Volume{
deploy.Spec.Template.Spec.Affinity = obj.Spec.Controller.Affinity.ToK8sAffinity()
deploy.Spec.Template.Spec.Volumes = []v1.Volume{
{
Name: "script",
VolumeSource: v1.VolumeSource{
Expand All @@ -271,22 +269,22 @@ func (r *AutoMQReconciler) syncControllerSTS(ctx context.Context, obj *infrav1be
},
},
{
Name: sts.Name,
Name: deploy.Name,
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: sts.Name,
ClaimName: deploy.Name,
},
},
},
}
sts.Spec.Template.Spec.Containers = []v1.Container{
deploy.Spec.Template.Spec.Containers = []v1.Container{
{
Name: controllerRole,
Image: defaults.DefaultImageName,
Env: envs,
VolumeMounts: []v1.VolumeMount{
{
Name: sts.Name,
Name: deploy.Name,
MountPath: "/data/kafka",
},
{
Expand Down Expand Up @@ -339,41 +337,41 @@ func (r *AutoMQReconciler) syncControllerSTS(ctx context.Context, obj *infrav1be
if !ok {
hash = ""
}
sts.Spec.Template.Annotations = map[string]string{
deploy.Spec.Template.Annotations = map[string]string{
"configmap/script-hash": hash,
}

if obj.Spec.Metrics.Enable {
sts.Spec.Template.Spec.Containers[0].Env = append(sts.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{
deploy.Spec.Template.Spec.Containers[0].Env = append(deploy.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{
Name: "KAFKA_CFG_S3_TELEMETRY_METRICS_EXPORTER_URI",
Value: "prometheus://?host=0.0.0.0&port=9090",
})
sts.Spec.Template.Annotations["prometheus.io/scrape"] = "true"
sts.Spec.Template.Annotations["prometheus.io/port"] = "9090"
sts.Spec.Template.Annotations["prometheus.io/path"] = "/metrics"
deploy.Spec.Template.Annotations["prometheus.io/scrape"] = "true"
deploy.Spec.Template.Annotations["prometheus.io/port"] = "9090"
deploy.Spec.Template.Annotations["prometheus.io/path"] = "/metrics"
}
if r.MountTZ {
sts.Spec.Template.Spec.Volumes = append(sts.Spec.Template.Spec.Volumes, v1.Volume{
deploy.Spec.Template.Spec.Volumes = append(deploy.Spec.Template.Spec.Volumes, v1.Volume{
Name: "k8tz",
VolumeSource: v1.VolumeSource{
HostPath: &v1.HostPathVolumeSource{
Path: "/etc/localtime",
},
},
})
sts.Spec.Template.Spec.Containers[0].VolumeMounts = append(sts.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{
deploy.Spec.Template.Spec.Containers[0].VolumeMounts = append(deploy.Spec.Template.Spec.Containers[0].VolumeMounts, v1.VolumeMount{
Name: "k8tz",
MountPath: "/etc/localtime",
})
}
if obj.Spec.Controller.Resource.Requests != nil {
sts.Spec.Template.Spec.Containers[0].Resources.Requests = obj.Spec.Controller.Resource.Requests
deploy.Spec.Template.Spec.Containers[0].Resources.Requests = obj.Spec.Controller.Resource.Requests
}
if obj.Spec.Controller.Resource.Limits != nil {
sts.Spec.Template.Spec.Containers[0].Resources.Limits = obj.Spec.Controller.Resource.Limits
deploy.Spec.Template.Spec.Containers[0].Resources.Limits = obj.Spec.Controller.Resource.Limits
}
if obj.Spec.Controller.Envs != nil && len(obj.Spec.Controller.Envs) > 0 {
sts.Spec.Template.Spec.Containers[0].Env = append(sts.Spec.Template.Spec.Containers[0].Env, obj.Spec.Controller.Envs...)
deploy.Spec.Template.Spec.Containers[0].Env = append(deploy.Spec.Template.Spec.Containers[0].Env, obj.Spec.Controller.Envs...)
}
return nil
})
Expand All @@ -390,7 +388,6 @@ func (r *AutoMQReconciler) syncControllerService(ctx context.Context, obj *infra
labelMap := getAutoMQLabelMap(obj.GetName(), controllerRole)
labelMap[autoMQIndexKey] = fmt.Sprintf("%d", index)
svc.Spec.Selector = labelMap
svc.Spec.ClusterIP = v1.ClusterIPNone
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
_, err := controllerutil.CreateOrUpdate(ctx, r.Client, svc, func() error {
svc.Labels = labelMap
Expand Down

0 comments on commit 9bc2e7d

Please sign in to comment.