Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add support raw container in the map task #329

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const SIGKILL = 137
const defaultContainerTemplateName = "default"
const primaryContainerTemplateName = "primary"
const PrimaryContainerKey = "primary_container_name"
const Sidecar = "sidecar"

// ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified.
func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity) {
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil)
} else {
phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient)

logger.Infof(ctx, "Failed to launch subtask with error [%s]", perr)
// if launchSubtask fails we attempt to deallocate the (previously allocated)
// resource to mitigate leaks
if perr != nil {
Expand Down
22 changes: 21 additions & 1 deletion go/tasks/plugins/array/k8s/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"strings"
"time"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog"
Expand Down Expand Up @@ -159,6 +160,20 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor
return nil
}

// updateCopilotArgs append array index to the end of the output prefix
func updateCopilotArgs(pod *v1.Pod, stCtx SubTaskExecutionContext) {
for sidecarIndex, container := range pod.Spec.Containers {
if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
for i, arg := range pod.Spec.Containers[sidecarIndex].Args {
if arg == "--to-output-prefix" {
pod.Spec.Containers[sidecarIndex].Args[i+1] = fmt.Sprintf("%s/%s", pod.Spec.Containers[sidecarIndex].Args[i+1], strconv.Itoa(stCtx.originalIndex))
}
}
break
}
}
}

// launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config.
func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) {
o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx)
Expand Down Expand Up @@ -187,6 +202,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf
})

pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, arrayJobEnvVars...)
updateCopilotArgs(pod, stCtx)

logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", pod.GetObjectKind().GroupVersionKind(), pod.GetNamespace(), pod.GetName())
err = kubeClient.GetClient().Create(ctx, pod)
Expand Down Expand Up @@ -330,6 +346,10 @@ func getTaskContainerIndex(pod *v1.Pod) (int, error) {
if len(pod.Spec.Containers) == 1 {
return 0, nil
}
// Copilot is always the second container if it is enabled.
if len(pod.Spec.Containers) == 2 && pod.Spec.Containers[1].Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is reasonable to assume this. Could there a scenario where the second container was not copilot?

return 0, nil
}
// For tasks with a K8sPod task target, they may produce multiple containers but at least one must be the designated primary.
return -1, stdErrors.Errorf(ErrBuildPodTemplate, "Expected a specified primary container key when building an array job with a K8sPod spec target")

Expand Down
26 changes: 25 additions & 1 deletion go/tasks/plugins/array/k8s/subtask_exec_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package k8s
import (
"context"
"fmt"

"testing"

podPlugin "github.com/flyteorg/flyteplugins/go/tasks/plugins/k8s/pod"
v1 "k8s.io/api/core/v1"

"github.com/flyteorg/flytestdlib/storage"

"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -36,3 +37,26 @@ func TestSubTaskExecutionContext(t *testing.T) {
assert.Equal(t, storage.DataReference("/prefix/"), stCtx.OutputWriter().GetOutputPrefixPath())
assert.Equal(t, storage.DataReference("/raw_prefix/5/1"), stCtx.OutputWriter().GetRawOutputPrefix())
}

func TestUpdateCopilotArgs(t *testing.T) {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "flyte-copilot-sidecar",
Args: []string{"--to-output-prefix", "s3://bucket/key"},
},
},
},
}
ctx := context.Background()

tCtx := getMockTaskExecutionContext(ctx, 0)
taskTemplate, err := tCtx.TaskReader().Read(ctx)
assert.Nil(t, err)

stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0))
assert.Nil(t, err)
updateCopilotArgs(pod, stCtx)
assert.Equal(t, pod.Spec.Containers[0].Args[1], "s3://bucket/key/5")
}
40 changes: 40 additions & 0 deletions go/tasks/plugins/array/k8s/subtask_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package k8s

import (
"testing"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"gotest.tools/assert"
v1 "k8s.io/api/core/v1"
)

func TestGetTaskContainerTask(t *testing.T) {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "PrimaryContainer",
},
},
},
}
index, err := getTaskContainerIndex(pod)
assert.NilError(t, err)
assert.Equal(t, index, 0)

pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar})
index, err = getTaskContainerIndex(pod)
assert.NilError(t, err)
assert.Equal(t, index, 0)

pod.Annotations = map[string]string{flytek8s.PrimaryContainerKey: "PrimaryContainer"}
index, err = getTaskContainerIndex(pod)
assert.NilError(t, err)
assert.Equal(t, index, 0)

pod.Spec.Containers[0].Name = "SecondaryContainer"
_, err = getTaskContainerIndex(pod)
assert.ErrorContains(t, err, "Couldn't find any container matching the primary container")

}