diff --git a/apis/clusters/v1beta1/kafka_types.go b/apis/clusters/v1beta1/kafka_types.go index e542c093f..d75b55b02 100644 --- a/apis/clusters/v1beta1/kafka_types.go +++ b/apis/clusters/v1beta1/kafka_types.go @@ -79,14 +79,12 @@ type KafkaSpec struct { // Provision additional dedicated nodes for Apache Zookeeper to run on. // Zookeeper nodes will be co-located with Kafka if this is not provided - DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"` - ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"` - ClientAuthBrokerWithoutEncryption bool `json:"clientAuthBrokerWithoutEncryption,omitempty"` - ClientAuthBrokerWithEncryption bool `json:"clientAuthBrokerWithEncryption,omitempty"` - KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"` - KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"` - BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - UserRefs []*UserReference `json:"userRefs,omitempty"` + DedicatedZookeeper []*DedicatedZookeeper `json:"dedicatedZookeeper,omitempty"` + ClientBrokerAuthWithMTLS bool `json:"clientBrokerAuthWithMtls,omitempty"` + KarapaceRestProxy []*KarapaceRestProxy `json:"karapaceRestProxy,omitempty"` + KarapaceSchemaRegistry []*KarapaceSchemaRegistry `json:"karapaceSchemaRegistry,omitempty"` + BundledUseOnly bool `json:"bundledUseOnly,omitempty"` + UserRefs []*UserReference `json:"userRefs,omitempty"` } type KafkaDataCentre struct { @@ -140,27 +138,25 @@ func (k *Kafka) NewPatch() client.Patch { func (k *KafkaSpec) ToInstAPI() *models.KafkaCluster { return &models.KafkaCluster{ - SchemaRegistry: k.schemaRegistryToInstAPI(), - RestProxy: k.restProxyToInstAPI(), - PCIComplianceMode: k.PCICompliance, - DefaultReplicationFactor: k.ReplicationFactor, - DefaultNumberOfPartitions: k.PartitionsNumber, - TwoFactorDelete: k.TwoFactorDeletesToInstAPI(), - AllowDeleteTopics: k.AllowDeleteTopics, - AutoCreateTopics: k.AutoCreateTopics, - ClientToClusterEncryption: k.ClientToClusterEncryption, - DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(), - PrivateNetworkCluster: k.PrivateNetworkCluster, - Name: k.Name, - SLATier: k.SLATier, - KafkaVersion: k.Version, - DataCentres: k.dcToInstAPI(), - ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS, - ClientAuthBrokerWithoutEncryption: k.ClientAuthBrokerWithoutEncryption, - ClientAuthBrokerWithEncryption: k.ClientAuthBrokerWithEncryption, - BundledUseOnly: k.BundledUseOnly, - KarapaceRestProxy: k.karapaceRestProxyToInstAPI(), - KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(), + SchemaRegistry: k.schemaRegistryToInstAPI(), + RestProxy: k.restProxyToInstAPI(), + PCIComplianceMode: k.PCICompliance, + DefaultReplicationFactor: k.ReplicationFactor, + DefaultNumberOfPartitions: k.PartitionsNumber, + TwoFactorDelete: k.TwoFactorDeletesToInstAPI(), + AllowDeleteTopics: k.AllowDeleteTopics, + AutoCreateTopics: k.AutoCreateTopics, + ClientToClusterEncryption: k.ClientToClusterEncryption, + DedicatedZookeeper: k.dedicatedZookeeperToInstAPI(), + PrivateNetworkCluster: k.PrivateNetworkCluster, + Name: k.Name, + SLATier: k.SLATier, + KafkaVersion: k.Version, + DataCentres: k.dcToInstAPI(), + ClientBrokerAuthWithMtls: k.ClientBrokerAuthWithMTLS, + BundledUseOnly: k.BundledUseOnly, + KarapaceRestProxy: k.karapaceRestProxyToInstAPI(), + KarapaceSchemaRegistry: k.karapaceSchemaRegistryToInstAPI(), } } @@ -283,21 +279,19 @@ func (ks *KafkaSpec) FromInstAPI(iKafka *models.KafkaCluster) KafkaSpec { SLATier: iKafka.SLATier, TwoFactorDelete: ks.Cluster.TwoFactorDeleteFromInstAPI(iKafka.TwoFactorDelete), }, - SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry), - ReplicationFactor: iKafka.DefaultReplicationFactor, - PartitionsNumber: iKafka.DefaultNumberOfPartitions, - RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy), - AllowDeleteTopics: iKafka.AllowDeleteTopics, - AutoCreateTopics: iKafka.AutoCreateTopics, - ClientToClusterEncryption: iKafka.ClientToClusterEncryption, - DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres), - DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper), - ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls, - ClientAuthBrokerWithoutEncryption: iKafka.ClientAuthBrokerWithoutEncryption, - ClientAuthBrokerWithEncryption: iKafka.ClientAuthBrokerWithEncryption, - KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy), - KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry), - BundledUseOnly: iKafka.BundledUseOnly, + SchemaRegistry: ks.SchemaRegistryFromInstAPI(iKafka.SchemaRegistry), + ReplicationFactor: iKafka.DefaultReplicationFactor, + PartitionsNumber: iKafka.DefaultNumberOfPartitions, + RestProxy: ks.RestProxyFromInstAPI(iKafka.RestProxy), + AllowDeleteTopics: iKafka.AllowDeleteTopics, + AutoCreateTopics: iKafka.AutoCreateTopics, + ClientToClusterEncryption: iKafka.ClientToClusterEncryption, + DataCentres: ks.DCsFromInstAPI(iKafka.DataCentres), + DedicatedZookeeper: ks.DedicatedZookeeperFromInstAPI(iKafka.DedicatedZookeeper), + ClientBrokerAuthWithMTLS: iKafka.ClientBrokerAuthWithMtls, + KarapaceRestProxy: ks.KarapaceRestProxyFromInstAPI(iKafka.KarapaceRestProxy), + KarapaceSchemaRegistry: ks.KarapaceSchemaRegistryFromInstAPI(iKafka.KarapaceSchemaRegistry), + BundledUseOnly: iKafka.BundledUseOnly, } } @@ -399,8 +393,6 @@ func (a *KafkaSpec) IsEqual(b KafkaSpec) bool { a.AutoCreateTopics == b.AutoCreateTopics && a.ClientToClusterEncryption == b.ClientToClusterEncryption && a.ClientBrokerAuthWithMTLS == b.ClientBrokerAuthWithMTLS && - a.ClientAuthBrokerWithoutEncryption == b.ClientAuthBrokerWithoutEncryption && - a.ClientAuthBrokerWithEncryption == b.ClientAuthBrokerWithEncryption && a.BundledUseOnly == b.BundledUseOnly && isKafkaAddonsEqual[SchemaRegistry](a.SchemaRegistry, b.SchemaRegistry) && isKafkaAddonsEqual[RestProxy](a.RestProxy, b.RestProxy) && diff --git a/apis/clusters/v1beta1/kafka_webhook.go b/apis/clusters/v1beta1/kafka_webhook.go index 7c3cc3806..dd543cde5 100644 --- a/apis/clusters/v1beta1/kafka_webhook.go +++ b/apis/clusters/v1beta1/kafka_webhook.go @@ -318,31 +318,27 @@ type immutableKafkaFields struct { } type specificKafkaFields struct { - replicationFactor int - partitionsNumber int - allowDeleteTopics bool - autoCreateTopics bool - clientToClusterEncryption bool - bundledUseOnly bool - privateNetworkCluster bool - clientAuthBrokerWithEncryption bool - clientAuthBrokerWithoutEncryption bool - clientBrokerAuthWithMtls bool + replicationFactor int + partitionsNumber int + allowDeleteTopics bool + autoCreateTopics bool + clientToClusterEncryption bool + bundledUseOnly bool + privateNetworkCluster bool + clientBrokerAuthWithMtls bool } func (ks *KafkaSpec) newKafkaImmutableFields() *immutableKafkaFields { return &immutableKafkaFields{ specificFields: specificKafkaFields{ - replicationFactor: ks.ReplicationFactor, - partitionsNumber: ks.PartitionsNumber, - allowDeleteTopics: ks.AllowDeleteTopics, - autoCreateTopics: ks.AutoCreateTopics, - clientToClusterEncryption: ks.ClientToClusterEncryption, - bundledUseOnly: ks.BundledUseOnly, - privateNetworkCluster: ks.PrivateNetworkCluster, - clientAuthBrokerWithEncryption: ks.ClientAuthBrokerWithEncryption, - clientAuthBrokerWithoutEncryption: ks.ClientAuthBrokerWithoutEncryption, - clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS, + replicationFactor: ks.ReplicationFactor, + partitionsNumber: ks.PartitionsNumber, + allowDeleteTopics: ks.AllowDeleteTopics, + autoCreateTopics: ks.AutoCreateTopics, + clientToClusterEncryption: ks.ClientToClusterEncryption, + bundledUseOnly: ks.BundledUseOnly, + privateNetworkCluster: ks.PrivateNetworkCluster, + clientBrokerAuthWithMtls: ks.ClientBrokerAuthWithMTLS, }, cluster: ks.Cluster.newImmutableFields(), } diff --git a/apis/kafkamanagement/v1beta1/kafkauser_types.go b/apis/kafkamanagement/v1beta1/kafkauser_types.go index e6bff4cf6..a5f5fcbd1 100644 --- a/apis/kafkamanagement/v1beta1/kafkauser_types.go +++ b/apis/kafkamanagement/v1beta1/kafkauser_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1beta1 import ( + k8sCore "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" @@ -26,9 +27,10 @@ import ( // KafkaUserSpec defines the desired state of KafkaUser type KafkaUserSpec struct { - Options *KafkaUserOptions `json:"options"` - SecretRef *v1beta1.SecretReference `json:"secretRef"` - InitialPermissions string `json:"initialPermissions"` + Options *KafkaUserOptions `json:"options"` + SecretRef *v1beta1.SecretReference `json:"secretRef"` + CertificateRequests []*CertificateRequest `json:"certificateRequests,omitempty"` + InitialPermissions string `json:"initialPermissions"` } type KafkaUserOptions struct { @@ -41,6 +43,25 @@ type KafkaUserStatus struct { ClustersEvents map[string]string `json:"clustersEvents,omitempty"` } +type Certificate struct { + ID string `json:"id,omitempty"` + ExpiryDate string `json:"expiryDate,omitempty"` + SignedCertificate string `json:"signedCertificate,omitempty"` +} + +type CertificateRequest struct { + SecretName string `json:"secretName"` + SecretNamespace string `json:"secretNamespace"` + ClusterID string `json:"clusterId"` + CSR string `json:"csr,omitempty"` + ValidPeriod int `json:"validPeriod"` + CommonName string `json:"commonName"` + Country string `json:"country"` + Organization string `json:"organization"` + OrganizationalUnit string `json:"organizationalUnit"` + AutoRenew bool `json:"autoRenew"` +} + //+kubebuilder:object:root=true //+kubebuilder:subresource:status @@ -83,6 +104,21 @@ func (ku *KafkaUser) GetID(clusterID, name string) string { return clusterID + "_" + name } +func (ku *KafkaUser) NewCertificateSecret(name, namespace string) *k8sCore.Secret { + return &k8sCore.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: models.SecretKind, + APIVersion: models.K8sAPIVersionV1, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + + StringData: map[string]string{}, + } +} + func init() { SchemeBuilder.Register(&KafkaUser{}, &KafkaUserList{}) } @@ -104,3 +140,12 @@ func (ko *KafkaUserOptions) ToInstAPI() *models.KafkaUserOptions { } } + +func (cr *CertificateRequest) ToInstAPI(username string) *models.CertificateRequest { + return &models.CertificateRequest{ + ClusterID: cr.ClusterID, + CSR: cr.CSR, + KafkaUsername: username, + ValidPeriod: cr.ValidPeriod, + } +} diff --git a/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go b/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go index 88f88e4a5..e4ce1a987 100644 --- a/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go +++ b/apis/kafkamanagement/v1beta1/zz_generated.deepcopy.go @@ -41,6 +41,36 @@ func (in *ACL) DeepCopy() *ACL { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Certificate) DeepCopyInto(out *Certificate) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Certificate. +func (in *Certificate) DeepCopy() *Certificate { + if in == nil { + return nil + } + out := new(Certificate) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CertificateRequest) DeepCopyInto(out *CertificateRequest) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CertificateRequest. +func (in *CertificateRequest) DeepCopy() *CertificateRequest { + if in == nil { + return nil + } + out := new(CertificateRequest) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Connector) DeepCopyInto(out *Connector) { *out = *in @@ -252,6 +282,17 @@ func (in *KafkaUserSpec) DeepCopyInto(out *KafkaUserSpec) { *out = new(clusterresourcesv1beta1.SecretReference) **out = **in } + if in.CertificateRequests != nil { + in, out := &in.CertificateRequests, &out.CertificateRequests + *out = make([]*CertificateRequest, len(*in)) + for i := range *in { + if (*in)[i] != nil { + in, out := &(*in)[i], &(*out)[i] + *out = new(CertificateRequest) + **out = **in + } + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KafkaUserSpec. diff --git a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml index 97b39830e..3964a51c3 100644 --- a/config/crd/bases/clusters.instaclustr.com_kafkas.yaml +++ b/config/crd/bases/clusters.instaclustr.com_kafkas.yaml @@ -41,10 +41,6 @@ spec: type: boolean bundledUseOnly: type: boolean - clientAuthBrokerWithEncryption: - type: boolean - clientAuthBrokerWithoutEncryption: - type: boolean clientBrokerAuthWithMtls: type: boolean clientToClusterEncryption: diff --git a/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml b/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml index 08f3413f3..6126678c7 100644 --- a/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml +++ b/config/crd/bases/kafkamanagement.instaclustr.com_kafkausers.yaml @@ -35,6 +35,41 @@ spec: spec: description: KafkaUserSpec defines the desired state of KafkaUser properties: + certificateRequests: + items: + properties: + autoRenew: + type: boolean + clusterId: + type: string + commonName: + type: string + country: + type: string + csr: + type: string + organization: + type: string + organizationalUnit: + type: string + secretName: + type: string + secretNamespace: + type: string + validPeriod: + type: integer + required: + - autoRenew + - clusterId + - commonName + - country + - organization + - organizationalUnit + - secretName + - secretNamespace + - validPeriod + type: object + type: array initialPermissions: type: string options: diff --git a/config/samples/clusters_v1beta1_kafka.yaml b/config/samples/clusters_v1beta1_kafka.yaml index 51bfc9d21..21afafd97 100644 --- a/config/samples/clusters_v1beta1_kafka.yaml +++ b/config/samples/clusters_v1beta1_kafka.yaml @@ -1,7 +1,7 @@ apiVersion: clusters.instaclustr.com/v1beta1 kind: Kafka metadata: - name: kafka-two + name: kafka spec: name: "Kafka-example" version: "3.3.1" @@ -14,8 +14,6 @@ spec: privateNetworkCluster: false slaTier: "NON_PRODUCTION" # bundledUseOnly: true - # clientAuthBrokerWithEncryption: true - # clientAuthBrokerWithoutEncryption: true # clientBrokerAuthWithMtls: true # dedicatedZookeeper: # - nodeSize: "KDZ-DEV-t4g.small-30" diff --git a/config/samples/kafkamanagement_v1beta1_kafkauser.yaml b/config/samples/kafkamanagement_v1beta1_kafkauser.yaml index 203125514..63455c054 100644 --- a/config/samples/kafkamanagement_v1beta1_kafkauser.yaml +++ b/config/samples/kafkamanagement_v1beta1_kafkauser.yaml @@ -14,7 +14,26 @@ spec: secretRef: name: "secret-test" namespace: "default" + certificateRequests: +# - secretName: "cert-secret-test" +# secretNamespace: "default" +# clusterId: "9f422a96-5e53-4e09-b789-efc852fd9d4a" +# commonName: "Sanch-two" +# country: "VN" +# organization: "Instaclustr" +# organizationalUnit: "IT" +# validPeriod: 6 +# autoRenew: true +# - secretName: "cert-secret-test-three" +# secretNamespace: "default" +# clusterId: "9f422a96-5e53-4e09-b789-efc852fd9d4a" +# commonName: "Sanch-two" +# country: "VN" +# organization: "Instaclustr" +# organizationalUnit: "IT" +# validPeriod: 6 +# autoRenew: false initialPermissions: "standard" options: overrideExistingUser: true - saslScramMechanism: "SCRAM-SHA-256" + saslScramMechanism: "SCRAM-SHA-256" \ No newline at end of file diff --git a/controllers/kafkamanagement/kafkauser_controller.go b/controllers/kafkamanagement/kafkauser_controller.go index 87b5b5811..6c2fbd665 100644 --- a/controllers/kafkamanagement/kafkauser_controller.go +++ b/controllers/kafkamanagement/kafkauser_controller.go @@ -18,7 +18,14 @@ package kafkamanagement import ( "context" - + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "strings" + + "github.com/go-logr/logr" v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/fields" @@ -29,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -473,7 +481,17 @@ func (r *KafkaUserReconciler) SetupWithManager(mgr ctrl.Manager) error { } return ctrl.NewControllerManagedBy(mgr). - For(&v1beta1.KafkaUser{}, builder.WithPredicates(predicate.Funcs{})).Owns(&v1.Secret{}). + For(&v1beta1.KafkaUser{}, builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(event event.UpdateEvent) bool { + newObj := event.ObjectNew.(*v1beta1.KafkaUser) + oldObj := event.ObjectOld.(*v1beta1.KafkaUser) + if newObj.Generation != event.ObjectOld.GetGeneration() { + r.handleCertificateEvent(newObj, oldObj.Spec.CertificateRequests) + } + + return true + }, + })).Owns(&v1.Secret{}). Watches( &source.Kind{Type: &v1.Secret{}}, handler.EnqueueRequestsFromMapFunc(r.findSecretObjects), @@ -533,3 +551,399 @@ func (r *KafkaUserReconciler) getKafkaUserCredsFromSecret( return string(username[:len(username)-1]), string(password[:len(password)-1]), nil } + +func (r *KafkaUserReconciler) getKafkaUserCertIDFromSecret( + ctx context.Context, + certRequest *v1beta1.CertificateRequest, +) (string, error) { + kafkaUserCertSecret := &v1.Secret{} + kafkaUserCertSecretNamespacedName := types.NamespacedName{ + Name: certRequest.SecretName, + Namespace: certRequest.SecretNamespace, + } + + err := r.Get(ctx, kafkaUserCertSecretNamespacedName, kafkaUserCertSecret) + if err != nil { + return "", err + } + + certID := kafkaUserCertSecret.Data["id"] + + if len(certID) == 0 { + return "", models.ErrMissingSecretKeys + } + + return string(certID), nil +} + +func (r *KafkaUserReconciler) GenerateCSR(certRequest *v1beta1.CertificateRequest) (string, error) { + keyBytes, err := rsa.GenerateKey(rand.Reader, 1024) + if err != nil { + return "", err + } + + subj := pkix.Name{ + CommonName: certRequest.CommonName, + Country: []string{certRequest.Country}, + Organization: []string{certRequest.Organization}, + OrganizationalUnit: []string{certRequest.OrganizationalUnit}, + } + + template := x509.CertificateRequest{ + Subject: subj, + SignatureAlgorithm: x509.SHA256WithRSA, + } + + csrBytes, err := x509.CreateCertificateRequest(rand.Reader, &template, keyBytes) + if err != nil { + return "", err + } + strBuf := strings.Builder{} + err = pem.Encode(&strBuf, &pem.Block{Type: "NEW CERTIFICATE REQUEST", Bytes: csrBytes}) + if err != nil { + return "", err + } + + return strBuf.String(), nil +} + +func (r *KafkaUserReconciler) UpdateCertSecret(ctx context.Context, secret *v1.Secret, certResp *v1beta1.Certificate) error { + secret.StringData = make(map[string]string) + + secret.StringData["id"] = certResp.ID + secret.StringData["expiryDate"] = certResp.ExpiryDate + secret.StringData["signedCertificate"] = certResp.SignedCertificate + + err := r.Update(ctx, secret) + if err != nil { + return err + } + + return nil +} + +func (r *KafkaUserReconciler) handleCertificateEvent( + newObj *v1beta1.KafkaUser, + oldCerts []*v1beta1.CertificateRequest, +) { + ctx := context.TODO() + l := log.FromContext(ctx) + + for _, oldCert := range oldCerts { + var exist bool + for _, newCert := range newObj.Spec.CertificateRequests { + if *oldCert == *newCert { + exist = true + break + } + } + + if exist { + continue + } + + err := r.handleDeleteCertificate(ctx, newObj, l, oldCert) + if err != nil { + l.Error(err, "Cannot delete Kafka user mTLS certificate", "user", oldCert) + r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, + "Cannot delete mTlS certificate. Reason: %v", err) + } + } + + for _, newCert := range newObj.Spec.CertificateRequests { + var exist bool + for _, oldCert := range oldCerts { + if *newCert == *oldCert { + exist = true + break + } + } + + if exist { + if newCert.AutoRenew { + err := r.handleRenewCertificate(ctx, newObj, newCert, l) + if err != nil { + l.Error(err, "Cannot renew Kafka user mTLS certificate", "cert", newCert) + r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, + "Cannot renew user mTLS certificate. Reason: %v", err) + } + } + continue + } + + err := r.handleCreateCertificate(ctx, newObj, l, newCert) + if err != nil { + l.Error(err, "Cannot create Kafka user mTLS certificate", "cert", newCert) + r.EventRecorder.Eventf(newObj, models.Warning, models.CreatingEvent, + "Cannot create user mTLS certificate. Reason: %v", err) + } + + oldCerts = append(oldCerts, newCert) + } +} + +func (r *KafkaUserReconciler) handleCreateCertificate(ctx context.Context, user *v1beta1.KafkaUser, l logr.Logger, certRequest *v1beta1.CertificateRequest) error { + username, _, err := r.getKafkaUserCredsFromSecret(user.Spec) + if err != nil { + l.Error( + err, "Cannot get Kafka user creds from secret", + "kafka user spec", user.Spec, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.FetchFailed, + "Fetch user credentials from secret is failed. Reason: %v", + err, + ) + + return err + } + + var isCSRGenerated bool + + if certRequest.CSR == "" { + certRequest.CSR, err = r.GenerateCSR(certRequest) + if err != nil { + l.Error(err, "Cannot generate CSR for Kafka user certificate creation", + "user", user.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.GenerateFailed, + "Generate CSR is failed. Reason: %v", + err, + ) + + return err + } + isCSRGenerated = true + } + + certResponse, err := r.API.CreateKafkaUserCertificate(certRequest.ToInstAPI(username)) + if err != nil { + l.Error(err, "Cannot create Kafka user mTLS certificate", + "user", user.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.CreationFailed, + "Certificate creation is failed. Reason: %v", + err, + ) + + return err + } + + newSecret := user.NewCertificateSecret(certRequest.SecretName, certRequest.SecretNamespace) + err = r.Client.Create(ctx, newSecret) + if err != nil { + l.Error(err, "Cannot create Kafka user Cert Secret.", + "secret name", certRequest.SecretName, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.CreationFailed, + "create user Cert Secret is failed. Reason: %v", + err, + ) + + return err + + } + + controllerutil.AddFinalizer(newSecret, models.DeletionFinalizer) + + err = r.UpdateCertSecret(ctx, newSecret, certResponse) + if err != nil { + l.Error(err, "Cannot update certificate secret", + "user", user.Name, + "secret", newSecret.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.UpdateFailed, + "Certificate secret update is failed. Reason: %v", + err, + ) + + return err + } + + l.Info("Kafka user mTLS certificate has been created", + "User ID", user.GetID(certRequest.ClusterID, username), + ) + + if isCSRGenerated { + certRequest.CSR = "" + } + + return nil +} + +func (r *KafkaUserReconciler) handleDeleteCertificate(ctx context.Context, user *v1beta1.KafkaUser, l logr.Logger, certRequest *v1beta1.CertificateRequest) error { + certID, err := r.getKafkaUserCertIDFromSecret(ctx, certRequest) + if err != nil { + l.Error( + err, "Cannot get Kafka user certificate ID from secret", + "kafka user certificate secret name", certRequest.SecretName, + "kafka user certificate secret namespace", certRequest.SecretNamespace, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.FetchFailed, + "Fetch user certificate ID from secret is failed. Reason: %v", + err, + ) + + return err + } + + err = r.API.DeleteKafkaUserCertificate(certID) + if err != nil { + l.Error(err, "Cannot Delete Kafka user mTLS certificate", + "user", user.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.DeletionFailed, + "Certificate deletion is failed. Reason: %v", + err, + ) + + return err + } + + secret := &v1.Secret{} + certSecretNamespacedName := types.NamespacedName{ + Name: certRequest.SecretName, + Namespace: certRequest.SecretNamespace, + } + err = r.Client.Get(ctx, certSecretNamespacedName, secret) + if err != nil { + l.Error(err, "Cannot get Kafka user certificate secret.", + "kafka user certificate secret name", certRequest.SecretName, + "kafka user certificate secret namespace", certRequest.SecretNamespace, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.FetchFailed, + "Fetch user certificate secret is failed. Reason: %v", + err, + ) + + return err + } + + controllerutil.RemoveFinalizer(secret, models.DeletionFinalizer) + err = r.Update(ctx, secret) + if err != nil { + l.Error(err, "Cannot remove finalizer from secret", "secret name", secret.Name) + + r.EventRecorder.Eventf(user, models.Warning, models.PatchFailed, + "Resource patch is failed. Reason: %v", err) + + return err + } + + err = r.Client.Delete(ctx, secret) + if err != nil && !k8serrors.IsNotFound(err) { + l.Error(err, "Cannot delete Kafka user certificate secret", + "kafka user certificate secret name", certRequest.SecretName, + "kafka user certificate secret namespace", certRequest.SecretNamespace, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.DeletionFailed, + "Delete user certificate secret is failed. Reason: %v", + err, + ) + + return err + } + + l.Info("Kafka user mTLS certificate has been deleted", + "Certificate ID", certID, + ) + + return nil +} + +func (r *KafkaUserReconciler) handleRenewCertificate(ctx context.Context, user *v1beta1.KafkaUser, certRequest *v1beta1.CertificateRequest, l logr.Logger) error { + certID, err := r.getKafkaUserCertIDFromSecret(ctx, certRequest) + if err != nil { + l.Error( + err, "Cannot get Kafka user certificate ID from secret", + "kafka user certificate secret name", certRequest.SecretName, + "kafka user certificate secret namespace", certRequest.SecretNamespace, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.FetchFailed, + "Fetch user certificate ID from secret is failed. Reason: %v", + err, + ) + + return err + } + + newCert, err := r.API.RenewKafkaUserCertificate(certID) + if err != nil { + l.Error(err, "Cannot Renew Kafka user mTLS certificate", + "user", user.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.DeletionFailed, + "Certificate renew is failed. Reason: %v", + err, + ) + + return err + } + + secret := &v1.Secret{} + certSecretNamespacedName := types.NamespacedName{ + Name: certRequest.SecretName, + Namespace: certRequest.SecretNamespace, + } + err = r.Client.Get(ctx, certSecretNamespacedName, secret) + if err != nil { + l.Error(err, "Cannot get Kafka user certificate secret.", + "kafka user certificate secret name", certRequest.SecretName, + "kafka user certificate secret namespace", certRequest.SecretNamespace, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.FetchFailed, + "Fetch user certificate secret is failed. Reason: %v", + err, + ) + + return err + } + + err = r.UpdateCertSecret(ctx, secret, newCert) + if err != nil { + l.Error(err, "Cannot update certificate secret", + "user", user.Name, + "secret", secret.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.UpdateFailed, + "Certificate secret update is failed. Reason: %v", + err, + ) + + return err + } + + l.Info("Kafka user mTLS certificate has been renewed", + "Certificate ID", certID, + "New expiry date", newCert.ExpiryDate, + "New ID", newCert.ID, + ) + + err = r.API.DeleteKafkaUserCertificate(certID) + if err != nil { + l.Error(err, "Cannot Delete Kafka user mTLS certificate", + "user", user.Name, + ) + r.EventRecorder.Eventf( + user, models.Warning, models.DeletionFailed, + "Certificate deletion is failed. Reason: %v", + err, + ) + + return err + } + + return nil +} diff --git a/pkg/instaclustr/client.go b/pkg/instaclustr/client.go index b6dac101e..37c8149bb 100644 --- a/pkg/instaclustr/client.go +++ b/pkg/instaclustr/client.go @@ -978,6 +978,90 @@ func (c *Client) DeleteKafkaUser(kafkaUserID, kafkaUserEndpoint string) error { return nil } +func (c *Client) CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*kafkamanagementv1beta1.Certificate, error) { + data, err := json.Marshal(certRequest) + if err != nil { + return nil, err + } + + url := c.serverHostname + KafkauserCertificatesEndpoint + resp, err := c.DoRequest(url, http.MethodPost, data) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusAccepted { + return nil, fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) + } + + cert := &kafkamanagementv1beta1.Certificate{} + err = json.Unmarshal(body, cert) + if err != nil { + return nil, err + } + + return cert, nil +} + +func (c *Client) DeleteKafkaUserCertificate(certificateID string) error { + url := c.serverHostname + KafkauserCertificatesEndpoint + certificateID + resp, err := c.DoRequest(url, http.MethodDelete, nil) + if err != nil { + return err + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("status code: %d, message: %s", resp.StatusCode, body) + } + + return nil +} + +func (c *Client) RenewKafkaUserCertificate(certificateID string) (*kafkamanagementv1beta1.Certificate, error) { + payload := &struct { + CertificateID string `json:"certificateId"` + }{ + CertificateID: certificateID, + } + + data, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + url := c.serverHostname + KafkaUserCertificatesRenewEndpoint + resp, err := c.DoRequest(url, http.MethodPost, data) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + cert := &kafkamanagementv1beta1.Certificate{} + err = json.Unmarshal(body, cert) + if err != nil { + return nil, err + } + + return cert, nil +} + func (c *Client) CreateKafkaMirror(m *kafkamanagementv1beta1.MirrorSpec) (*kafkamanagementv1beta1.MirrorStatus, error) { data, err := json.Marshal(m) if err != nil { diff --git a/pkg/instaclustr/config.go b/pkg/instaclustr/config.go index 85cd56e96..20c69a6a0 100644 --- a/pkg/instaclustr/config.go +++ b/pkg/instaclustr/config.go @@ -40,6 +40,8 @@ const ( AWSSecurityGroupFirewallRuleEndpoint = "/cluster-management/v2/resources/providers/aws/security-group-firewall-rules/v2/" GCPPeeringEndpoint = "/cluster-management/v2/resources/providers/gcp/vpc-peers/v2/" KafkaUserEndpoint = "/cluster-management/v2/resources/applications/kafka/users/v2/" + KafkauserCertificatesEndpoint = "/cluster-management/v2/resources/applications/kafka/user-certificates/v2/" + KafkaUserCertificatesRenewEndpoint = "/cluster-management/v2/operations/applications/kafka/user-certificates/renew/v2/" KafkaACLEndpoint = "/cluster-management/v2/resources/applications/kafka/acls/v2/" CadenceEndpoint = "/cluster-management/v2/resources/applications/cadence/clusters/v2/" RedisEndpoint = "/cluster-management/v2/resources/applications/redis/clusters/v2/" diff --git a/pkg/instaclustr/interfaces.go b/pkg/instaclustr/interfaces.go index 19cb6f40a..ae82afff9 100644 --- a/pkg/instaclustr/interfaces.go +++ b/pkg/instaclustr/interfaces.go @@ -44,6 +44,9 @@ type API interface { CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (*kafkamanagementv1beta1.KafkaUserStatus, error) UpdateKafkaUser(kafkaUserID string, kafkaUserSpec *models.KafkaUser) error DeleteKafkaUser(kafkaUserID, kafkaUserEndpoint string) error + CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*kafkamanagementv1beta1.Certificate, error) + DeleteKafkaUserCertificate(certificateID string) error + RenewKafkaUserCertificate(certificateID string) (*kafkamanagementv1beta1.Certificate, error) GetTopicStatus(id string) ([]byte, error) CreateKafkaTopic(url string, topic *kafkamanagementv1beta1.Topic) error DeleteKafkaTopic(url, id string) error diff --git a/pkg/instaclustr/mock/client.go b/pkg/instaclustr/mock/client.go index b9df227f1..543184b9a 100644 --- a/pkg/instaclustr/mock/client.go +++ b/pkg/instaclustr/mock/client.go @@ -117,6 +117,18 @@ func (c *mockClient) CreateKafkaUser(url string, kafkaUser *models.KafkaUser) (* panic("CreateKafkaUser: is not implemented") } +func (c *mockClient) CreateKafkaUserCertificate(certRequest *models.CertificateRequest) (*kafkamanagementv1beta1.Certificate, error) { + panic("CreateKafkaUserCertificate: is not implemented") +} + +func (c *mockClient) DeleteKafkaUserCertificate(certificateID string) error { + panic("DeleteKafkaUserCertificate: is not implemented") +} + +func (c *mockClient) RenewKafkaUserCertificate(certificateID string) (*kafkamanagementv1beta1.Certificate, error) { + panic("RenewKafkaUserCertificate: is not implemented") +} + func (c *mockClient) UpdateKafkaUser(kafkaUserID string, kafkaUserSpec *models.KafkaUser) error { panic("UpdateKafkaUser: is not implemented") } diff --git a/pkg/models/kafka_user_apv2.go b/pkg/models/kafka_user_apv2.go index 70303b716..97d556123 100644 --- a/pkg/models/kafka_user_apv2.go +++ b/pkg/models/kafka_user_apv2.go @@ -28,3 +28,10 @@ type KafkaUserOptions struct { OverrideExistingUser bool `json:"overrideExistingUser,omitempty"` SASLSCRAMMechanism string `json:"saslScramMechanism"` } + +type CertificateRequest struct { + ClusterID string `json:"clusterId"` + CSR string `json:"csr"` + KafkaUsername string `json:"kafkaUsername"` + ValidPeriod int `json:"validPeriod"` +} diff --git a/pkg/models/operator.go b/pkg/models/operator.go index 0f3c82c18..01b7d2dac 100644 --- a/pkg/models/operator.go +++ b/pkg/models/operator.go @@ -125,6 +125,7 @@ const ( NotFound = "NotFound" CreationFailed = "CreationFailed" FetchFailed = "FetchFailed" + GenerateFailed = "GenerateFailed" ConvertionFailed = "ConvertionFailed" ValidationFailed = "ValidationFailed" UpdateFailed = "UpdateFailed"