From ce0ec33fc4e43d9487240297ff10255df827e85e Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 22 Aug 2022 22:34:48 -0700 Subject: [PATCH 1/3] Prefix sub-lp exec id with the parent exec-id Signed-off-by: Haytham Abuelfutuh --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 14 ++++++- .../flyteworkflow/v1alpha1/node_status.go | 13 +++++++ pkg/controller/nodes/handler/state.go | 5 ++- .../nodes/handler/transition_info.go | 1 + pkg/controller/nodes/subworkflow/handler.go | 7 +++- .../nodes/subworkflow/launchplan.go | 39 ++++++++++++++++--- pkg/controller/nodes/subworkflow/util.go | 23 +++++++++++ pkg/controller/nodes/transformers.go | 1 + 8 files changed, 92 insertions(+), 11 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 26cfd2321..b71ebd925 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface { SetExecutionError(executionError *core.ExecutionError) } -// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus. -// p returns ExecutableBranchNodeStatus, which permits some mutations +// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the +// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations type ExecutableBranchNode interface { GetIf() ExecutableIfBlock GetElse() *NodeID @@ -246,6 +246,7 @@ type ExecutableBranchNode interface { } type ExecutableWorkflowNodeStatus interface { + Versioned GetWorkflowNodePhase() WorkflowNodePhase GetExecutionError() *core.ExecutionError } @@ -253,6 +254,7 @@ type ExecutableWorkflowNodeStatus interface { type MutableWorkflowNodeStatus interface { Mutable ExecutableWorkflowNodeStatus + MutableVersioned SetWorkflowNodePhase(phase WorkflowNodePhase) SetExecutionError(executionError *core.ExecutionError) } @@ -261,6 +263,14 @@ type Mutable interface { IsDirty() bool } +type Versioned interface { + GetVersion() uint32 +} + +type MutableVersioned interface { + SetVersion(version uint32) +} + type MutableNodeStatus interface { Mutable // Mutation API's diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index cc78b492a..72a57fb6b 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -17,6 +17,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type VersionedStruct struct { + version uint32 +} + +func (in *VersionedStruct) SetVersion(version uint32) { + in.version = version +} + +func (in VersionedStruct) GetVersion() uint32 { + return in.version +} + type MutableStruct struct { isDirty bool } @@ -158,6 +170,7 @@ const ( type WorkflowNodeStatus struct { MutableStruct + VersionedStruct Phase WorkflowNodePhase `json:"phase,omitempty"` ExecutionError *core.ExecutionError `json:"executionError,omitempty"` } diff --git a/pkg/controller/nodes/handler/state.go b/pkg/controller/nodes/handler/state.go index 2e456d817..3512a6c25 100644 --- a/pkg/controller/nodes/handler/state.go +++ b/pkg/controller/nodes/handler/state.go @@ -35,8 +35,9 @@ type DynamicNodeState struct { } type WorkflowNodeState struct { - Phase v1alpha1.WorkflowNodePhase - Error *core.ExecutionError + Phase v1alpha1.WorkflowNodePhase + Error *core.ExecutionError + Version uint32 } type NodeStateWriter interface { diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index d12ff9f8a..d9c31a60c 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -39,6 +39,7 @@ type DynamicNodeInfo struct { type WorkflowNodeInfo struct { LaunchedWorkflowID *core.WorkflowExecutionIdentifier + Version uint32 } type BranchNodeInfo struct { diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index bf7b5c393..7fefcd880 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -58,7 +58,12 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu return transition, err } - workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} + version := uint32(0) + if info := transition.Info().GetInfo(); info != nil && info.WorkflowNodeInfo != nil { + version = info.WorkflowNodeInfo.Version + } + + workflowNodeState := handler.WorkflowNodeState{Phase: newPhase, Version: version} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 8d14968d0..983e10508 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -22,6 +22,13 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) +type NodeStatusVersion uint32 + +const ( + NodeStatusVersion1 NodeStatusVersion = iota + NodeStatusVersion2 +) + type launchPlanHandler struct { launchPlan launchplan.Executor recoveryClient recovery.Client @@ -56,10 +63,11 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No if err != nil { return handler.UnknownTransition, err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDV2( parentNodeExecutionID, nCtx.CurrentAttempt(), ) + if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil } @@ -106,19 +114,38 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{ - WorkflowNodeInfo: &handler.WorkflowNodeInfo{LaunchedWorkflowID: childID}, + WorkflowNodeInfo: &handler.WorkflowNodeInfo{ + LaunchedWorkflowID: childID, + Version: uint32(NodeStatusVersion2), + }, })), nil } +func GetChildWorkflowExecutionForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { + // Handle launch plan + if nCtx.NodeStateReader().GetWorkflowNodeState().Version == uint32(NodeStatusVersion2) { + return GetChildWorkflowExecutionIDV2( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) + } + + return GetChildWorkflowExecutionID( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) +} + func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) if err != nil { return handler.UnknownTransition, err } + // Handle launch plan - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { @@ -203,9 +230,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx if err != nil { return err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { // THIS SHOULD NEVER HAPPEN diff --git a/pkg/controller/nodes/subworkflow/util.go b/pkg/controller/nodes/subworkflow/util.go index 0b5bf715b..cedef8469 100644 --- a/pkg/controller/nodes/subworkflow/util.go +++ b/pkg/controller/nodes/subworkflow/util.go @@ -22,3 +22,26 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem Name: name, }, nil } + +func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + if err != nil { + return nil, err + } + + // Restriction on name is 20 chars + return &core.WorkflowExecutionIdentifier{ + Project: nodeExecID.ExecutionId.Project, + Domain: nodeExecID.ExecutionId.Domain, + Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow), + }, nil +} + +func EnsureExecIDWithinLength(execID, subName string, maxLength int) string { + maxLengthRemaining := maxLength - len(subName) + if len(execID) < maxLengthRemaining { + return execID + subName + } + + return execID[:maxLengthRemaining] + subName +} diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 68f0d70bb..94c13735d 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -252,5 +252,6 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa t := s.GetOrCreateWorkflowStatus() t.SetWorkflowNodePhase(n.w.Phase) t.SetExecutionError(n.w.Error) + t.SetVersion(p.GetInfo().WorkflowNodeInfo.Version) } } From e73a7aa3cc9f2cff4c45cc0205e84eaf8867df5e Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 22 Aug 2022 22:41:36 -0700 Subject: [PATCH 2/3] cleanup Signed-off-by: Haytham Abuelfutuh --- pkg/apis/flyteworkflow/v1alpha1/iface.go | 10 ---------- pkg/apis/flyteworkflow/v1alpha1/node_status.go | 13 ------------- 2 files changed, 23 deletions(-) diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index b71ebd925..df1e42a20 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -246,7 +246,6 @@ type ExecutableBranchNode interface { } type ExecutableWorkflowNodeStatus interface { - Versioned GetWorkflowNodePhase() WorkflowNodePhase GetExecutionError() *core.ExecutionError } @@ -254,7 +253,6 @@ type ExecutableWorkflowNodeStatus interface { type MutableWorkflowNodeStatus interface { Mutable ExecutableWorkflowNodeStatus - MutableVersioned SetWorkflowNodePhase(phase WorkflowNodePhase) SetExecutionError(executionError *core.ExecutionError) } @@ -263,14 +261,6 @@ type Mutable interface { IsDirty() bool } -type Versioned interface { - GetVersion() uint32 -} - -type MutableVersioned interface { - SetVersion(version uint32) -} - type MutableNodeStatus interface { Mutable // Mutation API's diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 72a57fb6b..cc78b492a 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -17,18 +17,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -type VersionedStruct struct { - version uint32 -} - -func (in *VersionedStruct) SetVersion(version uint32) { - in.version = version -} - -func (in VersionedStruct) GetVersion() uint32 { - return in.version -} - type MutableStruct struct { isDirty bool } @@ -170,7 +158,6 @@ const ( type WorkflowNodeStatus struct { MutableStruct - VersionedStruct Phase WorkflowNodePhase `json:"phase,omitempty"` ExecutionError *core.ExecutionError `json:"executionError,omitempty"` } From 4fb8fb1526edb08674a2c197a8e54e884f9bac56 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Mon, 22 Aug 2022 22:43:10 -0700 Subject: [PATCH 3/3] cleanup Signed-off-by: Haytham Abuelfutuh --- pkg/controller/nodes/transformers.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 94c13735d..68f0d70bb 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -252,6 +252,5 @@ func UpdateNodeStatus(np v1alpha1.NodePhase, p handler.PhaseInfo, n *nodeStateMa t := s.GetOrCreateWorkflowStatus() t.SetWorkflowNodePhase(n.w.Phase) t.SetExecutionError(n.w.Error) - t.SetVersion(p.GetInfo().WorkflowNodeInfo.Version) } }