Skip to content

Commit

Permalink
Merge pull request #213 from Clever/wfm-tracing
Browse files Browse the repository at this point in the history
Workflow Manager Tracing
  • Loading branch information
taylor-sutton authored Sep 3, 2019
2 parents 2f0927e + 44ae686 commit c6af8b8
Show file tree
Hide file tree
Showing 9 changed files with 494 additions and 43 deletions.
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions


### Version information
*Version* : 0.10.1
*Version* : 0.11.0


### URI scheme
Expand Down
14 changes: 12 additions & 2 deletions executor/workflow_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)

// WorkflowManager is the interface for creating, stopping and checking status for Workflows
Expand All @@ -37,17 +39,21 @@ func PollForPendingWorkflowsAndUpdateStore(ctx context.Context, wm WorkflowManag
log.Info("poll-for-pending-workflows-done")
return
default:
out, err := sqsapi.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
span, innerCtx := opentracing.StartSpanFromContext(ctx, "updating-pending-workflows")

out, err := sqsapi.ReceiveMessageWithContext(innerCtx, &sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: aws.String(sqsQueueURL),
WaitTimeSeconds: aws.Int64(10),
})
if err != nil {
log.ErrorD("poll-for-pending-workflows", logger.M{"error": err.Error()})
ext.Error.Set(span, true)
span.SetTag("errorMessage", err.Error())
}

for _, message := range out.Messages {
if id, err := updatePendingWorkflow(ctx, message, wm, thestore, sqsapi, sqsQueueURL); err != nil {
if id, err := updatePendingWorkflow(innerCtx, message, wm, thestore, sqsapi, sqsQueueURL); err != nil {
log.ErrorD("update-pending-workflow", logger.M{"id": id, "error": err.Error()})

// If we're seeing DynamoDB throttling, let's wait before running our next poll loop
Expand All @@ -62,6 +68,7 @@ func PollForPendingWorkflowsAndUpdateStore(ctx context.Context, wm WorkflowManag
log.InfoD("update-pending-workflow", logger.M{"id": id})
}
}
span.Finish()
}
}
}
Expand All @@ -88,6 +95,8 @@ func createPendingWorkflow(ctx context.Context, workflowID string, sqsapi sqsifa
}

func updatePendingWorkflow(ctx context.Context, m *sqs.Message, wm WorkflowManager, thestore store.Store, sqsapi sqsiface.SQSAPI, sqsQueueURL string) (string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "workflow-update")
defer span.Finish()
deleteMsg := func() {
if _, err := sqsapi.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{
QueueUrl: aws.String(sqsQueueURL),
Expand All @@ -107,6 +116,7 @@ func updatePendingWorkflow(ctx context.Context, m *sqs.Message, wm WorkflowManag
}

wfID := *m.Body
span.SetTag("workflow-id", wfID)
wf, err := thestore.GetWorkflowByID(ctx, wfID)
if err != nil {
if _, ok := err.(models.NotFound); ok {
Expand Down
53 changes: 27 additions & 26 deletions executor/workflow_manager_sfn.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,11 @@ func toSFNTags(wmTags map[string]interface{}) []*sfn.Tag {
return sfnTags
}

func (wm *SFNWorkflowManager) describeOrCreateStateMachine(wd models.WorkflowDefinition, namespace, queue string) (*sfn.DescribeStateMachineOutput, error) {
describeOutput, err := wm.sfnapi.DescribeStateMachine(&sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(sfnconventions.StateMachineArn(wm.region, wm.accountID, wd.Name, wd.Version, namespace, wd.StateMachine.StartAt)),
})
func (wm *SFNWorkflowManager) describeOrCreateStateMachine(ctx context.Context, wd models.WorkflowDefinition, namespace, queue string) (*sfn.DescribeStateMachineOutput, error) {
describeOutput, err := wm.sfnapi.DescribeStateMachineWithContext(ctx,
&sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(sfnconventions.StateMachineArn(wm.region, wm.accountID, wd.Name, wd.Version, namespace, wd.StateMachine.StartAt)),
})
if err == nil {
return describeOutput, nil
}
Expand All @@ -152,25 +153,26 @@ func (wm *SFNWorkflowManager) describeOrCreateStateMachine(wd models.WorkflowDef
// this effectively creates a new workflow definition in each namespace we deploy into
awsStateMachineName := sfnconventions.StateMachineName(wd.Name, wd.Version, namespace, wd.StateMachine.StartAt)
log.InfoD("create-state-machine", logger.M{"definition": awsStateMachineDef, "name": awsStateMachineName})
_, err = wm.sfnapi.CreateStateMachine(&sfn.CreateStateMachineInput{
Name: aws.String(awsStateMachineName),
Definition: aws.String(awsStateMachineDef),
RoleArn: aws.String(wm.roleARN),
Tags: append([]*sfn.Tag{
{Key: aws.String("environment"), Value: aws.String(namespace)},
{Key: aws.String("workflow-definition-name"), Value: aws.String(wd.Name)},
{Key: aws.String("workflow-definition-version"), Value: aws.String(fmt.Sprintf("%d", wd.Version))},
{Key: aws.String("workflow-definition-start-at"), Value: aws.String(wd.StateMachine.StartAt)},
}, toSFNTags(wd.DefaultTags)...),
})
_, err = wm.sfnapi.CreateStateMachineWithContext(ctx,
&sfn.CreateStateMachineInput{
Name: aws.String(awsStateMachineName),
Definition: aws.String(awsStateMachineDef),
RoleArn: aws.String(wm.roleARN),
Tags: append([]*sfn.Tag{
{Key: aws.String("environment"), Value: aws.String(namespace)},
{Key: aws.String("workflow-definition-name"), Value: aws.String(wd.Name)},
{Key: aws.String("workflow-definition-version"), Value: aws.String(fmt.Sprintf("%d", wd.Version))},
{Key: aws.String("workflow-definition-start-at"), Value: aws.String(wd.StateMachine.StartAt)},
}, toSFNTags(wd.DefaultTags)...),
})
if err != nil {
return nil, fmt.Errorf("CreateStateMachine error: %s", err.Error())
}

return wm.describeOrCreateStateMachine(wd, namespace, queue)
return wm.describeOrCreateStateMachine(ctx, wd, namespace, queue)
}

func (wm *SFNWorkflowManager) startExecution(stateMachineArn *string, workflowID, input string) error {
func (wm *SFNWorkflowManager) startExecution(ctx context.Context, stateMachineArn *string, workflowID, input string) error {
executionName := aws.String(workflowID)

var inputJSON map[string]interface{}
Expand All @@ -192,7 +194,7 @@ func (wm *SFNWorkflowManager) startExecution(stateMachineArn *string, workflowID
// - aws.String(""): leads to InvalidExecutionInput AWS error
// - aws.String("[]"): leads to an input of an empty array "[]"
startExecutionInput := aws.String(string(marshaledInput))
_, err = wm.sfnapi.StartExecution(&sfn.StartExecutionInput{
_, err = wm.sfnapi.StartExecutionWithContext(ctx, &sfn.StartExecutionInput{
StateMachineArn: stateMachineArn,
Input: startExecutionInput,
Name: executionName,
Expand All @@ -207,7 +209,7 @@ func (wm *SFNWorkflowManager) CreateWorkflow(ctx context.Context, wd models.Work
queue string,
tags map[string]interface{}) (*models.Workflow, error) {

describeOutput, err := wm.describeOrCreateStateMachine(wd, namespace, queue)
describeOutput, err := wm.describeOrCreateStateMachine(ctx, wd, namespace, queue)
if err != nil {
return nil, err
}
Expand All @@ -225,13 +227,13 @@ func (wm *SFNWorkflowManager) CreateWorkflow(ctx context.Context, wd models.Work
// i.e. execution was started but we failed to save workflow
// If we fail starting the execution, we can resolve this out of band (TODO: should support cancelling)
workflow := resources.NewWorkflow(&wd, input, namespace, queue, mergedTags)
logger.FromContext(ctx).AddContext("wf-id", workflow.ID)
logger.FromContext(ctx).AddContext("workflow-id", workflow.ID)
if err := wm.store.SaveWorkflow(ctx, *workflow); err != nil {
return nil, err
}

// submit an execution using input, set execution name == our workflow GUID
err = wm.startExecution(describeOutput.StateMachineArn, workflow.ID, input)
err = wm.startExecution(ctx, describeOutput.StateMachineArn, workflow.ID, input)
if err != nil {
// since we failed to start execution, remove Workflow from store
if delErr := wm.store.DeleteWorkflowByID(ctx, workflow.ID); delErr != nil {
Expand Down Expand Up @@ -267,7 +269,7 @@ func (wm *SFNWorkflowManager) RetryWorkflow(ctx context.Context, ogWorkflow mode
if err := resources.RemoveInactiveStates(newDef.StateMachine); err != nil {
return nil, err
}
describeOutput, err := wm.describeOrCreateStateMachine(newDef, ogWorkflow.Namespace, ogWorkflow.Queue)
describeOutput, err := wm.describeOrCreateStateMachine(ctx, newDef, ogWorkflow.Namespace, ogWorkflow.Queue)
if err != nil {
return nil, err
}
Expand All @@ -287,7 +289,7 @@ func (wm *SFNWorkflowManager) RetryWorkflow(ctx context.Context, ogWorkflow mode
}

// submit an execution using input, set execution name == our workflow GUID
err = wm.startExecution(describeOutput.StateMachineArn, workflow.ID, input)
err = wm.startExecution(ctx, describeOutput.StateMachineArn, workflow.ID, input)
if err != nil {
return nil, err
}
Expand All @@ -308,7 +310,7 @@ func (wm *SFNWorkflowManager) CancelWorkflow(ctx context.Context, workflow *mode

wd := workflow.WorkflowDefinition
execARN := wm.executionArn(workflow, wd)
if _, err := wm.sfnapi.StopExecution(&sfn.StopExecutionInput{
if _, err := wm.sfnapi.StopExecutionWithContext(ctx, &sfn.StopExecutionInput{
ExecutionArn: aws.String(execARN),
Cause: aws.String(reason),
// Error: aws.String(""), // TODO: Can we use this? "An arbitrary error code that identifies the cause of the termination."
Expand Down Expand Up @@ -430,8 +432,7 @@ func (wm *SFNWorkflowManager) UpdateWorkflowHistory(ctx context.Context, workflo

// Setup a context with a timeout of one minute since
// we don't want to pull very large workflow histories
// TODO: this should be a context passed by the handler
ctx, cancel := context.WithTimeout(context.Background(), durationToFetchHistoryPages)
ctx, cancel := context.WithTimeout(ctx, durationToFetchHistoryPages)
defer cancel()

var jobs []*models.Job
Expand Down
22 changes: 11 additions & 11 deletions executor/workflow_manager_sfn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,14 @@ func TestCreateWorkflow(t *testing.T) {
c.workflowDefinition.StateMachine.StartAt,
)
c.mockSFNAPI.EXPECT().
DescribeStateMachine(&sfn.DescribeStateMachineInput{
DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(stateMachineArn),
}).
Return(&sfn.DescribeStateMachineOutput{
StateMachineArn: aws.String(stateMachineArn),
}, nil)
c.mockSFNAPI.EXPECT().
StartExecution(gomock.Any()).
StartExecutionWithContext(gomock.Any(), gomock.Any()).
Return(&sfn.StartExecutionOutput{}, nil)
c.mockSQSAPI.EXPECT().
SendMessageWithContext(gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -240,14 +240,14 @@ func TestCreateWorkflow(t *testing.T) {
c.workflowDefinition.StateMachine.StartAt,
)
c.mockSFNAPI.EXPECT().
DescribeStateMachine(&sfn.DescribeStateMachineInput{
DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(stateMachineArn),
}).
Return(&sfn.DescribeStateMachineOutput{
StateMachineArn: aws.String(stateMachineArn),
}, nil)
c.mockSFNAPI.EXPECT().
StartExecution(gomock.Any()).
StartExecutionWithContext(gomock.Any(), gomock.Any()).
Return(&sfn.StartExecutionOutput{}, nil)
c.mockSQSAPI.EXPECT().
SendMessageWithContext(gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -291,14 +291,14 @@ func TestCreateWorkflow(t *testing.T) {
)
awsError := awserr.New("test", "test", errors.New(""))
c.mockSFNAPI.EXPECT().
DescribeStateMachine(&sfn.DescribeStateMachineInput{
DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(stateMachineArn),
}).
Return(&sfn.DescribeStateMachineOutput{
StateMachineArn: aws.String(stateMachineArn),
}, nil)
c.mockSFNAPI.EXPECT().
StartExecution(gomock.Any()).
StartExecutionWithContext(gomock.Any(), gomock.Any()).
Return(nil, awsError)

workflow, err := c.manager.CreateWorkflow(ctx, *c.workflowDefinition,
Expand Down Expand Up @@ -331,14 +331,14 @@ func TestRetryWorkflow(t *testing.T) {
c.workflowDefinition.StateMachine.StartAt,
)
c.mockSFNAPI.EXPECT().
DescribeStateMachine(&sfn.DescribeStateMachineInput{
DescribeStateMachineWithContext(gomock.Any(), &sfn.DescribeStateMachineInput{
StateMachineArn: aws.String(stateMachineArn),
}).
Return(&sfn.DescribeStateMachineOutput{
StateMachineArn: aws.String(stateMachineArn),
}, nil)
c.mockSFNAPI.EXPECT().
StartExecution(gomock.Any()).
StartExecutionWithContext(gomock.Any(), gomock.Any()).
Return(&sfn.StartExecutionOutput{}, nil)
c.mockSQSAPI.EXPECT().
SendMessageWithContext(gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -370,12 +370,12 @@ func TestRetryWorkflow(t *testing.T) {
workflow.Status = models.WorkflowStatusFailed

c.mockSFNAPI.EXPECT().
DescribeStateMachine(gomock.Any()).
DescribeStateMachineWithContext(gomock.Any(), gomock.Any()).
Return(&sfn.DescribeStateMachineOutput{
StateMachineArn: aws.String(stateMachineArn),
}, nil)
c.mockSFNAPI.EXPECT().
StartExecution(gomock.Any()).
StartExecutionWithContext(gomock.Any(), gomock.Any()).
Return(&sfn.StartExecutionOutput{}, nil)
c.mockSQSAPI.EXPECT().
SendMessageWithContext(gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestCancelWorkflow(t *testing.T) {
reason := "i have my reasons"
sfnExecutionARN := c.manager.executionArn(workflow, c.workflowDefinition)
c.mockSFNAPI.EXPECT().
StopExecution(&sfn.StopExecutionInput{
StopExecutionWithContext(gomock.Any(), &sfn.StopExecutionInput{
ExecutionArn: aws.String(sfnExecutionARN),
Cause: aws.String(reason),
}).
Expand Down
Loading

0 comments on commit c6af8b8

Please sign in to comment.