From 5bc76aa2cf5e69573a5af718c99d855d586800fe Mon Sep 17 00:00:00 2001 From: Bohdan Siryk Date: Tue, 29 Aug 2023 16:20:44 +0300 Subject: [PATCH] kafka and cadence updates --- apis/clusters/v1beta1/cadence_types.go | 8 +++-- apis/clusters/v1beta1/cadence_webhook.go | 8 +++++ apis/clusters/v1beta1/kafka_types.go | 36 +++++++++---------- apis/clusters/v1beta1/kafka_webhook.go | 10 +++++- .../clusters/v1beta1/zz_generated.deepcopy.go | 19 ++-------- .../clusters.instaclustr.com_kafkas.yaml | 2 ++ config/samples/clusters_v1beta1_cadence.yaml | 2 ++ .../samples/clusters_v1beta1_cassandra.yaml | 2 ++ config/samples/clusters_v1beta1_kafka.yaml | 8 ++--- pkg/models/kafka_apiv2.go | 6 +--- 10 files changed, 54 insertions(+), 47 deletions(-) diff --git a/apis/clusters/v1beta1/cadence_types.go b/apis/clusters/v1beta1/cadence_types.go index f1b3711c2..857c8b26d 100644 --- a/apis/clusters/v1beta1/cadence_types.go +++ b/apis/clusters/v1beta1/cadence_types.go @@ -62,7 +62,9 @@ type BundledOpenSearchSpec struct { // CadenceSpec defines the desired state of Cadence type CadenceSpec struct { - Cluster `json:",inline"` + Cluster `json:",inline"` + //+kubebuilder:validation:MinItems:=1 + //+kubebuilder:validation:MaxItems:=1 DataCentres []*CadenceDataCentre `json:"dataCentres"` Description string `json:"description,omitempty"` UseCadenceWebAuth bool `json:"useCadenceWebAuth"` @@ -423,7 +425,9 @@ func (cs *CadenceStatus) SecondaryTargetsFromInstAPI(iCad *models.CadenceCluster func (cs *CadenceStatus) DCsFromInstAPI(iDCs []*models.CadenceDataCentre) (dcs []*DataCentreStatus) { for _, iDC := range iDCs { - dcs = append(dcs, cs.ClusterStatus.DCFromInstAPI(iDC.DataCentre)) + dc := cs.ClusterStatus.DCFromInstAPI(iDC.DataCentre) + dc.PrivateLink = privateLinkStatusesFromInstAPI(iDC.PrivateLink) + dcs = append(dcs, dc) } return diff --git a/apis/clusters/v1beta1/cadence_webhook.go b/apis/clusters/v1beta1/cadence_webhook.go index 7adbd7b5f..7a2e55e12 100644 --- a/apis/clusters/v1beta1/cadence_webhook.go +++ b/apis/clusters/v1beta1/cadence_webhook.go @@ -174,6 +174,14 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje if err != nil { return err } + + if !c.Spec.PrivateNetworkCluster && dc.PrivateLink != nil { + return models.ErrPrivateLinkOnlyWithPrivateNetworkCluster + } + + if dc.CloudProvider != models.AWSVPC && dc.PrivateLink != nil { + return models.ErrPrivateLinkSupportedOnlyForAWS + } } return nil diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index 30f5c09fb..dc9717f6d 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -27,7 +27,7 @@ import ( // +kubebuilder:object:generate:=false type KafkaAddons interface { - SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | KafkaPrivateLink | Kraft + SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | PrivateLink | Kraft } type SchemaRegistry struct { @@ -70,12 +70,14 @@ type KafkaSpec struct { ReplicationFactor int `json:"replicationFactor"` // PartitionsNumber number of partitions to use when created new topics. - PartitionsNumber int `json:"partitionsNumber"` - RestProxy []*RestProxy `json:"restProxy,omitempty"` - AllowDeleteTopics bool `json:"allowDeleteTopics"` - AutoCreateTopics bool `json:"autoCreateTopics"` - ClientToClusterEncryption bool `json:"clientToClusterEncryption"` - DataCentres []*KafkaDataCentre `json:"dataCentres"` + PartitionsNumber int `json:"partitionsNumber"` + RestProxy []*RestProxy `json:"restProxy,omitempty"` + AllowDeleteTopics bool `json:"allowDeleteTopics"` + AutoCreateTopics bool `json:"autoCreateTopics"` + ClientToClusterEncryption bool `json:"clientToClusterEncryption"` + //+kubebuilder:validation:MinItems:=1 + //+kubebuilder:validation:MaxItems:=1 + DataCentres []*KafkaDataCentre `json:"dataCentres"` // Provision additional dedicated nodes for Apache Zookeeper to run on. // Zookeeper nodes will be co-located with Kafka if this is not provided @@ -96,11 +98,7 @@ type Kraft struct { type KafkaDataCentre struct { DataCentre `json:",inline"` - PrivateLink []*KafkaPrivateLink `json:"privateLink,omitempty"` -} - -type KafkaPrivateLink struct { - AdvertisedHostname string `json:"advertisedHostname"` + PrivateLink []*PrivateLink `json:"privateLink,omitempty"` } // KafkaStatus defines the observed state of Kafka @@ -236,9 +234,9 @@ func (k *KafkaSpec) dedicatedZookeeperToInstAPI() (iZookeepers []*models.Dedicat return } -func (k *KafkaDataCentre) privateLinkToInstAPI() (iPrivateLink []*models.KafkaPrivateLink) { +func (k *KafkaDataCentre) privateLinkToInstAPI() (iPrivateLink []*models.PrivateLink) { for _, link := range k.PrivateLink { - iPrivateLink = append(iPrivateLink, &models.KafkaPrivateLink{ + iPrivateLink = append(iPrivateLink, &models.PrivateLink{ AdvertisedHostname: link.AdvertisedHostname, }) } @@ -339,9 +337,9 @@ func (ks *KafkaSpec) DCsFromInstAPI(iDCs []*models.KafkaDataCentre) (dcs []*Kafk return } -func (ks *KafkaSpec) PrivateLinkFromInstAPI(iPLs []*models.KafkaPrivateLink) (dcs []*KafkaPrivateLink) { +func (ks *KafkaSpec) PrivateLinkFromInstAPI(iPLs []*models.PrivateLink) (pls []*PrivateLink) { for _, iPL := range iPLs { - dcs = append(dcs, &KafkaPrivateLink{ + pls = append(pls, &PrivateLink{ AdvertisedHostname: iPL.AdvertisedHostname, }) } @@ -411,7 +409,9 @@ func (ks *KafkaSpec) KarapaceSchemaRegistryFromInstAPI(iKSRs []*models.KarapaceS func (ks *KafkaStatus) DCsFromInstAPI(iDCs []*models.KafkaDataCentre) (dcs []*DataCentreStatus) { for _, iDC := range iDCs { - dcs = append(dcs, ks.ClusterStatus.DCFromInstAPI(iDC.DataCentre)) + dc := ks.DCFromInstAPI(iDC.DataCentre) + dc.PrivateLink = privateLinkStatusesFromInstAPI(iDC.PrivateLink) + dcs = append(dcs, dc) } return dcs } @@ -445,7 +445,7 @@ func (rs *KafkaSpec) areDCsEqual(b []*KafkaDataCentre) bool { for i := range b { if !a[i].DataCentre.IsEqual(b[i].DataCentre) || - !isKafkaAddonsEqual[KafkaPrivateLink](a[i].PrivateLink, b[i].PrivateLink) { + !isKafkaAddonsEqual[PrivateLink](a[i].PrivateLink, b[i].PrivateLink) { return false } } diff --git a/apis/clusters/v1beta1/kafka_webhook.go b/apis/clusters/v1beta1/kafka_webhook.go index 196fe3d9b..c34c870e5 100644 --- a/apis/clusters/v1beta1/kafka_webhook.go +++ b/apis/clusters/v1beta1/kafka_webhook.go @@ -120,6 +120,14 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object if ((dc.NodesNumber*k.Spec.ReplicationFactor)/k.Spec.ReplicationFactor)%k.Spec.ReplicationFactor != 0 { return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", k.Spec.ReplicationFactor) } + + if !k.Spec.PrivateNetworkCluster && dc.PrivateLink != nil { + return models.ErrPrivateLinkOnlyWithPrivateNetworkCluster + } + + if dc.CloudProvider != models.AWSVPC && dc.PrivateLink != nil { + return models.ErrPrivateLinkSupportedOnlyForAWS + } } if len(k.Spec.Kraft) > 1 { @@ -269,7 +277,7 @@ func validateZookeeperUpdate(new, old []*DedicatedZookeeper) bool { return true } -func isPrivateLinkValid(new, old []*KafkaPrivateLink) bool { +func isPrivateLinkValid(new, old []*PrivateLink) bool { if new == nil && old == nil { return true } diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 458aa70d0..6b0d07e80 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -980,11 +980,11 @@ func (in *KafkaDataCentre) DeepCopyInto(out *KafkaDataCentre) { in.DataCentre.DeepCopyInto(&out.DataCentre) if in.PrivateLink != nil { in, out := &in.PrivateLink, &out.PrivateLink - *out = make([]*KafkaPrivateLink, len(*in)) + *out = make([]*PrivateLink, len(*in)) for i := range *in { if (*in)[i] != nil { in, out := &(*in)[i], &(*out)[i] - *out = new(KafkaPrivateLink) + *out = new(PrivateLink) **out = **in } } @@ -1033,21 +1033,6 @@ func (in *KafkaList) DeepCopyObject() runtime.Object { return nil } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *KafkaPrivateLink) DeepCopyInto(out *KafkaPrivateLink) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaPrivateLink. -func (in *KafkaPrivateLink) DeepCopy() *KafkaPrivateLink { - if in == nil { - return nil - } - out := new(KafkaPrivateLink) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KafkaSpec) DeepCopyInto(out *KafkaSpec) { *out = *in diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index e2983ae31..2ab7a5f3f 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -79,6 +79,7 @@ spec: items: properties: advertisedHostname: + minLength: 3 type: string required: - advertisedHostname @@ -97,6 +98,7 @@ spec: - nodesNumber - region type: object + maxItems: 1 type: array dedicatedZookeeper: description: Provision additional dedicated nodes for Apache Zookeeper diff --git a/config/samples/clusters_v1beta1_cadence.yaml b/config/samples/clusters_v1beta1_cadence.yaml index 9baa23444..618be60c0 100644 --- a/config/samples/clusters_v1beta1_cadence.yaml +++ b/config/samples/clusters_v1beta1_cadence.yaml @@ -51,6 +51,8 @@ spec: nodeSize: "CAD-DEV-t3.small-5" nodesNumber: 2 clientEncryption: false +# privateLink: +# - advertisedHostname: bohdan-cadence-test.com slaTier: "NON_PRODUCTION" useCadenceWebAuth: false # targetPrimaryCadence: diff --git a/config/samples/clusters_v1beta1_cassandra.yaml b/config/samples/clusters_v1beta1_cassandra.yaml index 6d57d7936..0ef9fee3f 100644 --- a/config/samples/clusters_v1beta1_cassandra.yaml +++ b/config/samples/clusters_v1beta1_cassandra.yaml @@ -5,6 +5,7 @@ metadata: spec: name: "username-Cassandra" version: "4.0.10" + privateNetworkCluster: true dataCentres: - name: "AWS_cassandra" region: "US_EAST_1" @@ -18,6 +19,7 @@ spec: "tag": "testTag" clientToClusterEncryption: false nodeSize: "CAS-DEV-t4g.small-5" + # nodeSize: "CAS-PRD-m6g.large-120" # - name: "AWS_cassandra2" # region: "US_EAST_1" # cloudProvider: "AWS_VPC" diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index 4993fd89d..482ec3867 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -46,8 +46,8 @@ spec: tag: "oneTag" tag2: "twoTags" nodeSize: "KFK-DEV-t4g.small-5" - # nodeSize: "KFK-PRD-r6g.large-250" - # nodeSize: "KFK-DEV-t4g.medium-80" +# nodeSize: "KFK-PRD-r6g.large-250" +# nodeSize: "KFK-DEV-t4g.medium-80" network: "10.0.0.0/16" region: "US_EAST_1" # accountName: "Custrom" @@ -56,10 +56,10 @@ spec: # diskEncryptionKey: "123e4567-e89b-12d3-a456-426614174000" # resourceGroup: "asdfadfsdfas" # privateLink: -# - advertisedHostname: "asdfadsf" +# - advertisedHostname: "bohdan-kafka-test.com" # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" # nodesNumber: 3 - userRefs: +# userRefs: # - name: kafkauser-sample # namespace: default \ No newline at end of file diff --git a/pkg/models/kafka_apiv2.go b/pkg/models/kafka_apiv2.go index bfc9e9aa0..e91f9713d 100644 --- a/pkg/models/kafka_apiv2.go +++ b/pkg/models/kafka_apiv2.go @@ -44,11 +44,7 @@ type KafkaCluster struct { type KafkaDataCentre struct { DataCentre `json:",inline"` - PrivateLink []*KafkaPrivateLink `json:"privateLink,omitempty"` -} - -type KafkaPrivateLink struct { - AdvertisedHostname string `json:"advertisedHostname"` + PrivateLink []*PrivateLink `json:"privateLink,omitempty"` } type SchemaRegistry struct {