Skip to content

Commit

Permalink
KafkaConnect code base refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Feb 19, 2024
1 parent 6987800 commit 49d0b34
Show file tree
Hide file tree
Showing 38 changed files with 661 additions and 554 deletions.
6 changes: 3 additions & 3 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@
"filename": "apis/clusters/v1beta1/cassandra_types.go",
"hashed_secret": "331cc743251c3b9504229de4d139c539da121a33",
"is_verified": false,
"line_number": 261
"line_number": 238
},
{
"type": "Secret Keyword",
"filename": "apis/clusters/v1beta1/cassandra_types.go",
"hashed_secret": "0ad8d7005e084d4f028a4277b73c6fab24269c17",
"is_verified": false,
"line_number": 347
"line_number": 324
},
{
"type": "Secret Keyword",
Expand Down Expand Up @@ -1146,5 +1146,5 @@
}
]
},
"generated_at": "2024-02-19T13:01:03Z"
"generated_at": "2024-02-19T13:43:58Z"
}
59 changes: 30 additions & 29 deletions apis/clusters/v1beta1/cassandra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type CassandraSpec struct {
LuceneEnabled bool `json:"luceneEnabled,omitempty"`
PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
PCICompliance bool `json:"pciCompliance,omitempty"`
UserRefs References `json:"userRefs,omitempty"`
ResizeSettings GenericResizeSettings `json:"resizeSettings,omitempty"`
}
Expand Down Expand Up @@ -147,31 +148,7 @@ type CassandraDataCentreStatus struct {

func (s *CassandraDataCentreStatus) Equals(o *CassandraDataCentreStatus) bool {
return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) &&
s.nodesEqual(o.Nodes)
}

func (s *CassandraDataCentreStatus) nodesEqual(nodes []*Node) bool {
if len(s.Nodes) != len(nodes) {
return false
}

sNodes := map[string]*Node{}
for _, node := range s.Nodes {
sNodes[node.ID] = node
}

for _, node := range nodes {
sNode, ok := sNodes[node.ID]
if !ok {
return false
}

if !sNode.Equals(node) {
return false
}
}

return true
nodesEqual(s.Nodes, o.Nodes)
}

func (s *CassandraDataCentreStatus) FromInstAPI(instModel *models.CassandraDataCentre) {
Expand Down Expand Up @@ -347,18 +324,36 @@ func (cs *CassandraSpec) FromInstAPI(instModel *models.CassandraCluster) {
cs.PasswordAndUserAuth = instModel.PasswordAndUserAuth
cs.BundledUseOnly = instModel.BundledUseOnly
cs.Version = instModel.CassandraVersion
cs.PCICompliance = instModel.PCIComplianceMode
cs.ResizeSettings.FromInstAPI(instModel.ResizeSettings)

cs.dcsFromInstAPI(instModel.DataCentres)
}

func (cs *CassandraSpec) dcsFromInstAPI(instModels []*models.CassandraDataCentre) {
cs.DataCentres = make([]*CassandraDataCentre, len(instModels))
dcs := make([]*CassandraDataCentre, len(instModels))
for i, instModel := range instModels {
dc := &CassandraDataCentre{}
dcs[i] = dc

if index := cs.getDC(instModel.Name); index > -1 {
dc.Debezium = cs.DataCentres[index].Debezium
}

dc.FromInstAPI(instModel)
cs.DataCentres[i] = dc
}

cs.DataCentres = dcs
}

func (c *CassandraSpec) getDC(name string) int {
for i, dc := range c.DataCentres {
if dc.Name == name {
return i
}
}

return -1
}

func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre) {
Expand All @@ -377,15 +372,20 @@ func (d *CassandraDataCentre) FromInstAPI(instModel *models.CassandraDataCentre)
}

func (cs *CassandraDataCentre) debeziumFromInstAPI(instModels []*models.Debezium) {
cs.Debezium = make([]*DebeziumCassandraSpec, len(instModels))
debezium := make([]*DebeziumCassandraSpec, len(instModels))
for i, instModel := range instModels {
cs.Debezium[i] = &DebeziumCassandraSpec{
debezium[i] = &DebeziumCassandraSpec{
KafkaVPCType: instModel.KafkaVPCType,
KafkaTopicPrefix: instModel.KafkaTopicPrefix,
KafkaDataCentreID: instModel.KafkaDataCentreID,
Version: instModel.Version,
}

if len(cs.Debezium)-1 >= i {
debezium[i].ClusterRef = cs.Debezium[i].ClusterRef
}
}
cs.Debezium = debezium
}

func (cs *CassandraDataCentre) shotoverProxyFromInstAPI(instModels []*models.ShotoverProxy) {
Expand All @@ -411,6 +411,7 @@ func (cs *CassandraSpec) ToInstAPI() *models.CassandraCluster {
LuceneEnabled: cs.LuceneEnabled,
PasswordAndUserAuth: cs.PasswordAndUserAuth,
BundledUseOnly: cs.BundledUseOnly,
PCIComplianceMode: cs.PCICompliance,
DataCentres: cs.DCsToInstAPI(),
ResizeSettings: cs.ResizeSettings.ToInstAPI(),
}
Expand Down
2 changes: 2 additions & 0 deletions apis/clusters/v1beta1/cassandra_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ type immutableCassandraFields struct {
type specificCassandra struct {
LuceneEnabled bool
PasswordAndUserAuth bool
PCICompliance bool
}

type immutableCassandraDCFields struct {
Expand All @@ -233,6 +234,7 @@ func (cs *CassandraSpec) newImmutableFields() *immutableCassandraFields {
specificCassandra: specificCassandra{
LuceneEnabled: cs.LuceneEnabled,
PasswordAndUserAuth: cs.PasswordAndUserAuth,
PCICompliance: cs.PCICompliance,
},
immutableCluster: cs.GenericClusterSpec.immutableFields(),
}
Expand Down
7 changes: 0 additions & 7 deletions apis/clusters/v1beta1/generic_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ type GenericClusterSpec struct {

Version string `json:"version,omitempty"`

// The PCI compliance standards relate to the security of user data and transactional information.
// Can only be applied clusters provisioned on AWS_VPC, running Cassandra, Kafka, Elasticsearch and Redis.
PCICompliance bool `json:"pciCompliance,omitempty"`

PrivateNetwork bool `json:"privateNetwork,omitempty"`

// Non-production clusters may receive lower priority support and reduced SLAs.
Expand All @@ -30,7 +26,6 @@ type GenericClusterSpec struct {
func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool {
return s.Name == o.Name &&
s.Version == o.Version &&
s.PCICompliance == o.PCICompliance &&
s.PrivateNetwork == o.PrivateNetwork &&
s.SLATier == o.SLATier &&
s.Description == o.Description &&
Expand All @@ -39,7 +34,6 @@ func (s *GenericClusterSpec) Equals(o *GenericClusterSpec) bool {

func (s *GenericClusterSpec) FromInstAPI(model *models.GenericClusterFields) {
s.Name = model.Name
s.PCICompliance = model.PCIComplianceMode
s.PrivateNetwork = model.PrivateNetworkCluster
s.SLATier = model.SLATier
s.Description = model.Description
Expand All @@ -60,7 +54,6 @@ func (s *GenericClusterSpec) ToInstAPI() models.GenericClusterFields {
return models.GenericClusterFields{
Name: s.Name,
Description: s.Description,
PCIComplianceMode: s.PCICompliance,
PrivateNetworkCluster: s.PrivateNetwork,
SLATier: s.SLATier,
TwoFactorDelete: s.TwoFactorDeleteToInstAPI(),
Expand Down
40 changes: 5 additions & 35 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type KafkaSpec struct {
ClientToClusterEncryption bool `json:"clientToClusterEncryption"`
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
PCICompliance bool `json:"pciCompliance,omitempty"`
UserRefs References `json:"userRefs,omitempty"`

// Provision additional dedicated nodes for Apache Zookeeper to run on.
Expand Down Expand Up @@ -133,7 +134,7 @@ type KafkaDataCentreStatus struct {

func (s *KafkaDataCentreStatus) Equals(o *KafkaDataCentreStatus) bool {
return s.GenericDataCentreStatus.Equals(&o.GenericDataCentreStatus) &&
s.nodesEqual(o.Nodes) &&
nodesEqual(s.Nodes, o.Nodes) &&
slices.EqualsPtr(s.PrivateLink, o.PrivateLink)
}

Expand Down Expand Up @@ -178,30 +179,6 @@ func (s *KafkaStatus) ToOnPremises() ClusterStatus {
}
}

func (s *KafkaDataCentreStatus) nodesEqual(nodes []*Node) bool {
if len(s.Nodes) != len(nodes) {
return false
}

sNodes := map[string]*Node{}
for _, node := range s.Nodes {
sNodes[node.ID] = node
}

for _, node := range nodes {
sNode, ok := sNodes[node.ID]
if !ok {
return false
}

if !sNode.Equals(node) {
return false
}
}

return true
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
Expand Down Expand Up @@ -260,6 +237,7 @@ func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster {
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
ResizeSettings: k.ResizeSettings.ToInstAPI(),
PCIComplianceMode: k.PCICompliance,
}
}

Expand Down Expand Up @@ -386,6 +364,7 @@ func (ks *KafkaSpec) FromInstAPI(instaModel *models.KafkaCluster) {
ks.ClientBrokerAuthWithMTLS = instaModel.ClientBrokerAuthWithMtls
ks.BundledUseOnly = instaModel.BundledUseOnly
ks.Version = instaModel.KafkaVersion
ks.PCICompliance = instaModel.PCIComplianceMode

ks.DCsFromInstAPI(instaModel.DataCentres)
ks.kraftFromInstAPI(instaModel.Kraft)
Expand Down Expand Up @@ -491,17 +470,8 @@ func (ks *KafkaStatus) DCsFromInstAPI(instaModels []*models.KafkaDataCentre) {

func (s *KafkaDataCentreStatus) FromInstAPI(instaModel *models.KafkaDataCentre) {
s.GenericDataCentreStatus.FromInstAPI(&instaModel.GenericDataCentreFields)
s.nodesFromInstAPI(instaModel.Nodes)
s.PrivateLink.FromInstAPI(instaModel.PrivateLink)
}

func (s *KafkaDataCentreStatus) nodesFromInstAPI(instaModels []*models.Node) {
s.Nodes = make([]*Node, len(instaModels))
for i, instaModel := range instaModels {
node := Node{}
node.FromInstAPI(instaModel)
s.Nodes[i] = &node
}
s.Nodes = nodesFromInstAPI(instaModel.Nodes)
}

func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
Expand Down
2 changes: 2 additions & 0 deletions apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ type specificKafkaFields struct {
bundledUseOnly bool
privateNetworkCluster bool
clientBrokerAuthWithMtls bool
pciCompliance bool
}

func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields {
Expand All @@ -385,6 +386,7 @@ func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields {
bundledUseOnly: ks.BundledUseOnly,
privateNetworkCluster: ks.PrivateNetwork,
clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS,
pciCompliance: ks.PCICompliance,
},
cluster: ks.GenericClusterSpec.immutableFields(),
}
Expand Down
Loading

0 comments on commit 49d0b34

Please sign in to comment.