Skip to content

Commit

Permalink
[YUNIKORN-2475] Add support for K8s native sidecars (restartable init…
Browse files Browse the repository at this point in the history
… containers) (apache#801)

Closes: apache#801

Signed-off-by: Peter Bacsko <[email protected]>
  • Loading branch information
pbacsko committed Mar 20, 2024
1 parent 4dbf99b commit bcd8cb5
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 19 deletions.
35 changes: 29 additions & 6 deletions pkg/common/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ func GetPodResource(pod *v1.Pod) (resource *si.Resource) {
}

// each resource compare between initcontainer and sum of containers
// max(sum(Containers requirement), InitContainers requirement)
// InitContainers(i) requirement=sum(Sidecar requirement i-1)+InitContainer(i) request
// max(sum(Containers requirement)+sum(Sidecar requirement), InitContainer(i) requirement)
if pod.Spec.InitContainers != nil {
checkInitContainerRequest(pod, podResource)
podResource = checkInitContainerRequest(pod, podResource)
}

// K8s pod EnableOverHead from:
Expand Down Expand Up @@ -128,22 +129,44 @@ func getMaxResource(left v1.ResourceList, right v1.ResourceList) *si.Resource {
return combined
}

func checkInitContainerRequest(pod *v1.Pod, containersResources *si.Resource) {
func checkInitContainerRequest(pod *v1.Pod, containersResources *si.Resource) *si.Resource {
updatedRes := containersResources

// update total pod resource usage with sidecar containers
for _, c := range pod.Spec.InitContainers {
if isSideCarContainer(c) {
resourceList := c.Resources.Requests
sideCarResources := getResource(resourceList)
updatedRes = Add(updatedRes, sideCarResources)
}
}

var sideCarRequests *si.Resource // cumulative value of sidecar requests so far
for _, c := range pod.Spec.InitContainers {
resourceList := c.Resources.Requests
ICResource := getResource(resourceList)
if isSideCarContainer(c) {
sideCarRequests = Add(sideCarRequests, ICResource)
}
ICResource = Add(ICResource, sideCarRequests)
for resourceName, ICRequest := range ICResource.Resources {
containersRequests, exist := containersResources.Resources[resourceName]
containersRequests, exist := updatedRes.Resources[resourceName]
// addtional resource request from init cont, add it to request.
if !exist {
containersResources.Resources[resourceName] = ICRequest
updatedRes.Resources[resourceName] = ICRequest
continue
}
if ICRequest.GetValue() > containersRequests.GetValue() {
containersResources.Resources[resourceName] = ICRequest
updatedRes.Resources[resourceName] = ICRequest
}
}
}

return updatedRes
}

func isSideCarContainer(c v1.Container) bool {
return c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways
}

func GetNodeResource(nodeStatus *v1.NodeStatus) *si.Resource {
Expand Down
116 changes: 103 additions & 13 deletions pkg/common/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
apis "k8s.io/apimachinery/pkg/apis/meta/v1"
k8res "k8s.io/kubernetes/pkg/api/v1/resource"

siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
Expand Down Expand Up @@ -147,13 +148,13 @@ func TestEquals(t *testing.T) {
}

func TestParsePodResource(t *testing.T) {
containers := make([]v1.Container, 0)
containers := make([]v1.Container, 0, 2)

// container 01
c1Resources := make(map[v1.ResourceName]resource.Quantity)
c1Resources[v1.ResourceMemory] = resource.MustParse("500M")
c1Resources[v1.ResourceCPU] = resource.MustParse("1")
c1Resources[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("1")
c1Resources["nvidia.com/gpu"] = resource.MustParse("1")
containers = append(containers, v1.Container{
Name: "container-01",
Resources: v1.ResourceRequirements{
Expand All @@ -165,7 +166,7 @@ func TestParsePodResource(t *testing.T) {
c2Resources := make(map[v1.ResourceName]resource.Quantity)
c2Resources[v1.ResourceMemory] = resource.MustParse("1024M")
c2Resources[v1.ResourceCPU] = resource.MustParse("2")
c2Resources[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("4")
c2Resources["nvidia.com/gpu"] = resource.MustParse("4")
containers = append(containers, v1.Container{
Name: "container-02",
Resources: v1.ResourceRequirements{
Expand Down Expand Up @@ -194,28 +195,31 @@ func TestParsePodResource(t *testing.T) {
assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(3000))
assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(5))
assert.Equal(t, res.Resources["pods"].GetValue(), int64(1))
resK8s := siResourceFromList(k8res.PodRequests(pod, k8res.PodResourcesOptions{}))
assert.Assert(t, Equals(resK8s, res), "K8s pod resource request calculation yielded different results")

// Add pod OverHead
overHeadResources := make(map[v1.ResourceName]resource.Quantity)
overHeadResources[v1.ResourceMemory] = resource.MustParse("500M")
overHeadResources[v1.ResourceCPU] = resource.MustParse("1")
overHeadResources[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("1")
overHeadResources["nvidia.com/gpu"] = resource.MustParse("1")
// Set pod OverHead
pod.Spec.Overhead = overHeadResources

// verify we get aggregated resource from containers
res = GetPodResource(pod)
assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(2024*1000*1000))
assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(4000))
assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(6))
assert.Equal(t, res.Resources["pods"].GetValue(), int64(1))
resK8s = siResourceFromList(k8res.PodRequests(pod, k8res.PodResourcesOptions{ExcludeOverhead: false}))
assert.Assert(t, Equals(resK8s, res), "K8s pod resource request calculation yielded different results")

// test initcontainer and container resouce compare
initContainers := make([]v1.Container, 0)
initContainers := make([]v1.Container, 0, 2)
initc1Resources := make(map[v1.ResourceName]resource.Quantity)
initc1Resources[v1.ResourceMemory] = resource.MustParse("4096M")
initc1Resources[v1.ResourceCPU] = resource.MustParse("0.5")
initc1Resources[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("1")
initc1Resources["nvidia.com/gpu"] = resource.MustParse("1")
initContainers = append(initContainers, v1.Container{
Name: "initcontainer-01",
Resources: v1.ResourceRequirements{
Expand All @@ -226,7 +230,7 @@ func TestParsePodResource(t *testing.T) {
initc2Resources := make(map[v1.ResourceName]resource.Quantity)
initc2Resources[v1.ResourceMemory] = resource.MustParse("10000M")
initc2Resources[v1.ResourceCPU] = resource.MustParse("5.12")
initc2Resources[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("4")
initc2Resources["nvidia.com/gpu"] = resource.MustParse("4")
initContainers = append(initContainers, v1.Container{
Name: "initcontainer-02",
Resources: v1.ResourceRequirements{
Expand All @@ -236,11 +240,11 @@ func TestParsePodResource(t *testing.T) {

containers[0].Resources.Requests[v1.ResourceMemory] = resource.MustParse("2000M")
containers[0].Resources.Requests[v1.ResourceCPU] = resource.MustParse("4.096")
containers[0].Resources.Requests[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("2")
containers[0].Resources.Requests["nvidia.com/gpu"] = resource.MustParse("2")

containers[1].Resources.Requests[v1.ResourceMemory] = resource.MustParse("5000M")
containers[1].Resources.Requests[v1.ResourceCPU] = resource.MustParse("1.024")
containers[1].Resources.Requests[v1.ResourceName("nvidia.com/gpu")] = resource.MustParse("2")
containers[1].Resources.Requests["nvidia.com/gpu"] = resource.MustParse("2")

pod.ObjectMeta = apis.ObjectMeta{
Name: "pod-resource-test-00002",
Expand All @@ -262,11 +266,13 @@ func TestParsePodResource(t *testing.T) {
assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(5120))
assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(4))
assert.Equal(t, res.Resources["pods"].GetValue(), int64(1))
resK8s = siResourceFromList(k8res.PodRequests(pod, k8res.PodResourcesOptions{}))
assert.Assert(t, Equals(resK8s, res), "K8s pod resource request calculation yielded different results")

delete(containers[0].Resources.Requests, v1.ResourceName("nvidia.com/gpu"))
delete(containers[1].Resources.Requests, v1.ResourceName("nvidia.com/gpu"))
delete(containers[0].Resources.Requests, "nvidia.com/gpu")
delete(containers[1].Resources.Requests, "nvidia.com/gpu")
delete(initContainers[1].Resources.Requests, v1.ResourceCPU)
delete(initContainers[1].Resources.Requests, v1.ResourceName("nvidia.com/gpu"))
delete(initContainers[1].Resources.Requests, "nvidia.com/gpu")
pod.Spec = v1.PodSpec{
Containers: containers,
InitContainers: initContainers,
Expand All @@ -280,6 +286,90 @@ func TestParsePodResource(t *testing.T) {
assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(5120))
assert.Equal(t, res.Resources["nvidia.com/gpu"].GetValue(), int64(1))
assert.Equal(t, res.Resources["pods"].GetValue(), int64(1))
resK8s = siResourceFromList(k8res.PodRequests(pod, k8res.PodResourcesOptions{}))
assert.Assert(t, Equals(resK8s, res), "K8s pod resource request calculation yielded different results")

// single restartable init container (sidecar) {mem,CPU}
// IC1{10M, 1000mi}
// C1{2000M, 4096mi}
// C2{5000M, 1024mi}
// sum of containers{7010M, 6120mi}
initContainers = initContainers[:0]
alwaysRestart := v1.ContainerRestartPolicyAlways
smallResources := map[v1.ResourceName]resource.Quantity{
v1.ResourceMemory: resource.MustParse("10M"),
v1.ResourceCPU: resource.MustParse("1"),
}
initContainers = append(initContainers, v1.Container{
Name: "container-04",
Resources: v1.ResourceRequirements{
Requests: smallResources,
},
RestartPolicy: &alwaysRestart,
})
pod.Spec.InitContainers = initContainers
res = GetPodResource(pod)
assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(7010000000))
assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(6120))
assert.Equal(t, res.Resources["pods"].GetValue(), int64(1))
resK8s = siResourceFromList(k8res.PodRequests(pod, k8res.PodResourcesOptions{}))
assert.Assert(t, Equals(resK8s, res), "K8s pod resource request calculation yielded different results")

// restartable + non-restartable init containers {mem,CPU}
// C1{2000M, 4096mi}
// C2{5000M, 1024mi}
// IC1{10M, 1000mi} sidecar
// IC2{10M, 1000mi} sidecar
// IC3{4096M, 10000mi}
// usage calculation:
// When IC3 starts, necessary resource is Req1=IC1+IC2+IC3 {4116m, 12000mi}
// After IC3 completed, necessary resource is Req2=IC1,IC2,C1,C2 {7020m, 7120mi}
// Resource request for pod is max(Req1, Req2): {7020m, 12000m}
initContainers = initContainers[:0]
initContainers = append(initContainers, v1.Container{
Name: "container-05",
Resources: v1.ResourceRequirements{
Requests: smallResources,
},
RestartPolicy: &alwaysRestart,
})
initContainers = append(initContainers, v1.Container{
Name: "container-06",
Resources: v1.ResourceRequirements{
Requests: smallResources,
},
RestartPolicy: &alwaysRestart,
})
initContainers = append(initContainers, v1.Container{
Name: "container-07",
Resources: v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceMemory: resource.MustParse("4096M"),
v1.ResourceCPU: resource.MustParse("10"),
},
},
})
pod.Spec.InitContainers = initContainers
res = GetPodResource(pod)
assert.Equal(t, res.Resources[siCommon.Memory].GetValue(), int64(7020000000))
assert.Equal(t, res.Resources[siCommon.CPU].GetValue(), int64(12000))
assert.Equal(t, res.Resources["pods"].GetValue(), int64(1))
resK8s = siResourceFromList(k8res.PodRequests(pod, k8res.PodResourcesOptions{}))
assert.Assert(t, Equals(resK8s, res), "K8s pod resource request calculation yielded different results")
}

func siResourceFromList(list v1.ResourceList) *si.Resource {
builder := NewResourceBuilder()
for name, val := range list {
if name == v1.ResourceCPU {
builder.AddResource("vcore", val.MilliValue())
continue
}
builder.AddResource(name.String(), val.Value())
}

builder.AddResource("pods", 1)
return builder.Build()
}

//nolint:funlen
Expand Down

0 comments on commit bcd8cb5

Please sign in to comment.