Skip to content

Commit

Permalink
kafka and cadence updates
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Aug 31, 2023
1 parent 05bd37a commit 5bc76aa
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 47 deletions.
8 changes: 6 additions & 2 deletions apis/clusters/v1beta1/cadence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ type BundledOpenSearchSpec struct {

// CadenceSpec defines the desired state of Cadence
type CadenceSpec struct {
Cluster `json:",inline"`
Cluster `json:",inline"`
//+kubebuilder:validation:MinItems:=1
//+kubebuilder:validation:MaxItems:=1
DataCentres []*CadenceDataCentre `json:"dataCentres"`
Description string `json:"description,omitempty"`
UseCadenceWebAuth bool `json:"useCadenceWebAuth"`
Expand Down Expand Up @@ -423,7 +425,9 @@ func (cs *CadenceStatus) SecondaryTargetsFromInstAPI(iCad *models.CadenceCluster

func (cs *CadenceStatus) DCsFromInstAPI(iDCs []*models.CadenceDataCentre) (dcs []*DataCentreStatus) {
for _, iDC := range iDCs {
dcs = append(dcs, cs.ClusterStatus.DCFromInstAPI(iDC.DataCentre))
dc := cs.ClusterStatus.DCFromInstAPI(iDC.DataCentre)
dc.PrivateLink = privateLinkStatusesFromInstAPI(iDC.PrivateLink)
dcs = append(dcs, dc)
}

return
Expand Down
8 changes: 8 additions & 0 deletions apis/clusters/v1beta1/cadence_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,14 @@ func (cv *cadenceValidator) ValidateCreate(ctx context.Context, obj runtime.Obje
if err != nil {
return err
}

if !c.Spec.PrivateNetworkCluster && dc.PrivateLink != nil {
return models.ErrPrivateLinkOnlyWithPrivateNetworkCluster
}

if dc.CloudProvider != models.AWSVPC && dc.PrivateLink != nil {
return models.ErrPrivateLinkSupportedOnlyForAWS
}
}

return nil
Expand Down
36 changes: 18 additions & 18 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// +kubebuilder:object:generate:=false
type KafkaAddons interface {
SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | KafkaPrivateLink | Kraft
SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | PrivateLink | Kraft
}

type SchemaRegistry struct {
Expand Down Expand Up @@ -70,12 +70,14 @@ type KafkaSpec struct {
ReplicationFactor int `json:"replicationFactor"`

// PartitionsNumber number of partitions to use when created new topics.
PartitionsNumber int `json:"partitionsNumber"`
RestProxy []*RestProxy `json:"restProxy,omitempty"`
AllowDeleteTopics bool `json:"allowDeleteTopics"`
AutoCreateTopics bool `json:"autoCreateTopics"`
ClientToClusterEncryption bool `json:"clientToClusterEncryption"`
DataCentres []*KafkaDataCentre `json:"dataCentres"`
PartitionsNumber int `json:"partitionsNumber"`
RestProxy []*RestProxy `json:"restProxy,omitempty"`
AllowDeleteTopics bool `json:"allowDeleteTopics"`
AutoCreateTopics bool `json:"autoCreateTopics"`
ClientToClusterEncryption bool `json:"clientToClusterEncryption"`
//+kubebuilder:validation:MinItems:=1
//+kubebuilder:validation:MaxItems:=1
DataCentres []*KafkaDataCentre `json:"dataCentres"`

// Provision additional dedicated nodes for Apache Zookeeper to run on.
// Zookeeper nodes will be co-located with Kafka if this is not provided
Expand All @@ -96,11 +98,7 @@ type Kraft struct {

type KafkaDataCentre struct {
DataCentre `json:",inline"`
PrivateLink []*KafkaPrivateLink `json:"privateLink,omitempty"`
}

type KafkaPrivateLink struct {
AdvertisedHostname string `json:"advertisedHostname"`
PrivateLink []*PrivateLink `json:"privateLink,omitempty"`
}

// KafkaStatus defines the observed state of Kafka
Expand Down Expand Up @@ -236,9 +234,9 @@ func (k *KafkaSpec) dedicatedZookeeperToInstAPI() (iZookeepers []*models.Dedicat
return
}

func (k *KafkaDataCentre) privateLinkToInstAPI() (iPrivateLink []*models.KafkaPrivateLink) {
func (k *KafkaDataCentre) privateLinkToInstAPI() (iPrivateLink []*models.PrivateLink) {
for _, link := range k.PrivateLink {
iPrivateLink = append(iPrivateLink, &models.KafkaPrivateLink{
iPrivateLink = append(iPrivateLink, &models.PrivateLink{
AdvertisedHostname: link.AdvertisedHostname,
})
}
Expand Down Expand Up @@ -339,9 +337,9 @@ func (ks *KafkaSpec) DCsFromInstAPI(iDCs []*models.KafkaDataCentre) (dcs []*Kafk
return
}

func (ks *KafkaSpec) PrivateLinkFromInstAPI(iPLs []*models.KafkaPrivateLink) (dcs []*KafkaPrivateLink) {
func (ks *KafkaSpec) PrivateLinkFromInstAPI(iPLs []*models.PrivateLink) (pls []*PrivateLink) {
for _, iPL := range iPLs {
dcs = append(dcs, &KafkaPrivateLink{
pls = append(pls, &PrivateLink{
AdvertisedHostname: iPL.AdvertisedHostname,
})
}
Expand Down Expand Up @@ -411,7 +409,9 @@ func (ks *KafkaSpec) KarapaceSchemaRegistryFromInstAPI(iKSRs []*models.KarapaceS

func (ks *KafkaStatus) DCsFromInstAPI(iDCs []*models.KafkaDataCentre) (dcs []*DataCentreStatus) {
for _, iDC := range iDCs {
dcs = append(dcs, ks.ClusterStatus.DCFromInstAPI(iDC.DataCentre))
dc := ks.DCFromInstAPI(iDC.DataCentre)
dc.PrivateLink = privateLinkStatusesFromInstAPI(iDC.PrivateLink)
dcs = append(dcs, dc)
}
return dcs
}
Expand Down Expand Up @@ -445,7 +445,7 @@ func (rs *KafkaSpec) areDCsEqual(b []*KafkaDataCentre) bool {

for i := range b {
if !a[i].DataCentre.IsEqual(b[i].DataCentre) ||
!isKafkaAddonsEqual[KafkaPrivateLink](a[i].PrivateLink, b[i].PrivateLink) {
!isKafkaAddonsEqual[PrivateLink](a[i].PrivateLink, b[i].PrivateLink) {
return false
}
}
Expand Down
10 changes: 9 additions & 1 deletion apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object
if ((dc.NodesNumber*k.Spec.ReplicationFactor)/k.Spec.ReplicationFactor)%k.Spec.ReplicationFactor != 0 {
return fmt.Errorf("number of nodes must be a multiple of replication factor: %v", k.Spec.ReplicationFactor)
}

if !k.Spec.PrivateNetworkCluster && dc.PrivateLink != nil {
return models.ErrPrivateLinkOnlyWithPrivateNetworkCluster
}

if dc.CloudProvider != models.AWSVPC && dc.PrivateLink != nil {
return models.ErrPrivateLinkSupportedOnlyForAWS
}
}

if len(k.Spec.Kraft) > 1 {
Expand Down Expand Up @@ -269,7 +277,7 @@ func validateZookeeperUpdate(new, old []*DedicatedZookeeper) bool {
return true
}

func isPrivateLinkValid(new, old []*KafkaPrivateLink) bool {
func isPrivateLinkValid(new, old []*PrivateLink) bool {
if new == nil && old == nil {
return true
}
Expand Down
19 changes: 2 additions & 17 deletions apis/clusters/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/crd/bases/clusters.instaclustr.com_kafkas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ spec:
items:
properties:
advertisedHostname:
minLength: 3
type: string
required:
- advertisedHostname
Expand All @@ -97,6 +98,7 @@ spec:
- nodesNumber
- region
type: object
maxItems: 1
type: array
dedicatedZookeeper:
description: Provision additional dedicated nodes for Apache Zookeeper
Expand Down
2 changes: 2 additions & 0 deletions config/samples/clusters_v1beta1_cadence.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ spec:
nodeSize: "CAD-DEV-t3.small-5"
nodesNumber: 2
clientEncryption: false
# privateLink:
# - advertisedHostname: bohdan-cadence-test.com
slaTier: "NON_PRODUCTION"
useCadenceWebAuth: false
# targetPrimaryCadence:
Expand Down
2 changes: 2 additions & 0 deletions config/samples/clusters_v1beta1_cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ metadata:
spec:
name: "username-Cassandra"
version: "4.0.10"
privateNetworkCluster: true
dataCentres:
- name: "AWS_cassandra"
region: "US_EAST_1"
Expand All @@ -18,6 +19,7 @@ spec:
"tag": "testTag"
clientToClusterEncryption: false
nodeSize: "CAS-DEV-t4g.small-5"
# nodeSize: "CAS-PRD-m6g.large-120"
# - name: "AWS_cassandra2"
# region: "US_EAST_1"
# cloudProvider: "AWS_VPC"
Expand Down
8 changes: 4 additions & 4 deletions config/samples/clusters_v1beta1_kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ spec:
tag: "oneTag"
tag2: "twoTags"
nodeSize: "KFK-DEV-t4g.small-5"
# nodeSize: "KFK-PRD-r6g.large-250"
# nodeSize: "KFK-DEV-t4g.medium-80"
# nodeSize: "KFK-PRD-r6g.large-250"
# nodeSize: "KFK-DEV-t4g.medium-80"
network: "10.0.0.0/16"
region: "US_EAST_1"
# accountName: "Custrom"
Expand All @@ -56,10 +56,10 @@ spec:
# diskEncryptionKey: "123e4567-e89b-12d3-a456-426614174000"
# resourceGroup: "asdfadfsdfas"
# privateLink:
# - advertisedHostname: "asdfadsf"
# - advertisedHostname: "bohdan-kafka-test.com"
# dedicatedZookeeper:
# - nodeSize: "KDZ-DEV-t4g.small-30"
# nodesNumber: 3
userRefs:
# userRefs:
# - name: kafkauser-sample
# namespace: default
6 changes: 1 addition & 5 deletions pkg/models/kafka_apiv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ type KafkaCluster struct {

type KafkaDataCentre struct {
DataCentre `json:",inline"`
PrivateLink []*KafkaPrivateLink `json:"privateLink,omitempty"`
}

type KafkaPrivateLink struct {
AdvertisedHostname string `json:"advertisedHostname"`
PrivateLink []*PrivateLink `json:"privateLink,omitempty"`
}

type SchemaRegistry struct {
Expand Down

0 comments on commit 5bc76aa

Please sign in to comment.