From 02d33997d8c71b462f2a522175b8643e21f75f87 Mon Sep 17 00:00:00 2001 From: tengu-alt Date: Tue, 22 Aug 2023 16:34:25 +0300 Subject: [PATCH] Kafka cluster CRD spec was updated with Kraft field --- apis/clusters/v1beta1/kafka_types.go | 28 ++++++++++++++++++- apis/clusters/v1beta1/kafka_webhook.go | 13 +++++++++ .../clusters/v1beta1/zz_generated.deepcopy.go | 26 +++++++++++++++++ .../clusters.instaclustr.com_kafkas.yaml | 9 ++++++ config/samples/clusters_v1beta1_kafka.yaml | 6 ++-- .../mock/server/go/model_kafka_cluster_v2.go | 3 ++ .../mock/server/go/model_kafka_kraft_v2.go | 15 ++++++++++ pkg/models/errors.go | 3 ++ pkg/models/kafka_apiv2.go | 5 ++++ 9 files changed, 105 insertions(+), 3 deletions(-) create mode 100644 pkg/instaclustr/mock/server/go/model_kafka_kraft_v2.go diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index e542c093f..30f5c09fb 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -27,7 +27,7 @@ import ( // +kubebuilder:object:generate:=false type KafkaAddons interface { - SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | KafkaPrivateLink + SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | KafkaPrivateLink | Kraft } type SchemaRegistry struct { @@ -83,12 +83,17 @@ type KafkaSpec struct { ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"` ClientAuthBrokerWithoutEncryption bool `json:"clientAuthBrokerWithoutEncryption,omitempty"` ClientAuthBrokerWithEncryption bool `json:"clientAuthBrokerWithEncryption,omitempty"` + Kraft []*Kraft `json:"kraft,omitempty"` KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"` KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` UserRefs []*UserReference `json:"userRefs,omitempty"` } +type Kraft struct { + ControllerNodeCount int `json:"controllerNodeCount"` +} + type KafkaDataCentre struct { DataCentre `json:",inline"` PrivateLink []*KafkaPrivateLink `json:"privateLink,omitempty"` @@ -159,6 +164,7 @@ func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster { ClientAuthBrokerWithoutEncryption: k.ClientAuthBrokerWithoutEncryption, ClientAuthBrokerWithEncryption: k.ClientAuthBrokerWithEncryption, BundledUseOnly: k.BundledUseOnly, + Kraft: k.kraftToInstAPI(), KarapaceRestProxy: k.karapaceRestProxyToInstAPI(), KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(), } @@ -210,6 +216,15 @@ func (k *KafkaSpec) karapaceRestProxyToInstAPI() (iRestProxies []*models.Karapac return } +func (k *KafkaSpec) kraftToInstAPI() (iKraft []*models.Kraft) { + for _, kraft := range k.Kraft { + iKraft = append(iKraft, &models.Kraft{ + ControllerNodeCount: kraft.ControllerNodeCount, + }) + } + return +} + func (k *KafkaSpec) dedicatedZookeeperToInstAPI() (iZookeepers []*models.DedicatedZookeeper) { for _, zookeeper := range k.DedicatedZookeeper { iZookeepers = append(iZookeepers, &models.DedicatedZookeeper{ @@ -296,6 +311,7 @@ func (ks *KafkaSpec) FromInstAPI(iKafka *models.KafkaCluster) KafkaSpec { ClientAuthBrokerWithoutEncryption: iKafka.ClientAuthBrokerWithoutEncryption, ClientAuthBrokerWithEncryption: iKafka.ClientAuthBrokerWithEncryption, KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy), + Kraft: ks.kraftFromInstAPI(iKafka.Kraft), KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry), BundledUseOnly: iKafka.BundledUseOnly, } @@ -375,6 +391,15 @@ func (ks *KafkaSpec) KarapaceRestProxyFromInstAPI(iKRPs []*models.KarapaceRestPr return } +func (ks *KafkaSpec) kraftFromInstAPI(iKraft []*models.Kraft) (kraft []*Kraft) { + for _, ikraft := range iKraft { + kraft = append(kraft, &Kraft{ + ControllerNodeCount: ikraft.ControllerNodeCount, + }) + } + return +} + func (ks *KafkaSpec) KarapaceSchemaRegistryFromInstAPI(iKSRs []*models.KarapaceSchemaRegistry) (ksrs []*KarapaceSchemaRegistry) { for _, iKSR := range iKSRs { ksrs = append(ksrs, &KarapaceSchemaRegistry{ @@ -405,6 +430,7 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool { isKafkaAddonsEqual[SchemaRegistry](a.SchemaRegistry, b.SchemaRegistry) && isKafkaAddonsEqual[RestProxy](a.RestProxy, b.RestProxy) && isKafkaAddonsEqual[KarapaceRestProxy](a.KarapaceRestProxy, b.KarapaceRestProxy) && + isKafkaAddonsEqual[Kraft](a.Kraft, b.Kraft) && isKafkaAddonsEqual[KarapaceSchemaRegistry](a.KarapaceSchemaRegistry, b.KarapaceSchemaRegistry) && isKafkaAddonsEqual[DedicatedZookeeper](a.DedicatedZookeeper, b.DedicatedZookeeper) && a.areDCsEqual(b.DataCentres) && diff --git a/apis/clusters/v1beta1/kafka_webhook.go b/apis/clusters/v1beta1/kafka_webhook.go index 7c3cc3806..196fe3d9b 100644 --- a/apis/clusters/v1beta1/kafka_webhook.go +++ b/apis/clusters/v1beta1/kafka_webhook.go @@ -122,6 +122,16 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object } } + if len(k.Spec.Kraft) > 1 { + return models.ErrMoreThanOneKraft + } + + for _, kraft := range k.Spec.Kraft { + if kraft.ControllerNodeCount > 3 { + return models.ErrMoreThanThreeControllerNodeCount + } + } + return nil } @@ -210,6 +220,9 @@ func (ks *KafkaSpec) validateUpdate(old *KafkaSpec) error { if !isKafkaAddonsEqual[RestProxy](ks.RestProxy, old.RestProxy) { return models.ErrImmutableRestProxy } + if !isKafkaAddonsEqual[Kraft](ks.Kraft, old.Kraft) { + return models.ErrImmutableKraft + } if !isKafkaAddonsEqual[KarapaceRestProxy](ks.KarapaceRestProxy, old.KarapaceRestProxy) { return models.ErrImmutableKarapaceRestProxy } diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 957868ae4..1f6b27750 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -1063,6 +1063,17 @@ func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { } } } + if in.Kraft != nil { + in, out := &in.Kraft, &out.Kraft + *out = make([]*Kraft, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(Kraft) + **out = **in + } + } + } if in.KarapaceRestProxy != nil { in, out := &in.KarapaceRestProxy, &out.KarapaceRestProxy *out = make([]*KarapaceRestProxy, len(*in)) @@ -1154,6 +1165,21 @@ func (in *KarapaceSchemaRegistry) DeepCopy() *KarapaceSchemaRegistry { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Kraft) DeepCopyInto(out *Kraft) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Kraft. +func (in *Kraft) DeepCopy() *Kraft { + if in == nil { + return nil + } + out := new(Kraft) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MaintenanceEvent) DeepCopyInto(out *MaintenanceEvent) { *out = *in diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index 97b39830e..8fa7a5d47 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -139,6 +139,15 @@ spec: - version type: object type: array + kraft: + items: + properties: + controllerNodeCount: + type: integer + required: + - controllerNodeCount + type: object + type: array name: description: Name [ 3 .. 32 ] characters. type: string diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index 51bfc9d21..4993fd89d 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -30,6 +30,8 @@ spec: # karapaceRestProxy: # - integrateRestProxyWithSchemaRegistry: true # version: "3.2.0" + # kraft: + # - controllerNodeCount: 3 # restProxy: # - integrateRestProxyWithSchemaRegistry: false # schemaRegistryPassword: "asdfasdf" @@ -59,5 +61,5 @@ spec: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 userRefs: - - name: kafkauser-sample - namespace: default \ No newline at end of file +# - name: kafkauser-sample +# namespace: default \ No newline at end of file diff --git a/pkg/instaclustr/mock/server/go/model_kafka_cluster_v2.go b/pkg/instaclustr/mock/server/go/model_kafka_cluster_v2.go index bc29a6f17..ff358400d 100644 --- a/pkg/instaclustr/mock/server/go/model_kafka_cluster_v2.go +++ b/pkg/instaclustr/mock/server/go/model_kafka_cluster_v2.go @@ -33,6 +33,9 @@ type KafkaClusterV2 struct { // TwoFactorDelete []TwoFactorDeleteSettingsV2 `json:"twoFactorDelete,omitempty"` + // + Kraft []KafkaKraftV2 `json:"kraft,omitempty"` + // Adds the specified version of Kafka Karapace REST Proxy to this Kafka cluster. KarapaceRestProxy []KafkaKarapaceRestProxyDetailsV2 `json:"karapaceRestProxy,omitempty"` diff --git a/pkg/instaclustr/mock/server/go/model_kafka_kraft_v2.go b/pkg/instaclustr/mock/server/go/model_kafka_kraft_v2.go new file mode 100644 index 000000000..6eaf52104 --- /dev/null +++ b/pkg/instaclustr/mock/server/go/model_kafka_kraft_v2.go @@ -0,0 +1,15 @@ +/* + * Instaclustr Cluster Management API + * + * Instaclustr Cluster Management API + * + * API version: 2.0.0 + * Generated by: OpenAPI Generator (https://openapi-generator.tech) + */ + +package openapi + +// KafkaKraftV2 - +type KafkaKraftV2 struct { + ControllerNodeCount int `json:"controllerNodeCount"` +} diff --git a/pkg/models/errors.go b/pkg/models/errors.go index dbf0cafb6..266da981a 100644 --- a/pkg/models/errors.go +++ b/pkg/models/errors.go @@ -20,6 +20,8 @@ import "errors" var ( ErrZeroDataCentres = errors.New("cluster spec doesn't have data centres") + ErrMoreThanOneKraft = errors.New("cluster spec spec does not support more than one Kraft") + ErrMoreThanThreeControllerNodeCount = errors.New("Kraft does not support more than three controller nodes") ErrNetworkOverlaps = errors.New("cluster network overlaps") ErrImmutableTwoFactorDelete = errors.New("twoFactorDelete field is immutable") ErrImmutableCloudProviderSettings = errors.New("cloudProviderSettings are immutable") @@ -33,6 +35,7 @@ var ( ErrTypeAssertion = errors.New("unable to assert type") ErrImmutableSchemaRegistry = errors.New("schema registry is immutable") ErrImmutableRestProxy = errors.New("rest proxy is immutable") + ErrImmutableKraft = errors.New("Kraft is immutable") ErrImmutableKarapaceSchemaRegistry = errors.New("karapace schema registry is immutable") ErrImmutableKarapaceRestProxy = errors.New("karapace rest proxy is immutable") ErrImmutableDedicatedZookeeper = errors.New("dedicated zookeeper nodes cannot be changed") diff --git a/pkg/models/kafka_apiv2.go b/pkg/models/kafka_apiv2.go index 17bd5ba59..18b9c8638 100644 --- a/pkg/models/kafka_apiv2.go +++ b/pkg/models/kafka_apiv2.go @@ -34,6 +34,7 @@ type KafkaCluster struct { DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper"` DefaultNumberOfPartitions int `json:"defaultNumberOfPartitions"` DefaultReplicationFactor int `json:"defaultReplicationFactor"` + Kraft []*Kraft `json:"kraft"` KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy"` KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry"` PCIComplianceMode bool `json:"pciComplianceMode"` @@ -68,6 +69,10 @@ type KarapaceRestProxy struct { Version string `json:"version"` } +type Kraft struct { + ControllerNodeCount int `json:"controllerNodeCount"` +} + type KarapaceSchemaRegistry struct { Version string `json:"version"` }