Skip to content

Commit

Permalink
deprecated auth fields were removed & Mtls cerctificate creation was …
Browse files Browse the repository at this point in the history
…implemented
  • Loading branch information
tengu-alt committed Sep 1, 2023
1 parent 05bd37a commit 882c732
Show file tree
Hide file tree
Showing 19 changed files with 798 additions and 80 deletions.
1 change: 1 addition & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ resources:
version: v1beta1
webhooks:
defaulting: true
validation: true
webhookVersion: v1
- api:
crdVersion: v1
Expand Down
84 changes: 38 additions & 46 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,13 @@ type KafkaSpec struct {

// Provision additional dedicated nodes for Apache Zookeeper to run on.
// Zookeeper nodes will be co-located with Kafka if this is not provided
DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"`
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
ClientAuthBrokerWithoutEncryption bool `json:"clientAuthBrokerWithoutEncryption,omitempty"`
ClientAuthBrokerWithEncryption bool `json:"clientAuthBrokerWithEncryption,omitempty"`
DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"`
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"`
KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
UserRefs []*UserReference `json:"userRefs,omitempty"`
Kraft []*Kraft `json:"kraft,omitempty"`
KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"`
KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
UserRefs []*UserReference `json:"userRefs,omitempty"`
}

type Kraft struct {
Expand Down Expand Up @@ -145,28 +143,26 @@ func (k *Kafka) NewPatch() client.Patch {

func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster {
return &models.KafkaCluster{
SchemaRegistry: k.schemaRegistryToInstAPI(),
RestProxy: k.restProxyToInstAPI(),
PCIComplianceMode: k.PCICompliance,
DefaultReplicationFactor: k.ReplicationFactor,
DefaultNumberOfPartitions: k.PartitionsNumber,
TwoFactorDelete: k.TwoFactorDeletesToInstAPI(),
AllowDeleteTopics: k.AllowDeleteTopics,
AutoCreateTopics: k.AutoCreateTopics,
ClientToClusterEncryption: k.ClientToClusterEncryption,
DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(),
PrivateNetworkCluster: k.PrivateNetworkCluster,
Name: k.Name,
SLATier: k.SLATier,
KafkaVersion: k.Version,
DataCentres: k.dcToInstAPI(),
ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS,
ClientAuthBrokerWithoutEncryption: k.ClientAuthBrokerWithoutEncryption,
ClientAuthBrokerWithEncryption: k.ClientAuthBrokerWithEncryption,
BundledUseOnly: k.BundledUseOnly,
SchemaRegistry: k.schemaRegistryToInstAPI(),
RestProxy: k.restProxyToInstAPI(),
PCIComplianceMode: k.PCICompliance,
DefaultReplicationFactor: k.ReplicationFactor,
DefaultNumberOfPartitions: k.PartitionsNumber,
TwoFactorDelete: k.TwoFactorDeletesToInstAPI(),
AllowDeleteTopics: k.AllowDeleteTopics,
AutoCreateTopics: k.AutoCreateTopics,
ClientToClusterEncryption: k.ClientToClusterEncryption,
DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(),
PrivateNetworkCluster: k.PrivateNetworkCluster,
Name: k.Name,
SLATier: k.SLATier,
KafkaVersion: k.Version,
DataCentres: k.dcToInstAPI(),
ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS,
BundledUseOnly: k.BundledUseOnly,
Kraft: k.kraftToInstAPI(),
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
}
}

Expand Down Expand Up @@ -298,22 +294,20 @@ func (ks *KafkaSpec) FromInstAPI(iKafka *models.KafkaCluster) KafkaSpec {
SLATier: iKafka.SLATier,
TwoFactorDelete: ks.Cluster.TwoFactorDeleteFromInstAPI(iKafka.TwoFactorDelete),
},
SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry),
ReplicationFactor: iKafka.DefaultReplicationFactor,
PartitionsNumber: iKafka.DefaultNumberOfPartitions,
RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy),
AllowDeleteTopics: iKafka.AllowDeleteTopics,
AutoCreateTopics: iKafka.AutoCreateTopics,
ClientToClusterEncryption: iKafka.ClientToClusterEncryption,
DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres),
DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper),
ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls,
ClientAuthBrokerWithoutEncryption: iKafka.ClientAuthBrokerWithoutEncryption,
ClientAuthBrokerWithEncryption: iKafka.ClientAuthBrokerWithEncryption,
KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy),
SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry),
ReplicationFactor: iKafka.DefaultReplicationFactor,
PartitionsNumber: iKafka.DefaultNumberOfPartitions,
RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy),
AllowDeleteTopics: iKafka.AllowDeleteTopics,
AutoCreateTopics: iKafka.AutoCreateTopics,
ClientToClusterEncryption: iKafka.ClientToClusterEncryption,
DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres),
DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper),
ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls,
KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy),
Kraft: ks.kraftFromInstAPI(iKafka.Kraft),
KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry),
BundledUseOnly: iKafka.BundledUseOnly,
KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry),
BundledUseOnly: iKafka.BundledUseOnly,
}
}

Expand Down Expand Up @@ -424,8 +418,6 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
a.AutoCreateTopics == b.AutoCreateTopics &&
a.ClientToClusterEncryption == b.ClientToClusterEncryption &&
a.ClientBrokerAuthWithMTLS == b.ClientBrokerAuthWithMTLS &&
a.ClientAuthBrokerWithoutEncryption == b.ClientAuthBrokerWithoutEncryption &&
a.ClientAuthBrokerWithEncryption == b.ClientAuthBrokerWithEncryption &&
a.BundledUseOnly == b.BundledUseOnly &&
isKafkaAddonsEqual[SchemaRegistry](a.SchemaRegistry, b.SchemaRegistry) &&
isKafkaAddonsEqual[RestProxy](a.RestProxy, b.RestProxy) &&
Expand Down
36 changes: 16 additions & 20 deletions apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,31 +331,27 @@ type immutableKafkaFields struct {
}

type specificKafkaFields struct {
replicationFactor int
partitionsNumber int
allowDeleteTopics bool
autoCreateTopics bool
clientToClusterEncryption bool
bundledUseOnly bool
privateNetworkCluster bool
clientAuthBrokerWithEncryption bool
clientAuthBrokerWithoutEncryption bool
clientBrokerAuthWithMtls bool
replicationFactor int
partitionsNumber int
allowDeleteTopics bool
autoCreateTopics bool
clientToClusterEncryption bool
bundledUseOnly bool
privateNetworkCluster bool
clientBrokerAuthWithMtls bool
}

func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields {
return &immutableKafkaFields{
specificFields: specificKafkaFields{
replicationFactor: ks.ReplicationFactor,
partitionsNumber: ks.PartitionsNumber,
allowDeleteTopics: ks.AllowDeleteTopics,
autoCreateTopics: ks.AutoCreateTopics,
clientToClusterEncryption: ks.ClientToClusterEncryption,
bundledUseOnly: ks.BundledUseOnly,
privateNetworkCluster: ks.PrivateNetworkCluster,
clientAuthBrokerWithEncryption: ks.ClientAuthBrokerWithEncryption,
clientAuthBrokerWithoutEncryption: ks.ClientAuthBrokerWithoutEncryption,
clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS,
replicationFactor: ks.ReplicationFactor,
partitionsNumber: ks.PartitionsNumber,
allowDeleteTopics: ks.AllowDeleteTopics,
autoCreateTopics: ks.AutoCreateTopics,
clientToClusterEncryption: ks.ClientToClusterEncryption,
bundledUseOnly: ks.BundledUseOnly,
privateNetworkCluster: ks.PrivateNetworkCluster,
clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS,
},
cluster: ks.Cluster.newImmutableFields(),
}
Expand Down
51 changes: 48 additions & 3 deletions apis/kafkamanagement/v1beta1/kafkauser_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1

import (
k8sCore "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -26,9 +27,10 @@ import (

// KafkaUserSpec defines the desired state of KafkaUser
type KafkaUserSpec struct {
Options *KafkaUserOptions `json:"options"`
SecretRef *v1beta1.SecretReference `json:"secretRef"`
InitialPermissions string `json:"initialPermissions"`
Options *KafkaUserOptions `json:"options"`
SecretRef *v1beta1.SecretReference `json:"secretRef"`
CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"`
InitialPermissions string `json:"initialPermissions"`
}

type KafkaUserOptions struct {
Expand All @@ -41,6 +43,25 @@ type KafkaUserStatus struct {
ClustersEvents map[string]string `json:"clustersEvents,omitempty"`
}

type Certificate struct {
ID string `json:"id,omitempty"`
ExpiryDate string `json:"expiryDate,omitempty"`
SignedCertificate string `json:"signedCertificate,omitempty"`
}

type CertificateRequest struct {
SecretName string `json:"secretName"`
SecretNamespace string `json:"secretNamespace"`
ClusterID string `json:"clusterId"`
CSR string `json:"csr,omitempty"`
ValidPeriod int `json:"validPeriod"`
CommonName string `json:"commonName,omitempty"`
Country string `json:"country,omitempty"`
Organization string `json:"organization,omitempty"`
OrganizationalUnit string `json:"organizationalUnit,omitempty"`
AutoRenew bool `json:"autoRenew"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

Expand Down Expand Up @@ -79,6 +100,21 @@ func (ku *KafkaUser) GetID(clusterID, name string) string {
return clusterID + "_" + name
}

func (ku *KafkaUser) NewCertificateSecret(name, namespace string) *k8sCore.Secret {
return &k8sCore.Secret{
TypeMeta: metav1.TypeMeta{
Kind: models.SecretKind,
APIVersion: models.K8sAPIVersionV1,
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},

StringData: map[string]string{},
}
}

func init() {
SchemeBuilder.Register(&KafkaUser{}, &KafkaUserList{})
}
Expand All @@ -100,3 +136,12 @@ func (ko *KafkaUserOptions) ToInstAPI() *models.KafkaUserOptions {
}

}

func (cr *CertificateRequest) ToInstAPI(username string) *models.CertificateRequest {
return &models.CertificateRequest{
ClusterID: cr.ClusterID,
CSR: cr.CSR,
KafkaUsername: username,
ValidPeriod: cr.ValidPeriod,
}
}
40 changes: 40 additions & 0 deletions apis/kafkamanagement/v1beta1/kafkauser_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1

import (
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -47,3 +48,42 @@ func (r *KafkaUser) Default() {
})
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-kafkamanagement-instaclustr-com-v1beta1-kafkauser,mutating=false,failurePolicy=fail,sideEffects=None,groups=kafkamanagement.instaclustr.com,resources=kafkausers,verbs=create;update,versions=v1beta1,name=vkafkauser.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &KafkaUser{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *KafkaUser) ValidateCreate() error {
kafkauserlog.Info("validate create", "name", r.Name)

if len(r.Spec.CertificateRequests) != 0 {
return models.ErrNotEmptyCSRs
}

return nil
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *KafkaUser) ValidateUpdate(old runtime.Object) error {
kafkauserlog.Info("validate update", "name", r.Name)

for _, request := range r.Spec.CertificateRequests {
if request.CSR == "" {
if request.Organization == "" || request.OrganizationalUnit == "" || request.Country == "" || request.CommonName == "" {
return models.ErrEmptyCertGeneratingFields
}
}
}

return nil
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *KafkaUser) ValidateDelete() error {
kafkauserlog.Info("validate delete", "name", r.Name)

// TODO(user): fill in your validation logic upon object deletion.
return nil
}
41 changes: 41 additions & 0 deletions apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions config/crd/bases/clusters.instaclustr.com_kafkas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ spec:
type: boolean
bundledUseOnly:
type: boolean
clientAuthBrokerWithEncryption:
type: boolean
clientAuthBrokerWithoutEncryption:
type: boolean
clientBrokerAuthWithMtls:
type: boolean
clientToClusterEncryption:
Expand Down
Loading

0 comments on commit 882c732

Please sign in to comment.