diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 90e9e66..3c5b4aa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -84,7 +84,8 @@ jobs: sudo sealos run labring/minio:RELEASE.2024-01-11T07-46-16Z labring/kube-prometheus-stack:v0.63.0 sudo sealos run labring/kafka-ui:v0.7.1 sleep 10 - sudo kubectl get pods -A --show-labels + sudo kubectl get pods -A + sudo kubectl get svc -A - name: build run: | sudo make e2e diff --git a/e2e/automq_cluster_controller_test.go b/e2e/automq_cluster_controller_test.go index a33ac36..9d5a257 100644 --- a/e2e/automq_cluster_controller_test.go +++ b/e2e/automq_cluster_controller_test.go @@ -18,9 +18,15 @@ package e2e import ( "context" + "fmt" "os" "time" + "github.com/cuisongliu/automq-operator/internal/controller" + v2 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + infrav1beta1 "github.com/cuisongliu/automq-operator/api/v1beta1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -33,7 +39,7 @@ import ( var _ = Describe("automq_controller", func() { Context("automq_controller cr tests", func() { ctx := context.Background() - namespaceName := "automq-operator" + namespaceName := "automq-cr" namespace := &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: namespaceName, @@ -44,22 +50,23 @@ var _ = Describe("automq_controller", func() { automq.Name = "automq-s1" automq.Namespace = namespaceName automq.Spec.ClusterID = "rZdE0DjZSrqy96PXrMUZVw" - - BeforeEach(func() { + It("create cr namespace", func() { By("Creating the Namespace to perform the tests") err := k8sClient.Create(ctx, namespace) Expect(err).To(Not(HaveOccurred())) - By("Setting the NAMESPACE_NAME ENV VAR which stores the Operand image") - err = os.Setenv("NAMESPACE_NAME", namespaceName) - Expect(err).To(Not(HaveOccurred())) }) - It("Update Endpoint", func() { + It("create cr", func() { + By("get minio ip and port") + minioService := &v1.Service{} + err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "minio", Name: "minio"}, minioService) + Expect(err).To(Not(HaveOccurred())) + ip := minioService.Spec.ClusterIP By("creating the custom resource for the automq") - err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq) + err = k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq) if err != nil && errors.IsNotFound(err) { // Let's mock our custom resource at the same way that we would // apply on the cluster the manifest under config/samples - automq.Spec.S3.Endpoint = "http://minio.minio.svc.cluster.local:9000" + automq.Spec.S3.Endpoint = fmt.Sprintf("http://%s:9000", ip) automq.Spec.S3.Bucket = "ko3" automq.Spec.S3.AccessKeyID = "admin" automq.Spec.S3.SecretAccessKey = "minio123" @@ -72,7 +79,120 @@ var _ = Describe("automq_controller", func() { Expect(err).To(Not(HaveOccurred())) } }) - AfterEach(func() { + It("should successfully reconcile the resource", func() { + By("Reconciling the created resource") + controllerReconciler := &controller.AutoMQReconciler{ + Client: k8sClient, + Scheme: k8sClient.Scheme(), + Finalizer: "apps.cuisongliu.com/automq.finalizer", + MountTZ: true, + } + _, err := controllerReconciler.Reconcile(ctx, reconcile.Request{ + NamespacedName: client.ObjectKeyFromObject(automq), + }) + Expect(err).NotTo(HaveOccurred()) + }) + It("get automq deployment", func() { + ctx := context.Background() + Eventually(func() error { + deployment := &v2.DeploymentList{} + labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name}).AsSelector() + err := k8sClient.List(ctx, deployment, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(deployment.Items) != 4 { + return fmt.Errorf("expected 4 deploy, found %d", len(deployment.Items)) + } + for i, deploy := range deployment.Items { + if deploy.Status.ReadyReplicas != 1 { + return fmt.Errorf("expected deploy %d ready replicas to be 1, got '%d'", i, deploy.Status.ReadyReplicas) + } + } + return nil + }, "60s", "1s").Should(Succeed()) + }) + It("check controller status", func() { + ctx := context.Background() + Eventually(func() error { + podList := &v1.PodList{} + labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "controller"}).AsSelector() + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(podList.Items) != 1 { + return fmt.Errorf("expected 3 pod, found %d", len(podList.Items)) + } + for i, pod := range podList.Items { + if pod.Status.Phase != v1.PodRunning { + return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase) + } + } + return nil + }, "60s", "1s").Should(Succeed()) + }) + + It("check broker status", func() { + ctx := context.Background() + Eventually(func() error { + podList := &v1.PodList{} + labelSelector := labels.Set(map[string]string{"app.kubernetes.io/owner-by": "automq", "app.kubernetes.io/instance": automq.Name, "app.kubernetes.io/role": "broker"}).AsSelector() + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: automq.Namespace, LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(podList.Items) != 3 { + return fmt.Errorf("expected 1 pod, found %d", len(podList.Items)) + } + for i, pod := range podList.Items { + if pod.Status.Phase != v1.PodRunning { + return fmt.Errorf("expected pod %d phase to be 'Running', got '%s'", i, pod.Status.Phase) + } + } + return nil + }, "60s", "1s").Should(Succeed()) + }) + It("check automq status", func() { + ctx := context.Background() + Eventually(func() error { + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq) + if err != nil { + return err + } + if automq.Status.Phase != infrav1beta1.AutoMQReady { + return fmt.Errorf("expected automq phase to be 'Ready', got '%s'", automq.Status.Phase) + } + if automq.Status.ControllerReplicas != automq.Spec.Controller.Replicas { + return fmt.Errorf("expected automq controller replicas to be %d, got '%d'", automq.Spec.Controller.Replicas, automq.Status.ControllerReplicas) + } + if automq.Status.BrokerReplicas != automq.Spec.Broker.Replicas { + return fmt.Errorf("expected automq broker replicas to be %d, got '%d'", automq.Spec.Broker.Replicas, automq.Status.BrokerReplicas) + } + showReadyPods := automq.Spec.Controller.Replicas + automq.Spec.Broker.Replicas + if automq.Status.ReadyPods != showReadyPods { + return fmt.Errorf("expected automq ready pods to be %d, got '%d'", showReadyPods, automq.Status.ReadyPods) + } + if len(automq.Status.ControllerAddresses) != int(automq.Spec.Controller.Replicas) { + return fmt.Errorf("expected automq controller addresses to have %d elements, got '%d'", automq.Spec.Controller.Replicas, len(automq.Status.ControllerAddresses)) + } + if automq.Status.BootstrapInternalAddress == "" { + return fmt.Errorf("expected automq bootstrap internal address to be set") + } + bootstrapService := fmt.Sprintf("%s.%s.svc:%d", "automq-"+"broker-bootstrap", automq.Namespace, 9092) + if automq.Status.BootstrapInternalAddress != bootstrapService { + return fmt.Errorf("expected automq bootstrap internal address to be '%s', got '%s'", bootstrapService, automq.Status.BootstrapInternalAddress) + } + for i, address := range automq.Status.ControllerAddresses { + controllerService := fmt.Sprintf("%d@%s.%s.svc:%d", i, "automq-controller-"+fmt.Sprintf("%d", i), automq.Namespace, 9093) + if address != controllerService { + return fmt.Errorf("expected automq controller address %d to be '%s', got '%s'", i, controllerService, address) + } + } + return nil + }, "60s", "1s").Should(Succeed()) + }) + It("clean automq", func() { By("removing the custom resource for the automq") found := &infrav1beta1.AutoMQ{} err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), found) diff --git a/e2e/automq_cluster_test.go b/e2e/automq_cluster_test.go index 6bc741a..baa257d 100644 --- a/e2e/automq_cluster_test.go +++ b/e2e/automq_cluster_test.go @@ -212,6 +212,8 @@ var _ = BeforeSuite(func() { go func() { controller.APIRegistry(context.Background(), k8sClient) }() + err = os.Setenv("NAMESPACE_NAME", "default") + Expect(err).To(Not(HaveOccurred())) }) var _ = AfterSuite(func() { diff --git a/internal/controller/automq_apis.go b/internal/controller/automq_apis.go index 71f7171..2cb04ac 100644 --- a/internal/controller/automq_apis.go +++ b/internal/controller/automq_apis.go @@ -18,6 +18,7 @@ package controller import ( "context" + "github.com/gin-gonic/gin" v1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" diff --git a/internal/controller/automq_controller.go b/internal/controller/automq_controller.go index 6a4e079..98b90f1 100644 --- a/internal/controller/automq_controller.go +++ b/internal/controller/automq_controller.go @@ -172,8 +172,9 @@ func (r *AutoMQReconciler) reconcile(ctx context.Context, obj client.Object) (ct r.syncKafkaBootstrapService, } var ifRunning bool - for _, fn := range pipelines { + for index, fn := range pipelines { ifRunning = fn(ctx, automq) + log.V(1).Info("update reconcile controller automq", "ifRunning", ifRunning, "index", index) if !ifRunning { break } @@ -220,6 +221,7 @@ func (r *AutoMQReconciler) syncStatus(ctx context.Context, automq *infrav1beta1. } func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.AutoMQ) bool { + log := log.FromContext(ctx) conditionType := "SyncS3ServiceReady" sg, err := storage.NewBucket(storage.Config{ Type: "s3", @@ -229,6 +231,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto Endpoint: obj.Spec.S3.Endpoint, }) if err != nil { + log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace) meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ Type: conditionType, Status: metav1.ConditionFalse, @@ -240,6 +243,7 @@ func (r *AutoMQReconciler) s3Service(ctx context.Context, obj *infrav1beta1.Auto } err = sg.MkBucket(ctx, obj.Spec.S3.Bucket) if err != nil && !strings.Contains(err.Error(), "BucketAlready") { + log.Error(err, "Failed to create S3 Bucket interface for the custom resource", "name", obj.Name, "namespace", obj.Namespace) meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ Type: conditionType, Status: metav1.ConditionFalse, @@ -264,6 +268,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta conditionType := "SyncConfigmapReady" data, err := defaults.Asset("defaults/up.sh") if err != nil { + log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace) meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ Type: conditionType, Status: metav1.ConditionFalse, @@ -291,6 +296,7 @@ func (r *AutoMQReconciler) scriptConfigmap(ctx context.Context, obj *infrav1beta log.V(1).Info("create or update configmap by AutoMQ", "OperationResult", change) return nil }); err != nil { + log.Error(err, "Failed to create script configmap for the custom resource", "name", obj.Name, "namespace", obj.Namespace) meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ Type: conditionType, Status: metav1.ConditionFalse, diff --git a/internal/controller/automq_controller_b.go b/internal/controller/automq_controller_b.go index f73fe45..1c12c2d 100644 --- a/internal/controller/automq_controller_b.go +++ b/internal/controller/automq_controller_b.go @@ -100,7 +100,7 @@ func (r *AutoMQReconciler) syncBrokerScale(ctx context.Context, obj *infrav1beta func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool { conditionType := "SyncBrokerReady" - + log := log.FromContext(ctx) // 1. sync pvc // 2. sync deploy // 3. sync svc @@ -115,6 +115,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au Reason: "BrokerPVCReconciling", Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err), }) + log.Error(err, "Failed to create pvc for the custom resource", "name", obj.Name, "role", brokerRole) return true } if err := r.syncBrokerService(ctx, obj, int32(i)); err != nil { @@ -125,6 +126,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au Reason: "BrokerServiceReconciling", Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err), }) + log.Error(err, "Failed to create service for the custom resource", "name", obj.Name, "role", brokerRole) return true } if err := r.syncBrokerDeploy(ctx, obj, int32(i)); err != nil { @@ -135,6 +137,7 @@ func (r *AutoMQReconciler) syncBrokers(ctx context.Context, obj *infrav1beta1.Au Reason: "BrokerSTSReconciling", Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err), }) + log.Error(err, "Failed to create deploy for the custom resource", "name", obj.Name, "role", brokerRole) return true } } diff --git a/internal/controller/automq_controller_c.go b/internal/controller/automq_controller_c.go index e7aa32d..7c183db 100644 --- a/internal/controller/automq_controller_c.go +++ b/internal/controller/automq_controller_c.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "sigs.k8s.io/controller-runtime/pkg/log" "strings" "github.com/aws/aws-sdk-go-v2/aws" @@ -90,7 +91,7 @@ func (r *AutoMQReconciler) syncControllersScale(ctx context.Context, obj *infrav func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta1.AutoMQ) bool { conditionType := "SyncControllerReady" - + log := log.FromContext(ctx) // 1. sync pvc // 2. sync deploy // 3. sync svc @@ -105,6 +106,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta Reason: "ControllerPVCReconciling", Message: fmt.Sprintf("Failed to create pvc for the custom resource (%s): (%s)", obj.Name, err), }) + log.Error(err, "Failed to create pvc for the custom resource (%s)", obj.Name, "role", controllerRole) return true } if err := r.syncControllerDeploy(ctx, obj, int32(i)); err != nil { @@ -115,6 +117,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta Reason: "ControllerSTSReconciling", Message: fmt.Sprintf("Failed to create deploy for the custom resource (%s): (%s)", obj.Name, err), }) + log.Error(err, "Failed to create deploy for the custom resource (%s)", obj.Name, "role", controllerRole) return true } if err := r.syncControllerService(ctx, obj, int32(i)); err != nil { @@ -125,6 +128,7 @@ func (r *AutoMQReconciler) syncControllers(ctx context.Context, obj *infrav1beta Reason: "ControllerServiceReconciling", Message: fmt.Sprintf("Failed to create service for the custom resource (%s): (%s)", obj.Name, err), }) + log.Error(err, "Failed to create service for the custom resource (%s)", obj.Name, "role", controllerRole) return true } }