Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Prefix sub-lp exec id with the parent exec-id #474

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface {
SetExecutionError(executionError *core.ExecutionError)
}

// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus.
// p returns ExecutableBranchNodeStatus, which permits some mutations
// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the
// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations
type ExecutableBranchNode interface {
GetIf() ExecutableIfBlock
GetElse() *NodeID
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/nodes/handler/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ type DynamicNodeState struct {
}

type WorkflowNodeState struct {
Phase v1alpha1.WorkflowNodePhase
Error *core.ExecutionError
Phase v1alpha1.WorkflowNodePhase
Error *core.ExecutionError
Version uint32
}

type NodeStateWriter interface {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type DynamicNodeInfo struct {

type WorkflowNodeInfo struct {
LaunchedWorkflowID *core.WorkflowExecutionIdentifier
Version uint32
}

type BranchNodeInfo struct {
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/nodes/subworkflow/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func (w *workflowNodeHandler) Handle(ctx context.Context, nCtx handler.NodeExecu
return transition, err
}

workflowNodeState := handler.WorkflowNodeState{Phase: newPhase}
version := uint32(0)
if info := transition.Info().GetInfo(); info != nil && info.WorkflowNodeInfo != nil {
version = info.WorkflowNodeInfo.Version
}

workflowNodeState := handler.WorkflowNodeState{Phase: newPhase, Version: version}
err = nCtx.NodeStateWriter().PutWorkflowNodeState(workflowNodeState)
if err != nil {
logger.Errorf(ctx, "Failed to store WorkflowNodeState, err :%s", err.Error())
Expand Down
39 changes: 33 additions & 6 deletions pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import (
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
)

type NodeStatusVersion uint32

const (
NodeStatusVersion1 NodeStatusVersion = iota
NodeStatusVersion2
)

type launchPlanHandler struct {
launchPlan launchplan.Executor
recoveryClient recovery.Client
Expand Down Expand Up @@ -56,10 +63,11 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No
if err != nil {
return handler.UnknownTransition, err
}
childID, err := GetChildWorkflowExecutionID(
childID, err := GetChildWorkflowExecutionIDV2(
parentNodeExecutionID,
nCtx.CurrentAttempt(),
)

if err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil
}
Expand Down Expand Up @@ -106,19 +114,38 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No
}

return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoRunning(&handler.ExecutionInfo{
WorkflowNodeInfo: &handler.WorkflowNodeInfo{LaunchedWorkflowID: childID},
WorkflowNodeInfo: &handler.WorkflowNodeInfo{
LaunchedWorkflowID: childID,
Version: uint32(NodeStatusVersion2),
},
})), nil
}

func GetChildWorkflowExecutionForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) {
// Handle launch plan
if nCtx.NodeStateReader().GetWorkflowNodeState().Version == uint32(NodeStatusVersion2) {
return GetChildWorkflowExecutionIDV2(
parentNodeExecID,
nCtx.CurrentAttempt(),
)
}

return GetChildWorkflowExecutionID(
parentNodeExecID,
nCtx.CurrentAttempt(),
)
}

func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) {
parentNodeExecutionID, err := getParentNodeExecutionID(nCtx)
if err != nil {
return handler.UnknownTransition, err
}

// Handle launch plan
childID, err := GetChildWorkflowExecutionID(
childID, err := GetChildWorkflowExecutionForExecution(
parentNodeExecutionID,
nCtx.CurrentAttempt(),
nCtx,
)

if err != nil {
Expand Down Expand Up @@ -203,9 +230,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx
if err != nil {
return err
}
childID, err := GetChildWorkflowExecutionID(
childID, err := GetChildWorkflowExecutionForExecution(
parentNodeExecutionID,
nCtx.CurrentAttempt(),
nCtx,
)
if err != nil {
// THIS SHOULD NEVER HAPPEN
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/nodes/subworkflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,26 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem
Name: name,
}, nil
}

func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) {
name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt)))
if err != nil {
return nil, err
}

// Restriction on name is 20 chars
return &core.WorkflowExecutionIdentifier{
Project: nodeExecID.ExecutionId.Project,
Domain: nodeExecID.ExecutionId.Domain,
Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow),
}, nil
}

func EnsureExecIDWithinLength(execID, subName string, maxLength int) string {
maxLengthRemaining := maxLength - len(subName)
if len(execID) < maxLengthRemaining {
return execID + subName
}

return execID[:maxLengthRemaining] + subName
}