Skip to content

Commit

Permalink
[YUNIKORN-1041] Fix binding of dynamic volumes to pod (#359)
Browse files Browse the repository at this point in the history
Closes: #359
  • Loading branch information
craigcondit committed Jan 26, 2022
1 parent 8118ecf commit 43a7871
Showing 1 changed file with 53 additions and 3 deletions.
56 changes: 53 additions & 3 deletions pkg/cache/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/volume/scheduling"

"github.com/apache/incubator-yunikorn-k8shim/pkg/appmgmt/interfaces"
schedulercache "github.com/apache/incubator-yunikorn-k8shim/pkg/cache/external"
Expand Down Expand Up @@ -356,19 +357,68 @@ func (ctx *Context) bindPodVolumes(pod *v1.Pod) error {
zap.String("podName", pod.Name))
} else {
log.Logger().Info("Binding Pod Volumes", zap.String("podName", pod.Name))
boundClaims, claimsToBind, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
boundClaims, claimsToBind, unboundClaimsImmediate, err := ctx.apiProvider.GetAPIs().VolumeBinder.GetPodVolumes(assumedPod)
if err != nil {
log.Logger().Error("Failed to get pod volumes",
zap.String("podName", assumedPod.Name),
zap.Error(err))
return err
}
if len(unboundClaimsImmediate) > 0 {
err = fmt.Errorf("pod %s has unbound immediate claims", pod.Name)
log.Logger().Error("Pod has unbound immediate claims",
zap.String("podName", assumedPod.Name),
zap.Error(err))
return err
}
node, err := ctx.schedulerCache.GetNodeInfo(assumedPod.Spec.NodeName)
if err != nil {
log.Logger().Error("Failed to get node info",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Error(err))
return err
}
volumes, reasons, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node)
if err != nil {
log.Logger().Error("Failed to find pod volumes",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("claimsToBind", len(claimsToBind)),
zap.Error(err))
return err
}
volumes, _, err := ctx.apiProvider.GetAPIs().VolumeBinder.FindPodVolumes(assumedPod, boundClaims, claimsToBind, node)
if len(reasons) > 0 {
sReasons := make([]string, 0)
for _, reason := range reasons {
sReasons = append(sReasons, string(reason))
}
sReason := strings.Join(sReasons, ", ")
err = fmt.Errorf("pod %s has conflicting volume claims: %s", pod.Name, sReason)
log.Logger().Error("Pod has conflicting volume claims",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("claimsToBind", len(claimsToBind)),
zap.Error(err))
return err
}
if volumes.StaticBindings == nil {
// convert nil to empty array
volumes.StaticBindings = make([]*scheduling.BindingInfo, 0)
}
if volumes.DynamicProvisions == nil {
// convert nil to empty array
volumes.DynamicProvisions = make([]*v1.PersistentVolumeClaim, 0)
}
err = ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes)
if err != nil {
log.Logger().Error("Failed to bind pod volumes",
zap.String("podName", assumedPod.Name),
zap.String("nodeName", assumedPod.Spec.NodeName),
zap.Int("dynamicProvisions", len(volumes.DynamicProvisions)),
zap.Int("staticBindings", len(volumes.StaticBindings)))
return err
}
return ctx.apiProvider.GetAPIs().VolumeBinder.BindPodVolumes(assumedPod, volumes)
}
}
return nil
Expand Down

0 comments on commit 43a7871

Please sign in to comment.