Skip to content

Commit

Permalink
deprecated auth fields were removed & Mtls cerctificate creation was …
Browse files Browse the repository at this point in the history
…implemented
  • Loading branch information
tengu-alt committed Aug 23, 2023
1 parent 4f54849 commit 400197d
Show file tree
Hide file tree
Showing 20 changed files with 808 additions and 86 deletions.
1 change: 1 addition & 0 deletions PROJECT
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ resources:
version: v1beta1
webhooks:
defaulting: true
validation: true
webhookVersion: v1
- api:
crdVersion: v1
Expand Down
84 changes: 38 additions & 46 deletions apis/clusters/v1beta1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,12 @@ type KafkaSpec struct {

// Provision additional dedicated nodes for Apache Zookeeper to run on.
// Zookeeper nodes will be co-located with Kafka if this is not provided
DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"`
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
ClientAuthBrokerWithoutEncryption bool `json:"clientAuthBrokerWithoutEncryption,omitempty"`
ClientAuthBrokerWithEncryption bool `json:"clientAuthBrokerWithEncryption,omitempty"`
KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"`
KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
UserRefs []*UserReference `json:"userRefs,omitempty"`
DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"`
ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"`
KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"`
KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"`
BundledUseOnly bool `json:"bundledUseOnly,omitempty"`
UserRefs []*UserReference `json:"userRefs,omitempty"`
}

type KafkaDataCentre struct {
Expand Down Expand Up @@ -140,27 +138,25 @@ func (k *Kafka) NewPatch() client.Patch {

func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster {
return &models.KafkaCluster{
SchemaRegistry: k.schemaRegistryToInstAPI(),
RestProxy: k.restProxyToInstAPI(),
PCIComplianceMode: k.PCICompliance,
DefaultReplicationFactor: k.ReplicationFactor,
DefaultNumberOfPartitions: k.PartitionsNumber,
TwoFactorDelete: k.TwoFactorDeletesToInstAPI(),
AllowDeleteTopics: k.AllowDeleteTopics,
AutoCreateTopics: k.AutoCreateTopics,
ClientToClusterEncryption: k.ClientToClusterEncryption,
DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(),
PrivateNetworkCluster: k.PrivateNetworkCluster,
Name: k.Name,
SLATier: k.SLATier,
KafkaVersion: k.Version,
DataCentres: k.dcToInstAPI(),
ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS,
ClientAuthBrokerWithoutEncryption: k.ClientAuthBrokerWithoutEncryption,
ClientAuthBrokerWithEncryption: k.ClientAuthBrokerWithEncryption,
BundledUseOnly: k.BundledUseOnly,
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
SchemaRegistry: k.schemaRegistryToInstAPI(),
RestProxy: k.restProxyToInstAPI(),
PCIComplianceMode: k.PCICompliance,
DefaultReplicationFactor: k.ReplicationFactor,
DefaultNumberOfPartitions: k.PartitionsNumber,
TwoFactorDelete: k.TwoFactorDeletesToInstAPI(),
AllowDeleteTopics: k.AllowDeleteTopics,
AutoCreateTopics: k.AutoCreateTopics,
ClientToClusterEncryption: k.ClientToClusterEncryption,
DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(),
PrivateNetworkCluster: k.PrivateNetworkCluster,
Name: k.Name,
SLATier: k.SLATier,
KafkaVersion: k.Version,
DataCentres: k.dcToInstAPI(),
ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS,
BundledUseOnly: k.BundledUseOnly,
KarapaceRestProxy: k.karapaceRestProxyToInstAPI(),
KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(),
}
}

Expand Down Expand Up @@ -283,21 +279,19 @@ func (ks *KafkaSpec) FromInstAPI(iKafka *models.KafkaCluster) KafkaSpec {
SLATier: iKafka.SLATier,
TwoFactorDelete: ks.Cluster.TwoFactorDeleteFromInstAPI(iKafka.TwoFactorDelete),
},
SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry),
ReplicationFactor: iKafka.DefaultReplicationFactor,
PartitionsNumber: iKafka.DefaultNumberOfPartitions,
RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy),
AllowDeleteTopics: iKafka.AllowDeleteTopics,
AutoCreateTopics: iKafka.AutoCreateTopics,
ClientToClusterEncryption: iKafka.ClientToClusterEncryption,
DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres),
DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper),
ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls,
ClientAuthBrokerWithoutEncryption: iKafka.ClientAuthBrokerWithoutEncryption,
ClientAuthBrokerWithEncryption: iKafka.ClientAuthBrokerWithEncryption,
KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy),
KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry),
BundledUseOnly: iKafka.BundledUseOnly,
SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry),
ReplicationFactor: iKafka.DefaultReplicationFactor,
PartitionsNumber: iKafka.DefaultNumberOfPartitions,
RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy),
AllowDeleteTopics: iKafka.AllowDeleteTopics,
AutoCreateTopics: iKafka.AutoCreateTopics,
ClientToClusterEncryption: iKafka.ClientToClusterEncryption,
DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres),
DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper),
ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls,
KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy),
KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry),
BundledUseOnly: iKafka.BundledUseOnly,
}
}

Expand Down Expand Up @@ -399,8 +393,6 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool {
a.AutoCreateTopics == b.AutoCreateTopics &&
a.ClientToClusterEncryption == b.ClientToClusterEncryption &&
a.ClientBrokerAuthWithMTLS == b.ClientBrokerAuthWithMTLS &&
a.ClientAuthBrokerWithoutEncryption == b.ClientAuthBrokerWithoutEncryption &&
a.ClientAuthBrokerWithEncryption == b.ClientAuthBrokerWithEncryption &&
a.BundledUseOnly == b.BundledUseOnly &&
isKafkaAddonsEqual[SchemaRegistry](a.SchemaRegistry, b.SchemaRegistry) &&
isKafkaAddonsEqual[RestProxy](a.RestProxy, b.RestProxy) &&
Expand Down
36 changes: 16 additions & 20 deletions apis/clusters/v1beta1/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,31 +318,27 @@ type immutableKafkaFields struct {
}

type specificKafkaFields struct {
replicationFactor int
partitionsNumber int
allowDeleteTopics bool
autoCreateTopics bool
clientToClusterEncryption bool
bundledUseOnly bool
privateNetworkCluster bool
clientAuthBrokerWithEncryption bool
clientAuthBrokerWithoutEncryption bool
clientBrokerAuthWithMtls bool
replicationFactor int
partitionsNumber int
allowDeleteTopics bool
autoCreateTopics bool
clientToClusterEncryption bool
bundledUseOnly bool
privateNetworkCluster bool
clientBrokerAuthWithMtls bool
}

func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields {
return &immutableKafkaFields{
specificFields: specificKafkaFields{
replicationFactor: ks.ReplicationFactor,
partitionsNumber: ks.PartitionsNumber,
allowDeleteTopics: ks.AllowDeleteTopics,
autoCreateTopics: ks.AutoCreateTopics,
clientToClusterEncryption: ks.ClientToClusterEncryption,
bundledUseOnly: ks.BundledUseOnly,
privateNetworkCluster: ks.PrivateNetworkCluster,
clientAuthBrokerWithEncryption: ks.ClientAuthBrokerWithEncryption,
clientAuthBrokerWithoutEncryption: ks.ClientAuthBrokerWithoutEncryption,
clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS,
replicationFactor: ks.ReplicationFactor,
partitionsNumber: ks.PartitionsNumber,
allowDeleteTopics: ks.AllowDeleteTopics,
autoCreateTopics: ks.AutoCreateTopics,
clientToClusterEncryption: ks.ClientToClusterEncryption,
bundledUseOnly: ks.BundledUseOnly,
privateNetworkCluster: ks.PrivateNetworkCluster,
clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS,
},
cluster: ks.Cluster.newImmutableFields(),
}
Expand Down
51 changes: 48 additions & 3 deletions apis/kafkamanagement/v1beta1/kafkauser_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1beta1

import (
k8sCore "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -26,9 +27,10 @@ import (

// KafkaUserSpec defines the desired state of KafkaUser
type KafkaUserSpec struct {
Options *KafkaUserOptions `json:"options"`
SecretRef *v1beta1.SecretReference `json:"secretRef"`
InitialPermissions string `json:"initialPermissions"`
Options *KafkaUserOptions `json:"options"`
SecretRef *v1beta1.SecretReference `json:"secretRef"`
CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"`
InitialPermissions string `json:"initialPermissions"`
}

type KafkaUserOptions struct {
Expand All @@ -41,6 +43,25 @@ type KafkaUserStatus struct {
ClustersEvents map[string]string `json:"clustersEvents,omitempty"`
}

type Certificate struct {
ID string `json:"id,omitempty"`
ExpiryDate string `json:"expiryDate,omitempty"`
SignedCertificate string `json:"signedCertificate,omitempty"`
}

type CertificateRequest struct {
SecretName string `json:"secretName"`
SecretNamespace string `json:"secretNamespace"`
ClusterID string `json:"clusterId"`
CSR string `json:"csr,omitempty"`
ValidPeriod int `json:"validPeriod"`
CommonName string `json:"commonName,omitempty"`
Country string `json:"country,omitempty"`
Organization string `json:"organization,omitempty"`
OrganizationalUnit string `json:"organizationalUnit,omitempty"`
AutoRenew bool `json:"autoRenew"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

Expand Down Expand Up @@ -83,6 +104,21 @@ func (ku *KafkaUser) GetID(clusterID, name string) string {
return clusterID + "_" + name
}

func (ku *KafkaUser) NewCertificateSecret(name, namespace string) *k8sCore.Secret {
return &k8sCore.Secret{
TypeMeta: metav1.TypeMeta{
Kind: models.SecretKind,
APIVersion: models.K8sAPIVersionV1,
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},

StringData: map[string]string{},
}
}

func init() {
SchemeBuilder.Register(&KafkaUser{}, &KafkaUserList{})
}
Expand All @@ -104,3 +140,12 @@ func (ko *KafkaUserOptions) ToInstAPI() *models.KafkaUserOptions {
}

}

func (cr *CertificateRequest) ToInstAPI(username string) *models.CertificateRequest {
return &models.CertificateRequest{
ClusterID: cr.ClusterID,
CSR: cr.CSR,
KafkaUsername: username,
ValidPeriod: cr.ValidPeriod,
}
}
39 changes: 37 additions & 2 deletions apis/kafkamanagement/v1beta1/kafkauser_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ limitations under the License.
package v1beta1

import (
"github.com/instaclustr/operator/pkg/models"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"

"github.com/instaclustr/operator/pkg/models"
)

// log is for logging in this package.
Expand All @@ -47,3 +47,38 @@ func (r *KafkaUser) Default() {
})
}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-kafkamanagement-instaclustr-com-v1beta1-kafkauser,mutating=false,failurePolicy=fail,sideEffects=None,groups=kafkamanagement.instaclustr.com,resources=kafkausers,verbs=create;update,versions=v1beta1,name=vkafkauser.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &KafkaUser{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *KafkaUser) ValidateCreate() error {
kafkauserlog.Info("validate create", "name", r.Name)

// TODO(user): fill in your validation logic upon object creation.
return nil
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *KafkaUser) ValidateUpdate(old runtime.Object) error {
kafkauserlog.Info("validate update", "name", r.Name)

for _, request := range r.Spec.CertificateRequests {
if request.CSR == "" {
if request.Organization == "" || request.OrganizationalUnit == "" || request.Country == "" || request.CommonName == "" {
return models.ErrEmptyCertGeneratingFields
}
}
}
return nil
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *KafkaUser) ValidateDelete() error {
kafkauserlog.Info("validate delete", "name", r.Name)

// TODO(user): fill in your validation logic upon object deletion.
return nil
}
41 changes: 41 additions & 0 deletions apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions config/crd/bases/clusters.instaclustr.com_kafkas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ spec:
type: boolean
bundledUseOnly:
type: boolean
clientAuthBrokerWithEncryption:
type: boolean
clientAuthBrokerWithoutEncryption:
type: boolean
clientBrokerAuthWithMtls:
type: boolean
clientToClusterEncryption:
Expand Down
Loading

0 comments on commit 400197d

Please sign in to comment.