diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index ee26ce4dc..3412b92ab 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -31,6 +31,7 @@ const SIGKILL = 137 const defaultContainerTemplateName = "default" const primaryContainerTemplateName = "primary" const PrimaryContainerKey = "primary_container_name" +const Sidecar = "sidecar" // ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified. func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity) { diff --git a/go/tasks/plugins/array/k8s/management.go b/go/tasks/plugins/array/k8s/management.go index 32a3970b4..bfb81491f 100644 --- a/go/tasks/plugins/array/k8s/management.go +++ b/go/tasks/plugins/array/k8s/management.go @@ -187,7 +187,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil) } else { phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient) - + logger.Infof(ctx, "Failed to launch subtask with error [%s]", perr) // if launchSubtask fails we attempt to deallocate the (previously allocated) // resource to mitigate leaks if perr != nil { diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index 6411f06c0..7fed8c704 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -8,9 +8,10 @@ import ( "strings" "time" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyteplugins/go/tasks/errors" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog" @@ -159,6 +160,20 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor return nil } +// updateCopilotArgs append array index to the end of the output prefix +func updateCopilotArgs(pod *v1.Pod, stCtx SubTaskExecutionContext) { + for sidecarIndex, container := range pod.Spec.Containers { + if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar { + for i, arg := range pod.Spec.Containers[sidecarIndex].Args { + if arg == "--to-output-prefix" { + pod.Spec.Containers[sidecarIndex].Args[i+1] = fmt.Sprintf("%s/%s", pod.Spec.Containers[sidecarIndex].Args[i+1], strconv.Itoa(stCtx.originalIndex)) + } + } + break + } + } +} + // launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config. func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) { o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx) @@ -187,6 +202,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf }) pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, arrayJobEnvVars...) + updateCopilotArgs(pod, stCtx) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", pod.GetObjectKind().GroupVersionKind(), pod.GetNamespace(), pod.GetName()) err = kubeClient.GetClient().Create(ctx, pod) @@ -330,6 +346,10 @@ func getTaskContainerIndex(pod *v1.Pod) (int, error) { if len(pod.Spec.Containers) == 1 { return 0, nil } + // Copilot is always the second container if it is enabled. + if len(pod.Spec.Containers) == 2 && pod.Spec.Containers[1].Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar { + return 0, nil + } // For tasks with a K8sPod task target, they may produce multiple containers but at least one must be the designated primary. return -1, stdErrors.Errorf(ErrBuildPodTemplate, "Expected a specified primary container key when building an array job with a K8sPod spec target") diff --git a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go index 77389514e..6de79a3a2 100644 --- a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go +++ b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go @@ -3,12 +3,13 @@ package k8s import ( "context" "fmt" + "testing" podPlugin "github.com/flyteorg/flyteplugins/go/tasks/plugins/k8s/pod" + v1 "k8s.io/api/core/v1" "github.com/flyteorg/flytestdlib/storage" - "github.com/stretchr/testify/assert" ) @@ -36,3 +37,26 @@ func TestSubTaskExecutionContext(t *testing.T) { assert.Equal(t, storage.DataReference("/prefix/"), stCtx.OutputWriter().GetOutputPrefixPath()) assert.Equal(t, storage.DataReference("/raw_prefix/5/1"), stCtx.OutputWriter().GetRawOutputPrefix()) } + +func TestUpdateCopilotArgs(t *testing.T) { + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "flyte-copilot-sidecar", + Args: []string{"--to-output-prefix", "s3://bucket/key"}, + }, + }, + }, + } + ctx := context.Background() + + tCtx := getMockTaskExecutionContext(ctx, 0) + taskTemplate, err := tCtx.TaskReader().Read(ctx) + assert.Nil(t, err) + + stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0)) + assert.Nil(t, err) + updateCopilotArgs(pod, stCtx) + assert.Equal(t, pod.Spec.Containers[0].Args[1], "s3://bucket/key/5") +} diff --git a/go/tasks/plugins/array/k8s/subtask_test.go b/go/tasks/plugins/array/k8s/subtask_test.go new file mode 100644 index 000000000..19438f575 --- /dev/null +++ b/go/tasks/plugins/array/k8s/subtask_test.go @@ -0,0 +1,40 @@ +package k8s + +import ( + "testing" + + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" +) + +func TestGetTaskContainerTask(t *testing.T) { + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "PrimaryContainer", + }, + }, + }, + } + index, err := getTaskContainerIndex(pod) + assert.NilError(t, err) + assert.Equal(t, index, 0) + + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar}) + index, err = getTaskContainerIndex(pod) + assert.NilError(t, err) + assert.Equal(t, index, 0) + + pod.Annotations = map[string]string{flytek8s.PrimaryContainerKey: "PrimaryContainer"} + index, err = getTaskContainerIndex(pod) + assert.NilError(t, err) + assert.Equal(t, index, 0) + + pod.Spec.Containers[0].Name = "SecondaryContainer" + _, err = getTaskContainerIndex(pod) + assert.ErrorContains(t, err, "Couldn't find any container matching the primary container") + +}