From fee0242783c4cc9cb11faa29737d2998856b6180 Mon Sep 17 00:00:00 2001 From: Bychkov Date: Wed, 30 Aug 2023 18:31:22 +0300 Subject: [PATCH] fix failed job. refresh integration tests --- .../clusters/cassandra_controller_test.go | 28 +--- .../clusters/datatest/kafka_v1beta1.yaml | 3 + .../datatest/kafkaconnect_v1beta1.yaml | 3 + .../clusters/datatest/opensearch_v1beta1.yaml | 3 + .../clusters/datatest/postgresql_v1beta1.yaml | 1 + .../clusters/datatest/redis_v1beta1.yaml | 4 +- .../clusters/datatest/zookeeper_v1beta1.yaml | 3 + controllers/clusters/kafka_controller_test.go | 62 +++----- .../clusters/kafkaconnect_controller_test.go | 62 +++----- .../clusters/opensearch_controller_test.go | 59 ++----- .../clusters/postgresql_controller_test.go | 62 +++----- controllers/clusters/redis_controller_test.go | 63 +++----- controllers/clusters/suite_test.go | 13 +- .../clusters/zookeeper_controller_test.go | 53 ++----- .../datatest/kafkaacl_v1beta1.yaml | 3 + .../datatest/mirror_v1beta1.yaml | 3 + .../datatest/topic_v1beta1.yaml | 3 + .../kafkaacl_controller_test.go | 59 ++----- .../kafkamanagement/mirror_controller.go | 147 +++++++----------- .../kafkamanagement/mirror_controller_test.go | 59 ++----- controllers/kafkamanagement/suite_test.go | 8 +- .../kafkamanagement/topic_controller_test.go | 61 ++------ pkg/models/operator.go | 1 + 23 files changed, 255 insertions(+), 508 deletions(-) diff --git a/controllers/clusters/cassandra_controller_test.go b/controllers/clusters/cassandra_controller_test.go index 0abd9f8b4..269a2bb14 100644 --- a/controllers/clusters/cassandra_controller_test.go +++ b/controllers/clusters/cassandra_controller_test.go @@ -19,7 +19,6 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -29,21 +28,13 @@ import ( "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const newCassandraNodeSize = "CAS-DEV-t4g.small-30" var _ = Describe("Cassandra Controller", func() { - var ( - ns = "default" - - cassandra v1beta1.Cassandra - cassandraManifest v1beta1.Cassandra - - timeout = time.Second * 40 - interval = time.Second * 2 - ) + cassandra := v1beta1.Cassandra{} + cassandraManifest := v1beta1.Cassandra{} yfile, err := os.ReadFile("datatest/cassandra_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) @@ -54,15 +45,13 @@ var _ = Describe("Cassandra Controller", func() { ctx := context.Background() clusterID := cassandraManifest.Spec.Name + openapi.CreatedID - cassandraNamespacedName := types.NamespacedName{Name: cassandraManifest.ObjectMeta.Name, Namespace: ns} + cassandraNamespacedName := types.NamespacedName{Name: cassandraManifest.ObjectMeta.Name, Namespace: defaultNS} When("apply a cassandra manifest", func() { It("should create a cassandra resources", func() { Expect(k8sClient.Create(ctx, &cassandraManifest)).Should(Succeed()) By("sending cassandra specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, cassandraNamespacedName, &cassandra); err != nil { return false } @@ -75,16 +64,13 @@ var _ = Describe("Cassandra Controller", func() { When("changing a node size", func() { It("should update a cassandra resources", func() { Expect(k8sClient.Get(ctx, cassandraNamespacedName, &cassandra)).Should(Succeed()) - patch := cassandra.NewPatch() + patch := cassandra.NewPatch() cassandra.Spec.DataCentres[0].NodeSize = newCassandraNodeSize - - cassandra.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} Expect(k8sClient.Patch(ctx, &cassandra, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + "the status job get new data from the InstAPI and update it in k8s cassandra resource") - Eventually(func() bool { if err := k8sClient.Get(ctx, cassandraNamespacedName, &cassandra); err != nil { return false @@ -102,11 +88,7 @@ var _ = Describe("Cassandra Controller", func() { When("delete the cassandra resource", func() { It("should send delete request to the Instaclustr API", func() { Expect(k8sClient.Get(ctx, cassandraNamespacedName, &cassandra)).Should(Succeed()) - - cassandra.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - Expect(k8sClient.Delete(ctx, &cassandra)).Should(Succeed()) - By("sending delete request to Instaclustr API") Eventually(func() bool { err := k8sClient.Get(ctx, cassandraNamespacedName, &cassandra) @@ -114,7 +96,7 @@ var _ = Describe("Cassandra Controller", func() { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/clusters/datatest/kafka_v1beta1.yaml b/controllers/clusters/datatest/kafka_v1beta1.yaml index 557c1d047..400c5cab0 100644 --- a/controllers/clusters/datatest/kafka_v1beta1.yaml +++ b/controllers/clusters/datatest/kafka_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Kafka metadata: name: kafka + namespace: default + annotations: + defaulter: webhook spec: name: "kafka" version: "2.8.2" diff --git a/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml b/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml index d5d0c00f2..97419dcc3 100644 --- a/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml +++ b/controllers/clusters/datatest/kafkaconnect_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: KafkaConnect metadata: name: kafkaconnect-sample + namespace: default + annotations: + defaulter: webhook spec: dataCentres: - name: "US_EAST_1_DC_KAFKA" diff --git a/controllers/clusters/datatest/opensearch_v1beta1.yaml b/controllers/clusters/datatest/opensearch_v1beta1.yaml index 6b9b284b3..2d861038d 100644 --- a/controllers/clusters/datatest/opensearch_v1beta1.yaml +++ b/controllers/clusters/datatest/opensearch_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: OpenSearch metadata: name: opensearch + namespace: default + annotations: + defaulter: webhook spec: alertingPlugin: false anomalyDetectionPlugin: false diff --git a/controllers/clusters/datatest/postgresql_v1beta1.yaml b/controllers/clusters/datatest/postgresql_v1beta1.yaml index 85edf43f3..b96b86dd2 100644 --- a/controllers/clusters/datatest/postgresql_v1beta1.yaml +++ b/controllers/clusters/datatest/postgresql_v1beta1.yaml @@ -2,6 +2,7 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: PostgreSQL metadata: name: postgresql-sample + namespace: default annotations: testAnnotation: test spec: diff --git a/controllers/clusters/datatest/redis_v1beta1.yaml b/controllers/clusters/datatest/redis_v1beta1.yaml index 398d3f052..0477c9c4c 100644 --- a/controllers/clusters/datatest/redis_v1beta1.yaml +++ b/controllers/clusters/datatest/redis_v1beta1.yaml @@ -2,12 +2,14 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Redis metadata: name: redis-sample + namespace: default + annotations: + defaulter: webhook spec: name: "testRedis" version: "7.0.5" slaTier: "NON_PRODUCTION" pciCompliance: true - concurrentResizes: 1 clientToNodeEncryption: true privateNetworkCluster: true notifySupportContacts: true diff --git a/controllers/clusters/datatest/zookeeper_v1beta1.yaml b/controllers/clusters/datatest/zookeeper_v1beta1.yaml index ba8136ed0..b82202173 100644 --- a/controllers/clusters/datatest/zookeeper_v1beta1.yaml +++ b/controllers/clusters/datatest/zookeeper_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Zookeeper metadata: name: zookeeper-sample + namespace: default + annotations: + defaulter: webhook spec: dataCentres: - clientToServerEncryption: true diff --git a/controllers/clusters/kafka_controller_test.go b/controllers/clusters/kafka_controller_test.go index 4a2dbc3ba..2658a301c 100644 --- a/controllers/clusters/kafka_controller_test.go +++ b/controllers/clusters/kafka_controller_test.go @@ -19,109 +19,83 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const newKafkaNodeSize = "KFK-DEV-t4g.medium-80" var _ = Describe("Kafka Controller", func() { - var ( - kafkaResource v1beta1.Kafka - kafkaYAML v1beta1.Kafka - k = "kafka" - ns = "default" - kafkaNS = types.NamespacedName{Name: k, Namespace: ns} - timeout = time.Second * 40 - interval = time.Second * 2 - ) + kafka := v1beta1.Kafka{} + kafkaManifest := v1beta1.Kafka{} yfile, err := os.ReadFile("datatest/kafka_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &kafkaYAML) + err = yaml.Unmarshal(yfile, &kafkaManifest) Expect(err).NotTo(HaveOccurred()) - kafkaObjMeta := metav1.ObjectMeta{ - Name: k, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - kafkaYAML.ObjectMeta = kafkaObjMeta + kafkaNamespacedName := types.NamespacedName{Name: kafkaManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a Kafka manifest", func() { It("should create a Kafka resources", func() { - Expect(k8sClient.Create(ctx, &kafkaYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &kafkaManifest)).Should(Succeed()) By("sending Kafka specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, kafkaNS, &kafkaResource); err != nil { + if err := k8sClient.Get(ctx, kafkaNamespacedName, &kafka); err != nil { return false } - return kafkaResource.Status.ID == openapi.CreatedID + return kafka.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a node size", func() { It("should update a Kafka resources", func() { - Expect(k8sClient.Get(ctx, kafkaNS, &kafkaResource)).Should(Succeed()) - patch := kafkaResource.NewPatch() + Expect(k8sClient.Get(ctx, kafkaNamespacedName, &kafka)).Should(Succeed()) - kafkaResource.Spec.DataCentres[0].NodeSize = newKafkaNodeSize - - kafkaResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - Expect(k8sClient.Patch(ctx, &kafkaResource, patch)).Should(Succeed()) + patch := kafka.NewPatch() + kafka.Spec.DataCentres[0].NodeSize = newKafkaNodeSize + Expect(k8sClient.Patch(ctx, &kafka, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + "the status job get new data from the InstAPI and update it in k8s Kafka resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, kafkaNS, &kafkaResource); err != nil { + if err := k8sClient.Get(ctx, kafkaNamespacedName, &kafka); err != nil { return false } - if len(kafkaResource.Status.DataCentres) == 0 || len(kafkaResource.Status.DataCentres[0].Nodes) == 0 { + if len(kafka.Status.DataCentres) == 0 || len(kafka.Status.DataCentres[0].Nodes) == 0 { return false } - return kafkaResource.Status.DataCentres[0].Nodes[0].Size == newKafkaNodeSize + return kafka.Status.DataCentres[0].Nodes[0].Size == newKafkaNodeSize }, timeout, interval).Should(BeTrue()) }) }) When("delete the Kafka resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, kafkaNS, &kafkaResource)).Should(Succeed()) - - kafkaResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &kafkaResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, kafkaNamespacedName, &kafka)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &kafka)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, kafkaNS, &kafkaResource) + err := k8sClient.Get(ctx, kafkaNamespacedName, &kafka) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/clusters/kafkaconnect_controller_test.go b/controllers/clusters/kafkaconnect_controller_test.go index 3eddf2bd0..0dacc808b 100644 --- a/controllers/clusters/kafkaconnect_controller_test.go +++ b/controllers/clusters/kafkaconnect_controller_test.go @@ -19,109 +19,83 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const newKafkaConnectNodeNumbers = 6 var _ = Describe("Kafka Connect Controller", func() { - var ( - kafkaResource v1beta1.KafkaConnect - kafkaYAML v1beta1.KafkaConnect - kc = "kafkaconnect" - ns = "default" - kafkaNS = types.NamespacedName{Name: kc, Namespace: ns} - timeout = time.Second * 40 - interval = time.Second * 2 - ) + kafkaConnect := v1beta1.KafkaConnect{} + kafkaConnectManifest := v1beta1.KafkaConnect{} yfile, err := os.ReadFile("datatest/kafkaconnect_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &kafkaYAML) + err = yaml.Unmarshal(yfile, &kafkaConnectManifest) Expect(err).NotTo(HaveOccurred()) - kafkaObjMeta := metav1.ObjectMeta{ - Name: kc, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - kafkaYAML.ObjectMeta = kafkaObjMeta + kafkaConnectNamespacedName := types.NamespacedName{Name: kafkaConnectManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a Kafka Connect manifest", func() { It("should create a Kafka Connect resources", func() { - Expect(k8sClient.Create(ctx, &kafkaYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &kafkaConnectManifest)).Should(Succeed()) By("sending Kafka Connect specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, kafkaNS, &kafkaResource); err != nil { + if err := k8sClient.Get(ctx, kafkaConnectNamespacedName, &kafkaConnect); err != nil { return false } - return kafkaResource.Status.ID == openapi.CreatedID + return kafkaConnect.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a node size", func() { It("should update a Kafka Connect resources", func() { - Expect(k8sClient.Get(ctx, kafkaNS, &kafkaResource)).Should(Succeed()) - patch := kafkaResource.NewPatch() + Expect(k8sClient.Get(ctx, kafkaConnectNamespacedName, &kafkaConnect)).Should(Succeed()) - kafkaResource.Spec.DataCentres[0].NodesNumber = newKafkaConnectNodeNumbers - - kafkaResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - Expect(k8sClient.Patch(ctx, &kafkaResource, patch)).Should(Succeed()) + patch := kafkaConnect.NewPatch() + kafkaConnect.Spec.DataCentres[0].NodesNumber = newKafkaConnectNodeNumbers + Expect(k8sClient.Patch(ctx, &kafkaConnect, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + "the status job get new data from the InstAPI and update it in k8s Kafka resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, kafkaNS, &kafkaResource); err != nil { + if err := k8sClient.Get(ctx, kafkaConnectNamespacedName, &kafkaConnect); err != nil { return false } - if len(kafkaResource.Status.DataCentres) == 0 { + if len(kafkaConnect.Status.DataCentres) == 0 { return false } - return kafkaResource.Status.DataCentres[0].NodeNumber == newKafkaConnectNodeNumbers + return kafkaConnect.Status.DataCentres[0].NodeNumber == newKafkaConnectNodeNumbers }, timeout, interval).Should(BeTrue()) }) }) When("delete the Kafka Connect resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, kafkaNS, &kafkaResource)).Should(Succeed()) - - kafkaResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &kafkaResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, kafkaConnectNamespacedName, &kafkaConnect)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &kafkaConnect)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, kafkaNS, &kafkaResource) + err := k8sClient.Get(ctx, kafkaConnectNamespacedName, &kafkaConnect) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/clusters/opensearch_controller_test.go b/controllers/clusters/opensearch_controller_test.go index 0cc465216..4d5f59a9b 100644 --- a/controllers/clusters/opensearch_controller_test.go +++ b/controllers/clusters/opensearch_controller_test.go @@ -19,105 +19,80 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const newOpenSearchNodeSize = "SRH-DEV-t4g.small-30" var _ = Describe("OpenSearch Controller", func() { - var ( - openSearchResource v1beta1.OpenSearch - openSearchYAML v1beta1.OpenSearch - o = "opensearch" - ns = "default" - openSearchNS = types.NamespacedName{Name: o, Namespace: ns} - timeout = time.Second * 40 - interval = time.Second * 2 - ) + openSearch := v1beta1.OpenSearch{} + openSearchManifest := v1beta1.OpenSearch{} yfile, err := os.ReadFile("datatest/opensearch_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &openSearchYAML) + err = yaml.Unmarshal(yfile, &openSearchManifest) Expect(err).NotTo(HaveOccurred()) - openSearchObjMeta := metav1.ObjectMeta{ - Name: o, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - openSearchYAML.ObjectMeta = openSearchObjMeta + openSearchNamespacedName := types.NamespacedName{Name: openSearchManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a OpenSearch manifest", func() { It("should create a OpenSearch resources", func() { - Expect(k8sClient.Create(ctx, &openSearchYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &openSearchManifest)).Should(Succeed()) By("sending OpenSearch specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, openSearchNS, &openSearchResource); err != nil { + if err := k8sClient.Get(ctx, openSearchNamespacedName, &openSearch); err != nil { return false } - return openSearchResource.Status.ID == openapi.CreatedID + return openSearch.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a node size", func() { It("should update a OpenSearch resources", func() { - Expect(k8sClient.Get(ctx, openSearchNS, &openSearchResource)).Should(Succeed()) - patch := openSearchResource.NewPatch() - - openSearchResource.Spec.ClusterManagerNodes[0].NodeSize = newOpenSearchNodeSize + Expect(k8sClient.Get(ctx, openSearchNamespacedName, &openSearch)).Should(Succeed()) - openSearchResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - Expect(k8sClient.Patch(ctx, &openSearchResource, patch)).Should(Succeed()) + patch := openSearch.NewPatch() + openSearch.Spec.ClusterManagerNodes[0].NodeSize = newOpenSearchNodeSize + Expect(k8sClient.Patch(ctx, &openSearch, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + "the status job get new data from the InstAPI and update it in k8s OpenSearch resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, openSearchNS, &openSearchResource); err != nil { + if err := k8sClient.Get(ctx, openSearchNamespacedName, &openSearch); err != nil { return false } - return openSearchResource.Spec.ClusterManagerNodes[0].NodeSize == newOpenSearchNodeSize + return openSearch.Spec.ClusterManagerNodes[0].NodeSize == newOpenSearchNodeSize }, timeout, interval).Should(BeTrue()) }) }) When("delete the OpenSearch resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, openSearchNS, &openSearchResource)).Should(Succeed()) - - openSearchResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &openSearchResource)).Should(Succeed()) + Expect(k8sClient.Get(ctx, openSearchNamespacedName, &openSearch)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &openSearch)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, openSearchNS, &openSearchResource) + err := k8sClient.Get(ctx, openSearchNamespacedName, &openSearch) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/clusters/postgresql_controller_test.go b/controllers/clusters/postgresql_controller_test.go index 393389d7c..a049f7849 100644 --- a/controllers/clusters/postgresql_controller_test.go +++ b/controllers/clusters/postgresql_controller_test.go @@ -19,109 +19,83 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const newPostgreSQLNodeSize = "PGS-DEV-t4g.medium-30" var _ = Describe("PostgreSQL Controller", func() { - var ( - postgresqlResource v1beta1.PostgreSQL - postgresqlYAML v1beta1.PostgreSQL - p = "postgresql" - ns = "default" - postgresqlNS = types.NamespacedName{Name: p, Namespace: ns} - timeout = time.Second * 60 - interval = time.Second * 2 - ) + postgresql := v1beta1.PostgreSQL{} + postgresqlManifest := v1beta1.PostgreSQL{} yfile, err := os.ReadFile("datatest/postgresql_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &postgresqlYAML) + err = yaml.Unmarshal(yfile, &postgresqlManifest) Expect(err).NotTo(HaveOccurred()) - postgresqlObjMeta := metav1.ObjectMeta{ - Name: p, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - postgresqlYAML.ObjectMeta = postgresqlObjMeta + postgresqlNamespacedName := types.NamespacedName{Name: postgresqlManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a PostgreSQL manifest", func() { It("should create a PostgreSQL resources", func() { - Expect(k8sClient.Create(ctx, &postgresqlYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &postgresqlManifest)).Should(Succeed()) By("sending PostgreSQL specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, postgresqlNS, &postgresqlResource); err != nil { + if err := k8sClient.Get(ctx, postgresqlNamespacedName, &postgresql); err != nil { return false } - return postgresqlResource.Status.ID == openapi.CreatedID + return postgresql.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a node size", func() { It("should update a PostgreSQL resources", func() { - Expect(k8sClient.Get(ctx, postgresqlNS, &postgresqlResource)).Should(Succeed()) - patch := postgresqlResource.NewPatch() + Expect(k8sClient.Get(ctx, postgresqlNamespacedName, &postgresql)).Should(Succeed()) - postgresqlResource.Spec.DataCentres[0].NodeSize = newPostgreSQLNodeSize - - postgresqlResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - Expect(k8sClient.Patch(ctx, &postgresqlResource, patch)).Should(Succeed()) + patch := postgresql.NewPatch() + postgresql.Spec.DataCentres[0].NodeSize = newPostgreSQLNodeSize + Expect(k8sClient.Patch(ctx, &postgresql, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + "the status job get new data from the InstAPI and update it in k8s PostgreSQL resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, postgresqlNS, &postgresqlResource); err != nil { + if err := k8sClient.Get(ctx, postgresqlNamespacedName, &postgresql); err != nil { return false } - if len(postgresqlResource.Status.DataCentres) == 0 || len(postgresqlResource.Status.DataCentres[0].Nodes) == 0 { + if len(postgresql.Status.DataCentres) == 0 || len(postgresql.Status.DataCentres[0].Nodes) == 0 { return false } - return postgresqlResource.Status.DataCentres[0].Nodes[0].Size == newPostgreSQLNodeSize + return postgresql.Status.DataCentres[0].Nodes[0].Size == newPostgreSQLNodeSize }, timeout, interval).Should(BeTrue()) }) }) When("delete the PostgreSQL resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, postgresqlNS, &postgresqlResource)).Should(Succeed()) - - postgresqlResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &postgresqlResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, postgresqlNamespacedName, &postgresql)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &postgresql)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, postgresqlNS, &postgresqlResource) + err := k8sClient.Get(ctx, postgresqlNamespacedName, &postgresql) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/clusters/redis_controller_test.go b/controllers/clusters/redis_controller_test.go index 67936c22c..4f90b3465 100644 --- a/controllers/clusters/redis_controller_test.go +++ b/controllers/clusters/redis_controller_test.go @@ -19,109 +19,82 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const newRedisNodeSize = "RDS-DEV-t4g.medium-80" var _ = Describe("Redis Controller", func() { - var ( - redisResource v1beta1.Redis - redisYAML v1beta1.Redis - k = "redis" - ns = "default" - redisNS = types.NamespacedName{Name: k, Namespace: ns} - timeout = time.Second * 40 - interval = time.Second * 2 - ) + redis := v1beta1.Redis{} + redisManifest := v1beta1.Redis{} yfile, err := os.ReadFile("datatest/redis_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &redisYAML) + err = yaml.Unmarshal(yfile, &redisManifest) Expect(err).NotTo(HaveOccurred()) - redisObjMeta := metav1.ObjectMeta{ - Name: k, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - redisYAML.ObjectMeta = redisObjMeta + redisNamespacedName := types.NamespacedName{Name: redisManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a redis manifest", func() { It("should create a redis resources", func() { - Expect(k8sClient.Create(ctx, &redisYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &redisManifest)).Should(Succeed()) By("sending redis specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, redisNS, &redisResource); err != nil { + if err := k8sClient.Get(ctx, redisNamespacedName, &redis); err != nil { return false } - return redisResource.Status.ID == openapi.CreatedID + return redis.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a node size", func() { It("should update a redis resources", func() { - Expect(k8sClient.Get(ctx, redisNS, &redisResource)).Should(Succeed()) - patch := redisResource.NewPatch() - - redisResource.Spec.DataCentres[0].NodeSize = newRedisNodeSize - - redisResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - Expect(k8sClient.Patch(ctx, &redisResource, patch)).Should(Succeed()) + Expect(k8sClient.Get(ctx, redisNamespacedName, &redis)).Should(Succeed()) + patch := redis.NewPatch() + redis.Spec.DataCentres[0].NodeSize = newRedisNodeSize + Expect(k8sClient.Patch(ctx, &redis, patch)).Should(Succeed()) By("sending a resize request to the Instaclustr API. And when the resize is completed, " + "the status job get new data from the InstAPI and update it in k8s redis resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, redisNS, &redisResource); err != nil { + if err := k8sClient.Get(ctx, redisNamespacedName, &redis); err != nil { return false } - if len(redisResource.Status.DataCentres) == 0 || len(redisResource.Status.DataCentres[0].Nodes) == 0 { + if len(redis.Status.DataCentres) == 0 || len(redis.Status.DataCentres[0].Nodes) == 0 { return false } - return redisResource.Status.DataCentres[0].Nodes[0].Size == newRedisNodeSize + return redis.Status.DataCentres[0].Nodes[0].Size == newRedisNodeSize }, timeout, interval).Should(BeTrue()) }) }) When("delete the redis resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, redisNS, &redisResource)).Should(Succeed()) - - redisResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &redisResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, redisNamespacedName, &redis)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &redis)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, redisNS, &redisResource) + err := k8sClient.Get(ctx, redisNamespacedName, &redis) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/clusters/suite_test.go b/controllers/clusters/suite_test.go index 6d785ba6a..74c62e8f6 100644 --- a/controllers/clusters/suite_test.go +++ b/controllers/clusters/suite_test.go @@ -48,6 +48,11 @@ var ( testEnv *envtest.Environment ctx context.Context cancel context.CancelFunc + + defaultNS = "default" + + timeout = time.Millisecond * 2000 + interval = time.Millisecond * 20 ) func TestAPIs(t *testing.T) { @@ -93,9 +98,11 @@ var _ = BeforeSuite(func() { eRecorder := k8sManager.GetEventRecorderFor("instaclustr-operator-tests") - scheduler.ClusterStatusInterval = 1 * time.Second - scheduler.ClusterBackupsInterval = 30 * time.Second - models.ReconcileRequeue = reconcile.Result{RequeueAfter: time.Second * 3} + scheduler.ClusterStatusInterval = interval + scheduler.ClusterBackupsInterval = time.Second * 30 + scheduler.UserCreationInterval = interval + + models.ReconcileRequeue = reconcile.Result{RequeueAfter: interval} err = (&KafkaReconciler{ Client: k8sManager.GetClient(), diff --git a/controllers/clusters/zookeeper_controller_test.go b/controllers/clusters/zookeeper_controller_test.go index 039490bab..565c3137d 100644 --- a/controllers/clusters/zookeeper_controller_test.go +++ b/controllers/clusters/zookeeper_controller_test.go @@ -19,99 +19,76 @@ package clusters import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/clusters/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) const zookeeperNodeSizeFromYAML = "zookeeper-developer-t3.small-20" var _ = Describe("Zookeeper Controller", func() { - var ( - zookeeperResource v1beta1.Zookeeper - zookeeperYAML v1beta1.Zookeeper - zook = "zookeeper" - ns = "default" - zookeeperNS = types.NamespacedName{Name: zook, Namespace: ns} - timeout = time.Second * 40 - interval = time.Second * 2 - ) + zookeeper := v1beta1.Zookeeper{} + zookeeperManifest := v1beta1.Zookeeper{} yfile, err := os.ReadFile("datatest/zookeeper_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &zookeeperYAML) + err = yaml.Unmarshal(yfile, &zookeeperManifest) Expect(err).NotTo(HaveOccurred()) - zookeeperObjMeta := metav1.ObjectMeta{ - Name: zook, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - zookeeperYAML.ObjectMeta = zookeeperObjMeta + zookeeperNamespacedName := types.NamespacedName{Name: zookeeperManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a zookeeper manifest", func() { It("should create a zookeeper resources", func() { - Expect(k8sClient.Create(ctx, &zookeeperYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &zookeeperManifest)).Should(Succeed()) By("sending zookeeper specification to the Instaclustr API and get ID of created cluster.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, zookeeperNS, &zookeeperResource); err != nil { + if err := k8sClient.Get(ctx, zookeeperNamespacedName, &zookeeper); err != nil { return false } - return zookeeperResource.Status.ID == openapi.CreatedID + return zookeeper.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) When("zookeeper is created, the status job get new data from the InstAPI.", func() { It("updates a k8s zookeeper status", func() { - Expect(k8sClient.Get(ctx, zookeeperNS, &zookeeperResource)).Should(Succeed()) + Expect(k8sClient.Get(ctx, zookeeperNamespacedName, &zookeeper)).Should(Succeed()) Eventually(func() bool { - if err := k8sClient.Get(ctx, zookeeperNS, &zookeeperResource); err != nil { + if err := k8sClient.Get(ctx, zookeeperNamespacedName, &zookeeper); err != nil { return false } - if len(zookeeperResource.Status.DataCentres) == 0 || len(zookeeperResource.Status.DataCentres[0].Nodes) == 0 { + if len(zookeeper.Status.DataCentres) == 0 || len(zookeeper.Status.DataCentres[0].Nodes) == 0 { return false } - return zookeeperResource.Status.DataCentres[0].Nodes[0].Size == zookeeperNodeSizeFromYAML + return zookeeper.Status.DataCentres[0].Nodes[0].Size == zookeeperNodeSizeFromYAML }, timeout, interval).Should(BeTrue()) }) }) When("delete the zookeeper resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, zookeeperNS, &zookeeperResource)).Should(Succeed()) - - zookeeperResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &zookeeperResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, zookeeperNamespacedName, &zookeeper)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &zookeeper)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, zookeeperNS, &zookeeperResource) + err := k8sClient.Get(ctx, zookeeperNamespacedName, &zookeeper) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/kafkamanagement/datatest/kafkaacl_v1beta1.yaml b/controllers/kafkamanagement/datatest/kafkaacl_v1beta1.yaml index c1e2aa391..2f6007b03 100644 --- a/controllers/kafkamanagement/datatest/kafkaacl_v1beta1.yaml +++ b/controllers/kafkamanagement/datatest/kafkaacl_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: kafkamanagement.instaclustr.com/v1beta1 kind: KafkaACL metadata: name: kafkaacl-sample + namespace: default + annotations: + defaulter: webhook spec: acls: - host: "*" diff --git a/controllers/kafkamanagement/datatest/mirror_v1beta1.yaml b/controllers/kafkamanagement/datatest/mirror_v1beta1.yaml index a59d70c54..0f07b3dcc 100644 --- a/controllers/kafkamanagement/datatest/mirror_v1beta1.yaml +++ b/controllers/kafkamanagement/datatest/mirror_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: kafkamanagement.instaclustr.com/v1beta1 kind: Mirror metadata: name: mirror-sample + namespace: default + annotations: + defaulter: webhook spec: kafkaConnectClusterId: "897da890-a212-4771-ac13-b08d85e32ad6" maxTasks: 3 diff --git a/controllers/kafkamanagement/datatest/topic_v1beta1.yaml b/controllers/kafkamanagement/datatest/topic_v1beta1.yaml index a0dea9399..fb73877f5 100644 --- a/controllers/kafkamanagement/datatest/topic_v1beta1.yaml +++ b/controllers/kafkamanagement/datatest/topic_v1beta1.yaml @@ -2,6 +2,9 @@ apiVersion: kafkamanagement.instaclustr.com/v1beta1 kind: Topic metadata: name: topic-sample + namespace: default + annotations: + defaulter: webhook spec: partitions: 3 replicationFactor: 3 diff --git a/controllers/kafkamanagement/kafkaacl_controller_test.go b/controllers/kafkamanagement/kafkaacl_controller_test.go index 8c47fab68..e1e747881 100644 --- a/controllers/kafkamanagement/kafkaacl_controller_test.go +++ b/controllers/kafkamanagement/kafkaacl_controller_test.go @@ -19,12 +19,10 @@ package kafkamanagement import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" @@ -36,88 +34,65 @@ import ( var newRecourseType = "GROUP" var _ = Describe("Kafka ACL Controller", func() { - var ( - aclResource v1beta1.KafkaACL - aclYAML v1beta1.KafkaACL - a = "acl" - ns = "default" - aclNS = types.NamespacedName{Name: a, Namespace: ns} - timeout = time.Second * 15 - interval = time.Second * 2 - ) + acl := v1beta1.KafkaACL{} + aclManifest := v1beta1.KafkaACL{} yfile, err := os.ReadFile("datatest/kafkaacl_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &aclYAML) + err = yaml.Unmarshal(yfile, &aclManifest) Expect(err).NotTo(HaveOccurred()) - aclObjMeta := metav1.ObjectMeta{ - Name: a, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - aclYAML.ObjectMeta = aclObjMeta + aclNamespacedName := types.NamespacedName{Name: aclManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a Kafka ACL manifest", func() { It("should create a ACL resources", func() { - Expect(k8sClient.Create(ctx, &aclYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &aclManifest)).Should(Succeed()) By("sending ACL specification to the Instaclustr API and get ID of created resource.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, aclNS, &aclResource); err != nil { + if err := k8sClient.Get(ctx, aclNamespacedName, &acl); err != nil { return false } - return aclResource.Status.ID == openapi.CreatedID + return acl.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a ACL ", func() { It("should update the ACL resources", func() { - Expect(k8sClient.Get(ctx, aclNS, &aclResource)).Should(Succeed()) - patch := aclResource.NewPatch() + Expect(k8sClient.Get(ctx, aclNamespacedName, &acl)).Should(Succeed()) - aclResource.Spec.ACLs[0].ResourceType = newRecourseType - aclResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - - Expect(k8sClient.Patch(ctx, &aclResource, patch)).Should(Succeed()) + patch := acl.NewPatch() + acl.Spec.ACLs[0].ResourceType = newRecourseType + Expect(k8sClient.Patch(ctx, &acl, patch)).Should(Succeed()) By("sending a new ACL configs request to the Instaclustr API, it" + "gets a new data from the InstAPI and update it in k8s ACL resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, aclNS, &aclResource); err != nil { + if err := k8sClient.Get(ctx, aclNamespacedName, &acl); err != nil { return false } - return aclResource.GetAnnotations()[models.ResourceStateAnnotation] == models.UpdatedEvent + return acl.GetAnnotations()[models.ResourceStateAnnotation] == models.UpdatedEvent }, timeout, interval).Should(BeTrue()) }) }) When("delete the ACL resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, aclNS, &aclResource)).Should(Succeed()) - - aclResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &aclResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, aclNamespacedName, &acl)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &acl)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, aclNS, &aclResource) + err := k8sClient.Get(ctx, aclNamespacedName, &acl) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }, timeout, interval).Should(BeTrue()) }) }) diff --git a/controllers/kafkamanagement/mirror_controller.go b/controllers/kafkamanagement/mirror_controller.go index 3fbdaa4cf..c9d8c7c4c 100644 --- a/controllers/kafkamanagement/mirror_controller.go +++ b/controllers/kafkamanagement/mirror_controller.go @@ -76,13 +76,10 @@ func (r *MirrorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr switch mirror.Annotations[models.ResourceStateAnnotation] { case models.CreatingEvent: return r.handleCreateCluster(ctx, mirror, l), nil - case models.UpdatingEvent: return r.handleUpdateCluster(ctx, mirror, l), nil - case models.DeletingEvent: return r.handleDeleteCluster(ctx, mirror, l), nil - case models.GenericEvent: l.Info("Event isn't handled", "kafka Connect ID to mirror", mirror.Spec.KafkaConnectClusterID, @@ -105,48 +102,34 @@ func (r *MirrorReconciler) handleCreateCluster( iStatus, err := r.API.CreateKafkaMirror(&mirror.Spec) if err != nil { l.Error(err, "Cannot create Kafka mirror", "spec", mirror.Spec) - r.EventRecorder.Eventf( - mirror, models.Warning, models.CreationFailed, - "Resource creation on the Instaclustr is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.CreationFailed, + "Resource creation on the Instaclustr is failed. Reason: %v", err) return models.ReconcileRequeue } - l.Info("Kafka mirror has been created", "mirror ID", mirror.Status.ID) - - r.EventRecorder.Eventf( - mirror, models.Normal, models.Created, - "Resource creation request is sent. Mirror ID: %s", - mirror.Status.ID, - ) + l.Info("Kafka mirror has been created", "mirror ID", iStatus.ID) + r.EventRecorder.Eventf(mirror, models.Normal, models.Created, + "Kafka mirror creation request is sent to Instaclustr API. Mirror ID: %s", mirror.Status.ID) patch := mirror.NewPatch() - mirror.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent controllerutil.AddFinalizer(mirror, models.DeletionFinalizer) - err = r.Patch(ctx, mirror, patch) if err != nil { l.Error(err, "Cannot patch kafka mirror after create", "kafka mirror connector name", iStatus.ConnectorName) - r.EventRecorder.Eventf( - mirror, models.Warning, models.PatchFailed, - "Resource patch is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, + "Resource patch is failed after request to Instaclustr was sent. Reason: %v", err) return models.ReconcileRequeue } + patch = mirror.NewPatch() mirror.Status = *iStatus err = r.Status().Patch(ctx, mirror, patch) if err != nil { l.Error(err, "Cannot patch Kafka mirror status from the Instaclustr API", - "spec", mirror.Spec) - r.EventRecorder.Eventf( - mirror, models.Warning, models.PatchFailed, - "Resource status patch is failed. Reason: %v", - err, - ) + "spec", mirror.Spec, "status", iStatus) + r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, + "Resource status patch is failed. Reason: %v", err) return models.ReconcileRequeue } } @@ -155,17 +138,12 @@ func (r *MirrorReconciler) handleCreateCluster( if err != nil { l.Error(err, "Cannot start cluster status job", "mirror cluster ID", mirror.Status.ID) - r.EventRecorder.Eventf( - mirror, models.Warning, models.CreationFailed, - "Resource status job creation is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.CreationFailed, + "Mirror status job creation is failed. Reason: %v", err) return models.ReconcileRequeue } - r.EventRecorder.Eventf( - mirror, models.Normal, models.Created, - "Resource status check job is started", - ) + r.EventRecorder.Event(mirror, models.Normal, models.Created, + "mirror status check job is started") return models.ExitReconcile } @@ -180,12 +158,8 @@ func (r *MirrorReconciler) handleUpdateCluster( iMirror, err := r.API.GetMirrorStatus(mirror.Status.ID) if err != nil { l.Error(err, "Cannot get Kafka mirror from Instaclustr", "mirror ID", mirror.Status.ID) - - r.EventRecorder.Eventf( - mirror, models.Warning, models.UpdateFailed, - "Resource update on the Instaclustr API is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.UpdateFailed, + "Resource update on the Instaclustr API is failed. Reason: %v", err) return models.ReconcileRequeue } @@ -194,11 +168,8 @@ func (r *MirrorReconciler) handleUpdateCluster( if err != nil { l.Error(err, "Unable to update kafka mirror", "kafka connect ID", mirror.Spec.KafkaConnectClusterID, "mirror ID", mirror.Status.ID) - r.EventRecorder.Eventf( - mirror, models.Warning, models.UpdateFailed, - "Resource update on the Instaclustr API is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.UpdateFailed, + "Failed to sent a update request to the Instaclustr API. Reason: %v", err) return models.ReconcileRequeue } @@ -215,14 +186,11 @@ func (r *MirrorReconciler) handleUpdateCluster( mirror.Annotations[models.ResourceStateAnnotation] = models.UpdatedEvent err = r.Patch(ctx, mirror, patch) if err != nil { - l.Error(err, "Cannot patch Kafka mirror management after update op", + l.Error(err, "Cannot patch Kafka mirror management after update", "mirror ID", mirror.Status.ID, "kafka connect", mirror.Spec.KafkaConnectClusterID) - r.EventRecorder.Eventf( - mirror, models.Warning, models.PatchFailed, - "Resource status patch is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, + "Resource status patch is failed. Reason: %v", err) return models.ReconcileRequeue } @@ -241,11 +209,8 @@ func (r *MirrorReconciler) handleDeleteCluster( l.Error(err, "Cannot get Kafka mirror", "kafka connect ID", mirror.Spec.KafkaConnectClusterID, "mirror id", mirror.Status.ID) - r.EventRecorder.Eventf( - mirror, models.Warning, models.FetchFailed, - "Fetch resource from the Instaclustr API is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.FetchFailed, + "Fetch resource from the Instaclustr API is failed. Reason: %v", err) return models.ReconcileRequeue } @@ -255,35 +220,25 @@ func (r *MirrorReconciler) handleDeleteCluster( l.Error(err, "Cannot delete kafka mirror", "mirror name", mirror.Spec.KafkaConnectClusterID, "mirror ID", mirror.Status.ID) - r.EventRecorder.Eventf( - mirror, models.Warning, models.DeletionFailed, - "Resource deletion on the Instaclustr is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.DeletionFailed, + "Resource deletion on the Instaclustr is failed. Reason: %v", err) return models.ReconcileRequeue } - r.EventRecorder.Eventf( - mirror, models.Normal, models.DeletionStarted, - "Resource deletion request is sent to the Instaclustr API.", - ) + r.EventRecorder.Eventf(mirror, models.Normal, models.DeletionStarted, + "Mirror deletion request is sent to the Instaclustr API.") } patch := mirror.NewPatch() - r.Scheduler.RemoveJob(mirror.GetJobID(scheduler.StatusChecker)) mirror.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent controllerutil.RemoveFinalizer(mirror, models.DeletionFinalizer) - err = r.Patch(ctx, mirror, patch) if err != nil { l.Error(err, "Cannot patch remove finalizer from kafka", "cluster name", mirror.Spec.KafkaConnectClusterID) - r.EventRecorder.Eventf( - mirror, models.Warning, models.PatchFailed, - "Resource patch is failed. Reason: %v", - err, - ) + r.EventRecorder.Eventf(mirror, models.Warning, models.PatchFailed, + "Mirror patch is failed after deletion. Reason: %v", err) return models.ReconcileRequeue } @@ -291,10 +246,8 @@ func (r *MirrorReconciler) handleDeleteCluster( "kafka connect ID", mirror.Spec.KafkaConnectClusterID, "mirror ID", mirror.Status.ID) - r.EventRecorder.Eventf( - mirror, models.Normal, models.Deleted, - "Resource is deleted", - ) + r.EventRecorder.Eventf(mirror, models.Normal, models.Deleted, + "Resource is deleted") return models.ExitReconcile } @@ -316,13 +269,18 @@ func (r *MirrorReconciler) newWatchStatusJob(mirror *v1beta1.Mirror) scheduler.J namespacedName := client.ObjectKeyFromObject(mirror) err := r.Get(context.Background(), namespacedName, mirror) if k8serrors.IsNotFound(err) { - l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", + l.Info("Mirror is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) + r.EventRecorder.Eventf(mirror, models.Normal, models.Deleted, + "Mirror is not found in the k8s cluster. Closing Instaclustr status sync.") + r.Scheduler.RemoveJob(mirror.GetJobID(scheduler.StatusChecker)) return nil } if err != nil { l.Error(err, "Cannot get mirror resource", "resource name", mirror.Name) + r.EventRecorder.Eventf(mirror, models.Warning, models.GetFailed, + "Cannot get mirror resource, request: %v", namespacedName) return err } @@ -333,12 +291,16 @@ func (r *MirrorReconciler) newWatchStatusJob(mirror *v1beta1.Mirror) scheduler.J "mirror ID", mirror.Status.ID, "kafka connect ID", mirror.Spec.KafkaConnectClusterID, "namespaced name", namespacedName) + r.EventRecorder.Event(mirror, models.Normal, models.Deleted, + "Mirror is not found in Instaclustr. Deleting resource.") err = r.Delete(context.Background(), mirror) if err != nil { - l.Error(err, "Cannot delete mirror resource", - "resource name", mirror.Name, - "resource ID", mirror.Status.ID) + l.Error(err, "Cannot delete mirror resource after not found in the Instaclustr", + "mirror namespacedName", namespacedName, "resource ID", mirror.Status.ID) + r.EventRecorder.Event(mirror, models.Normal, models.Deleted, + "Cannot delete mirror resource after not found in the Instaclustr") + return err } @@ -349,21 +311,17 @@ func (r *MirrorReconciler) newWatchStatusJob(mirror *v1beta1.Mirror) scheduler.J return err } - patch := mirror.NewPatch() - - if mirror.Spec.TargetLatency != iMirror.TargetLatency { + if mirror.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && + mirror.Spec.TargetLatency != iMirror.TargetLatency { l.Info("k8s Kafka Mirror target latency is different from Instaclustr. Reconcile target latency..", "mirror ID", mirror.Status.ID, "kafka connect ID", mirror.Spec.KafkaConnectClusterID, "Instaclustr target latency", iMirror.TargetLatency, "k8s target latency", mirror.Spec.TargetLatency) - - mirror.Spec.TargetLatency = iMirror.TargetLatency - err = r.Patch(context.Background(), mirror, patch) - if err != nil { - l.Error(err, "Cannot patch mirror target latency", "mirror ID name", mirror.Status.ID) - return err - } + r.EventRecorder.Eventf(mirror, models.Warning, models.ExternalChanges, + "There are external changes on the Instaclustr console. Please reconcile the specification manually. "+ + "targetLatency from k8s: %v, targetLatency from instaclustr: %v", + mirror.Spec.TargetLatency, iMirror.TargetLatency) } if !mirror.Status.IsEqual(iMirror) { @@ -371,10 +329,11 @@ func (r *MirrorReconciler) newWatchStatusJob(mirror *v1beta1.Mirror) scheduler.J "Instaclustr data", iMirror, "k8s operator data", mirror.Status) + patch := mirror.NewPatch() mirror.Status = *iMirror err = r.Status().Patch(context.Background(), mirror, patch) if err != nil { - l.Error(err, "Cannot patch Kafka cluster", + l.Error(err, "Cannot patch Mirror status", "mirror ID name", mirror.Status.ID) return err } diff --git a/controllers/kafkamanagement/mirror_controller_test.go b/controllers/kafkamanagement/mirror_controller_test.go index 01f4ff7d9..e22076f1d 100644 --- a/controllers/kafkamanagement/mirror_controller_test.go +++ b/controllers/kafkamanagement/mirror_controller_test.go @@ -19,85 +19,63 @@ package kafkamanagement import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) var newMirrorLatency int32 = 3000 var _ = Describe("Kafka Mirror Controller", func() { - var ( - mirrorResource v1beta1.Mirror - mirrorYAML v1beta1.Mirror - m = "mirror" - ns = "default" - mirrorNS = types.NamespacedName{Name: m, Namespace: ns} - timeout = time.Second * 40 - interval = time.Second * 2 - ) + mirror := v1beta1.Mirror{} + mirrorManifest := v1beta1.Mirror{} yfile, err := os.ReadFile("datatest/mirror_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &mirrorYAML) + err = yaml.Unmarshal(yfile, &mirrorManifest) Expect(err).NotTo(HaveOccurred()) - mirrorObjMeta := metav1.ObjectMeta{ - Name: m, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - mirrorYAML.ObjectMeta = mirrorObjMeta + mirrorNamespacedName := types.NamespacedName{Name: mirrorManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a Mirror manifest", func() { It("should create a Mirror resources", func() { - Expect(k8sClient.Create(ctx, &mirrorYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &mirrorManifest)).Should(Succeed()) By("sending a Mirror specification to the Instaclustr API and get an ID of created resource.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, mirrorNS, &mirrorResource); err != nil { + if err := k8sClient.Get(ctx, mirrorNamespacedName, &mirror); err != nil { return false } - return mirrorResource.Status.ID != openapi.CreatedID + return mirror.Status.ID != openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a Mirror latency", func() { It("should update the Mirror resources", func() { - Expect(k8sClient.Get(ctx, mirrorNS, &mirrorResource)).Should(Succeed()) - patch := mirrorResource.NewPatch() + Expect(k8sClient.Get(ctx, mirrorNamespacedName, &mirror)).Should(Succeed()) - mirrorResource.Spec.TargetLatency = newMirrorLatency - mirrorResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - - Expect(k8sClient.Patch(ctx, &mirrorResource, patch)).Should(Succeed()) + patch := mirror.NewPatch() + mirror.Spec.TargetLatency = newMirrorLatency + Expect(k8sClient.Patch(ctx, &mirror, patch)).Should(Succeed()) By("sending a new Mirror configs request to the Instaclustr API, it" + "gets a new data from the InstAPI and update it in k8s Mirror resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, mirrorNS, &mirrorResource); err != nil { + if err := k8sClient.Get(ctx, mirrorNamespacedName, &mirror); err != nil { return false } - if mirrorResource.Status.TargetLatency != newMirrorLatency { + if mirror.Status.TargetLatency != newMirrorLatency { return false } @@ -108,19 +86,16 @@ var _ = Describe("Kafka Mirror Controller", func() { When("delete the Kafka resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, mirrorNS, &mirrorResource)).Should(Succeed()) - - mirrorResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - Expect(k8sClient.Delete(ctx, &mirrorResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, mirrorNamespacedName, &mirror)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &mirror)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, mirrorNS, &mirrorResource) + err := k8sClient.Get(ctx, mirrorNamespacedName, &mirror) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }).Should(BeTrue()) }) }) diff --git a/controllers/kafkamanagement/suite_test.go b/controllers/kafkamanagement/suite_test.go index 0e9148e1e..63a5fcad8 100644 --- a/controllers/kafkamanagement/suite_test.go +++ b/controllers/kafkamanagement/suite_test.go @@ -46,6 +46,10 @@ var ( testEnv *envtest.Environment ctx context.Context cancel context.CancelFunc + + defaultNS = "default" + timeout = time.Millisecond * 300 + interval = time.Millisecond * 30 ) func TestAPIs(t *testing.T) { @@ -86,8 +90,8 @@ var _ = BeforeSuite(func() { }) Expect(err).ToNot(HaveOccurred()) - scheduler.ClusterStatusInterval = 1 * time.Second - models.ReconcileRequeue = reconcile.Result{RequeueAfter: time.Second * 2} + scheduler.ClusterStatusInterval = interval + models.ReconcileRequeue = reconcile.Result{RequeueAfter: interval} eRecorder := k8sManager.GetEventRecorderFor("instaclustr-operator-tests") diff --git a/controllers/kafkamanagement/topic_controller_test.go b/controllers/kafkamanagement/topic_controller_test.go index 495246bef..8a4348368 100644 --- a/controllers/kafkamanagement/topic_controller_test.go +++ b/controllers/kafkamanagement/topic_controller_test.go @@ -19,18 +19,15 @@ package kafkamanagement import ( "context" "os" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" k8serrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" "github.com/instaclustr/operator/apis/kafkamanagement/v1beta1" openapi "github.com/instaclustr/operator/pkg/instaclustr/mock/server/go" - "github.com/instaclustr/operator/pkg/models" ) var newTopicConfig = map[string]string{ @@ -39,69 +36,49 @@ var newTopicConfig = map[string]string{ } var _ = Describe("Kafka Topic Controller", func() { - var ( - topicResource v1beta1.Topic - topicYAML v1beta1.Topic - t = "topic" - ns = "default" - topicNS = types.NamespacedName{Name: t, Namespace: ns} - timeout = time.Second * 20 - interval = time.Second * 2 - ) + topic := v1beta1.Topic{} + topicManifest := v1beta1.Topic{} yfile, err := os.ReadFile("datatest/topic_v1beta1.yaml") Expect(err).NotTo(HaveOccurred()) - err = yaml.Unmarshal(yfile, &topicYAML) + err = yaml.Unmarshal(yfile, &topicManifest) Expect(err).NotTo(HaveOccurred()) - topicObjMeta := metav1.ObjectMeta{ - Name: t, - Namespace: ns, - Annotations: map[string]string{ - models.ResourceStateAnnotation: models.CreatingEvent, - }, - } - - topicYAML.ObjectMeta = topicObjMeta + topicNamespacedName := types.NamespacedName{Name: topicManifest.ObjectMeta.Name, Namespace: defaultNS} ctx := context.Background() When("apply a Kafka topic manifest", func() { It("should create a topic resources", func() { - Expect(k8sClient.Create(ctx, &topicYAML)).Should(Succeed()) + Expect(k8sClient.Create(ctx, &topicManifest)).Should(Succeed()) By("sending topic specification to the Instaclustr API and get ID of created resource.") - Eventually(func() bool { - if err := k8sClient.Get(ctx, topicNS, &topicResource); err != nil { + if err := k8sClient.Get(ctx, topicNamespacedName, &topic); err != nil { return false } - return topicResource.Status.ID == openapi.CreatedID + return topic.Status.ID == openapi.CreatedID }).Should(BeTrue()) }) }) When("changing a topic configs", func() { It("should update the topic resources", func() { - Expect(k8sClient.Get(ctx, topicNS, &topicResource)).Should(Succeed()) - patch := topicResource.NewPatch() - - topicResource.Spec.TopicConfigs = newTopicConfig - topicResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.UpdatingEvent} - - Expect(k8sClient.Patch(ctx, &topicResource, patch)).Should(Succeed()) + Expect(k8sClient.Get(ctx, topicNamespacedName, &topic)).Should(Succeed()) + patch := topic.NewPatch() + topic.Spec.TopicConfigs = newTopicConfig + Expect(k8sClient.Patch(ctx, &topic, patch)).Should(Succeed()) By("sending a new topic configs request to the Instaclustr API, it" + "gets a new data from the InstAPI and update it in k8s Topic resource") - Eventually(func() bool { - if err := k8sClient.Get(ctx, topicNS, &topicResource); err != nil { + if err := k8sClient.Get(ctx, topicNamespacedName, &topic); err != nil { return false } for newK, newV := range newTopicConfig { - if oldV, ok := topicResource.Status.TopicConfigs[newK]; !ok || oldV != newV { + if oldV, ok := topic.Status.TopicConfigs[newK]; !ok || oldV != newV { return false } } @@ -112,20 +89,16 @@ var _ = Describe("Kafka Topic Controller", func() { When("delete the Kafka resource", func() { It("should send delete request to the Instaclustr API", func() { - Expect(k8sClient.Get(ctx, topicNS, &topicResource)).Should(Succeed()) - - topicResource.Annotations = map[string]string{models.ResourceStateAnnotation: models.DeletingEvent} - - Expect(k8sClient.Delete(ctx, &topicResource)).Should(Succeed()) - + Expect(k8sClient.Get(ctx, topicNamespacedName, &topic)).Should(Succeed()) + Expect(k8sClient.Delete(ctx, &topic)).Should(Succeed()) By("sending delete request to Instaclustr API") Eventually(func() bool { - err := k8sClient.Get(ctx, topicNS, &topicResource) + err := k8sClient.Get(ctx, topicNamespacedName, &topic) if err != nil && !k8serrors.IsNotFound(err) { return false } - return true + return k8serrors.IsNotFound(err) }).Should(BeTrue()) }) }) diff --git a/pkg/models/operator.go b/pkg/models/operator.go index b58db4f59..b81d4eb16 100644 --- a/pkg/models/operator.go +++ b/pkg/models/operator.go @@ -121,6 +121,7 @@ const ( Warning = "Warning" Created = "Created" PatchFailed = "PatchFailed" + GetFailed = "GetFailed" NotFound = "NotFound" CreationFailed = "CreationFailed" FetchFailed = "FetchFailed"