Skip to content

Commit

Permalink
Merge pull request #212 from Clever/SYNC-1445-try-to-use-input-from-l…
Browse files Browse the repository at this point in the history
…astjob

Use the input from lastJob when it's available
  • Loading branch information
Sayan- authored May 1, 2020
2 parents b546118 + b28d09d commit 1c2044d
Show file tree
Hide file tree
Showing 2 changed files with 452 additions and 16 deletions.
90 changes: 74 additions & 16 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,31 +360,60 @@ func (h Handler) ResumeWorkflowByID(ctx context.Context, input *models.ResumeWor
if !resources.WorkflowIsDone(&workflow) {
return &models.Workflow{}, fmt.Errorf("Workflow %s active: %s", workflow.ID, workflow.Status)
}
areDescendantsDone, err := areDescendantWorkflowsDone(ctx, h.store, workflow)
if err != nil {
return &models.Workflow{}, fmt.Errorf("Failed to check descendant workflows: %s", err)
}
if !areDescendantsDone {
return &models.Workflow{}, fmt.Errorf("Cannot resume workflow %s because it has active descendants", workflow.ID)
}
if _, ok := workflow.WorkflowDefinition.StateMachine.States[input.Overrides.StartAt]; !ok {
return &models.Workflow{}, fmt.Errorf("Invalid StartAt state %s", input.Overrides.StartAt)
}

// find the input to the StartAt state
effectiveInput := ""
for _, job := range workflow.Jobs {
if job.State == input.Overrides.StartAt {
// if job was never started then we should probably not trust the input
if job.Status == models.JobStatusAbortedDepsFailed ||
job.Status == models.JobStatusQueued ||
job.Status == models.JobStatusWaitingForDeps ||
job.Status == models.JobStatusCreated {

return &models.Workflow{},
fmt.Errorf("Job %s for StartAt %s was not started for Workflow: %s. Could not infer input",
job.ID, job.State, workflow.ID)
// Find the job that ran for the StartAt state so that its input can be used for the new workflow.
// Try to use lastJob if it's available and if we want to start the new workflow from that state,
// otherwise load the full execution history and search through it to find the desired input.
// Note: getting the workflow's execution history involves making expensive API calls to the
// stepfunctions GetExecutionHistory endpoint. GetWorkflowByID can be called beforehand to avoid
// the workflow.Jobs == nil path.
// Clients may wish to increase the timeout from the global default to handle workflows with long
// execution histories.
var originalJob *models.Job
if workflow.LastJob != nil && input.Overrides.StartAt == workflow.LastJob.State {
originalJob = workflow.LastJob
} else {
if workflow.Jobs == nil {
if err := h.manager.UpdateWorkflowHistory(ctx, &workflow); err != nil {
return &models.Workflow{}, err
}
updatedWorkflow, err := h.store.GetWorkflowByID(ctx, input.WorkflowID)
if err != nil {
return &models.Workflow{}, err
}
workflow = updatedWorkflow
}

effectiveInput = job.Input
break
for _, job := range workflow.Jobs {
if job.State == input.Overrides.StartAt {
originalJob = job
break
}
}
}

return h.manager.RetryWorkflow(ctx, workflow, input.Overrides.StartAt, effectiveInput)
if originalJob == nil {
return &models.Workflow{}, fmt.Errorf("No job found for StartAt %s", input.Overrides.StartAt)
}

// if job was never started then we should probably not trust the input
if !hasJobStarted(originalJob.Status) {
return &models.Workflow{},
fmt.Errorf("Job %s for StartAt %s was not started for Workflow: %s. Could not infer input",
originalJob.ID, originalJob.State, workflow.ID)
}

return h.manager.RetryWorkflow(ctx, workflow, input.Overrides.StartAt, originalJob.Input)
}

// ResolveWorkflowByID sets a workflow's ResolvedByUser to true if it is currently false.
Expand Down Expand Up @@ -430,6 +459,13 @@ func newWorkflowDefinitionFromRequest(req models.NewWorkflowDefinitionRequest) (
return resources.NewWorkflowDefinition(req.Name, req.Manager, req.StateMachine, req.DefaultTags)
}

func hasJobStarted(status models.JobStatus) bool {
return !(status == models.JobStatusAbortedDepsFailed ||
status == models.JobStatusQueued ||
status == models.JobStatusWaitingForDeps ||
status == models.JobStatusCreated)
}

// validateTagsMap ensures that all tags values are strings
func validateTagsMap(apiTags map[string]interface{}) error {
for _, val := range apiTags {
Expand All @@ -443,3 +479,25 @@ func validateTagsMap(apiTags map[string]interface{}) error {
func epochMillis(t time.Time) int {
return int(t.UnixNano() / int64(time.Millisecond))
}

// areDescendantWorkflowsDone does a depth first search on the workflow's retries and returns true
// if none of the descendant workflows are active.
func areDescendantWorkflowsDone(ctx context.Context, s store.Store, workflow models.Workflow) (bool, error) {
for _, childID := range workflow.Retries {
childWorkflow, err := s.GetWorkflowByID(ctx, childID)
if err != nil {
return false, err
}
if !resources.WorkflowIsDone(&childWorkflow) {
return false, nil
}
areDescendantsDone, err := areDescendantWorkflowsDone(ctx, s, childWorkflow)
if err != nil {
return false, err
}
if !areDescendantsDone {
return false, nil
}
}
return true, nil
}
Loading

0 comments on commit 1c2044d

Please sign in to comment.