Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(main): add test for automq #58

Merged
merged 1 commit into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ jobs:
sudo sealos run labring/minio:RELEASE.2024-01-11T07-46-16Z labring/kube-prometheus-stack:v0.63.0
sudo sealos run labring/kafka-ui:v0.7.1
sleep 10
sudo kubectl get pods -A --show-labels
sudo kubectl get pods -A
sudo kubectl get svc -A
- name: build
run: |
sudo make e2e
Expand Down
140 changes: 130 additions & 10 deletions e2e/automq_cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ package e2e

import (
"context"
"fmt"
"os"
"time"

"github.com/cuisongliu/automq-operator/internal/controller"
v2 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

infrav1beta1 "github.com/cuisongliu/automq-operator/api/v1beta1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
Expand All @@ -33,7 +39,7 @@ import (
var _ = Describe("automq_controller", func() {
Context("automq_controller cr tests", func() {
ctx := context.Background()
namespaceName := "automq-operator"
namespaceName := "automq-cr"
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespaceName,
Expand All @@ -44,22 +50,23 @@ var _ = Describe("automq_controller", func() {
automq.Name = "automq-s1"
automq.Namespace = namespaceName
automq.Spec.ClusterID = "rZdE0DjZSrqy96PXrMUZVw"

BeforeEach(func() {
It("create cr namespace", func() {
By("Creating the Namespace to perform the tests")
err := k8sClient.Create(ctx, namespace)
Expect(err).To(Not(HaveOccurred()))
By("Setting the NAMESPACE_NAME ENV VAR which stores the Operand image")
err = os.Setenv("NAMESPACE_NAME", namespaceName)
Expect(err).To(Not(HaveOccurred()))
})
It("Update Endpoint", func() {
It("create cr", func() {
By("get minio ip and port")
minioService := &v1.Service{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "minio", Name: "minio"}, minioService)
Expect(err).To(Not(HaveOccurred()))
ip := minioService.Spec.ClusterIP
By("creating the custom resource for the automq")
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
err = k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
if err != nil && errors.IsNotFound(err) {
// Let's mock our custom resource at the same way that we would
// apply on the cluster the manifest under config/samples
automq.Spec.S3.Endpoint = "http://minio.minio.svc.cluster.local:9000"
automq.Spec.S3.Endpoint = fmt.Sprintf("http://%s:9000", ip)
automq.Spec.S3.Bucket = "ko3"
automq.Spec.S3.AccessKeyID = "admin"
automq.Spec.S3.SecretAccessKey = "minio123"
Expand All @@ -72,7 +79,120 @@ var _ = Describe("automq_controller", func() {
Expect(err).To(Not(HaveOccurred()))
}
})
AfterEach(func() {
It("should successfully reconcile the resource", func() {
By("Reconciling the created resource")
controllerReconciler := &controller.AutoMQReconciler{
Client: k8sClient,
Scheme: k8sClient.Scheme(),
Finalizer: "apps.cuisongliu.com/automq.finalizer",
MountTZ: true,
}
_, err := controllerReconciler.Reconcile(ctx, reconcile.Request{
NamespacedName: client.ObjectKeyFromObject(automq),
})
Expect(err).NotTo(HaveOccurred())
})
It("get automq deployment", func() {
ctx := context.Background()
Eventually(func() error {
deployment := &v2.DeploymentList{}
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name}).AsSelector()
err := k8sClient.List(ctx, deployment, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
if err != nil {
return err
}
if len(deployment.Items) != 4 {
return fmt.Errorf("expected 4 deploy, found %d", len(deployment.Items))
}
for i, deploy := range deployment.Items {
if deploy.Status.ReadyReplicas != 1 {
return fmt.Errorf("expected deploy %d ready replicas to be 1, got '%d'", i, deploy.Status.ReadyReplicas)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})
It("check controller status", func() {
ctx := context.Background()
Eventually(func() error {
podList := &v1.PodList{}
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "controller"}).AsSelector()
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
if err != nil {
return err
}
if len(podList.Items) != 1 {
return fmt.Errorf("expected 3 pod, found %d", len(podList.Items))
}
for i, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})

It("check broker status", func() {
ctx := context.Background()
Eventually(func() error {
podList := &v1.PodList{}
labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "broker"}).AsSelector()
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector})
if err != nil {
return err
}
if len(podList.Items) != 3 {
return fmt.Errorf("expected 1 pod, found %d", len(podList.Items))
}
for i, pod := range podList.Items {
if pod.Status.Phase != v1.PodRunning {
return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})
It("check automq status", func() {
ctx := context.Background()
Eventually(func() error {
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq)
if err != nil {
return err
}
if automq.Status.Phase != infrav1beta1.AutoMQReady {
return fmt.Errorf("expected automq phase to be 'Ready', got '%s'", automq.Status.Phase)
}
if automq.Status.ControllerReplicas != automq.Spec.Controller.Replicas {
return fmt.Errorf("expected automq controller replicas to be %d, got '%d'", automq.Spec.Controller.Replicas, automq.Status.ControllerReplicas)
}
if automq.Status.BrokerReplicas != automq.Spec.Broker.Replicas {
return fmt.Errorf("expected automq broker replicas to be %d, got '%d'", automq.Spec.Broker.Replicas, automq.Status.BrokerReplicas)
}
showReadyPods := automq.Spec.Controller.Replicas + automq.Spec.Broker.Replicas
if automq.Status.ReadyPods != showReadyPods {
return fmt.Errorf("expected automq ready pods to be %d, got '%d'", showReadyPods, automq.Status.ReadyPods)
}
if len(automq.Status.ControllerAddresses) != int(automq.Spec.Controller.Replicas) {
return fmt.Errorf("expected automq controller addresses to have %d elements, got '%d'", automq.Spec.Controller.Replicas, len(automq.Status.ControllerAddresses))
}
if automq.Status.BootstrapInternalAddress == "" {
return fmt.Errorf("expected automq bootstrap internal address to be set")
}
bootstrapService := fmt.Sprintf("%s.%s.svc:%d", "automq-"+"broker-bootstrap", automq.Namespace, 9092)
if automq.Status.BootstrapInternalAddress != bootstrapService {
return fmt.Errorf("expected automq bootstrap internal address to be '%s', got '%s'", bootstrapService, automq.Status.BootstrapInternalAddress)
}
for i, address := range automq.Status.ControllerAddresses {
controllerService := fmt.Sprintf("%d@%s.%s.svc:%d", i, "automq-controller-"+fmt.Sprintf("%d", i), automq.Namespace, 9093)
if address != controllerService {
return fmt.Errorf("expected automq controller address %d to be '%s', got '%s'", i, controllerService, address)
}
}
return nil
}, "60s", "1s").Should(Succeed())
})
It("clean automq", func() {
By("removing the custom resource for the automq")
found := &infrav1beta1.AutoMQ{}
err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), found)
Expand Down
2 changes: 2 additions & 0 deletions e2e/automq_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ var _ = BeforeSuite(func() {
go func() {
controller.APIRegistry(context.Background(), k8sClient)
}()
err = os.Setenv("NAMESPACE_NAME", "default")
Expect(err).To(Not(HaveOccurred()))
})

var _ = AfterSuite(func() {
Expand Down
1 change: 1 addition & 0 deletions internal/controller/automq_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"

"github.com/gin-gonic/gin"
v1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down
8 changes: 7 additions & 1 deletion internal/controller/automq_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ func (r *AutoMQReconciler) reconcile(ctx context.Context, obj client.Object) (ct
r.syncKafkaBootstrapService,
}
var ifRunning bool
for _, fn := range pipelines {
for index, fn := range pipelines {
ifRunning = fn(ctx, automq)
log.V(1).Info("update reconcile controller automq", "ifRunning", ifRunning, "index", index)
if !ifRunning {
break
}
Expand Down Expand Up @@ -220,6 +221,7 @@ func (r *AutoMQReconciler) syncStatus(ctx context.Context, automq *infrav1beta1.
}

func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
log := log.FromContext(ctx)
conditionType := "SyncS3ServiceReady"
sg, err := storage.NewBucket(storage.Config{
Type: "s3",
Expand All @@ -229,6 +231,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto
Endpoint: obj.Spec.S3.Endpoint,
})
if err != nil {
log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand All @@ -240,6 +243,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto
}
err = sg.MkBucket(ctx, obj.Spec.S3.Bucket)
if err != nil && !strings.Contains(err.Error(), "BucketAlready") {
log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand All @@ -264,6 +268,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta
conditionType := "SyncConfigmapReady"
data, err := defaults.Asset("defaults/up.sh")
if err != nil {
log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand Down Expand Up @@ -291,6 +296,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta
log.V(1).Info("create or update configmap by AutoMQ", "OperationResult", change)
return nil
}); err != nil {
log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace)
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: conditionType,
Status: metav1.ConditionFalse,
Expand Down
5 changes: 4 additions & 1 deletion internal/controller/automq_controller_b.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *AutoMQReconciler) syncBrokerScale(ctx context.Context, obj *infrav1beta

func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
conditionType := "SyncBrokerReady"

log := log.FromContext(ctx)
// 1. sync pvc
// 2. sync deploy
// 3. sync svc
Expand All @@ -115,6 +115,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
Reason: "BrokerPVCReconciling",
Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create pvc for the custom resource", "name", obj.Name, "role", brokerRole)
return true
}
if err := r.syncBrokerService(ctx, obj, int32(i)); err != nil {
Expand All @@ -125,6 +126,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
Reason: "BrokerServiceReconciling",
Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create service for the custom resource", "name", obj.Name, "role", brokerRole)
return true
}
if err := r.syncBrokerDeploy(ctx, obj, int32(i)); err != nil {
Expand All @@ -135,6 +137,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au
Reason: "BrokerSTSReconciling",
Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create deploy for the custom resource", "name", obj.Name, "role", brokerRole)
return true
}
}
Expand Down
6 changes: 5 additions & 1 deletion internal/controller/automq_controller_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/log"
"strings"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (r *AutoMQReconciler) syncControllersScale(ctx context.Context, obj *infrav

func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool {
conditionType := "SyncControllerReady"

log := log.FromContext(ctx)
// 1. sync pvc
// 2. sync deploy
// 3. sync svc
Expand All @@ -105,6 +106,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
Reason: "ControllerPVCReconciling",
Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create pvc for the custom resource (%s)", obj.Name, "role", controllerRole)
return true
}
if err := r.syncControllerDeploy(ctx, obj, int32(i)); err != nil {
Expand All @@ -115,6 +117,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
Reason: "ControllerSTSReconciling",
Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create deploy for the custom resource (%s)", obj.Name, "role", controllerRole)
return true
}
if err := r.syncControllerService(ctx, obj, int32(i)); err != nil {
Expand All @@ -125,6 +128,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta
Reason: "ControllerServiceReconciling",
Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err),
})
log.Error(err, "Failed to create service for the custom resource (%s)", obj.Name, "role", controllerRole)
return true
}
}
Expand Down