From 47d5addf28898f8f8598793f97d21528ee0690de Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 14:21:06 -0800 Subject: [PATCH 01/14] Add support upload collections Signed-off-by: Kevin Su --- .../pluginmachinery/flytek8s/pod_helper.go | 8 ++++++-- .../flytek8s/pod_helper_test.go | 10 +++++----- go/tasks/plugins/array/k8s/management.go | 2 +- go/tasks/plugins/array/k8s/subtask.go | 14 +++++++++++++ go/tasks/plugins/k8s/pod/plugin.go | 20 ++++++++++++++++++- 5 files changed, 45 insertions(+), 9 deletions(-) diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index e9736ff6a..151165de6 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -26,6 +26,9 @@ const Interrupted = "Interrupted" const SIGKILL = 137 const defaultContainerTemplateName = "default" const primaryContainerTemplateName = "primary" +const PrimaryContainerKey = "primary_container_name" +const FlyteCopilotName = "flyte_copilot_name" +const Sidecar = "sidecar" // ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified. func ApplyInterruptibleNodeSelectorRequirement(interruptible bool, affinity *v1.Affinity) { @@ -122,6 +125,7 @@ func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (* TaskExecMetadata: tCtx.TaskExecutionMetadata(), } c, err := ToK8sContainer(ctx, task.GetContainer(), task.Interface, templateParameters) + if err != nil { return nil, err } @@ -403,10 +407,10 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo return pluginsCore.PhaseInfoSuccess(&info), nil } -// DeterminePrimaryContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object +// DetermineContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object // corresponding to the phase of the primaryContainer which is identified using the provided name. // This is useful in case of sidecars or pod jobs, where Flyte will monitor successful exit of a single container. -func DeterminePrimaryContainerPhase(primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { +func DetermineContainerPhase(primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { for _, s := range statuses { if s.Name == primaryContainerName { if s.State.Waiting != nil || s.State.Running != nil { diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 716807452..6230a7646 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -950,7 +950,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { } var info = &pluginsCore.TaskInfo{} t.Run("primary container waiting", func(t *testing.T) { - phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -963,7 +963,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase()) }) t.Run("primary container running", func(t *testing.T) { - phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -976,7 +976,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase()) }) t.Run("primary container failed", func(t *testing.T) { - phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -993,7 +993,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, "foo failed", phaseInfo.Err().Message) }) t.Run("primary container succeeded", func(t *testing.T) { - phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -1006,7 +1006,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase()) }) t.Run("missing primary container", func(t *testing.T) { - phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, }, info) assert.Equal(t, pluginsCore.PhasePermanentFailure, phaseInfo.Phase()) diff --git a/go/tasks/plugins/array/k8s/management.go b/go/tasks/plugins/array/k8s/management.go index a29967ac1..ecdb752f2 100644 --- a/go/tasks/plugins/array/k8s/management.go +++ b/go/tasks/plugins/array/k8s/management.go @@ -187,7 +187,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil) } else { phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient) - + logger.Infof(ctx, "launchSubtask [%s]", perr) // if launchSubtask fails we attempt to deallocate the (previously allocated) // resource to mitigate leaks if perr != nil { diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index 5cdc80596..cd899a27a 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -160,15 +160,18 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor // launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config. func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) { + logger.Infof(ctx, "BuildResource BuildResource") o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx) pod := o.(*v1.Pod) if err != nil { + logger.Infof(ctx, "build resource with err [%v]", err) return pluginsCore.PhaseInfoUndefined, err } addMetadata(stCtx, cfg, config.GetK8sPluginConfig(), pod) // inject maptask specific container environment variables + logger.Infof(ctx, "pod.Spec.Containers pod.Spec.Containers") if len(pod.Spec.Containers) == 0 { return pluginsCore.PhaseInfoUndefined, stdErrors.Wrapf(ErrReplaceCmdTemplate, err, "No containers found in podSpec.") } @@ -185,6 +188,17 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf Value: strconv.Itoa(stCtx.originalIndex), }) + for sidecarIndex, container := range pod.Spec.Containers { + if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar { + for i, arg := range pod.Spec.Containers[sidecarIndex].Args { + if arg == "--to-output-prefix" { + pod.Spec.Containers[sidecarIndex].Args[i+1] = fmt.Sprintf("%s/%s", pod.Spec.Containers[sidecarIndex].Args[i+1], strconv.Itoa(stCtx.originalIndex)) + } + } + break + } + } + pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, arrayJobEnvVars...) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", pod.GetObjectKind().GroupVersionKind(), pod.GetNamespace(), pod.GetName()) diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 2b492dc5c..2b86de390 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -2,6 +2,8 @@ package pod import ( "context" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" @@ -85,6 +87,11 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu return nil, err } + if task.GetContainer().DataConfig != nil && task.GetContainer().DataConfig.Enabled { + pod.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName + pod.Annotations[flytek8s.FlyteCopilotName] = config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar + } + return pod, nil } @@ -137,8 +144,19 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil } + // When the copilot is running, we should wait until the data is uploaded by the copilot. + copilotContainerName, exists := r.GetAnnotations()[flytek8s.FlyteCopilotName] + logger.Infof(ctx, "copilotContainerName copilotContainerName [%v]", copilotContainerName) + logger.Infof(ctx, "exists exists [%v]", exists) + if exists { + copilotContainerPhase := flytek8s.DetermineContainerPhase(copilotContainerName, pod.Status.ContainerStatuses, &info) + if copilotContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { + return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, copilotContainerPhase.Info()), nil + } + } + // if the primary container annotation exists, we use the status of the specified container - primaryContainerPhase := flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) + primaryContainerPhase := flytek8s.DetermineContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) if primaryContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, primaryContainerPhase.Info()), nil } From 57a2d3a30d95dd1cde48e18794395c863d001433 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 14:41:57 -0800 Subject: [PATCH 02/14] updated Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/management.go | 2 +- go/tasks/plugins/array/k8s/subtask.go | 4 +--- go/tasks/plugins/k8s/pod/plugin.go | 6 +----- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/go/tasks/plugins/array/k8s/management.go b/go/tasks/plugins/array/k8s/management.go index ecdb752f2..a3dbd64c1 100644 --- a/go/tasks/plugins/array/k8s/management.go +++ b/go/tasks/plugins/array/k8s/management.go @@ -187,7 +187,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon phaseInfo = core.PhaseInfoWaitingForResourcesInfo(time.Now(), core.DefaultPhaseVersion, "Exceeded ResourceManager quota", nil) } else { phaseInfo, perr = launchSubtask(ctx, stCtx, config, kubeClient) - logger.Infof(ctx, "launchSubtask [%s]", perr) + logger.Infof(ctx, "Failed to launch subtask with error [%s]", perr) // if launchSubtask fails we attempt to deallocate the (previously allocated) // resource to mitigate leaks if perr != nil { diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index cd899a27a..8a2e906c4 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -3,6 +3,7 @@ package k8s import ( "context" "fmt" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" "regexp" "strconv" "strings" @@ -160,18 +161,15 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor // launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config. func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) { - logger.Infof(ctx, "BuildResource BuildResource") o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx) pod := o.(*v1.Pod) if err != nil { - logger.Infof(ctx, "build resource with err [%v]", err) return pluginsCore.PhaseInfoUndefined, err } addMetadata(stCtx, cfg, config.GetK8sPluginConfig(), pod) // inject maptask specific container environment variables - logger.Infof(ctx, "pod.Spec.Containers pod.Spec.Containers") if len(pod.Spec.Containers) == 0 { return pluginsCore.PhaseInfoUndefined, stdErrors.Wrapf(ErrReplaceCmdTemplate, err, "No containers found in podSpec.") } diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 2b86de390..85830abf1 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -2,10 +2,8 @@ package pod import ( "context" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/flyteorg/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyteplugins/go/tasks/logs" @@ -146,8 +144,6 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin // When the copilot is running, we should wait until the data is uploaded by the copilot. copilotContainerName, exists := r.GetAnnotations()[flytek8s.FlyteCopilotName] - logger.Infof(ctx, "copilotContainerName copilotContainerName [%v]", copilotContainerName) - logger.Infof(ctx, "exists exists [%v]", exists) if exists { copilotContainerPhase := flytek8s.DetermineContainerPhase(copilotContainerName, pod.Status.ContainerStatuses, &info) if copilotContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { From 79095a0a6dc86532dcde1175fa135d7978733539 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 9 Mar 2023 15:15:27 -0800 Subject: [PATCH 03/14] Bump flyte plugins Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/subtask.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index 79c630448..cbf7531d6 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -11,7 +11,6 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/errors" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/tasklog" From 2e76f2994b15e977fd245c164090e3e0bc0c0545 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 10 Mar 2023 14:07:55 -0800 Subject: [PATCH 04/14] nit Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/pod/plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 9266c0765..0d3cf88d9 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -133,7 +133,7 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu pod.ObjectMeta = *objectMeta pod.Spec = *podSpec - if taskTemplate.GetContainer().DataConfig != nil && taskTemplate.GetContainer().DataConfig.Enabled { + if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().DataConfig != nil && taskTemplate.GetContainer().DataConfig.Enabled { pod.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName pod.Annotations[flytek8s.FlyteCopilotName] = config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar } From d20137836713ba83acdd029cb1f83eb28b82ceb4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 10 Mar 2023 14:26:33 -0800 Subject: [PATCH 05/14] nit Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/subtask.go | 3 ++- go/tasks/plugins/k8s/pod/plugin.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index cbf7531d6..2d9e80569 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -3,12 +3,13 @@ package k8s import ( "context" "fmt" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" "regexp" "strconv" "strings" "time" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyteplugins/go/tasks/errors" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 0d3cf88d9..779c222ab 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -2,6 +2,7 @@ package pod import ( "context" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" pluginserrors "github.com/flyteorg/flyteplugins/go/tasks/errors" From b9551e36f999313efb04650af9696a45530fe65d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 23 Mar 2023 13:39:50 -0700 Subject: [PATCH 06/14] lint Signed-off-by: Kevin Su --- go/tasks/pluginmachinery/flytek8s/pod_helper.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 432c62135..52064653f 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -555,9 +555,9 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo // DetermineContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object // corresponding to the phase of the primaryContainer which is identified using the provided name. // This is useful in case of sidecars or pod jobs, where Flyte will monitor successful exit of a single container. -func DetermineContainerPhase(primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { +func DetermineContainerPhase(containerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { for _, s := range statuses { - if s.Name == primaryContainerName { + if s.Name == containerName { if s.State.Waiting != nil || s.State.Running != nil { return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info) } @@ -574,7 +574,7 @@ func DetermineContainerPhase(primaryContainerName string, statuses []v1.Containe // If for some reason we can't find the primary container, always just return a permanent failure return pluginsCore.PhaseInfoFailure("PrimaryContainerMissing", - fmt.Sprintf("Primary container [%s] not found in pod's container statuses", primaryContainerName), info) + fmt.Sprintf("Primary container [%s] not found in pod's container statuses", containerName), info) } // DemystifyFailure resolves the various Kubernetes pod failure modes to determine From b1be55681ad2e3f66159cf2c1e66cb3dd3991173 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 28 Mar 2023 15:46:32 -0700 Subject: [PATCH 07/14] address comment Signed-off-by: Kevin Su --- .../pluginmachinery/flytek8s/pod_helper.go | 8 ++++---- .../flytek8s/pod_helper_test.go | 10 +++++----- go/tasks/plugins/array/k8s/subtask.go | 16 ++++++++++++++++ go/tasks/plugins/k8s/pod/plugin.go | 18 +----------------- 4 files changed, 26 insertions(+), 26 deletions(-) diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 52064653f..03655cf83 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -552,12 +552,12 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo return pluginsCore.PhaseInfoSuccess(&info), nil } -// DetermineContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object +// DeterminePrimaryContainerPhase as the name suggests, given all the containers, will return a pluginsCore.PhaseInfo object // corresponding to the phase of the primaryContainer which is identified using the provided name. // This is useful in case of sidecars or pod jobs, where Flyte will monitor successful exit of a single container. -func DetermineContainerPhase(containerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { +func DeterminePrimaryContainerPhase(primaryContainerName string, statuses []v1.ContainerStatus, info *pluginsCore.TaskInfo) pluginsCore.PhaseInfo { for _, s := range statuses { - if s.Name == containerName { + if s.Name == primaryContainerName { if s.State.Waiting != nil || s.State.Running != nil { return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info) } @@ -574,7 +574,7 @@ func DetermineContainerPhase(containerName string, statuses []v1.ContainerStatus // If for some reason we can't find the primary container, always just return a permanent failure return pluginsCore.PhaseInfoFailure("PrimaryContainerMissing", - fmt.Sprintf("Primary container [%s] not found in pod's container statuses", containerName), info) + fmt.Sprintf("Primary container [%s] not found in pod's container statuses", primaryContainerName), info) } // DemystifyFailure resolves the various Kubernetes pod failure modes to determine diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 141007061..58a5e012a 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -949,7 +949,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { } var info = &pluginsCore.TaskInfo{} t.Run("primary container waiting", func(t *testing.T) { - phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -962,7 +962,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase()) }) t.Run("primary container running", func(t *testing.T) { - phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -975,7 +975,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase()) }) t.Run("primary container failed", func(t *testing.T) { - phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -992,7 +992,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, "foo failed", phaseInfo.Err().Message) }) t.Run("primary container succeeded", func(t *testing.T) { - phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, { Name: primaryContainerName, State: v1.ContainerState{ @@ -1005,7 +1005,7 @@ func TestDeterminePrimaryContainerPhase(t *testing.T) { assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase()) }) t.Run("missing primary container", func(t *testing.T) { - phaseInfo := DetermineContainerPhase(primaryContainerName, []v1.ContainerStatus{ + phaseInfo := DeterminePrimaryContainerPhase(primaryContainerName, []v1.ContainerStatus{ secondaryContainer, }, info) assert.Equal(t, pluginsCore.PhasePermanentFailure, phaseInfo.Phase()) diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index 2d9e80569..041a0e9c1 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -160,6 +160,18 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor return nil } +func addCopilotToPod(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, pod *v1.Pod) error { + taskTemplate, err := taskCtx.TaskReader().Read(ctx) + if err != nil { + logger.Warnf(ctx, "failed to read task information when trying to construct Pod, err: %s", err.Error()) + return err + } + if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().DataConfig != nil && taskTemplate.GetContainer().DataConfig.Enabled { + pod.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName + pod.Annotations[flytek8s.FlyteCopilotName] = config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar + } +} + // launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config. func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) { o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx) @@ -342,6 +354,10 @@ func getTaskContainerIndex(pod *v1.Pod) (int, error) { if len(pod.Spec.Containers) == 1 { return 0, nil } + // Copilot is always the second container if it is enabled. + if len(pod.Spec.Containers) == 2 && pod.Spec.Containers[1].Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar { + return 0, nil + } // For tasks with a K8sPod task target, they may produce multiple containers but at least one must be the designated primary. return -1, stdErrors.Errorf(ErrBuildPodTemplate, "Expected a specified primary container key when building an array job with a K8sPod spec target") diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 779c222ab..33f69b3c2 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -3,8 +3,6 @@ package pod import ( "context" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" - pluginserrors "github.com/flyteorg/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyteplugins/go/tasks/logs" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" @@ -134,11 +132,6 @@ func (p plugin) BuildResource(ctx context.Context, taskCtx pluginsCore.TaskExecu pod.ObjectMeta = *objectMeta pod.Spec = *podSpec - if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().DataConfig != nil && taskTemplate.GetContainer().DataConfig.Enabled { - pod.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName - pod.Annotations[flytek8s.FlyteCopilotName] = config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar - } - return pod, nil } @@ -191,17 +184,8 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil } - // When the copilot is running, we should wait until the data is uploaded by the copilot. - copilotContainerName, exists := r.GetAnnotations()[flytek8s.FlyteCopilotName] - if exists { - copilotContainerPhase := flytek8s.DetermineContainerPhase(copilotContainerName, pod.Status.ContainerStatuses, &info) - if copilotContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { - return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, copilotContainerPhase.Info()), nil - } - } - // if the primary container annotation exists, we use the status of the specified container - primaryContainerPhase := flytek8s.DetermineContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) + primaryContainerPhase := flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) if primaryContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, primaryContainerPhase.Info()), nil } From ea31d6c5da37aca803886d999ec8f7d568073a34 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 28 Mar 2023 15:47:56 -0700 Subject: [PATCH 08/14] address comment Signed-off-by: Kevin Su --- go/tasks/pluginmachinery/flytek8s/pod_helper.go | 1 - go/tasks/plugins/array/k8s/subtask.go | 12 ------------ 2 files changed, 13 deletions(-) diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 03655cf83..0e10808b7 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -31,7 +31,6 @@ const SIGKILL = 137 const defaultContainerTemplateName = "default" const primaryContainerTemplateName = "primary" const PrimaryContainerKey = "primary_container_name" -const FlyteCopilotName = "flyte_copilot_name" const Sidecar = "sidecar" // ApplyInterruptibleNodeSelectorRequirement configures the node selector requirement of the node-affinity using the configuration specified. diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index 041a0e9c1..e2392a97c 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -160,18 +160,6 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor return nil } -func addCopilotToPod(ctx context.Context, taskCtx pluginsCore.TaskExecutionContext, pod *v1.Pod) error { - taskTemplate, err := taskCtx.TaskReader().Read(ctx) - if err != nil { - logger.Warnf(ctx, "failed to read task information when trying to construct Pod, err: %s", err.Error()) - return err - } - if taskTemplate.GetContainer() != nil && taskTemplate.GetContainer().DataConfig != nil && taskTemplate.GetContainer().DataConfig.Enabled { - pod.Annotations[flytek8s.PrimaryContainerKey] = primaryContainerName - pod.Annotations[flytek8s.FlyteCopilotName] = config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar - } -} - // launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config. func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) { o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx) From 77e62b8da2db8e20616e952a28dbfb558ea8fe3f Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 28 Mar 2023 15:57:32 -0700 Subject: [PATCH 09/14] address comment Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/subtask.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/go/tasks/plugins/array/k8s/subtask.go b/go/tasks/plugins/array/k8s/subtask.go index e2392a97c..7fed8c704 100644 --- a/go/tasks/plugins/array/k8s/subtask.go +++ b/go/tasks/plugins/array/k8s/subtask.go @@ -160,6 +160,20 @@ func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCor return nil } +// updateCopilotArgs append array index to the end of the output prefix +func updateCopilotArgs(pod *v1.Pod, stCtx SubTaskExecutionContext) { + for sidecarIndex, container := range pod.Spec.Containers { + if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar { + for i, arg := range pod.Spec.Containers[sidecarIndex].Args { + if arg == "--to-output-prefix" { + pod.Spec.Containers[sidecarIndex].Args[i+1] = fmt.Sprintf("%s/%s", pod.Spec.Containers[sidecarIndex].Args[i+1], strconv.Itoa(stCtx.originalIndex)) + } + } + break + } + } +} + // launchSubtask creates a k8s pod defined by the SubTaskExecutionContext and Config. func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) (pluginsCore.PhaseInfo, error) { o, err := podPlugin.DefaultPodPlugin.BuildResource(ctx, stCtx) @@ -187,18 +201,8 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf Value: strconv.Itoa(stCtx.originalIndex), }) - for sidecarIndex, container := range pod.Spec.Containers { - if container.Name == config.GetK8sPluginConfig().CoPilot.NamePrefix+flytek8s.Sidecar { - for i, arg := range pod.Spec.Containers[sidecarIndex].Args { - if arg == "--to-output-prefix" { - pod.Spec.Containers[sidecarIndex].Args[i+1] = fmt.Sprintf("%s/%s", pod.Spec.Containers[sidecarIndex].Args[i+1], strconv.Itoa(stCtx.originalIndex)) - } - } - break - } - } - pod.Spec.Containers[containerIndex].Env = append(pod.Spec.Containers[containerIndex].Env, arrayJobEnvVars...) + updateCopilotArgs(pod, stCtx) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", pod.GetObjectKind().GroupVersionKind(), pod.GetNamespace(), pod.GetName()) err = kubeClient.GetClient().Create(ctx, pod) From 2a9d1322959e79168906d2e7ee9febb6d3deb4ec Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 29 Mar 2023 11:28:27 -0700 Subject: [PATCH 10/14] add a test Signed-off-by: Kevin Su --- .../array/k8s/subtask_exec_context_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go index 77389514e..5facb89aa 100644 --- a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go +++ b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go @@ -3,6 +3,7 @@ package k8s import ( "context" "fmt" + v1 "k8s.io/api/core/v1" "testing" podPlugin "github.com/flyteorg/flyteplugins/go/tasks/plugins/k8s/pod" @@ -36,3 +37,25 @@ func TestSubTaskExecutionContext(t *testing.T) { assert.Equal(t, storage.DataReference("/prefix/"), stCtx.OutputWriter().GetOutputPrefixPath()) assert.Equal(t, storage.DataReference("/raw_prefix/5/1"), stCtx.OutputWriter().GetRawOutputPrefix()) } + +func TestUpdateCopilotArgs(t *testing.T) { + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "flyte-copilot-sidecar", + Args: []string{"--to-output-prefix", "s3://bucket/key"}, + }, + }, + }, + } + ctx := context.Background() + + tCtx := getMockTaskExecutionContext(ctx, 0) + taskTemplate, err := tCtx.TaskReader().Read(ctx) + assert.Nil(t, err) + + stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0)) + updateCopilotArgs(pod, stCtx) + assert.Equal(t, pod.Spec.Containers[0].Args[1], "s3://bucket/key/5") +} From 4ec9d31ca83993123698a674155728ccf70c6128 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 29 Mar 2023 11:51:32 -0700 Subject: [PATCH 11/14] lint Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/subtask_exec_context_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go index 5facb89aa..4d45d0740 100644 --- a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go +++ b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go @@ -55,7 +55,8 @@ func TestUpdateCopilotArgs(t *testing.T) { taskTemplate, err := tCtx.TaskReader().Read(ctx) assert.Nil(t, err) - stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0)) + var stCtx SubTaskExecutionContext + stCtx, err = NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0)) updateCopilotArgs(pod, stCtx) assert.Equal(t, pod.Spec.Containers[0].Args[1], "s3://bucket/key/5") } From 06877a865bf20d5cb5155f28857a3bacf82ff9c6 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 29 Mar 2023 15:59:49 -0700 Subject: [PATCH 12/14] lint Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/subtask_exec_context_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go index 4d45d0740..6de79a3a2 100644 --- a/go/tasks/plugins/array/k8s/subtask_exec_context_test.go +++ b/go/tasks/plugins/array/k8s/subtask_exec_context_test.go @@ -3,13 +3,13 @@ package k8s import ( "context" "fmt" - v1 "k8s.io/api/core/v1" + "testing" podPlugin "github.com/flyteorg/flyteplugins/go/tasks/plugins/k8s/pod" + v1 "k8s.io/api/core/v1" "github.com/flyteorg/flytestdlib/storage" - "github.com/stretchr/testify/assert" ) @@ -55,8 +55,8 @@ func TestUpdateCopilotArgs(t *testing.T) { taskTemplate, err := tCtx.TaskReader().Read(ctx) assert.Nil(t, err) - var stCtx SubTaskExecutionContext - stCtx, err = NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0)) + stCtx, err := NewSubTaskExecutionContext(ctx, tCtx, taskTemplate, 0, 5, uint64(1), uint64(0)) + assert.Nil(t, err) updateCopilotArgs(pod, stCtx) assert.Equal(t, pod.Spec.Containers[0].Args[1], "s3://bucket/key/5") } From 10be909844b07c550a6dcee87958da4a327b105d Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 31 Mar 2023 01:16:20 -0700 Subject: [PATCH 13/14] test Signed-off-by: Kevin Su --- go/tasks/plugins/k8s/pod/plugin.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index d1c509fb7..095727f14 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -146,10 +146,12 @@ func (p plugin) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContex func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string) (pluginsCore.PhaseInfo, error) { pluginState := k8s.PluginState{} - _, err := pluginContext.PluginStateReader().Get(&pluginState) - if err != nil { - return pluginsCore.PhaseInfoUndefined, err - } + var err error + //_, err := pluginContext.PluginStateReader().Get(&pluginState) + //if err != nil { + // logger.Errorf(ctx, "kevin test [%v]", err) + // return pluginsCore.PhaseInfoUndefined, err + //} pod := r.(*v1.Pod) From b80c8a9251f7586739457b59dfe4a6496d3d9590 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Sun, 9 Apr 2023 12:27:03 -0700 Subject: [PATCH 14/14] more tests Signed-off-by: Kevin Su --- go/tasks/plugins/array/k8s/subtask_test.go | 40 ++++++++++++++++++++++ go/tasks/plugins/k8s/pod/plugin.go | 10 +++--- 2 files changed, 44 insertions(+), 6 deletions(-) create mode 100644 go/tasks/plugins/array/k8s/subtask_test.go diff --git a/go/tasks/plugins/array/k8s/subtask_test.go b/go/tasks/plugins/array/k8s/subtask_test.go new file mode 100644 index 000000000..19438f575 --- /dev/null +++ b/go/tasks/plugins/array/k8s/subtask_test.go @@ -0,0 +1,40 @@ +package k8s + +import ( + "testing" + + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" +) + +func TestGetTaskContainerTask(t *testing.T) { + pod := &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "PrimaryContainer", + }, + }, + }, + } + index, err := getTaskContainerIndex(pod) + assert.NilError(t, err) + assert.Equal(t, index, 0) + + pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{Name: config.GetK8sPluginConfig().CoPilot.NamePrefix + flytek8s.Sidecar}) + index, err = getTaskContainerIndex(pod) + assert.NilError(t, err) + assert.Equal(t, index, 0) + + pod.Annotations = map[string]string{flytek8s.PrimaryContainerKey: "PrimaryContainer"} + index, err = getTaskContainerIndex(pod) + assert.NilError(t, err) + assert.Equal(t, index, 0) + + pod.Spec.Containers[0].Name = "SecondaryContainer" + _, err = getTaskContainerIndex(pod) + assert.ErrorContains(t, err, "Couldn't find any container matching the primary container") + +} diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 095727f14..d1c509fb7 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -146,12 +146,10 @@ func (p plugin) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContex func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string) (pluginsCore.PhaseInfo, error) { pluginState := k8s.PluginState{} - var err error - //_, err := pluginContext.PluginStateReader().Get(&pluginState) - //if err != nil { - // logger.Errorf(ctx, "kevin test [%v]", err) - // return pluginsCore.PhaseInfoUndefined, err - //} + _, err := pluginContext.PluginStateReader().Get(&pluginState) + if err != nil { + return pluginsCore.PhaseInfoUndefined, err + } pod := r.(*v1.Pod)