From 070e4e064fffa208ecf086527325ba0642395fe7 Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Wed, 7 Feb 2024 15:23:12 +0200 Subject: [PATCH] kafka codebase refactor --- .secrets.baseline | 25 +- apis/clusters/v1beta1/cassandra_webhook.go | 14 +- apis/clusters/v1beta1/kafka_types.go | 421 +++++++++++------- apis/clusters/v1beta1/kafka_webhook.go | 52 +-- apis/clusters/v1beta1/kafka_webhook_test.go | 8 +- apis/clusters/v1beta1/structs.go | 23 + .../clusters/v1beta1/zz_generated.deepcopy.go | 103 +++-- .../clusters.instaclustr.com_cassandras.yaml | 1 + .../clusters.instaclustr.com_kafkas.yaml | 21 +- config/samples/clusters_v1beta1_kafka.yaml | 18 +- controllers/clusters/cadence_controller.go | 20 +- controllers/clusters/cassandra_controller.go | 36 +- .../clusters/datatest/kafka_v1beta1.yaml | 2 +- controllers/clusters/helpers.go | 15 + controllers/clusters/kafka_controller.go | 361 +++++++-------- controllers/clusters/opensearch_controller.go | 32 +- pkg/instaclustr/client.go | 10 +- pkg/instaclustr/interfaces.go | 2 +- pkg/instaclustr/mock/client.go | 2 +- pkg/models/cassandra_apiv2.go | 2 +- pkg/models/kafka_apiv2.go | 49 +- 21 files changed, 684 insertions(+), 533 deletions(-) diff --git a/.secrets.baseline b/.secrets.baseline index 5a819ca17..411e7d1c7 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -75,6 +75,10 @@ { "path": "detect_secrets.filters.allowlist.is_line_allowlisted" }, + { + "path": "detect_secrets.filters.common.is_baseline_file", + "filename": ".secrets.baseline" + }, { "path": "detect_secrets.filters.common.is_ignored_due_to_verification_policies", "min_level": 2 @@ -114,15 +118,6 @@ } ], "results": { - ".git/config": [ - { - "type": "Base64 High Entropy String", - "filename": ".git/config", - "hashed_secret": "a16bf940eb9599e3a77ae599906a4e71e4e52243", - "is_verified": false, - "line_number": 23 - } - ], "apis/clusterresources/v1beta1/cassandrauser_types.go": [ { "type": "Secret Keyword", @@ -222,14 +217,14 @@ "filename": "apis/clusters/v1beta1/kafka_types.go", "hashed_secret": "964c67cddfe8e6707157152dcf319126502199dc", "is_verified": false, - "line_number": 206 + "line_number": 294 }, { "type": "Secret Keyword", "filename": "apis/clusters/v1beta1/kafka_types.go", - "hashed_secret": "75ba0f10db4ab09467225d44f7911a3f724a0917", + "hashed_secret": "589a0ad3cc6bc886a00c46a22e5065c48bd8e1b2", "is_verified": false, - "line_number": 374 + "line_number": 440 } ], "apis/clusters/v1beta1/kafkaconnect_types.go": [ @@ -395,7 +390,7 @@ "filename": "apis/clusters/v1beta1/zz_generated.deepcopy.go", "hashed_secret": "44e17306b837162269a410204daaa5ecee4ec22c", "is_verified": false, - "line_number": 2174 + "line_number": 2223 } ], "apis/kafkamanagement/v1beta1/kafkauser_types.go": [ @@ -744,7 +739,7 @@ "filename": "pkg/instaclustr/client.go", "hashed_secret": "5baa61e4c9b93f3f0682250b6cf8331b7ee68fd8", "is_verified": false, - "line_number": 2048 + "line_number": 2054 } ], "pkg/instaclustr/mock/client.go": [ @@ -1135,5 +1130,5 @@ } ] }, - "generated_at": "2024-02-06T13:59:12Z" + "generated_at": "2024-02-07T11:44:08Z" } diff --git a/apis/clusters/v1beta1/cassandra_webhook.go b/apis/clusters/v1beta1/cassandra_webhook.go index c0b60a4f0..1cd98551b 100644 --- a/apis/clusters/v1beta1/cassandra_webhook.go +++ b/apis/clusters/v1beta1/cassandra_webhook.go @@ -176,13 +176,17 @@ func (cv *cassandraValidator) ValidateUpdate(ctx context.Context, old runtime.Ob return fmt.Errorf("cannot assert object %v to cassandra", new.GetObjectKind()) } - cassandralog.Info("validate update", "name", c.Name) - + // skip validation when we receive cluster specification update from the Instaclustr Console. if c.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { return nil } - // skip validation when we receive cluster specification update from the Instaclustr Console. + if c.Status.ID == "" { + return cv.ValidateCreate(ctx, c) + } + + cassandralog.Info("validate update", "name", c.Name) + if c.Annotations[models.ExternalChangesAnnotation] == models.True { return nil } @@ -200,10 +204,6 @@ func (cv *cassandraValidator) ValidateUpdate(ctx context.Context, old runtime.Ob return nil } - if c.Status.ID == "" { - return cv.ValidateCreate(ctx, c) - } - err := c.Spec.validateUpdate(oldCluster.Spec) if err != nil { return fmt.Errorf("cannot update immutable fields: %v", err) diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index 6c1a5bdd3..6d62c7254 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -17,21 +17,15 @@ limitations under the License. package v1beta1 import ( - "encoding/json" - k8scorev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" ) -// +kubebuilder:object:generate:=false -type KafkaAddons interface { - SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | PrivateLink | Kraft -} - type SchemaRegistry struct { Version string `json:"version"` } @@ -64,35 +58,35 @@ type KarapaceSchemaRegistry struct { // KafkaSpec defines the desired state of Kafka type KafkaSpec struct { - Cluster `json:",inline"` - OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` - SchemaRegistry []*SchemaRegistry `json:"schemaRegistry,omitempty"` + GenericClusterSpec `json:",inline"` + OnPremisesSpec *OnPremisesSpec `json:"onPremisesSpec,omitempty"` // ReplicationFactor to use for new topic. // Also represents the number of racks to use when allocating nodes. ReplicationFactor int `json:"replicationFactor"` // PartitionsNumber number of partitions to use when created new topics. - PartitionsNumber int `json:"partitionsNumber"` - RestProxy []*RestProxy `json:"restProxy,omitempty"` - AllowDeleteTopics bool `json:"allowDeleteTopics"` - AutoCreateTopics bool `json:"autoCreateTopics"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - //+kubebuilder:validation:MinItems:=1 - //+kubebuilder:validation:MaxItems:=1 - DataCentres []*KafkaDataCentre `json:"dataCentres"` - //+kubebuilder:validation:MaxItems:=1 - ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` + PartitionsNumber int `json:"partitionsNumber"` + + AllowDeleteTopics bool `json:"allowDeleteTopics"` + AutoCreateTopics bool `json:"autoCreateTopics"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"` + BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + UserRefs References `json:"userRefs,omitempty"` // Provision additional dedicated nodes for Apache Zookeeper to run on. // Zookeeper nodes will be co-located with Kafka if this is not provided - DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"` - ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"` - KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"` - KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"` - BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - UserRefs References `json:"userRefs,omitempty"` - Kraft []*Kraft `json:"kraft,omitempty"` + DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"` + //+kubebuilder:validation:MinItems:=1 + //+kubebuilder:validation:MaxItems:=1 + DataCentres []*KafkaDataCentre `json:"dataCentres"` + SchemaRegistry []*SchemaRegistry `json:"schemaRegistry,omitempty"` + RestProxy []*RestProxy `json:"restProxy,omitempty"` + KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"` + KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"` + Kraft []*Kraft `json:"kraft,omitempty"` + ResizeSettings GenericResizeSettings `json:"resizeSettings,omitempty"` } type Kraft struct { @@ -100,14 +94,113 @@ type Kraft struct { } type KafkaDataCentre struct { - DataCentre `json:",inline"` + GenericDataCentreSpec `json:",inline"` + + NodesNumber int `json:"nodesNumber"` + NodeSize string `json:"nodeSize"` + PrivateLink []*PrivateLink `json:"privateLink,omitempty"` } +func (dc *KafkaDataCentre) Equals(o *KafkaDataCentre) bool { + return dc.GenericDataCentreSpec.Equals(&o.GenericDataCentreSpec) && + slices.EqualsPtr(dc.PrivateLink, o.PrivateLink) && + dc.NodeSize == o.NodeSize && + dc.NodesNumber == o.NodesNumber +} + +func (ksdc *KafkaDataCentre) FromInstAPI(instaModel *models.KafkaDataCentre) { + ksdc.GenericDataCentreSpec.FromInstAPI(&instaModel.GenericDataCentreFields) + ksdc.PrivateLinkFromInstAPI(instaModel.PrivateLink) + + ksdc.NodeSize = instaModel.NodeSize + ksdc.NodesNumber = instaModel.NumberOfNodes +} + // KafkaStatus defines the observed state of Kafka type KafkaStatus struct { - ClusterStatus `json:",inline"` - AvailableUsers References `json:"availableUsers,omitempty"` + GenericStatus `json:",inline"` + + AvailableUsers References `json:"availableUsers,omitempty"` + DataCentres []*KafkaDataCentreStatus `json:"dataCentres,omitempty"` +} + +type KafkaDataCentreStatus struct { + GenericDataCentreStatus `json:",inline"` + + Nodes []*Node `json:"nodes,omitempty"` + PrivateLink PrivateLinkStatuses `json:"privateLink,omitempty"` +} + +func (s *KafkaDataCentreStatus) Equals(o *KafkaDataCentreStatus) bool { + return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) && + s.nodesEqual(o.Nodes) && + slices.EqualsPtr(s.PrivateLink, o.PrivateLink) +} + +func (s *KafkaStatus) Equals(o *KafkaStatus) bool { + return s.GenericStatus.Equals(&o.GenericStatus) && + s.DCsEqual(o) +} + +func (s *KafkaStatus) DCsEqual(o *KafkaStatus) bool { + if len(s.DataCentres) != len(o.DataCentres) { + return false + } + + sMap := map[string]*KafkaDataCentreStatus{} + for _, dc := range s.DataCentres { + sMap[dc.Name] = dc + } + + for _, dc := range o.DataCentres { + sDC, ok := sMap[dc.Name] + if !ok { + return false + } + + if !sDC.Equals(dc) { + return false + } + } + + return true +} + +func (s *KafkaStatus) ToOnPremises() ClusterStatus { + dc := &DataCentreStatus{ + ID: s.DataCentres[0].ID, + Nodes: s.DataCentres[0].Nodes, + } + + return ClusterStatus{ + ID: s.ID, + DataCentres: []*DataCentreStatus{dc}, + } +} + +func (s *KafkaDataCentreStatus) nodesEqual(nodes []*Node) bool { + if len(s.Nodes) != len(nodes) { + return false + } + + sNodes := map[string]*Node{} + for _, node := range s.Nodes { + sNodes[node.ID] = node + } + + for _, node := range nodes { + sNode, ok := sNodes[node.ID] + if !ok { + return false + } + + if !sNode.Equals(node) { + return false + } + } + + return true } //+kubebuilder:object:root=true @@ -151,20 +244,15 @@ func (k *Kafka) NewPatch() client.Patch { func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster { return &models.KafkaCluster{ + GenericClusterFields: k.GenericClusterSpec.ToInstAPI(), SchemaRegistry: k.schemaRegistryToInstAPI(), RestProxy: k.restProxyToInstAPI(), - PCIComplianceMode: k.PCICompliance, DefaultReplicationFactor: k.ReplicationFactor, DefaultNumberOfPartitions: k.PartitionsNumber, - TwoFactorDelete: k.TwoFactorDeletesToInstAPI(), AllowDeleteTopics: k.AllowDeleteTopics, AutoCreateTopics: k.AutoCreateTopics, ClientToClusterEncryption: k.ClientToClusterEncryption, DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(), - PrivateNetworkCluster: k.PrivateNetworkCluster, - Name: k.Name, - Description: k.Description, - SLATier: k.SLATier, KafkaVersion: k.Version, DataCentres: k.dcToInstAPI(), ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS, @@ -172,7 +260,7 @@ func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster { Kraft: k.kraftToInstAPI(), KarapaceRestProxy: k.karapaceRestProxyToInstAPI(), KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(), - ResizeSettings: resizeSettingsToInstAPI(k.ResizeSettings), + ResizeSettings: k.ResizeSettings.ToInstAPI(), } } @@ -255,8 +343,10 @@ func (k *KafkaDataCentre) privateLinkToInstAPI() (iPrivateLink []*models.Private func (k *KafkaSpec) dcToInstAPI() (iDCs []*models.KafkaDataCentre) { for _, crdDC := range k.DataCentres { iDCs = append(iDCs, &models.KafkaDataCentre{ - DataCentre: crdDC.DataCentre.ToInstAPI(), - PrivateLink: crdDC.privateLinkToInstAPI(), + GenericDataCentreFields: crdDC.GenericDataCentreSpec.ToInstAPI(), + PrivateLink: crdDC.privateLinkToInstAPI(), + NumberOfNodes: crdDC.NodesNumber, + NodeSize: crdDC.NodeSize, }) } return @@ -266,7 +356,7 @@ func (k *KafkaSpec) ToInstAPIUpdate() *models.KafkaInstAPIUpdateRequest { return &models.KafkaInstAPIUpdateRequest{ DataCentre: k.dcToInstAPI(), DedicatedZookeeper: k.dedicatedZookeeperToInstAPIUpdate(), - ResizeSettings: resizeSettingsToInstAPI(k.ResizeSettings), + ResizeSettings: k.ResizeSettings.ToInstAPI(), } } @@ -280,153 +370,143 @@ func (k *KafkaSpec) dedicatedZookeeperToInstAPIUpdate() (iZookeepers []*models.D return } -func (k *Kafka) FromInstAPI(iData []byte) (*Kafka, error) { - iKafka := &models.KafkaCluster{} - err := json.Unmarshal(iData, iKafka) - if err != nil { - return nil, err - } +func (k *Kafka) FromInstAPI(instaModel *models.KafkaCluster) { + k.Spec.FromInstAPI(instaModel) + k.Status.FromInstAPI(instaModel) +} + +func (ks *KafkaSpec) FromInstAPI(instaModel *models.KafkaCluster) { + ks.GenericClusterSpec.FromInstAPI(&instaModel.GenericClusterFields) + ks.ResizeSettings.FromInstAPI(instaModel.ResizeSettings) + + ks.ReplicationFactor = instaModel.DefaultReplicationFactor + ks.PartitionsNumber = instaModel.DefaultNumberOfPartitions + ks.AllowDeleteTopics = instaModel.AllowDeleteTopics + ks.AutoCreateTopics = instaModel.AutoCreateTopics + ks.ClientToClusterEncryption = instaModel.ClientToClusterEncryption + ks.ClientBrokerAuthWithMTLS = instaModel.ClientBrokerAuthWithMtls + ks.BundledUseOnly = instaModel.BundledUseOnly + ks.Version = instaModel.KafkaVersion - return &Kafka{ - TypeMeta: k.TypeMeta, - ObjectMeta: k.ObjectMeta, - Spec: k.Spec.FromInstAPI(iKafka), - Status: k.Status.FromInstAPI(iKafka), - }, nil + ks.DCsFromInstAPI(instaModel.DataCentres) + ks.kraftFromInstAPI(instaModel.Kraft) + ks.restProxyFromInstAPI(instaModel.RestProxy) + ks.schemaRegistryFromInstAPI(instaModel.SchemaRegistry) + ks.karapaceRestProxyFromInstAPI(instaModel.KarapaceRestProxy) + ks.dedicatedZookeeperFromInstAPI(instaModel.DedicatedZookeeper) + ks.karapaceSchemaRegistryFromInstAPI(instaModel.KarapaceSchemaRegistry) } -func (ks *KafkaSpec) FromInstAPI(iKafka *models.KafkaCluster) KafkaSpec { - return KafkaSpec{ - Cluster: Cluster{ - Name: iKafka.Name, - Version: iKafka.KafkaVersion, - PCICompliance: iKafka.PCIComplianceMode, - PrivateNetworkCluster: iKafka.PrivateNetworkCluster, - SLATier: iKafka.SLATier, - Description: iKafka.Description, - TwoFactorDelete: ks.Cluster.TwoFactorDeleteFromInstAPI(iKafka.TwoFactorDelete), - }, - SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry), - ReplicationFactor: iKafka.DefaultReplicationFactor, - PartitionsNumber: iKafka.DefaultNumberOfPartitions, - RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy), - AllowDeleteTopics: iKafka.AllowDeleteTopics, - AutoCreateTopics: iKafka.AutoCreateTopics, - ClientToClusterEncryption: iKafka.ClientToClusterEncryption, - DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres), - DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper), - ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls, - KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy), - Kraft: ks.kraftFromInstAPI(iKafka.Kraft), - KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry), - BundledUseOnly: iKafka.BundledUseOnly, - ResizeSettings: resizeSettingsFromInstAPI(iKafka.ResizeSettings), - } -} - -func (ks *KafkaStatus) FromInstAPI(iKafka *models.KafkaCluster) KafkaStatus { - return KafkaStatus{ - ClusterStatus: ClusterStatus{ - ID: iKafka.ID, - State: iKafka.Status, - DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres), - CurrentClusterOperationStatus: iKafka.CurrentClusterOperationStatus, - MaintenanceEvents: ks.MaintenanceEvents, - }, +func (ks *KafkaStatus) FromInstAPI(instaModel *models.KafkaCluster) { + ks.GenericStatus.FromInstAPI(&instaModel.GenericClusterFields) + ks.DCsFromInstAPI(instaModel.DataCentres) +} + +func (ks *KafkaSpec) DCsFromInstAPI(instaModels []*models.KafkaDataCentre) { + ks.DataCentres = make([]*KafkaDataCentre, len(instaModels)) + for i, instaModel := range instaModels { + dc := KafkaDataCentre{} + dc.FromInstAPI(instaModel) + ks.DataCentres[i] = &dc } } -func (ks *KafkaSpec) DCsFromInstAPI(iDCs []*models.KafkaDataCentre) (dcs []*KafkaDataCentre) { - for _, iDC := range iDCs { - dcs = append(dcs, &KafkaDataCentre{ - DataCentre: ks.Cluster.DCFromInstAPI(iDC.DataCentre), - PrivateLink: ks.PrivateLinkFromInstAPI(iDC.PrivateLink), - }) +func (dc *KafkaDataCentre) PrivateLinkFromInstAPI(instaModels []*models.PrivateLink) { + dc.PrivateLink = make([]*PrivateLink, len(instaModels)) + for i, instaModel := range instaModels { + dc.PrivateLink[i] = &PrivateLink{ + AdvertisedHostname: instaModel.AdvertisedHostname, + } } - return } -func (ks *KafkaSpec) PrivateLinkFromInstAPI(iPLs []*models.PrivateLink) (pls []*PrivateLink) { - for _, iPL := range iPLs { - pls = append(pls, &PrivateLink{ - AdvertisedHostname: iPL.AdvertisedHostname, - }) +func (ks *KafkaSpec) schemaRegistryFromInstAPI(instaModels []*models.SchemaRegistry) { + ks.SchemaRegistry = make([]*SchemaRegistry, len(instaModels)) + for i, instaModel := range instaModels { + ks.SchemaRegistry[i] = &SchemaRegistry{ + Version: instaModel.Version, + } } - return } -func (ks *KafkaSpec) SchemaRegistryFromInstAPI(iSRs []*models.SchemaRegistry) (srs []*SchemaRegistry) { - for _, iSR := range iSRs { - srs = append(srs, &SchemaRegistry{ - Version: iSR.Version, - }) +func (ks *KafkaSpec) restProxyFromInstAPI(instaModels []*models.RestProxy) { + ks.RestProxy = make([]*RestProxy, len(instaModels)) + for i, instaModel := range instaModels { + ks.RestProxy[i] = &RestProxy{ + IntegrateRestProxyWithSchemaRegistry: instaModel.IntegrateRestProxyWithSchemaRegistry, + UseLocalSchemaRegistry: instaModel.UseLocalSchemaRegistry, + SchemaRegistryServerURL: instaModel.SchemaRegistryServerURL, + SchemaRegistryUsername: instaModel.SchemaRegistryUsername, + SchemaRegistryPassword: instaModel.SchemaRegistryPassword, + Version: instaModel.Version, + } } - return } -func (ks *KafkaSpec) RestProxyFromInstAPI(iRPs []*models.RestProxy) (rps []*RestProxy) { - for _, iRP := range iRPs { - rps = append(rps, &RestProxy{ - IntegrateRestProxyWithSchemaRegistry: iRP.IntegrateRestProxyWithSchemaRegistry, - UseLocalSchemaRegistry: iRP.UseLocalSchemaRegistry, - SchemaRegistryServerURL: iRP.SchemaRegistryServerURL, - SchemaRegistryUsername: iRP.SchemaRegistryUsername, - SchemaRegistryPassword: iRP.SchemaRegistryPassword, - Version: iRP.Version, - }) +func (ks *KafkaSpec) dedicatedZookeeperFromInstAPI(instaModels []*models.DedicatedZookeeper) { + ks.DedicatedZookeeper = make([]*DedicatedZookeeper, len(instaModels)) + for i, instaModel := range instaModels { + ks.DedicatedZookeeper[i] = &DedicatedZookeeper{ + NodeSize: instaModel.ZookeeperNodeSize, + NodesNumber: instaModel.ZookeeperNodeCount, + } } - return } -func (ks *KafkaSpec) DedicatedZookeeperFromInstAPI(iDZs []*models.DedicatedZookeeper) (dzs []*DedicatedZookeeper) { - for _, iDZ := range iDZs { - dzs = append(dzs, &DedicatedZookeeper{ - NodeSize: iDZ.ZookeeperNodeSize, - NodesNumber: iDZ.ZookeeperNodeCount, - }) +func (ks *KafkaSpec) karapaceRestProxyFromInstAPI(instaModels []*models.KarapaceRestProxy) { + ks.KarapaceRestProxy = make([]*KarapaceRestProxy, len(instaModels)) + for i, instaModel := range instaModels { + ks.KarapaceRestProxy[i] = &KarapaceRestProxy{ + IntegrateRestProxyWithSchemaRegistry: instaModel.IntegrateRestProxyWithSchemaRegistry, + Version: instaModel.Version, + } } - return } -func (ks *KafkaSpec) KarapaceRestProxyFromInstAPI(iKRPs []*models.KarapaceRestProxy) (krps []*KarapaceRestProxy) { - for _, iKRP := range iKRPs { - krps = append(krps, &KarapaceRestProxy{ - IntegrateRestProxyWithSchemaRegistry: iKRP.IntegrateRestProxyWithSchemaRegistry, - Version: iKRP.Version, - }) +func (ks *KafkaSpec) kraftFromInstAPI(instaModels []*models.Kraft) { + ks.Kraft = make([]*Kraft, len(instaModels)) + for i, instaModel := range instaModels { + ks.Kraft[i] = &Kraft{ + ControllerNodeCount: instaModel.ControllerNodeCount, + } } - return } -func (ks *KafkaSpec) kraftFromInstAPI(iKraft []*models.Kraft) (kraft []*Kraft) { - for _, ikraft := range iKraft { - kraft = append(kraft, &Kraft{ - ControllerNodeCount: ikraft.ControllerNodeCount, - }) +func (ks *KafkaSpec) karapaceSchemaRegistryFromInstAPI(instaModels []*models.KarapaceSchemaRegistry) { + ks.KarapaceSchemaRegistry = make([]*KarapaceSchemaRegistry, len(instaModels)) + for i, instaModel := range instaModels { + ks.KarapaceSchemaRegistry[i] = &KarapaceSchemaRegistry{ + Version: instaModel.Version, + } } - return } -func (ks *KafkaSpec) KarapaceSchemaRegistryFromInstAPI(iKSRs []*models.KarapaceSchemaRegistry) (ksrs []*KarapaceSchemaRegistry) { - for _, iKSR := range iKSRs { - ksrs = append(ksrs, &KarapaceSchemaRegistry{ - Version: iKSR.Version, - }) +func (ks *KafkaStatus) DCsFromInstAPI(instaModels []*models.KafkaDataCentre) { + ks.DataCentres = make([]*KafkaDataCentreStatus, len(instaModels)) + for i, instaModel := range instaModels { + dc := KafkaDataCentreStatus{} + dc.FromInstAPI(instaModel) + ks.DataCentres[i] = &dc } - return } -func (ks *KafkaStatus) DCsFromInstAPI(iDCs []*models.KafkaDataCentre) (dcs []*DataCentreStatus) { - for _, iDC := range iDCs { - dc := ks.DCFromInstAPI(iDC.DataCentre) - dc.PrivateLink = privateLinkStatusesFromInstAPI(iDC.PrivateLink) - dcs = append(dcs, dc) +func (s *KafkaDataCentreStatus) FromInstAPI(instaModel *models.KafkaDataCentre) { + s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields) + s.nodesFromInstAPI(instaModel.Nodes) + s.PrivateLink.FromInstAPI(instaModel.PrivateLink) +} + +func (s *KafkaDataCentreStatus) nodesFromInstAPI(instaModels []*models.Node) { + s.Nodes = make([]*Node, len(instaModels)) + for i, instaModel := range instaModels { + node := Node{} + node.FromInstAPI(instaModel) + s.Nodes[i] = &node } - return dcs } func (a *KafkaSpec) IsEqual(b KafkaSpec) bool { - return a.Cluster.IsEqual(b.Cluster) && + return a.GenericClusterSpec.Equals(&b.GenericClusterSpec) && a.ReplicationFactor == b.ReplicationFactor && a.PartitionsNumber == b.PartitionsNumber && a.AllowDeleteTopics == b.AllowDeleteTopics && @@ -434,14 +514,13 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool { a.ClientToClusterEncryption == b.ClientToClusterEncryption && a.ClientBrokerAuthWithMTLS == b.ClientBrokerAuthWithMTLS && a.BundledUseOnly == b.BundledUseOnly && - isKafkaAddonsEqual[SchemaRegistry](a.SchemaRegistry, b.SchemaRegistry) && - isKafkaAddonsEqual[RestProxy](a.RestProxy, b.RestProxy) && - isKafkaAddonsEqual[KarapaceRestProxy](a.KarapaceRestProxy, b.KarapaceRestProxy) && - isKafkaAddonsEqual[Kraft](a.Kraft, b.Kraft) && - isKafkaAddonsEqual[KarapaceSchemaRegistry](a.KarapaceSchemaRegistry, b.KarapaceSchemaRegistry) && - isKafkaAddonsEqual[DedicatedZookeeper](a.DedicatedZookeeper, b.DedicatedZookeeper) && - a.areDCsEqual(b.DataCentres) && - a.IsTwoFactorDeleteEqual(b.TwoFactorDelete) + slices.EqualsPtr(a.SchemaRegistry, b.SchemaRegistry) && + slices.EqualsPtr(a.RestProxy, b.RestProxy) && + slices.EqualsPtr(a.KarapaceRestProxy, b.KarapaceRestProxy) && + slices.EqualsPtr(a.Kraft, b.Kraft) && + slices.EqualsPtr(a.KarapaceSchemaRegistry, b.KarapaceSchemaRegistry) && + slices.EqualsPtr(a.DedicatedZookeeper, b.DedicatedZookeeper) && + a.areDCsEqual(b.DataCentres) } func (c *Kafka) GetSpec() KafkaSpec { return c.Spec } @@ -451,18 +530,22 @@ func (c *Kafka) IsSpecEqual(spec KafkaSpec) bool { } func (rs *KafkaSpec) areDCsEqual(b []*KafkaDataCentre) bool { - a := rs.DataCentres - if len(a) != len(b) { + if len(rs.DataCentres) != len(b) { return false } - for i := range b { - if a[i].Name != b[i].Name { - continue + sMap := map[string]*KafkaDataCentre{} + for _, dc := range rs.DataCentres { + sMap[dc.Name] = dc + } + + for _, dc := range b { + sDC, ok := sMap[dc.Name] + if !ok { + return false } - if !a[i].DataCentre.IsEqual(b[i].DataCentre) || - !isKafkaAddonsEqual[PrivateLink](a[i].PrivateLink, b[i].PrivateLink) { + if !sDC.Equals(dc) { return false } } @@ -508,7 +591,7 @@ func (k *Kafka) SetClusterID(id string) { func (k *Kafka) GetExposePorts() []k8scorev1.ServicePort { var exposePorts []k8scorev1.ServicePort - if !k.Spec.PrivateNetworkCluster { + if !k.Spec.PrivateNetwork { exposePorts = []k8scorev1.ServicePort{ { Name: models.KafkaClient, diff --git a/apis/clusters/v1beta1/kafka_webhook.go b/apis/clusters/v1beta1/kafka_webhook.go index 8eb717efa..3be0652f6 100644 --- a/apis/clusters/v1beta1/kafka_webhook.go +++ b/apis/clusters/v1beta1/kafka_webhook.go @@ -19,6 +19,7 @@ package v1beta1 import ( "context" "fmt" + "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -26,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/instaclustr/operator/pkg/models" + "github.com/instaclustr/operator/pkg/utils/slices" "github.com/instaclustr/operator/pkg/validation" ) @@ -62,10 +64,6 @@ func (k *Kafka) Default() { models.ResourceStateAnnotation: "", }) } - - for _, dataCentre := range k.Spec.DataCentres { - dataCentre.SetDefaultValues() - } } // TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation. @@ -82,7 +80,7 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object kafkalog.Info("validate create", "name", k.Name) - err := k.Spec.Cluster.ValidateCreation() + err := k.Spec.GenericClusterSpec.ValidateCreation() if err != nil { return err } @@ -100,7 +98,7 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object if err != nil { return err } - if k.Spec.PrivateNetworkCluster { + if k.Spec.PrivateNetwork { err = k.Spec.OnPremisesSpec.ValidateSSHGatewayCreation() if err != nil { return err @@ -130,12 +128,12 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object for _, dc := range k.Spec.DataCentres { if k.Spec.OnPremisesSpec != nil { - err = dc.DataCentre.ValidateOnPremisesCreation() + err = dc.GenericDataCentreSpec.ValidateOnPremisesCreation() if err != nil { return err } } else { - err = dc.DataCentre.ValidateCreation() + err = dc.GenericDataCentreSpec.validateCreation() if err != nil { return err } @@ -160,7 +158,7 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", k.Spec.ReplicationFactor) } - if !k.Spec.PrivateNetworkCluster && dc.PrivateLink != nil { + if !k.Spec.PrivateNetwork && dc.PrivateLink != nil { return models.ErrPrivateLinkOnlyWithPrivateNetworkCluster } @@ -196,6 +194,10 @@ func (kv *kafkaValidator) ValidateUpdate(ctx context.Context, old runtime.Object return fmt.Errorf("cannot assert object %v to kafka", new.GetObjectKind()) } + if k.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return nil + } + if k.Status.ID == "" { return kv.ValidateCreate(ctx, k) } @@ -277,19 +279,19 @@ func (ks *KafkaSpec) validateUpdate(old *KafkaSpec) error { return err } - if !isKafkaAddonsEqual[SchemaRegistry](ks.SchemaRegistry, old.SchemaRegistry) { + if !slices.EqualsPtr(ks.SchemaRegistry, old.SchemaRegistry) { return models.ErrImmutableSchemaRegistry } - if !isKafkaAddonsEqual[KarapaceSchemaRegistry](ks.KarapaceSchemaRegistry, old.KarapaceSchemaRegistry) { + if !slices.EqualsPtr(ks.KarapaceSchemaRegistry, old.KarapaceSchemaRegistry) { return models.ErrImmutableKarapaceSchemaRegistry } - if !isKafkaAddonsEqual[RestProxy](ks.RestProxy, old.RestProxy) { + if !slices.EqualsPtr(ks.RestProxy, old.RestProxy) { return models.ErrImmutableRestProxy } - if !isKafkaAddonsEqual[Kraft](ks.Kraft, old.Kraft) { + if !slices.EqualsPtr(ks.Kraft, old.Kraft) { return models.ErrImmutableKraft } - if !isKafkaAddonsEqual[KarapaceRestProxy](ks.KarapaceRestProxy, old.KarapaceRestProxy) { + if !slices.EqualsPtr(ks.KarapaceRestProxy, old.KarapaceRestProxy) { return models.ErrImmutableKarapaceRestProxy } if ok := validateZookeeperUpdate(ks.DedicatedZookeeper, old.DedicatedZookeeper); !ok { @@ -299,24 +301,6 @@ func (ks *KafkaSpec) validateUpdate(old *KafkaSpec) error { return nil } -func isKafkaAddonsEqual[T KafkaAddons](new, old []*T) bool { - if new == nil && old == nil { - return true - } - - if len(new) != len(old) { - return false - } - - for i := range new { - if *new[i] != *old[i] { - return false - } - } - - return true -} - func validateZookeeperUpdate(new, old []*DedicatedZookeeper) bool { if new == nil && old == nil { return true @@ -422,10 +406,10 @@ func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields { autoCreateTopics: ks.AutoCreateTopics, clientToClusterEncryption: ks.ClientToClusterEncryption, bundledUseOnly: ks.BundledUseOnly, - privateNetworkCluster: ks.PrivateNetworkCluster, + privateNetworkCluster: ks.PrivateNetwork, clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS, }, - cluster: ks.Cluster.newImmutableFields(), + cluster: ks.GenericClusterSpec.immutableFields(), } } diff --git a/apis/clusters/v1beta1/kafka_webhook_test.go b/apis/clusters/v1beta1/kafka_webhook_test.go index 191eb2840..ecf1a74d8 100644 --- a/apis/clusters/v1beta1/kafka_webhook_test.go +++ b/apis/clusters/v1beta1/kafka_webhook_test.go @@ -72,9 +72,9 @@ var _ = Describe("Kafka Controller", Ordered, func() { Expect(k8sClient.Patch(ctx, &testKafkaManifest, patch)).ShouldNot(Succeed()) testKafkaManifest.Spec.PCICompliance = kafkaManifest.Spec.PCICompliance - testKafkaManifest.Spec.PrivateNetworkCluster = !kafkaManifest.Spec.PrivateNetworkCluster + testKafkaManifest.Spec.PrivateNetwork = !kafkaManifest.Spec.PrivateNetwork Expect(k8sClient.Patch(ctx, &testKafkaManifest, patch)).ShouldNot(Succeed()) - testKafkaManifest.Spec.PrivateNetworkCluster = kafkaManifest.Spec.PrivateNetworkCluster + testKafkaManifest.Spec.PrivateNetwork = kafkaManifest.Spec.PrivateNetwork testKafkaManifest.Spec.PartitionsNumber++ Expect(k8sClient.Patch(ctx, &testKafkaManifest, patch)).ShouldNot(Succeed()) @@ -96,9 +96,9 @@ var _ = Describe("Kafka Controller", Ordered, func() { Expect(k8sClient.Patch(ctx, &testKafkaManifest, patch)).ShouldNot(Succeed()) testKafkaManifest.Spec.BundledUseOnly = kafkaManifest.Spec.BundledUseOnly - testKafkaManifest.Spec.PrivateNetworkCluster = !kafkaManifest.Spec.PrivateNetworkCluster + testKafkaManifest.Spec.PrivateNetwork = !kafkaManifest.Spec.PrivateNetwork Expect(k8sClient.Patch(ctx, &testKafkaManifest, patch)).ShouldNot(Succeed()) - testKafkaManifest.Spec.PrivateNetworkCluster = kafkaManifest.Spec.PrivateNetworkCluster + testKafkaManifest.Spec.PrivateNetwork = kafkaManifest.Spec.PrivateNetwork testKafkaManifest.Spec.ClientBrokerAuthWithMTLS = !kafkaManifest.Spec.ClientBrokerAuthWithMTLS Expect(k8sClient.Patch(ctx, &testKafkaManifest, patch)).ShouldNot(Succeed()) diff --git a/apis/clusters/v1beta1/structs.go b/apis/clusters/v1beta1/structs.go index a9daa97ff..28b94e6d0 100644 --- a/apis/clusters/v1beta1/structs.go +++ b/apis/clusters/v1beta1/structs.go @@ -167,6 +167,28 @@ func (p1 PrivateLinkStatuses) Equal(p2 PrivateLinkStatuses) bool { return true } +func (s PrivateLinkStatuses) ToInstAPI() []*models.PrivateLink { + instaModels := make([]*models.PrivateLink, len(s)) + for i, link := range s { + instaModels[i] = &models.PrivateLink{ + AdvertisedHostname: link.AdvertisedHostname, + } + } + + return instaModels +} + +func (p *PrivateLinkStatuses) FromInstAPI(instaModels []*models.PrivateLink) { + *p = make(PrivateLinkStatuses, len(instaModels)) + for i, instaModel := range instaModels { + (*p)[i] = &privateLinkStatus{ + AdvertisedHostname: instaModel.AdvertisedHostname, + EndPointServiceID: instaModel.EndPointServiceID, + EndPointServiceName: instaModel.EndPointServiceName, + } + } +} + func privateLinksToInstAPI(p []*PrivateLink) []*models.PrivateLink { links := make([]*models.PrivateLink, 0, len(p)) for _, link := range p { @@ -762,6 +784,7 @@ func (old References) Diff(new References) (added, deleted References) { return added, deleted } +// +kubebuilder:validation:MinItems:=1 type GenericResizeSettings []*ResizeSettings func (g *GenericResizeSettings) FromInstAPI(instModels []*models.ResizeSettings) { diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 8b798e270..2f3ac5970 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -1282,7 +1282,7 @@ func (in *KafkaConnectStatus) DeepCopy() *KafkaConnectStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaDataCentre) DeepCopyInto(out *KafkaDataCentre) { *out = *in - in.DataCentre.DeepCopyInto(&out.DataCentre) + in.GenericDataCentreSpec.DeepCopyInto(&out.GenericDataCentreSpec) if in.PrivateLink != nil { in, out := &in.PrivateLink, &out.PrivateLink *out = make([]*PrivateLink, len(*in)) @@ -1306,6 +1306,44 @@ func (in *KafkaDataCentre) DeepCopy() *KafkaDataCentre { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KafkaDataCentreStatus) DeepCopyInto(out *KafkaDataCentreStatus) { + *out = *in + in.GenericDataCentreStatus.DeepCopyInto(&out.GenericDataCentreStatus) + if in.Nodes != nil { + in, out := &in.Nodes, &out.Nodes + *out = make([]*Node, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Node) + (*in).DeepCopyInto(*out) + } + } + } + if in.PrivateLink != nil { + in, out := &in.PrivateLink, &out.PrivateLink + *out = make(PrivateLinkStatuses, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(privateLinkStatus) + **out = **in + } + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaDataCentreStatus. +func (in *KafkaDataCentreStatus) DeepCopy() *KafkaDataCentreStatus { + if in == nil { + return nil + } + out := new(KafkaDataCentreStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaList) DeepCopyInto(out *KafkaList) { *out = *in @@ -1341,30 +1379,30 @@ func (in *KafkaList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { *out = *in - in.Cluster.DeepCopyInto(&out.Cluster) + in.GenericClusterSpec.DeepCopyInto(&out.GenericClusterSpec) if in.OnPremisesSpec != nil { in, out := &in.OnPremisesSpec, &out.OnPremisesSpec *out = new(OnPremisesSpec) (*in).DeepCopyInto(*out) } - if in.SchemaRegistry != nil { - in, out := &in.SchemaRegistry, &out.SchemaRegistry - *out = make([]*SchemaRegistry, len(*in)) + if in.UserRefs != nil { + in, out := &in.UserRefs, &out.UserRefs + *out = make(References, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(SchemaRegistry) + *out = new(apiextensions.ObjectReference) **out = **in } } } - if in.RestProxy != nil { - in, out := &in.RestProxy, &out.RestProxy - *out = make([]*RestProxy, len(*in)) + if in.DedicatedZookeeper != nil { + in, out := &in.DedicatedZookeeper, &out.DedicatedZookeeper + *out = make([]*DedicatedZookeeper, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(RestProxy) + *out = new(DedicatedZookeeper) **out = **in } } @@ -1380,24 +1418,24 @@ func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { } } } - if in.ResizeSettings != nil { - in, out := &in.ResizeSettings, &out.ResizeSettings - *out = make([]*ResizeSettings, len(*in)) + if in.SchemaRegistry != nil { + in, out := &in.SchemaRegistry, &out.SchemaRegistry + *out = make([]*SchemaRegistry, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(ResizeSettings) + *out = new(SchemaRegistry) **out = **in } } } - if in.DedicatedZookeeper != nil { - in, out := &in.DedicatedZookeeper, &out.DedicatedZookeeper - *out = make([]*DedicatedZookeeper, len(*in)) + if in.RestProxy != nil { + in, out := &in.RestProxy, &out.RestProxy + *out = make([]*RestProxy, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(DedicatedZookeeper) + *out = new(RestProxy) **out = **in } } @@ -1424,24 +1462,24 @@ func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { } } } - if in.UserRefs != nil { - in, out := &in.UserRefs, &out.UserRefs - *out = make(References, len(*in)) + if in.Kraft != nil { + in, out := &in.Kraft, &out.Kraft + *out = make([]*Kraft, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(apiextensions.ObjectReference) + *out = new(Kraft) **out = **in } } } - if in.Kraft != nil { - in, out := &in.Kraft, &out.Kraft - *out = make([]*Kraft, len(*in)) + if in.ResizeSettings != nil { + in, out := &in.ResizeSettings, &out.ResizeSettings + *out = make(GenericResizeSettings, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(Kraft) + *out = new(ResizeSettings) **out = **in } } @@ -1461,7 +1499,7 @@ func (in *KafkaSpec) DeepCopy() *KafkaSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaStatus) DeepCopyInto(out *KafkaStatus) { *out = *in - in.ClusterStatus.DeepCopyInto(&out.ClusterStatus) + in.GenericStatus.DeepCopyInto(&out.GenericStatus) if in.AvailableUsers != nil { in, out := &in.AvailableUsers, &out.AvailableUsers *out = make(References, len(*in)) @@ -1473,6 +1511,17 @@ func (in *KafkaStatus) DeepCopyInto(out *KafkaStatus) { } } } + if in.DataCentres != nil { + in, out := &in.DataCentres, &out.DataCentres + *out = make([]*KafkaDataCentreStatus, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(KafkaDataCentreStatus) + (*in).DeepCopyInto(*out) + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaStatus. diff --git a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml index 7818cfc5b..b38223a7f 100644 --- a/config/crd/bases/clusters.instaclustr.com_cassandras.yaml +++ b/config/crd/bases/clusters.instaclustr.com_cassandras.yaml @@ -217,6 +217,7 @@ spec: notifySupportContacts: type: boolean type: object + minItems: 1 type: array restoreFrom: properties: diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index 010684d89..2da1ee95f 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -222,7 +222,7 @@ spec: provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis. type: boolean - privateNetworkCluster: + privateNetwork: type: boolean replicationFactor: description: ReplicationFactor to use for new topic. Also represents @@ -236,7 +236,7 @@ spec: notifySupportContacts: type: boolean type: object - maxItems: 1 + minItems: 1 type: array restProxy: items: @@ -325,15 +325,11 @@ spec: - namespace type: object type: array - cdcid: - type: string currentClusterOperationStatus: type: string dataCentres: items: properties: - encryptionKeyId: - type: string id: type: string name: @@ -359,8 +355,6 @@ spec: type: string type: object type: array - nodesNumber: - type: integer privateLink: items: properties: @@ -520,19 +514,8 @@ spec: type: array type: object type: array - options: - properties: - dataNodeSize: - type: string - masterNodeSize: - type: string - openSearchDashboardsNodeSize: - type: string - type: object state: type: string - twoFactorDeleteEnabled: - type: boolean type: object type: object served: true diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index c7527894f..514dc1c6e 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -11,7 +11,7 @@ spec: allowDeleteTopics: true autoCreateTopics: true clientToClusterEncryption: false - privateNetworkCluster: false + privateNetwork: false slaTier: "NON_PRODUCTION" # bundledUseOnly: true # clientBrokerAuthWithMtls: true @@ -21,13 +21,13 @@ spec: # twoFactorDelete: # - email: "asdfadfsdsf" # phone: "ddsafasdf" - # karapaceSchemaRegistry: - # - version: "3.2.0" - # schemaRegistry: - # - version: "5.0.0" - # karapaceRestProxy: - # - integrateRestProxyWithSchemaRegistry: true - # version: "3.2.0" +# karapaceSchemaRegistry: +# - version: "3.6.2" +# schemaRegistry: +# - version: "3.0.0" +# karapaceRestProxy: +# - integrateRestProxyWithSchemaRegistry: true +# version: "3.6.2" # kraft: # - controllerNodeCount: 3 # restProxy: @@ -44,7 +44,7 @@ spec: tag: "oneTag" tag2: "twoTags" nodeSize: "KFK-DEV-t4g.medium-80" -# nodeSize: "KFK-PRD-r6g.large-250" +# nodeSize: "KFK-PRD-r6g.large-400" # nodeSize: "KFK-DEV-t4g.medium-80" network: "10.0.0.0/16" region: "US_EAST_1" diff --git a/controllers/clusters/cadence_controller.go b/controllers/clusters/cadence_controller.go index 02da4df81..a1789154d 100644 --- a/controllers/clusters/cadence_controller.go +++ b/controllers/clusters/cadence_controller.go @@ -1057,15 +1057,15 @@ func (r *CadenceReconciler) newKafkaSpec(c *v1beta1.Cadence, latestKafkaVersion providerAccountName := c.Spec.DataCentres[0].ProviderAccountName kafkaDataCentres := []*v1beta1.KafkaDataCentre{ { - DataCentre: v1beta1.DataCentre{ + GenericDataCentreSpec: v1beta1.GenericDataCentreSpec{ Name: dcName, Region: dcRegion, CloudProvider: cloudProvider, ProviderAccountName: providerAccountName, - NodeSize: kafkaNodeSize, - NodesNumber: kafkaNodesNumber, Network: kafkaNetwork, }, + NodeSize: kafkaNodeSize, + NodesNumber: kafkaNodesNumber, }, } @@ -1074,13 +1074,13 @@ func (r *CadenceReconciler) newKafkaSpec(c *v1beta1.Cadence, latestKafkaVersion pciCompliance := c.Spec.PCICompliance clientEncryption := c.Spec.DataCentres[0].ClientEncryption spec := v1beta1.KafkaSpec{ - Cluster: v1beta1.Cluster{ - Name: models.KafkaChildPrefix + c.Name, - Version: latestKafkaVersion, - SLATier: slaTier, - PrivateNetworkCluster: privateClusterNetwork, - TwoFactorDelete: kafkaTFD, - PCICompliance: pciCompliance, + GenericClusterSpec: v1beta1.GenericClusterSpec{ + Name: models.KafkaChildPrefix + c.Name, + Version: latestKafkaVersion, + SLATier: slaTier, + PrivateNetwork: privateClusterNetwork, + TwoFactorDelete: kafkaTFD, + PCICompliance: pciCompliance, }, DataCentres: kafkaDataCentres, ReplicationFactor: bundledKafkaSpec.ReplicationFactor, diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index ade4ab49a..7a49b94c1 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -238,13 +238,6 @@ func (r *CassandraReconciler) createCluster(ctx context.Context, c *v1beta1.Cass return fmt.Errorf("failed to update cassandra status, err: %w", err) } - controllerutil.AddFinalizer(c, models.DeletionFinalizer) - c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent - err = r.Update(ctx, c) - if err != nil { - return err - } - l.Info( "Cluster has been created", "cluster name", c.Spec.Name, @@ -391,7 +384,17 @@ func (r *CassandraReconciler) handleCreateCluster( } if c.Status.State != models.DeletedStatus { - err := r.startClusterJobs(c, l) + c.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent + controllerutil.AddFinalizer(c, models.DeletionFinalizer) + err := r.Update(ctx, c) + if err != nil { + r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, + "Failed to update resource metadata. Reason: %v", err, + ) + return reconcile.Result{}, err + } + + err = r.startClusterJobs(c, l) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to start cluster jobs, err: %w", err) } @@ -501,6 +504,8 @@ func (r *CassandraReconciler) handleUpdateCluster( "data centres", c.Spec.DataCentres, ) + r.EventRecorder.Event(c, models.Normal, models.UpdatedEvent, "Cluster has been updated") + return models.ExitReconcile, nil } @@ -679,7 +684,7 @@ func (r *CassandraReconciler) handleDeleteCluster( } func (r *CassandraReconciler) startSyncJob(c *v1beta1.Cassandra) error { - job := r.newWatchStatusJob(c) + job := r.newSyncJob(c) err := r.Scheduler.ScheduleJob(c.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) if err != nil { @@ -722,8 +727,9 @@ func (r *CassandraReconciler) startClusterOnPremisesIPsJob(c *v1beta1.Cassandra, return nil } -func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler.Job { - l := log.Log.WithValues("component", "CassandraStatusClusterJob") +func (r *CassandraReconciler) newSyncJob(c *v1beta1.Cassandra) scheduler.Job { + l := log.Log.WithValues("syncJob", c.GetJobID(scheduler.StatusChecker), "clusterID", c.Status.ID) + return func() error { namespacedName := client.ObjectKeyFromObject(c) err := r.Get(context.Background(), namespacedName, c) @@ -789,16 +795,10 @@ func (r *CassandraReconciler) newWatchStatusJob(c *v1beta1.Cassandra) scheduler. equals := c.Spec.IsEqual(&iCassandra.Spec) if equals && c.Annotations[models.ExternalChangesAnnotation] == models.True { - patch := c.NewPatch() - delete(c.Annotations, models.ExternalChangesAnnotation) - err := r.Patch(context.Background(), c, patch) + err = reconcileExternalChanges(r.Client, r.EventRecorder, c) if err != nil { return err } - - r.EventRecorder.Event(c, models.Normal, models.ExternalChanges, - "External changes were automatically reconciled", - ) } else if c.Status.CurrentClusterOperationStatus == models.NoOperation && c.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && !equals { diff --git a/controllers/clusters/datatest/kafka_v1beta1.yaml b/controllers/clusters/datatest/kafka_v1beta1.yaml index 4d73b5344..d8ffe680a 100644 --- a/controllers/clusters/datatest/kafka_v1beta1.yaml +++ b/controllers/clusters/datatest/kafka_v1beta1.yaml @@ -14,7 +14,7 @@ spec: allowDeleteTopics: true autoCreateTopics: true clientToClusterEncryption: true - privateNetworkCluster: true + privateNetwork: true slaTier: "NON_PRODUCTION" # bundledUseOnly: true # clientBrokerAuthWithMtls: true diff --git a/controllers/clusters/helpers.go b/controllers/clusters/helpers.go index adaa4d946..d093b990a 100644 --- a/controllers/clusters/helpers.go +++ b/controllers/clusters/helpers.go @@ -336,3 +336,18 @@ func handleExternalChanges[T any]( return models.ExitReconcile, nil } + +func reconcileExternalChanges(c client.Client, r record.EventRecorder, obj Object) error { + patch := obj.NewPatch() + obj.GetAnnotations()[models.ResourceStateAnnotation] = "" + err := c.Patch(context.Background(), obj, patch) + if err != nil { + return fmt.Errorf("failed to automaticly handle external changes, err: %w", err) + } + + r.Event(obj, models.Normal, models.ExternalChanges, + "External changes were automatically reconciled", + ) + + return nil +} diff --git a/controllers/clusters/kafka_controller.go b/controllers/clusters/kafka_controller.go index e5d49d1fb..750915428 100644 --- a/controllers/clusters/kafka_controller.go +++ b/controllers/clusters/kafka_controller.go @@ -18,7 +18,9 @@ package clusters import ( "context" + "encoding/json" "errors" + "fmt" "github.com/go-logr/logr" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -104,177 +106,182 @@ func (r *KafkaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return models.ExitReconcile, nil } -func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, k *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) { - l = l.WithName("Kafka creation Event") +func (r *KafkaReconciler) createCluster(ctx context.Context, k *v1beta1.Kafka, l logr.Logger) error { + l.Info("Creating cluster", + "cluster name", k.Spec.Name, + "data centres", k.Spec.DataCentres) - var err error - if k.Status.ID == "" { - l.Info("Creating cluster", - "cluster name", k.Spec.Name, - "data centres", k.Spec.DataCentres) + b, err := r.API.CreateClusterRaw(instaclustr.KafkaEndpoint, k.Spec.ToInstAPI()) + if err != nil { + return fmt.Errorf("failed to create kafka cluster, err: %w", err) + } - patch := k.NewPatch() - k.Status.ID, err = r.API.CreateCluster(instaclustr.KafkaEndpoint, k.Spec.ToInstAPI()) + instaModel := models.KafkaCluster{} + err = json.Unmarshal(b, &instaModel) + if err != nil { + return fmt.Errorf("failed to unmarshal json to kafka model, err: %w", err) + } + + r.EventRecorder.Eventf( + k, models.Normal, models.Created, + "Cluster creation request is sent. Cluster ID: %s", + instaModel.ID, + ) + + k.Spec.FromInstAPI(&instaModel) + err = r.Update(ctx, k) + if err != nil { + return fmt.Errorf("failed to update kafka spec, err: %w", err) + } + + k.Status.FromInstAPI(&instaModel) + err = r.Status().Update(ctx, k) + if err != nil { + return fmt.Errorf("failed to update kafka status, err: %w", err) + } + + l.Info("Cluster has been created", + "cluster ID", k.Status.ID, + ) + + return nil +} + +func (r *KafkaReconciler) startJobs(k *v1beta1.Kafka) error { + err := r.startSyncJob(k) + if err != nil { + return fmt.Errorf("failed to start cluster synchronize, err: %w", err) + } + + r.EventRecorder.Eventf( + k, models.Normal, models.Created, + "Cluster status check job is started", + ) + + if k.Spec.UserRefs != nil && k.Status.AvailableUsers == nil { + err = r.startUsersCreationJob(k) if err != nil { - l.Error(err, "Cannot create cluster", - "spec", k.Spec, - ) - r.EventRecorder.Eventf( - k, models.Warning, models.CreationFailed, - "Cluster creation on the Instaclustr is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err + return fmt.Errorf("failed to start user creation job, err: %w", err) } + r.EventRecorder.Event(k, models.Normal, models.Created, + "Cluster user creation job is started", + ) + } + + return nil +} + +func (r *KafkaReconciler) handleOnPremisesCreation(ctx context.Context, k *v1beta1.Kafka, l logr.Logger) error { + instaModel, err := r.API.GetKafka(k.Status.ID) + if err != nil { + l.Error(err, + "Cannot get cluster from the Instaclustr API", + "cluster name", k.Spec.Name, + "data centres", k.Spec.DataCentres, + "cluster ID", k.Status.ID, + ) r.EventRecorder.Eventf( - k, models.Normal, models.Created, - "Cluster creation request is sent. Cluster ID: %s", - k.Status.ID, + k, models.Warning, models.FetchFailed, + "Cluster fetch from the Instaclustr API is failed. Reason: %v", + err, ) + return err + } - err = r.Status().Patch(ctx, k, patch) + iKafka := v1beta1.Kafka{} + iKafka.FromInstAPI(instaModel) + + bootstrap := newOnPremisesBootstrap( + r.Client, + k, + r.EventRecorder, + iKafka.Status.ToOnPremises(), + k.Spec.OnPremisesSpec, + newExposePorts(k.GetExposePorts()), + k.GetHeadlessPorts(), + k.Spec.PrivateNetwork, + ) + + err = handleCreateOnPremisesClusterResources(ctx, bootstrap) + if err != nil { + l.Error(err, + "Cannot create resources for on-premises cluster", + "cluster spec", k.Spec.OnPremisesSpec, + ) + r.EventRecorder.Eventf( + k, models.Warning, models.CreationFailed, + "Resources creation for on-premises cluster is failed. Reason: %v", + err, + ) + return err + } + + err = r.startClusterOnPremisesIPsJob(k, bootstrap) + if err != nil { + l.Error(err, + "Cannot start on-premises cluster IPs check job", + "cluster ID", k.Status.ID, + ) + + r.EventRecorder.Eventf( + k, models.Warning, models.CreationFailed, + "On-premises cluster IPs check job is failed. Reason: %v", + err, + ) + return err + } + + l.Info( + "On-premises resources have been created", + "cluster name", k.Spec.Name, + "on-premises Spec", k.Spec.OnPremisesSpec, + "cluster ID", k.Status.ID, + ) + + return nil +} + +func (r *KafkaReconciler) handleCreateCluster(ctx context.Context, k *v1beta1.Kafka, l logr.Logger) (reconcile.Result, error) { + l = l.WithName("Kafka creation Event") + + if k.Status.ID == "" { + err := r.createCluster(ctx, k, l) if err != nil { - l.Error(err, "Cannot patch cluster status", - "spec", k.Spec, - ) - r.EventRecorder.Eventf( - k, models.Warning, models.PatchFailed, - "Cluster resource status patch is failed. Reason: %v", - err, + r.EventRecorder.Eventf(k, models.Warning, models.CreationFailed, + "Failed to create Kafka cluster. Reason: %v", err, ) return reconcile.Result{}, err } + } + if k.Status.State != models.DeletedStatus { k.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent controllerutil.AddFinalizer(k, models.DeletionFinalizer) - err = r.Patch(ctx, k, patch) + err := r.Update(ctx, k) if err != nil { - l.Error(err, "Cannot patch cluster resource", - "name", k.Spec.Name, - ) - r.EventRecorder.Eventf( - k, models.Warning, models.PatchFailed, - "Cluster resource patch is failed. Reason: %v", - err, + r.EventRecorder.Eventf(k, models.Warning, models.CreationFailed, + "Failed to update resource metadata. Reason: %v", err, ) return reconcile.Result{}, err } - l.Info("Cluster has been created", - "cluster ID", k.Status.ID, - ) - } - - if k.Status.State != models.DeletedStatus { - err = r.startClusterStatusJob(k) + err = r.startJobs(k) if err != nil { - l.Error(err, "Cannot start cluster status job", - "cluster ID", k.Status.ID) - r.EventRecorder.Eventf( - k, models.Warning, models.CreationFailed, - "Cluster status check job creation is failed. Reason: %v", - err, + r.EventRecorder.Eventf(k, models.Warning, models.CreationFailed, + "Failed to start cluster jobs. Reason: %v", err, ) return reconcile.Result{}, err } - r.EventRecorder.Eventf( - k, models.Normal, models.Created, - "Cluster status check job is started", - ) - - if k.Spec.UserRefs != nil && k.Status.AvailableUsers == nil { - err = r.startUsersCreationJob(k) - if err != nil { - l.Error(err, "Failed to start user creation job") - r.EventRecorder.Eventf(k, models.Warning, models.CreationFailed, - "User creation job is failed. Reason: %v", err, - ) - return reconcile.Result{}, err - } - - r.EventRecorder.Event(k, models.Normal, models.Created, - "Cluster user creation job is started", - ) - } - if k.Spec.OnPremisesSpec != nil && k.Spec.OnPremisesSpec.EnableAutomation { - iData, err := r.API.GetKafka(k.Status.ID) + err = r.handleOnPremisesCreation(ctx, k, l) if err != nil { - l.Error(err, "Cannot get cluster from the Instaclustr API", - "cluster name", k.Spec.Name, - "data centres", k.Spec.DataCentres, - "cluster ID", k.Status.ID, - ) - r.EventRecorder.Eventf( - k, models.Warning, models.FetchFailed, - "Cluster fetch from the Instaclustr API is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - iKafka, err := k.FromInstAPI(iData) - if err != nil { - l.Error( - err, "Cannot convert cluster from the Instaclustr API", - "cluster name", k.Spec.Name, - "cluster ID", k.Status.ID, - ) - r.EventRecorder.Eventf( - k, models.Warning, models.ConversionFailed, - "Cluster convertion from the Instaclustr API to k8s resource is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - bootstrap := newOnPremisesBootstrap( - r.Client, - k, - r.EventRecorder, - iKafka.Status.ClusterStatus, - k.Spec.OnPremisesSpec, - newExposePorts(k.GetExposePorts()), - k.GetHeadlessPorts(), - k.Spec.PrivateNetworkCluster, - ) - - err = handleCreateOnPremisesClusterResources(ctx, bootstrap) - if err != nil { - l.Error( - err, "Cannot create resources for on-premises cluster", - "cluster spec", k.Spec.OnPremisesSpec, - ) - r.EventRecorder.Eventf( - k, models.Warning, models.CreationFailed, - "Resources creation for on-premises cluster is failed. Reason: %v", - err, - ) - return reconcile.Result{}, err - } - - err = r.startClusterOnPremisesIPsJob(k, bootstrap) - if err != nil { - l.Error(err, "Cannot start on-premises cluster IPs check job", - "cluster ID", k.Status.ID, - ) - - r.EventRecorder.Eventf( - k, models.Warning, models.CreationFailed, - "On-premises cluster IPs check job is failed. Reason: %v", - err, + r.EventRecorder.Eventf(k, models.Warning, models.CreationFailed, + "Failed to handle OnPremises cluster creation. Reason: %v", err, ) return reconcile.Result{}, err } - - l.Info( - "On-premises resources have been created", - "cluster name", k.Spec.Name, - "on-premises Spec", k.Spec.OnPremisesSpec, - "cluster ID", k.Status.ID, - ) - return models.ExitReconcile, nil } } @@ -289,24 +296,21 @@ func (r *KafkaReconciler) handleUpdateCluster( ) (reconcile.Result, error) { l = l.WithName("Kafka update Event") - iData, err := r.API.GetKafka(k.Status.ID) + instaModel, err := r.API.GetKafka(k.Status.ID) if err != nil { l.Error(err, "Cannot get cluster from the Instaclustr", "cluster ID", k.Status.ID) return reconcile.Result{}, err } - iKafka, err := k.FromInstAPI(iData) - if err != nil { - l.Error(err, "Cannot convert cluster from the Instaclustr API", "cluster ID", k.Status.ID) - return reconcile.Result{}, err - } + iKafka := v1beta1.Kafka{} + iKafka.FromInstAPI(instaModel) if k.Annotations[models.ExternalChangesAnnotation] == models.True || r.RateLimiter.NumRequeues(req) == rlimiter.DefaultMaxTries { - return handleExternalChanges[v1beta1.KafkaSpec](r.EventRecorder, r.Client, k, iKafka, l) + return handleExternalChanges[v1beta1.KafkaSpec](r.EventRecorder, r.Client, k, &iKafka, l) } - if k.Spec.ClusterSettingsNeedUpdate(iKafka.Spec.Cluster) { + if k.Spec.ClusterSettingsNeedUpdate(&iKafka.Spec.GenericClusterSpec) { l.Info("Updating cluster settings", "instaclustr description", iKafka.Spec.Description, "instaclustr two factor delete", iKafka.Spec.TwoFactorDelete) @@ -343,7 +347,7 @@ func (r *KafkaReconciler) handleUpdateCluster( err = r.API.UpdateCluster(k.Status.ID, instaclustr.KafkaEndpoint, k.Spec.ToInstAPIUpdate()) if err != nil { l.Error(err, "Unable to update cluster on Instaclustr", - "cluster name", k.Spec.Name, "cluster state", k.Status.ClusterStatus.State) + "cluster name", k.Spec.Name, "cluster state", k.Status.State) r.EventRecorder.Eventf(k, models.Warning, models.UpdateFailed, "Cluster update on the Instaclustr API is failed. Reason: %v", err) @@ -371,6 +375,8 @@ func (r *KafkaReconciler) handleUpdateCluster( "data centres", k.Spec.DataCentres, ) + r.EventRecorder.Event(k, models.Normal, models.UpdatedEvent, "Cluster has been updated") + return models.ExitReconcile, nil } @@ -381,7 +387,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, k *v1beta1.Ka if err != nil && !errors.Is(err, instaclustr.NotFound) { l.Error(err, "Cannot get cluster from the Instaclustr API", "cluster name", k.Spec.Name, - "cluster state", k.Status.ClusterStatus.State) + "cluster state", k.Status.State) r.EventRecorder.Eventf( k, models.Warning, models.FetchFailed, "Cluster resource fetch from the Instaclustr API is failed. Reason: %v", @@ -400,7 +406,7 @@ func (r *KafkaReconciler) handleDeleteCluster(ctx context.Context, k *v1beta1.Ka if err != nil { l.Error(err, "Cannot delete cluster", "cluster name", k.Spec.Name, - "cluster state", k.Status.ClusterStatus.State) + "cluster state", k.Status.State) r.EventRecorder.Eventf( k, models.Warning, models.DeletionFailed, "Cluster deletion is failed on the Instaclustr. Reason: %v", @@ -514,8 +520,8 @@ func (r *KafkaReconciler) startClusterOnPremisesIPsJob(k *v1beta1.Kafka, b *onPr return nil } -func (r *KafkaReconciler) startClusterStatusJob(kafka *v1beta1.Kafka) error { - job := r.newWatchStatusJob(kafka) +func (r *KafkaReconciler) startSyncJob(kafka *v1beta1.Kafka) error { + job := r.newSyncJob(kafka) err := r.Scheduler.ScheduleJob(kafka.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) if err != nil { @@ -525,8 +531,9 @@ func (r *KafkaReconciler) startClusterStatusJob(kafka *v1beta1.Kafka) error { return nil } -func (r *KafkaReconciler) newWatchStatusJob(k *v1beta1.Kafka) scheduler.Job { - l := log.Log.WithValues("component", "kafkaStatusClusterJob") +func (r *KafkaReconciler) newSyncJob(k *v1beta1.Kafka) scheduler.Job { + l := log.Log.WithValues("syncJob", k.GetJobID(scheduler.StatusChecker), "clusterID", k.Status.ID) + return func() error { namespacedName := client.ObjectKeyFromObject(k) err := r.Get(context.Background(), namespacedName, k) @@ -544,7 +551,7 @@ func (r *KafkaReconciler) newWatchStatusJob(k *v1beta1.Kafka) scheduler.Job { return err } - iData, err := r.API.GetKafka(k.Status.ID) + instaModel, err := r.API.GetKafka(k.Status.ID) if err != nil { if errors.Is(err, instaclustr.NotFound) { if k.DeletionTimestamp != nil { @@ -559,23 +566,16 @@ func (r *KafkaReconciler) newWatchStatusJob(k *v1beta1.Kafka) scheduler.Job { return err } - iKafka, err := k.FromInstAPI(iData) - if err != nil { - l.Error(err, "Cannot convert cluster from the Instaclustr API", - "cluster ID", k.Status.ID, - ) - return err - } + iKafka := v1beta1.Kafka{} + iKafka.FromInstAPI(instaModel) - if !areStatusesEqual(&k.Status.ClusterStatus, &iKafka.Status.ClusterStatus) { - l.Info("Kafka status of k8s is different from Instaclustr. Reconcile k8s resource status..", - "instacluster status", iKafka.Status, - "k8s status", k.Status.ClusterStatus) + if !k.Status.Equals(&iKafka.Status) { + l.Info("Updating cluster status") - areDCsEqual := areDataCentresEqual(iKafka.Status.ClusterStatus.DataCentres, k.Status.ClusterStatus.DataCentres) + areDCsEqual := k.Status.DCsEqual(&iKafka.Status) patch := k.NewPatch() - k.Status.ClusterStatus = iKafka.Status.ClusterStatus + k.Status.FromInstAPI(instaModel) err = r.Status().Patch(context.Background(), k, patch) if err != nil { l.Error(err, "Cannot patch cluster cluster", @@ -586,14 +586,14 @@ func (r *KafkaReconciler) newWatchStatusJob(k *v1beta1.Kafka) scheduler.Job { if !areDCsEqual { var nodes []*v1beta1.Node - for _, dc := range iKafka.Status.ClusterStatus.DataCentres { + for _, dc := range iKafka.Status.DataCentres { nodes = append(nodes, dc.Nodes...) } err = exposeservice.Create(r.Client, k.Name, k.Namespace, - k.Spec.PrivateNetworkCluster, + k.Spec.PrivateNetwork, nodes, models.KafkaConnectionPort) if err != nil { @@ -602,9 +602,17 @@ func (r *KafkaReconciler) newWatchStatusJob(k *v1beta1.Kafka) scheduler.Job { } } + equals := k.Spec.IsEqual(iKafka.Spec) + + if equals && k.Annotations[models.ExternalChangesAnnotation] == models.True { + err = reconcileExternalChanges(r.Client, r.EventRecorder, k) + if err != nil { + return err + } + } if iKafka.Status.CurrentClusterOperationStatus == models.NoOperation && k.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && - !k.Spec.IsEqual(iKafka.Spec) { + !equals { patch := k.NewPatch() k.Annotations[models.ExternalChangesAnnotation] = models.True @@ -778,6 +786,11 @@ func (r *KafkaReconciler) SetupWithManager(mgr ctrl.Manager) error { } newObj := event.ObjectNew.(*v1beta1.Kafka) + + if newObj.Status.ID == "" && newObj.Annotations[models.ResourceStateAnnotation] == models.CreatingEvent { + return true + } + if newObj.Status.ID == "" { newObj.Annotations[models.ResourceStateAnnotation] = models.CreatingEvent return true @@ -808,7 +821,7 @@ func (r *KafkaReconciler) reconcileMaintenanceEvents(ctx context.Context, k *v1b return err } - if !k.Status.AreMaintenanceEventStatusesEqual(iMEStatuses) { + if !k.Status.MaintenanceEventsEqual(iMEStatuses) { patch := k.NewPatch() k.Status.MaintenanceEvents = iMEStatuses err = r.Status().Patch(ctx, k, patch) diff --git a/controllers/clusters/opensearch_controller.go b/controllers/clusters/opensearch_controller.go index 89bb81f54..bf5ea6d5a 100644 --- a/controllers/clusters/opensearch_controller.go +++ b/controllers/clusters/opensearch_controller.go @@ -206,13 +206,6 @@ func (r *OpenSearchReconciler) createCluster(ctx context.Context, o *v1beta1.Ope return fmt.Errorf("failed to update cluster status, err : %w", err) } - controllerutil.AddFinalizer(o, models.DeletionFinalizer) - o.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent - err = r.Update(ctx, o) - if err != nil { - return err - } - logger.Info( "OpenSearch resource has been created", "cluster name", o.Spec.Name, "cluster ID", o.Status.ID, @@ -266,6 +259,16 @@ func (r *OpenSearchReconciler) HandleCreateCluster( } if o.Status.State != models.DeletedStatus { + o.Annotations[models.ResourceStateAnnotation] = models.CreatedEvent + controllerutil.AddFinalizer(o, models.DeletionFinalizer) + err := r.Update(ctx, o) + if err != nil { + r.EventRecorder.Eventf(o, models.Warning, models.CreationFailed, + "Failed to update resource metadata. Reason: %v", err, + ) + return reconcile.Result{}, err + } + err = r.startClusterJobs(o) if err != nil { return reconcile.Result{}, fmt.Errorf("failed to run cluster jobs, err: %w", err) @@ -512,7 +515,7 @@ func (r *OpenSearchReconciler) HandleDeleteCluster( } func (r *OpenSearchReconciler) startClusterSyncJob(cluster *v1beta1.OpenSearch) error { - job := r.newWatchStatusJob(cluster) + job := r.newSyncJob(cluster) err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.StatusChecker), scheduler.ClusterStatusInterval, job) if err != nil { @@ -544,8 +547,9 @@ func (r *OpenSearchReconciler) startUsersCreationJob(cluster *v1beta1.OpenSearch return nil } -func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) scheduler.Job { - l := log.Log.WithValues("statusJob", o.GetJobID(scheduler.StatusChecker)) +func (r *OpenSearchReconciler) newSyncJob(o *v1beta1.OpenSearch) scheduler.Job { + l := log.Log.WithValues("syncJob", o.GetJobID(scheduler.StatusChecker), "clusterID", o.Status.ID) + return func() error { namespacedName := client.ObjectKeyFromObject(o) err := r.Get(context.Background(), namespacedName, o) @@ -626,16 +630,10 @@ func (r *OpenSearchReconciler) newWatchStatusJob(o *v1beta1.OpenSearch) schedule equals := o.Spec.IsEqual(iO.Spec) if equals && o.Annotations[models.ExternalChangesAnnotation] == models.True { - patch := o.NewPatch() - delete(o.Annotations, models.ExternalChangesAnnotation) - err := r.Patch(context.Background(), o, patch) + err = reconcileExternalChanges(r.Client, r.EventRecorder, o) if err != nil { return err } - - r.EventRecorder.Event(o, models.Normal, models.ExternalChanges, - "External changes were automatically reconciled", - ) } else if o.Status.CurrentClusterOperationStatus == models.NoOperation && o.Annotations[models.ResourceStateAnnotation] != models.UpdatingEvent && !equals { diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index 27acd4645..42aaa45fa 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -411,7 +411,7 @@ func (c *Client) UpdateCassandra(id string, cassandra models.CassandraClusterAPI return nil } -func (c *Client) GetKafka(id string) ([]byte, error) { +func (c *Client) GetKafka(id string) (*models.KafkaCluster, error) { url := c.serverHostname + KafkaEndpoint + id resp, err := c.DoRequest(url, http.MethodGet, nil) @@ -433,7 +433,13 @@ func (c *Client) GetKafka(id string) ([]byte, error) { return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) } - return body, nil + var instaModel models.KafkaCluster + err = json.Unmarshal(body, &instaModel) + if err != nil { + return nil, err + } + + return &instaModel, nil } func (c *Client) GetKafkaConnect(id string) ([]byte, error) { diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 8257d79b3..b1dba4831 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -81,7 +81,7 @@ type API interface { UpdateKafkaACL(kafkaACLID, kafkaACLEndpoint string, kafkaACLSpec any) error GetCassandra(id string) (*models.CassandraCluster, error) UpdateCassandra(id string, cassandra models.CassandraClusterAPIUpdate) error - GetKafka(id string) ([]byte, error) + GetKafka(id string) (*models.KafkaCluster, error) GetKafkaConnect(id string) ([]byte, error) UpdateKafkaConnect(id string, kc models.KafkaConnectAPIUpdate) error GetZookeeper(id string) ([]byte, error) diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index 14d79d4de..9054993cd 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -306,7 +306,7 @@ func (c *mockClient) UpdateRedis(id string, r *models.RedisDataCentreUpdate) err panic("UpdateRedis: is not implemented") } -func (c *mockClient) GetKafka(id string) ([]byte, error) { +func (c *mockClient) GetKafka(id string) (*models.KafkaCluster, error) { panic("GetKafka: is not implemented") } diff --git a/pkg/models/cassandra_apiv2.go b/pkg/models/cassandra_apiv2.go index 48a6f37ec..981bc56c8 100644 --- a/pkg/models/cassandra_apiv2.go +++ b/pkg/models/cassandra_apiv2.go @@ -32,7 +32,7 @@ type CassandraDataCentre struct { GenericDataCentreFields `json:",inline"` ContinuousBackup bool `json:"continuousBackup"` - PrivateLink bool `json:"privateLink,omitempty"` + PrivateLink bool `json:"privateLink"` PrivateIPBroadcastForDiscovery bool `json:"privateIpBroadcastForDiscovery"` ClientToClusterEncryption bool `json:"clientToClusterEncryption"` ReplicationFactor int `json:"replicationFactor"` diff --git a/pkg/models/kafka_apiv2.go b/pkg/models/kafka_apiv2.go index b01138a9e..843c78576 100644 --- a/pkg/models/kafka_apiv2.go +++ b/pkg/models/kafka_apiv2.go @@ -17,34 +17,35 @@ limitations under the License. package models type KafkaCluster struct { - ClusterStatus `json:",inline"` - Name string `json:"name"` - KafkaVersion string `json:"kafkaVersion"` - PrivateNetworkCluster bool `json:"privateNetworkCluster"` - SLATier string `json:"slaTier"` - TwoFactorDelete []*TwoFactorDelete `json:"twoFactorDelete"` - AllowDeleteTopics bool `json:"allowDeleteTopics"` - AutoCreateTopics bool `json:"autoCreateTopics"` - BundledUseOnly bool `json:"bundledUseOnly"` - ClientBrokerAuthWithMtls bool `json:"clientBrokerAuthWithMtls"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - DataCentres []*KafkaDataCentre `json:"dataCentres"` - DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper"` - DefaultNumberOfPartitions int `json:"defaultNumberOfPartitions"` - DefaultReplicationFactor int `json:"defaultReplicationFactor"` - Kraft []*Kraft `json:"kraft,omitempty"` - KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy"` - KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry"` - PCIComplianceMode bool `json:"pciComplianceMode"` - RestProxy []*RestProxy `json:"restProxy"` - SchemaRegistry []*SchemaRegistry `json:"schemaRegistry"` - ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` - Description string `json:"description,omitempty"` + GenericClusterFields `json:",inline"` + + KafkaVersion string `json:"kafkaVersion"` + AllowDeleteTopics bool `json:"allowDeleteTopics"` + AutoCreateTopics bool `json:"autoCreateTopics"` + BundledUseOnly bool `json:"bundledUseOnly"` + ClientBrokerAuthWithMtls bool `json:"clientBrokerAuthWithMtls"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + DefaultNumberOfPartitions int `json:"defaultNumberOfPartitions"` + DefaultReplicationFactor int `json:"defaultReplicationFactor"` + + DataCentres []*KafkaDataCentre `json:"dataCentres"` + DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper"` + Kraft []*Kraft `json:"kraft,omitempty"` + KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy"` + KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry"` + RestProxy []*RestProxy `json:"restProxy"` + SchemaRegistry []*SchemaRegistry `json:"schemaRegistry"` + ResizeSettings []*ResizeSettings `json:"resizeSettings,omitempty"` } type KafkaDataCentre struct { - DataCentre `json:",inline"` + GenericDataCentreFields `json:",inline"` + + NumberOfNodes int `json:"numberOfNodes"` + NodeSize string `json:"nodeSize"` + PrivateLink []*PrivateLink `json:"privateLink,omitempty"` + Nodes []*Node `json:"nodes"` } type SchemaRegistry struct {