Skip to content

Commit

Permalink
Kafka cluster CRD spec was updated with Kraft field
Browse files Browse the repository at this point in the history
  • Loading branch information
tengu-alt committed Aug 22, 2023
1 parent 4f54849 commit 02d3399
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 3 deletions.
28 changes: 27 additions & 1 deletion apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// +kubebuilder:object:generate:=false
type KafkaAddons interface {
SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | KafkaPrivateLink
SchemaRegistry | RestProxy | KarapaceSchemaRegistry | KarapaceRestProxy | DedicatedZookeeper | KafkaPrivateLink | Kraft
}

type SchemaRegistry struct {
Expand Down Expand Up @@ -83,12 +83,17 @@ type KafkaSpec struct {
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
ClientAuthBrokerWithoutEncryption bool `json:"clientAuthBrokerWithoutEncryption,omitempty"`
ClientAuthBrokerWithEncryption bool `json:"clientAuthBrokerWithEncryption,omitempty"`
Kraft []*Kraft `json:"kraft,omitempty"`
KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"`
KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
UserRefs []*UserReference `json:"userRefs,omitempty"`
}

type Kraft struct {
ControllerNodeCount int `json:"controllerNodeCount"`
}

type KafkaDataCentre struct {
DataCentre `json:",inline"`
PrivateLink []*KafkaPrivateLink `json:"privateLink,omitempty"`
Expand Down Expand Up @@ -159,6 +164,7 @@ func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster {
ClientAuthBrokerWithoutEncryption: k.ClientAuthBrokerWithoutEncryption,
ClientAuthBrokerWithEncryption: k.ClientAuthBrokerWithEncryption,
BundledUseOnly: k.BundledUseOnly,
Kraft: k.kraftToInstAPI(),
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
}
Expand Down Expand Up @@ -210,6 +216,15 @@ func (k *KafkaSpec) karapaceRestProxyToInstAPI() (iRestProxies []*models.Karapac
return
}

func (k *KafkaSpec) kraftToInstAPI() (iKraft []*models.Kraft) {
for _, kraft := range k.Kraft {
iKraft = append(iKraft, &models.Kraft{
ControllerNodeCount: kraft.ControllerNodeCount,
})
}
return
}

func (k *KafkaSpec) dedicatedZookeeperToInstAPI() (iZookeepers []*models.DedicatedZookeeper) {
for _, zookeeper := range k.DedicatedZookeeper {
iZookeepers = append(iZookeepers, &models.DedicatedZookeeper{
Expand Down Expand Up @@ -296,6 +311,7 @@ func (ks *KafkaSpec) FromInstAPI(iKafka *models.KafkaCluster) KafkaSpec {
ClientAuthBrokerWithoutEncryption: iKafka.ClientAuthBrokerWithoutEncryption,
ClientAuthBrokerWithEncryption: iKafka.ClientAuthBrokerWithEncryption,
KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy),
Kraft: ks.kraftFromInstAPI(iKafka.Kraft),
KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry),
BundledUseOnly: iKafka.BundledUseOnly,
}
Expand Down Expand Up @@ -375,6 +391,15 @@ func (ks *KafkaSpec) KarapaceRestProxyFromInstAPI(iKRPs []*models.KarapaceRestPr
return
}

func (ks *KafkaSpec) kraftFromInstAPI(iKraft []*models.Kraft) (kraft []*Kraft) {
for _, ikraft := range iKraft {
kraft = append(kraft, &Kraft{
ControllerNodeCount: ikraft.ControllerNodeCount,
})
}
return
}

func (ks *KafkaSpec) KarapaceSchemaRegistryFromInstAPI(iKSRs []*models.KarapaceSchemaRegistry) (ksrs []*KarapaceSchemaRegistry) {
for _, iKSR := range iKSRs {
ksrs = append(ksrs, &KarapaceSchemaRegistry{
Expand Down Expand Up @@ -405,6 +430,7 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
isKafkaAddonsEqual[SchemaRegistry](a.SchemaRegistry, b.SchemaRegistry) &&
isKafkaAddonsEqual[RestProxy](a.RestProxy, b.RestProxy) &&
isKafkaAddonsEqual[KarapaceRestProxy](a.KarapaceRestProxy, b.KarapaceRestProxy) &&
isKafkaAddonsEqual[Kraft](a.Kraft, b.Kraft) &&
isKafkaAddonsEqual[KarapaceSchemaRegistry](a.KarapaceSchemaRegistry, b.KarapaceSchemaRegistry) &&
isKafkaAddonsEqual[DedicatedZookeeper](a.DedicatedZookeeper, b.DedicatedZookeeper) &&
a.areDCsEqual(b.DataCentres) &&
Expand Down
13 changes: 13 additions & 0 deletions apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ func (kv *kafkaValidator) ValidateCreate(ctx context.Context, obj runtime.Object
}
}

if len(k.Spec.Kraft) > 1 {
return models.ErrMoreThanOneKraft
}

for _, kraft := range k.Spec.Kraft {
if kraft.ControllerNodeCount > 3 {
return models.ErrMoreThanThreeControllerNodeCount
}
}

return nil
}

Expand Down Expand Up @@ -210,6 +220,9 @@ func (ks *KafkaSpec) validateUpdate(old *KafkaSpec) error {
if !isKafkaAddonsEqual[RestProxy](ks.RestProxy, old.RestProxy) {
return models.ErrImmutableRestProxy
}
if !isKafkaAddonsEqual[Kraft](ks.Kraft, old.Kraft) {
return models.ErrImmutableKraft
}
if !isKafkaAddonsEqual[KarapaceRestProxy](ks.KarapaceRestProxy, old.KarapaceRestProxy) {
return models.ErrImmutableKarapaceRestProxy
}
Expand Down
26 changes: 26 additions & 0 deletions apis/clusters/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/crd/bases/clusters.instaclustr.com_kafkas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ spec:
- version
type: object
type: array
kraft:
items:
properties:
controllerNodeCount:
type: integer
required:
- controllerNodeCount
type: object
type: array
name:
description: Name [ 3 .. 32 ] characters.
type: string
Expand Down
6 changes: 4 additions & 2 deletions config/samples/clusters_v1beta1_kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ spec:
# karapaceRestProxy:
# - integrateRestProxyWithSchemaRegistry: true
# version: "3.2.0"
# kraft:
# - controllerNodeCount: 3
# restProxy:
# - integrateRestProxyWithSchemaRegistry: false
# schemaRegistryPassword: "asdfasdf"
Expand Down Expand Up @@ -59,5 +61,5 @@ spec:
# - nodeSize: "KDZ-DEV-t4g.small-30"
# nodesNumber: 3
userRefs:
- name: kafkauser-sample
namespace: default
# - name: kafkauser-sample
# namespace: default
3 changes: 3 additions & 0 deletions pkg/instaclustr/mock/server/go/model_kafka_cluster_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type KafkaClusterV2 struct {
//
TwoFactorDelete []TwoFactorDeleteSettingsV2 `json:"twoFactorDelete,omitempty"`

//
Kraft []KafkaKraftV2 `json:"kraft,omitempty"`

// Adds the specified version of Kafka Karapace REST Proxy to this Kafka cluster.
KarapaceRestProxy []KafkaKarapaceRestProxyDetailsV2 `json:"karapaceRestProxy,omitempty"`

Expand Down
15 changes: 15 additions & 0 deletions pkg/instaclustr/mock/server/go/model_kafka_kraft_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Instaclustr Cluster Management API
*
* Instaclustr Cluster Management API
*
* API version: 2.0.0
* Generated by: OpenAPI Generator (https://openapi-generator.tech)
*/

package openapi

// KafkaKraftV2 -
type KafkaKraftV2 struct {
ControllerNodeCount int `json:"controllerNodeCount"`
}
3 changes: 3 additions & 0 deletions pkg/models/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import "errors"

var (
ErrZeroDataCentres = errors.New("cluster spec doesn't have data centres")
ErrMoreThanOneKraft = errors.New("cluster spec spec does not support more than one Kraft")
ErrMoreThanThreeControllerNodeCount = errors.New("Kraft does not support more than three controller nodes")
ErrNetworkOverlaps = errors.New("cluster network overlaps")
ErrImmutableTwoFactorDelete = errors.New("twoFactorDelete field is immutable")
ErrImmutableCloudProviderSettings = errors.New("cloudProviderSettings are immutable")
Expand All @@ -33,6 +35,7 @@ var (
ErrTypeAssertion = errors.New("unable to assert type")
ErrImmutableSchemaRegistry = errors.New("schema registry is immutable")
ErrImmutableRestProxy = errors.New("rest proxy is immutable")
ErrImmutableKraft = errors.New("Kraft is immutable")
ErrImmutableKarapaceSchemaRegistry = errors.New("karapace schema registry is immutable")
ErrImmutableKarapaceRestProxy = errors.New("karapace rest proxy is immutable")
ErrImmutableDedicatedZookeeper = errors.New("dedicated zookeeper nodes cannot be changed")
Expand Down
5 changes: 5 additions & 0 deletions pkg/models/kafka_apiv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type KafkaCluster struct {
DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper"`
DefaultNumberOfPartitions int `json:"defaultNumberOfPartitions"`
DefaultReplicationFactor int `json:"defaultReplicationFactor"`
Kraft []*Kraft `json:"kraft"`
KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy"`
KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry"`
PCIComplianceMode bool `json:"pciComplianceMode"`
Expand Down Expand Up @@ -68,6 +69,10 @@ type KarapaceRestProxy struct {
Version string `json:"version"`
}

type Kraft struct {
ControllerNodeCount int `json:"controllerNodeCount"`
}

type KarapaceSchemaRegistry struct {
Version string `json:"version"`
}
Expand Down

0 comments on commit 02d3399

Please sign in to comment.