diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6b27fe7..55f49ee 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,7 +28,7 @@ jobs: uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} - job2: + job1: runs-on: ubuntu-20.04 steps: - name: Checkout @@ -48,9 +48,16 @@ jobs: cd deploy sudo sealos build -t ghcr.io/${{ github.repository_owner }}/automq-operator-sealos:latest . - job1: + job2: runs-on: ubuntu-20.04 steps: + - name: Before freeing up disk space + run: | + echo "Before freeing up disk space" + echo "==============================================================================" + df -hT + echo "==============================================================================" + - name: Checkout uses: actions/checkout@master @@ -62,7 +69,7 @@ jobs: - name: Verify sealos run: | curl -sfL https://raw.githubusercontent.com/labring/sealos/v5.0.0/scripts/install.sh | sh -s v5.0.0 labring/sealos - - name: prune os + - name: install k8s and apps run: | sudo systemctl unmask containerd sudo systemctl unmask docker @@ -75,6 +82,8 @@ jobs: sudo sealos run labring/kubernetes:v1.27.7 sudo sealos run labring/helm:v3.9.4 labring/calico:v3.26.5 labring/openebs:v3.9.0 labring/cert-manager:v1.14.6 sudo sealos run labring/minio:RELEASE.2024-01-11T07-46-16Z labring/kube-prometheus-stack:v0.63.0 labring/kafka-ui:v0.7.1 + sleep 10 + sudo kubectl get pods -A --show-labels - name: build run: | sudo make e2e diff --git a/cmd/main.go b/cmd/main.go index b2c45dd..1c18d32 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,17 +18,13 @@ package main import ( "flag" - v1 "k8s.io/api/core/v1" "os" - "sigs.k8s.io/controller-runtime/pkg/client" - // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. _ "k8s.io/client-go/plugin/pkg/client/auth" infrav1beta1 "github.com/cuisongliu/automq-operator/api/v1beta1" "github.com/cuisongliu/automq-operator/internal/controller" - "github.com/gin-gonic/gin" promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -130,30 +126,7 @@ func main() { go func() { if mgr.GetCache().WaitForCacheSync(ctx) { - setupLog.Info("cache sync success") - router := gin.Default() - router.GET("/api/v1/nodes/:name", func(c *gin.Context) { - name := c.Param("name") - node := &v1.Node{} - node.Name = name - if noe := mgr.GetClient().Get(ctx, client.ObjectKeyFromObject(node), node); noe != nil { - c.JSON(500, gin.H{"message": noe.Error()}) - return - } - nodeIP := "" - for _, addr := range node.Status.Addresses { - if addr.Type == v1.NodeInternalIP { - nodeIP = addr.Address - break - } - } - if nodeIP == "" { - c.JSON(500, gin.H{"message": "node ip not found"}) - return - } - c.String(200, nodeIP) - }) - router.Run(":9090") + controller.APIRegistry(ctx, mgr.GetClient()) } }() diff --git a/e2e/automq_cluster_controller_test.go b/e2e/automq_cluster_controller_test.go new file mode 100644 index 0000000..a33ac36 --- /dev/null +++ b/e2e/automq_cluster_controller_test.go @@ -0,0 +1,96 @@ +/* +Copyright 2024 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "os" + "time" + + infrav1beta1 "github.com/cuisongliu/automq-operator/api/v1beta1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("automq_controller", func() { + Context("automq_controller cr tests", func() { + ctx := context.Background() + namespaceName := "automq-operator" + namespace := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceName, + Namespace: namespaceName, + }, + } + automq := &infrav1beta1.AutoMQ{} + automq.Name = "automq-s1" + automq.Namespace = namespaceName + automq.Spec.ClusterID = "rZdE0DjZSrqy96PXrMUZVw" + + BeforeEach(func() { + By("Creating the Namespace to perform the tests") + err := k8sClient.Create(ctx, namespace) + Expect(err).To(Not(HaveOccurred())) + By("Setting the NAMESPACE_NAME ENV VAR which stores the Operand image") + err = os.Setenv("NAMESPACE_NAME", namespaceName) + Expect(err).To(Not(HaveOccurred())) + }) + It("Update Endpoint", func() { + By("creating the custom resource for the automq") + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq) + if err != nil && errors.IsNotFound(err) { + // Let's mock our custom resource at the same way that we would + // apply on the cluster the manifest under config/samples + automq.Spec.S3.Endpoint = "http://minio.minio.svc.cluster.local:9000" + automq.Spec.S3.Bucket = "ko3" + automq.Spec.S3.AccessKeyID = "admin" + automq.Spec.S3.SecretAccessKey = "minio123" + automq.Spec.S3.Region = "us-east-1" + automq.Spec.S3.EnablePathStyle = true + automq.Spec.Controller.Replicas = 1 + automq.Spec.Broker.Replicas = 3 + automq.Spec.NodePort = 32009 + err = k8sClient.Create(ctx, automq) + Expect(err).To(Not(HaveOccurred())) + } + }) + AfterEach(func() { + By("removing the custom resource for the automq") + found := &infrav1beta1.AutoMQ{} + err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), found) + Expect(err).To(Not(HaveOccurred())) + + Eventually(func() error { + return k8sClient.Delete(context.TODO(), found) + }, 2*time.Minute, time.Second).Should(Succeed()) + + // TODO(user): Attention if you improve this code by adding other context test you MUST + // be aware of the current delete namespace limitations. + // More info: https://book.kubebuilder.io/reference/envtest.html#testing-considerations + By("Deleting the Namespace to perform the tests") + _ = k8sClient.Delete(ctx, namespace) + + By("Removing the Image ENV VAR which stores the Operand image") + _ = os.Unsetenv("NAMESPACE_NAME") + }) + }) + +}) diff --git a/e2e/automq_cluster_test.go b/e2e/automq_cluster_test.go index 9fa8ea5..6bc741a 100644 --- a/e2e/automq_cluster_test.go +++ b/e2e/automq_cluster_test.go @@ -19,17 +19,18 @@ package e2e import ( "context" "fmt" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "path/filepath" "runtime" "testing" - "time" + "github.com/cuisongliu/automq-operator/internal/controller" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + promv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -57,65 +58,111 @@ func TestControllers(t *testing.T) { } var _ = Describe("automq_controller", func() { - Context("automq_controller tests", func() { + Context("automq_controller apis tests", func() { ctx := context.Background() - namespaceName := "automq-operator" - namespace := &v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: namespaceName, - Namespace: namespaceName, - }, - } - automq := &infrav1beta1.AutoMQ{} - automq.Name = "automq-s1" - automq.Namespace = namespaceName - automq.Spec.ClusterID = "rZdE0DjZSrqy96PXrMUZVw" - - BeforeEach(func() { - By("Creating the Namespace to perform the tests") - err := k8sClient.Create(ctx, namespace) - Expect(err).To(Not(HaveOccurred())) - By("Setting the NAMESPACE_NAME ENV VAR which stores the Operand image") - err = os.Setenv("NAMESPACE_NAME", namespaceName) - Expect(err).To(Not(HaveOccurred())) + It("check api", func() { + Eventually(func() error { + nodes := &v1.NodeList{} + err := k8sClient.List(ctx, nodes) + if err != nil { + return fmt.Errorf("list node error %s", err.Error()) + } + if len(nodes.Items) == 0 { + return fmt.Errorf("expected 1 node, found %d", len(nodes.Items)) + } + nodeName := nodes.Items[0].Name + nodeIp := nodes.Items[0].Status.Addresses[0].Address + if nodeIp == "" { + return fmt.Errorf("node ip not found") + } + if nodeName == "" { + return fmt.Errorf("node name not found") + } + ip := os.Getenv("OPERATOR_APIS_IP") + if ip == "" { + return fmt.Errorf("OPERATOR_APIS_IP is empty") + } + apiAddr := fmt.Sprintf("http://%s:9090/api/v1/nodes/%s", ip, nodeName) + out := RestHttpApi(ctx, apiAddr, "GET", nil, 0) + if out.Code != 200 { + return fmt.Errorf("api response code %d", out.Code) + } + if string(out.Data) != nodeIp { + return fmt.Errorf("api response %s", string(out.Data)) + } + return nil + }, "60s", "1s").Should(Succeed()) }) - It("Update Endpoint", func() { - By("creating the custom resource for the automq") - err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), automq) - if err != nil && errors.IsNotFound(err) { - // Let's mock our custom resource at the same way that we would - // apply on the cluster the manifest under config/samples - automq.Spec.S3.Endpoint = "http://minio.minio.svc.cluster.local:9000" - automq.Spec.S3.Bucket = "ko3" - automq.Spec.S3.AccessKeyID = "admin" - automq.Spec.S3.SecretAccessKey = "minio123" - automq.Spec.S3.Region = "us-east-1" - automq.Spec.S3.EnablePathStyle = true - automq.Spec.Controller.Replicas = 1 - automq.Spec.Broker.Replicas = 3 - automq.Spec.NodePort = 32009 - err = k8sClient.Create(ctx, automq) - Expect(err).To(Not(HaveOccurred())) - } + }) + Context("automq_controller component tests", func() { + ctx := context.Background() + It("check minio status", func() { + Eventually(func() error { + podList := &v1.PodList{} + labelSelector := labels.Set(map[string]string{"release": "minio"}).AsSelector() + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: "minio", LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(podList.Items) == 0 { + return fmt.Errorf("expected 1 pod, found %d", len(podList.Items)) + } + if podList.Items[0].Status.Phase != v1.PodRunning { + return fmt.Errorf("expected pod phase to be 'Running', got '%s'", podList.Items[0].Status.Phase) + } + return nil + }, "60s", "1s").Should(Succeed()) }) - AfterEach(func() { - By("removing the custom resource for the automq") - found := &infrav1beta1.AutoMQ{} - err := k8sClient.Get(ctx, client.ObjectKeyFromObject(automq), found) - Expect(err).To(Not(HaveOccurred())) - + It("check cert-manager status", func() { Eventually(func() error { - return k8sClient.Delete(context.TODO(), found) - }, 2*time.Minute, time.Second).Should(Succeed()) - - // TODO(user): Attention if you improve this code by adding other context test you MUST - // be aware of the current delete namespace limitations. - // More info: https://book.kubebuilder.io/reference/envtest.html#testing-considerations - By("Deleting the Namespace to perform the tests") - _ = k8sClient.Delete(ctx, namespace) - - By("Removing the Image ENV VAR which stores the Operand image") - _ = os.Unsetenv("NAMESPACE_NAME") + podList := &v1.PodList{} + labelSelector := labels.Set(map[string]string{"app.kubernetes.io/instance": "cert-manager"}).AsSelector() + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: "cert-manager", LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(podList.Items) == 0 { + return fmt.Errorf("expected 1 pod, found %d", len(podList.Items)) + } + if podList.Items[0].Status.Phase != v1.PodRunning { + return fmt.Errorf("expected pod phase to be 'Running', got '%s'", podList.Items[0].Status.Phase) + } + return nil + }, "60s", "1s").Should(Succeed()) + }) + It("check prometheus status", func() { + Eventually(func() error { + podList := &v1.PodList{} + labelSelector := labels.Set(map[string]string{"release": "prometheus"}).AsSelector() + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: "monitoring", LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(podList.Items) == 0 { + return fmt.Errorf("expected 1 pod, found %d", len(podList.Items)) + } + if podList.Items[0].Status.Phase != v1.PodRunning { + return fmt.Errorf("expected pod phase to be 'Running', got '%s'", podList.Items[0].Status.Phase) + } + return nil + }, "60s", "1s").Should(Succeed()) + }) + It("check kafka-ui status", func() { + Eventually(func() error { + podList := &v1.PodList{} + labelSelector := labels.Set(map[string]string{"release": "minio"}).AsSelector() + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: "minio", LabelSelector: labelSelector}) + if err != nil { + return err + } + if len(podList.Items) == 0 { + return fmt.Errorf("expected 1 pod, found %d", len(podList.Items)) + } + if podList.Items[0].Status.Phase != v1.PodRunning { + return fmt.Errorf("expected pod phase to be 'Running', got '%s'", podList.Items[0].Status.Phase) + } + return nil + }, "60s", "1s").Should(Succeed()) }) }) @@ -145,6 +192,10 @@ var _ = BeforeSuite(func() { err = infrav1beta1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) + err = clientgoscheme.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) + err = promv1.AddToScheme(scheme.Scheme) + Expect(err).NotTo(HaveOccurred()) //+kubebuilder:scaffold:scheme @@ -157,6 +208,10 @@ var _ = BeforeSuite(func() { err = os.Setenv("OPERATOR_APIS_IP", GetLocalIpv4()) Expect(err).To(Not(HaveOccurred())) + + go func() { + controller.APIRegistry(context.Background(), k8sClient) + }() }) var _ = AfterSuite(func() { diff --git a/e2e/httplib.go b/e2e/httplib.go new file mode 100644 index 0000000..1844b1e --- /dev/null +++ b/e2e/httplib.go @@ -0,0 +1,99 @@ +/* +Copyright 2023 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "context" + "crypto/tls" + "io" + "net" + httpl "net/http" + "time" +) + +func httpTransport() httpl.Transport { + return httpl.Transport{ + Proxy: httpl.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + MaxIdleConns: 200, + MaxIdleConnsPerHost: 200, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 15 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + ResponseHeaderTimeout: 2 * time.Minute, + DisableCompression: false, + DisableKeepAlives: false, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } +} + +type RespReturn struct { + Data []byte + Code int32 + Error error + Header httpl.Header +} + +func RestHttpApi(ctx context.Context, url, method string, body io.Reader, timeout int32, fns ...func(h httpl.Header)) RespReturn { + if timeout == 0 { + timeout = 60 + } + trans := httpTransport() + // https://github.com/golang/go/issues/13801 + client := &httpl.Client{ + Transport: &trans, + } + defer client.CloseIdleConnections() + ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(timeout)*int64(time.Second))) + defer cancel() + req, _ := httpl.NewRequestWithContext(ctx, method, url, body) + req.Header.Add("Content-Type", "application/json") + req.Header.Set("User-agent", "RealHttpConnector") + req.Header.Set("Connection", "keep-alive") + for _, f := range fns { + f(req.Header) + } + resp, err := client.Do(req) + if err != nil { + return RespReturn{ + Data: nil, + Code: 500, + Error: err, + Header: nil, + } + } + defer resp.Body.Close() + defer io.Copy(io.Discard, resp.Body) + data, err := io.ReadAll(resp.Body) + if err != nil { + return RespReturn{ + Data: nil, + Code: 500, + Error: err, + Header: nil, + } + } + return RespReturn{ + Data: data, + Code: int32(resp.StatusCode), + Error: err, + Header: resp.Header, + } +} diff --git a/e2e/utils.go b/e2e/utils.go index 0b93f0b..0859b4e 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -17,9 +17,10 @@ limitations under the License. package e2e import ( - "github.com/cuisongliu/logger" "net" "sort" + + "github.com/cuisongliu/logger" ) func LocalIP(addrs *[]net.Addr) string { diff --git a/internal/controller/automq_apis.go b/internal/controller/automq_apis.go new file mode 100644 index 0000000..71f7171 --- /dev/null +++ b/internal/controller/automq_apis.go @@ -0,0 +1,53 @@ +/* +Copyright 2024 cuisongliu@qq.com. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "github.com/gin-gonic/gin" + v1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func APIRegistry(ctx context.Context, k8sClient client.Client) { + setupLog := ctrl.Log.WithName("setup") + setupLog.Info("cache sync success") + router := gin.Default() + router.GET("/api/v1/nodes/:name", func(c *gin.Context) { + name := c.Param("name") + node := &v1.Node{} + node.Name = name + if noe := k8sClient.Get(ctx, client.ObjectKeyFromObject(node), node); noe != nil { + c.JSON(500, gin.H{"message": noe.Error()}) + return + } + nodeIP := "" + for _, addr := range node.Status.Addresses { + if addr.Type == v1.NodeInternalIP { + nodeIP = addr.Address + break + } + } + if nodeIP == "" { + c.JSON(500, gin.H{"message": "node ip not found"}) + return + } + c.String(200, nodeIP) + }) + router.Run(":9090") +}