Skip to content

Commit

Permalink
Reconcile k8s CSIDriver resource
Browse files Browse the repository at this point in the history
Signed-off-by: nb-ohad <[email protected]>
  • Loading branch information
nb-ohad committed Jun 27, 2024
1 parent d935b17 commit beda133
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 8 deletions.
114 changes: 106 additions & 8 deletions internal/controller/driver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,26 @@ package controller

import (
"context"
"encoding/json"
"fmt"
"maps"
"reflect"
"regexp"
"slices"
"strings"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

csiv1a1 "github.com/ceph/ceph-csi-operator/api/v1alpha1"
"github.com/ceph/ceph-csi-operator/utils"
Expand All @@ -38,6 +46,9 @@ import (
//+kubebuilder:rbac:groups=csi.ceph.io,resources=drivers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=csi.ceph.io,resources=drivers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=csi.ceph.io,resources=drivers/finalizers,verbs=update
//+kubebuilder:rbac:groups=csi.ceph.io,resources=drivers,verbs=update

const ownerRefAnnotationKey = "csi.ceph.io/ownerref"

// A regexp used to parse driver short name and driver type from the
// driver's full name
Expand All @@ -63,8 +74,27 @@ type driverReconcile struct {

// SetupWithManager sets up the controller with the Manager.
func (r *DriverReconciler) SetupWithManager(mgr ctrl.Manager) error {

enqueueFromOwnerRefAnnotation := handler.EnqueueRequestsFromMapFunc(
func(_ context.Context, obj client.Object) []reconcile.Request {
ownerRef := obj.GetAnnotations()[ownerRefAnnotationKey]
ownerObjKey := client.ObjectKey{}
if err := json.Unmarshal([]byte(ownerRef), &ownerObjKey); err != nil {
return nil
}

return []reconcile.Request{{
NamespacedName: types.NamespacedName{
Name: ownerObjKey.Name,
Namespace: ownerObjKey.Namespace,
},
}}
},
)

return ctrl.NewControllerManagedBy(mgr).
For(&csiv1a1.Driver{}).
Watches(&storagev1.CSIDriver{}, enqueueFromOwnerRefAnnotation).
Complete(r)
}

Expand Down Expand Up @@ -95,10 +125,10 @@ func (r *driverReconcile) reconcile() (ctrl.Result, error) {
// Concurrently reconcile different aspects of the clusters actual state to meet
// the desired state defined on the driver object
errors := utils.RunConcurrently(
r.upsertPluginDeamonSet,
r.upsertProvisionerDeployment,
r.upsertK8sCSIDriver,
r.upsertLivnessService,
r.reconcileK8sCsiDriver,
r.reconcilePluginDeamonSet,
r.reconcileProvisionerDeployment,
r.reconcileLivnessService,
)

// Check if any reconcilatin error where raised during the concurrent execution
Expand All @@ -114,6 +144,21 @@ func (r *driverReconcile) reconcile() (ctrl.Result, error) {
}

func (r *driverReconcile) LoadAndValidateDesiredState() error {
// Validate that the requested name for the CSI driver isn't already claimed by an existing CSI driver
// (Can happen if a driver with an identical name was created in a different namespace)
if err := r.Get(
r.ctx,
client.ObjectKey{Name: r.driver.Name},
&storagev1.CSIDriver{},
); client.IgnoreNotFound(err) != nil {
if errors.IsAlreadyExists(err) {
r.log.Error(err, "Desired name already in use by a different CSI Driver", "name", r.driver.Name)
} else {
r.log.Error(err, "Failed to query the existence of a CSI Driver", "name", r.driver.Name)
}
return err
}

// Load operator configuration resource
opConfig := csiv1a1.OperatorConfig{}
opConfig.Name = operatorConfigName
Expand Down Expand Up @@ -159,19 +204,72 @@ func (r *driverReconcile) LoadAndValidateDesiredState() error {
return nil
}

func (r *driverReconcile) upsertPluginDeamonSet() error {
func (r *driverReconcile) reconcileK8sCsiDriver() error {
existingCsiDriver := &storagev1.CSIDriver{}
existingCsiDriver.Name = r.driver.Name

log := r.log.WithValues("driverName", existingCsiDriver.Name)
log.Info("Reconciling CSI Driver resource")

if err := r.Get(r.ctx, client.ObjectKeyFromObject(existingCsiDriver), existingCsiDriver); client.IgnoreNotFound(err) != nil {
log.Error(err, "Failed to load CSI Driver resource")
return err
}

desiredCsiDriver := existingCsiDriver.DeepCopy()
desiredCsiDriver.Spec = storagev1.CSIDriverSpec{
AttachRequired: r.driver.Spec.AttachRequired,
PodInfoOnMount: ptr.To(false),
FSGroupPolicy: &r.driver.Spec.FsGroupPolicy,
}

ownerObjKey := client.ObjectKeyFromObject(&r.driver)
if bytes, err := json.Marshal(ownerObjKey); err != nil {
log.Error(
err,
"Failed to JSON marshal owner obj key for CSI driver resource",
"ownerObjKey",
ownerObjKey,
)
return err
} else {
utils.AddAnnotation(desiredCsiDriver, ownerRefAnnotationKey, string(bytes))
}

if existingCsiDriver.UID == "" || !reflect.DeepEqual(desiredCsiDriver, existingCsiDriver) {
if existingCsiDriver.UID != "" {
log.Info("CSI Driver resource exist but does not meet desired state")
if err := r.Delete(r.ctx, existingCsiDriver); err != nil {
log.Error(err, "Failed to delete existing CSI Driver resource")
return err
}
log.Info("CSI Driver resource deleted successfully")
} else {
log.Info("CSI Driver resource does not exist")
}

if err := r.Create(r.ctx, desiredCsiDriver); err != nil {
log.Error(err, "Failed to create a CSI Driver resource")
return err
}

log.Info("CSI Driver resource created successfully")
} else {
log.Info("CSI Driver resource already meets desired state")
}

return nil
}

func (r *driverReconcile) upsertProvisionerDeployment() error {
func (r *driverReconcile) reconcilePluginDeamonSet() error {
return nil
}

func (r *driverReconcile) upsertK8sCSIDriver() error {
func (r *driverReconcile) reconcileProvisionerDeployment() error {
return nil
}

func (r *driverReconcile) upsertLivnessService() error {
func (r *driverReconcile) reconcileLivnessService() error {
return nil
}

Expand Down
16 changes: 16 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package utils

import (
"sync"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func RunConcurrently(fnList ...func() error) chan error {
Expand Down Expand Up @@ -51,3 +53,17 @@ func ChannelToSlice[T any](c chan T) []T {
}
return list
}

// AddAnnotation adds an annotation to a resource metadata, returns true if added else false
func AddAnnotation(obj metav1.Object, key string, value string) bool {
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
obj.SetAnnotations(annotations)
}
if oldValue, exist := annotations[key]; !exist || oldValue != value {
annotations[key] = value
return true
}
return false
}

0 comments on commit beda133

Please sign in to comment.