diff --git a/apis/clusterresources/v1beta1/cassandrauser_types.go b/apis/clusterresources/v1beta1/cassandrauser_types.go index 41f1e07d7..8f6d6840d 100644 --- a/apis/clusterresources/v1beta1/cassandrauser_types.go +++ b/apis/clusterresources/v1beta1/cassandrauser_types.go @@ -73,3 +73,7 @@ func (r *CassandraUser) ToInstAPI(username, password string) *models.InstaUser { InitialPermission: "standard", } } + +func (r *CassandraUser) GetDeletionFinalizer() string { + return models.DeletionFinalizer + "_" + r.Namespace + "_" + r.Name +} diff --git a/apis/clusters/v1beta1/cassandra_types.go b/apis/clusters/v1beta1/cassandra_types.go index 8c9a52a13..4c756fb39 100644 --- a/apis/clusters/v1beta1/cassandra_types.go +++ b/apis/clusters/v1beta1/cassandra_types.go @@ -69,7 +69,7 @@ type CassandraSpec struct { PasswordAndUserAuth bool `json:"passwordAndUserAuth,omitempty"` Spark []*Spark `json:"spark,omitempty"` BundledUseOnly bool `json:"bundledUseOnly,omitempty"` - UserRef []*UserReference `json:"userRef,omitempty"` + UserRefs []*UserReference `json:"userRefs,omitempty"` } // CassandraStatus defines the observed state of Cassandra diff --git a/apis/clusters/v1beta1/zz_generated.deepcopy.go b/apis/clusters/v1beta1/zz_generated.deepcopy.go index 608016be1..957868ae4 100644 --- a/apis/clusters/v1beta1/zz_generated.deepcopy.go +++ b/apis/clusters/v1beta1/zz_generated.deepcopy.go @@ -455,8 +455,8 @@ func (in *CassandraSpec) DeepCopyInto(out *CassandraSpec) { } } } - if in.UserRef != nil { - in, out := &in.UserRef, &out.UserRef + if in.UserRefs != nil { + in, out := &in.UserRefs, &out.UserRefs *out = make([]*UserReference, len(*in)) for i := range *in { if (*in)[i] != nil { diff --git a/controllers/clusterresources/cassandrauser_controller.go b/controllers/clusterresources/cassandrauser_controller.go index ad6ad3e28..5638dbd81 100644 --- a/controllers/clusterresources/cassandrauser_controller.go +++ b/controllers/clusterresources/cassandrauser_controller.go @@ -19,7 +19,6 @@ package clusterresources import ( "context" - "github.com/go-logr/logr" k8sCore "k8s.io/api/core/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" @@ -89,7 +88,27 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques return models.ReconcileRequeue, nil } + if controllerutil.AddFinalizer(s, u.GetDeletionFinalizer()) { + err = r.Update(ctx, s) + if err != nil { + l.Error(err, "Cannot update Cassandra user's secret with deletion finalizer", + "secret name", s.Name, "secret namespace", s.Namespace) + r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, + "Update secret with deletion finalizer has been failed. Reason: %v", err) + return models.ReconcileRequeue, nil + } + } + patch := u.NewPatch() + if controllerutil.AddFinalizer(u, u.GetDeletionFinalizer()) { + err = r.Patch(ctx, u, patch) + if err != nil { + l.Error(err, "Cannot patch Cassandra user with deletion finalizer") + r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, + "Patching Cassandra user with deletion finalizer has been failed. Reason: %v", err) + return models.ReconcileRequeue, nil + } + } username, password, err := getUserCreds(s) if err != nil { @@ -132,21 +151,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques "User has been created for a cluster. Cluster ID: %s, username: %s", clusterID, username) - finalizerNeeded := controllerutil.AddFinalizer(s, models.DeletionFinalizer) - if finalizerNeeded { - err = r.Update(ctx, s) - if err != nil { - l.Error(err, "Cannot update Cassandra user secret", - "secret name", s.Name, - "secret namespace", s.Namespace) - r.EventRecorder.Eventf(u, models.Warning, models.UpdatedEvent, - "Cannot assign Cassandra user to a k8s secret. Reason: %v", err) - - return models.ReconcileRequeue, nil - } - } - - controllerutil.AddFinalizer(u, models.DeletionUserFinalizer+clusterID) + controllerutil.AddFinalizer(u, getDeletionUserFinalizer(clusterID)) err = r.Patch(ctx, u, patch) if err != nil { l.Error(err, "Cannot patch Cassandra user resource", @@ -188,7 +193,7 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques return models.ReconcileRequeue, nil } - controllerutil.RemoveFinalizer(u, models.DeletionUserFinalizer+clusterID) + controllerutil.RemoveFinalizer(u, getDeletionUserFinalizer(clusterID)) err = r.Patch(ctx, u, patch) if err != nil { l.Error(err, "Cannot patch Cassandra user resource", @@ -202,58 +207,66 @@ func (r *CassandraUserReconciler) Reconcile(ctx context.Context, req ctrl.Reques continue } - } - - if u.DeletionTimestamp != nil { - err = r.handleDeleteUser(ctx, l, s, u) - if err != nil { - return models.ReconcileRequeue, nil - } - } - return models.ExitReconcile, nil -} + if event == models.ClusterDeletingEvent { + delete(u.Status.ClustersEvents, clusterID) + err = r.Status().Patch(ctx, u, patch) + if err != nil { + l.Error(err, "Cannot detach clusterID from the Cassandra user resource", + "cluster ID", clusterID) + r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, + "Detaching clusterID from the OpenSearch user resource has been failed. Reason: %v", err) + return models.ReconcileRequeue, nil + } -func (r *CassandraUserReconciler) handleDeleteUser( - ctx context.Context, - l logr.Logger, - s *k8sCore.Secret, - u *v1beta1.CassandraUser, -) error { - username, _, err := getUserCreds(s) - if err != nil { - l.Error(err, "Cannot get user credentials") - r.EventRecorder.Eventf(u, models.Warning, models.CreatingEvent, - "Cannot get user credentials. Reason: %v", err) + controllerutil.RemoveFinalizer(u, getDeletionUserFinalizer(clusterID)) + err = r.Patch(ctx, u, patch) + if err != nil { + l.Error(err, "Cannot delete finalizer from the Cassandra user resource", + "cluster ID", clusterID) + r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, + "Deleting finalizer from the Cassandra user resource has been failed. Reason: %v", err) + return models.ReconcileRequeue, nil + } - return err + l.Info("Cassandra user has been detached from the cluster", "cluster ID", clusterID) + r.EventRecorder.Eventf(u, models.Normal, models.Deleted, + "User has been detached from the cluster. ClusterID: %v", clusterID) + } } - for clusterID, event := range u.Status.ClustersEvents { - if event == models.Created || event == models.CreatingEvent { - l.Error(models.ErrUserStillExist, "please remove the user from the cluster specification", - "username", username, "cluster ID", clusterID) + if u.DeletionTimestamp != nil { + if u.Status.ClustersEvents != nil { + l.Error(models.ErrUserStillExist, "Please remove the user from the cluster specification") r.EventRecorder.Event(u, models.Warning, models.DeletingEvent, "The user is still attached to cluster, please remove the user from the cluster specification.") - return models.ErrUserStillExist + return models.ExitReconcile, nil } - } - l.Info("Cassandra user has been deleted", "username", username) - - controllerutil.RemoveFinalizer(s, models.DeletionFinalizer) - err = r.Update(ctx, s) - if err != nil { - l.Error(err, "Cannot remove finalizer from secret", "secret name", s.Name) + controllerutil.RemoveFinalizer(s, u.GetDeletionFinalizer()) + err = r.Update(ctx, s) + if err != nil { + l.Error(err, "Cannot delete finalizer from the user's secret") + r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, + "Deleting finalizer from the user's secret has been failed. Reason: %v", err) + return models.ReconcileRequeue, nil + } - r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, - "Resource patch is failed. Reason: %v", err) + controllerutil.RemoveFinalizer(u, u.GetDeletionFinalizer()) + err = r.Patch(ctx, u, patch) + if err != nil { + l.Error(err, "Cannot delete finalizer from the Cassandra user resource") + r.EventRecorder.Eventf(u, models.Warning, models.PatchFailed, + "Deleting finalizer from the OpenSearch user resource has been failed. Reason: %v", err) + return models.ReconcileRequeue, nil + } - return err + l.Info("The user resource has been deleted") + return models.ExitReconcile, nil } - return nil + return models.ExitReconcile, nil } // SetupWithManager sets up the controller with the Manager. diff --git a/controllers/clusters/cassandra_controller.go b/controllers/clusters/cassandra_controller.go index 62a66c7e2..0a0c65445 100644 --- a/controllers/clusters/cassandra_controller.go +++ b/controllers/clusters/cassandra_controller.go @@ -258,19 +258,17 @@ func (r *CassandraReconciler) handleCreateCluster( "Cluster backups check job is started", ) - if cassandra.Spec.UserRef != nil { - for _, uRef := range cassandra.Spec.UserRef { - // TODO: manage the graceful creation of users in a cluster that is not yet running. - // Adding users to the creating cluster; errors are potentially encountered, - // need to wait before the cluster is created - - err = r.handleUsersCreate(ctx, l, cassandra, uRef) - if err != nil { - l.Error(err, "Cannot create Cassandra user", "user", uRef) - r.EventRecorder.Eventf(cassandra, models.Warning, models.CreatingEvent, - "Cannot create user. Reason: %v", err) - } + if cassandra.Spec.UserRefs != nil { + err = r.startUsersCreationJob(cassandra) + if err != nil { + l.Error(err, "Failed to start user creation job") + r.EventRecorder.Eventf(cassandra, models.Warning, models.CreationFailed, + "User creation job is failed. Reason: %v", err) + return models.ReconcileRequeue } + + r.EventRecorder.Event(cassandra, models.Normal, models.Created, + "Cluster user creation job is started") } return models.ExitReconcile @@ -505,12 +503,11 @@ func (r *CassandraReconciler) handleDeleteCluster( } } - r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.StatusChecker)) + r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.StatusChecker)) - l.Info("Deleting cluster backup resources", - "cluster ID", cassandra.Status.ID, - ) + l.Info("Deleting cluster backup resources", "cluster ID", cassandra.Status.ID) err = r.deleteBackups(ctx, cassandra.Status.ID, cassandra.Namespace) if err != nil { @@ -534,6 +531,13 @@ func (r *CassandraReconciler) handleDeleteCluster( "Cluster backup resources are deleted", ) + for _, ref := range cassandra.Spec.UserRefs { + err = r.handleUsersDetach(ctx, l, cassandra, ref) + if err != nil { + return models.ReconcileRequeue + } + } + controllerutil.RemoveFinalizer(cassandra, models.DeletionFinalizer) cassandra.Annotations[models.ResourceStateAnnotation] = models.DeletedEvent err = r.Patch(ctx, cassandra, patch) @@ -691,6 +695,55 @@ func (r *CassandraReconciler) handleUsersDelete( return nil } +func (r *CassandraReconciler) handleUsersDetach( + ctx context.Context, + l logr.Logger, + c *v1beta1.Cassandra, + uRef *v1beta1.UserReference, +) error { + req := types.NamespacedName{ + Namespace: uRef.Namespace, + Name: uRef.Name, + } + + u := &clusterresourcesv1beta1.CassandraUser{} + err := r.Get(ctx, req, u) + if err != nil { + if k8serrors.IsNotFound(err) { + l.Error(err, "Cassandra user is not found", "request", req) + r.EventRecorder.Eventf(c, models.Warning, models.NotFound, + "User resource is not found, please provide correct userRef."+ + "Current provided reference: %v", uRef) + return err + } + + l.Error(err, "Cannot get Cassandra user", "user", u.Spec) + r.EventRecorder.Eventf(c, models.Warning, models.DeletionFailed, + "Cannot get Cassandra user. user reference: %v", uRef) + return err + } + + if _, exist := u.Status.ClustersEvents[c.Status.ID]; !exist { + l.Info("User is not existing in the cluster", "user reference", uRef) + r.EventRecorder.Eventf(c, models.Normal, models.DeletionFailed, + "User is not existing in the cluster. User reference: %v", uRef) + return nil + } + + patch := u.NewPatch() + u.Status.ClustersEvents[c.Status.ID] = models.ClusterDeletingEvent + err = r.Status().Patch(ctx, u, patch) + if err != nil { + l.Error(err, "Cannot patch the Cassandra user status with the ClusterDeletingEvent", + "cluster name", c.Spec.Name, "cluster ID", c.Status.ID) + r.EventRecorder.Eventf(c, models.Warning, models.DeletionFailed, + "Cannot patch the Cassandra user status with the ClusterDeletingEvent. Reason: %v", err) + return err + } + + return nil +} + func (r *CassandraReconciler) handleUserEvent( newObj *v1beta1.Cassandra, oldUsers []*v1beta1.UserReference, @@ -698,7 +751,7 @@ func (r *CassandraReconciler) handleUserEvent( ctx := context.TODO() l := log.FromContext(ctx) - for _, newUser := range newObj.Spec.UserRef { + for _, newUser := range newObj.Spec.UserRefs { var exist bool for _, oldUser := range oldUsers { @@ -726,7 +779,7 @@ func (r *CassandraReconciler) handleUserEvent( for _, oldUser := range oldUsers { var exist bool - for _, newUser := range newObj.Spec.UserRef { + for _, newUser := range newObj.Spec.UserRefs { if *oldUser == *newUser { exist = true @@ -769,6 +822,17 @@ func (r *CassandraReconciler) startClusterBackupsJob(cluster *v1beta1.Cassandra) return nil } +func (r *CassandraReconciler) startUsersCreationJob(cluster *v1beta1.Cassandra) error { + job := r.newUsersCreationJob(cluster) + + err := r.Scheduler.ScheduleJob(cluster.GetJobID(scheduler.UserCreator), scheduler.UserCreationInterval, job) + if err != nil { + return err + } + + return nil +} + func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) scheduler.Job { l := log.Log.WithValues("component", "CassandraStatusClusterJob") return func() error { @@ -778,6 +842,7 @@ func (r *CassandraReconciler) newWatchStatusJob(cassandra *v1beta1.Cassandra) sc l.Info("Resource is not found in the k8s cluster. Closing Instaclustr status sync.", "namespaced name", namespacedName) r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.BackupsChecker)) + r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.UserCreator)) r.Scheduler.RemoveJob(cassandra.GetJobID(scheduler.StatusChecker)) return nil } @@ -1039,6 +1104,49 @@ func (r *CassandraReconciler) newWatchBackupsJob(cluster *v1beta1.Cassandra) sch } } +func (r *CassandraReconciler) newUsersCreationJob(c *v1beta1.Cassandra) scheduler.Job { + l := log.Log.WithValues("component", "cassandraUsersCreationJob") + + return func() error { + ctx := context.Background() + + err := r.Get(ctx, types.NamespacedName{ + Namespace: c.Namespace, + Name: c.Name, + }, c) + if err != nil { + if k8serrors.IsNotFound(err) { + return nil + } + return err + } + + if c.Status.State != models.RunningStatus { + l.Info("User creation job is scheduled") + r.EventRecorder.Event(c, models.Normal, models.CreationFailed, + "User creation job is scheduled, cluster is not in the running state") + return nil + } + + for _, ref := range c.Spec.UserRefs { + err = r.handleUsersCreate(ctx, l, c, ref) + if err != nil { + l.Error(err, "Failed to create a user for the cluster", "user ref", ref) + r.EventRecorder.Eventf(c, models.Warning, models.CreationFailed, + "Failed to create a user for the cluster. Reason: %v", err) + return err + } + } + + l.Info("User creation job successfully finished", "resource name", c.Name) + r.EventRecorder.Eventf(c, models.Normal, models.Created, "User creation job successfully finished") + + go r.Scheduler.RemoveJob(c.GetJobID(scheduler.UserCreator)) + + return nil + } +} + func (r *CassandraReconciler) listClusterBackups(ctx context.Context, clusterID, namespace string) (*clusterresourcesv1beta1.ClusterBackupList, error) { backupsList := &clusterresourcesv1beta1.ClusterBackupList{} listOpts := []client.ListOption{ @@ -1118,7 +1226,7 @@ func (r *CassandraReconciler) SetupWithManager(mgr ctrl.Manager) error { oldObj := event.ObjectOld.(*v1beta1.Cassandra) - r.handleUserEvent(newObj, oldObj.Spec.UserRef) + r.handleUserEvent(newObj, oldObj.Spec.UserRefs) newObj.Annotations[models.ResourceStateAnnotation] = models.UpdatingEvent return true