diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 26cfd2321..df1e42a20 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface { SetExecutionError(executionError *core.ExecutionError) } -// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus. -// p returns ExecutableBranchNodeStatus, which permits some mutations +// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the +// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations type ExecutableBranchNode interface { GetIf() ExecutableIfBlock GetElse() *NodeID diff --git a/pkg/controller/nodes/handler/state.go b/pkg/controller/nodes/handler/state.go index 2e456d817..3512a6c25 100644 --- a/pkg/controller/nodes/handler/state.go +++ b/pkg/controller/nodes/handler/state.go @@ -35,8 +35,9 @@ type DynamicNodeState struct { } type WorkflowNodeState struct { - Phase v1alpha1.WorkflowNodePhase - Error *core.ExecutionError + Phase v1alpha1.WorkflowNodePhase + Error *core.ExecutionError + Version uint32 } type NodeStateWriter interface { diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index d12ff9f8a..d9c31a60c 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -39,6 +39,7 @@ type DynamicNodeInfo struct { type WorkflowNodeInfo struct { LaunchedWorkflowID *core.WorkflowExecutionIdentifier + Version uint32 } type BranchNodeInfo struct { diff --git a/pkg/controller/nodes/subworkflow/handler.go b/pkg/controller/nodes/subworkflow/handler.go index bf7b5c393..7fefcd880 100644 --- a/pkg/controller/nodes/subworkflow/handler.go +++ b/pkg/controller/nodes/subworkflow/handler.go @@ -58,7 +58,12 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu return transition, err } - workflowNodeState := handler.WorkflowNodeState{Phase: newPhase} + version := uint32(0) + if info := transition.Info().GetInfo(); info != nil && info.WorkflowNodeInfo != nil { + version = info.WorkflowNodeInfo.Version + } + + workflowNodeState := handler.WorkflowNodeState{Phase: newPhase, Version: version} err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState) if err != nil { logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error()) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 8d14968d0..983e10508 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -22,6 +22,13 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) +type NodeStatusVersion uint32 + +const ( + NodeStatusVersion1 NodeStatusVersion = iota + NodeStatusVersion2 +) + type launchPlanHandler struct { launchPlan launchplan.Executor recoveryClient recovery.Client @@ -56,10 +63,11 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No if err != nil { return handler.UnknownTransition, err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDV2( parentNodeExecutionID, nCtx.CurrentAttempt(), ) + if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil } @@ -106,19 +114,38 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No } return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{ - WorkflowNodeInfo: &handler.WorkflowNodeInfo{LaunchedWorkflowID: childID}, + WorkflowNodeInfo: &handler.WorkflowNodeInfo{ + LaunchedWorkflowID: childID, + Version: uint32(NodeStatusVersion2), + }, })), nil } +func GetChildWorkflowExecutionForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { + // Handle launch plan + if nCtx.NodeStateReader().GetWorkflowNodeState().Version == uint32(NodeStatusVersion2) { + return GetChildWorkflowExecutionIDV2( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) + } + + return GetChildWorkflowExecutionID( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) +} + func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) if err != nil { return handler.UnknownTransition, err } + // Handle launch plan - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { @@ -203,9 +230,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx if err != nil { return err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { // THIS SHOULD NEVER HAPPEN diff --git a/pkg/controller/nodes/subworkflow/util.go b/pkg/controller/nodes/subworkflow/util.go index 0b5bf715b..cedef8469 100644 --- a/pkg/controller/nodes/subworkflow/util.go +++ b/pkg/controller/nodes/subworkflow/util.go @@ -22,3 +22,26 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem Name: name, }, nil } + +func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + if err != nil { + return nil, err + } + + // Restriction on name is 20 chars + return &core.WorkflowExecutionIdentifier{ + Project: nodeExecID.ExecutionId.Project, + Domain: nodeExecID.ExecutionId.Domain, + Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow), + }, nil +} + +func EnsureExecIDWithinLength(execID, subName string, maxLength int) string { + maxLengthRemaining := maxLength - len(subName) + if len(execID) < maxLengthRemaining { + return execID + subName + } + + return execID[:maxLengthRemaining] + subName +}