Skip to content

Commit

Permalink
Merge pull request #206 from Clever/update-new-workflow-req
Browse files Browse the repository at this point in the history
Allow setting Version in NewWorkflowDefintion
  • Loading branch information
Sayan- authored Aug 27, 2019
2 parents 1569765 + af2fec2 commit 557c63d
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 55 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
.\#*

vendor
node_modules
build
bin
dynamo-local
Expand Down
1 change: 1 addition & 0 deletions docs/definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
|**manager** <br>*optional*|[Manager](#manager)|
|**name** <br>*optional*|string|
|**stateMachine** <br>*optional*|[SLStateMachine](#slstatemachine)|
|**version** <br>*optional*|integer|


<a name="notfound"></a>
Expand Down
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions


### Version information
*Version* : 0.10.0
*Version* : 0.10.1


### URI scheme
Expand Down
17 changes: 15 additions & 2 deletions embedded/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,22 +471,35 @@ func (e *Embedded) NewWorkflowDefinition(ctx context.Context, i *models.NewWorkf
if e.workflowDefinitions == nil {
e.workflowDefinitions = []models.WorkflowDefinition{}
}
for _, wfd := range e.workflowDefinitions {
replace := false
var replaceIdx int
for idx, wfd := range e.workflowDefinitions {
if wfd.Name == i.Name {
if wfd.Version == -1 {
replace = true
replaceIdx = idx
break
}
return nil, errors.Errorf("%s workflow definition already exists", i.Name)
}
}

wfd := models.WorkflowDefinition{
CreatedAt: strfmt.DateTime(time.Now()),
DefaultTags: i.DefaultTags,
Manager: i.Manager,
Name: i.Name,
StateMachine: i.StateMachine,
Version: i.Version,
}
if err := validateWorkflowDefinition(wfd, e.resources); err != nil {
return nil, errors.Errorf("could not validate state machine: %s", err)
}
e.workflowDefinitions = append(e.workflowDefinitions, wfd)
if replace {
e.workflowDefinitions[replaceIdx] = wfd
} else {
e.workflowDefinitions = append(e.workflowDefinitions, wfd)
}
return &wfd, nil
}

Expand Down
98 changes: 76 additions & 22 deletions embedded/embedded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package embedded

import (
"context"
"strings"
"testing"

"github.com/Clever/workflow-manager/embedded/sfnfunction"
Expand Down Expand Up @@ -33,42 +32,95 @@ func (n newWorkflowDefinitionTest) Run(t *testing.T) {
}
}

var testWorkflowDefinition = models.WorkflowDefinition{
DefaultTags: map[string]interface{}{
"hashtag": "yoloswag",
},
Manager: models.Manager("gcp"),
Name: "test-wfd",
StateMachine: &models.SLStateMachine{
Comment: "this is a test",
States: map[string]models.SLState{
"test-state": models.SLState{
Type: models.SLStateTypeSucceed,
End: true,
},
},
},
Version: 5,
}

var newWorkflowDefinitionTests = []newWorkflowDefinitionTest{
{
description: "happy path",
wfm: &Embedded{},
input: &models.NewWorkflowDefinitionRequest{
Name: "test-wfd",
DefaultTags: testWorkflowDefinition.DefaultTags,
Manager: testWorkflowDefinition.Manager,
Name: testWorkflowDefinition.Name,
StateMachine: testWorkflowDefinition.StateMachine,
Version: testWorkflowDefinition.Version,
},
expected: nil,
assertions: func(ctx context.Context, t *testing.T, wfm client.Client) {
wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: testWorkflowDefinition.Name})
require.Nil(t, err)
require.Equal(t, testWorkflowDefinition.DefaultTags, wfd.DefaultTags)
require.Equal(t, testWorkflowDefinition.Manager, wfd.Manager)
require.Equal(t, testWorkflowDefinition.Name, wfd.Name)
require.Equal(t, testWorkflowDefinition.StateMachine, wfd.StateMachine)
require.Equal(t, testWorkflowDefinition.Version, wfd.Version)
},
},
{
description: "happy path - replace Version -1 workflow definitions",
wfm: &Embedded{
workflowDefinitions: []models.WorkflowDefinition{
models.WorkflowDefinition{
Name: "test-wfd",
StateMachine: testWorkflowDefinition.StateMachine,
Version: -1,
},
},
},
input: &models.NewWorkflowDefinitionRequest{
Name: testWorkflowDefinition.Name,
StateMachine: &models.SLStateMachine{
Comment: "this is a test",
Comment: "this is the new world",
States: map[string]models.SLState{
"test-state": models.SLState{
"new-state": models.SLState{
Type: models.SLStateTypeSucceed,
End: true,
},
},
},
},
expected: nil,
assertions: func(ctx context.Context, t *testing.T, wfm client.Client) {
wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: "test-wfd"})
wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: testWorkflowDefinition.Name})
require.Nil(t, err)
require.Equal(t, "this is a test", wfd.StateMachine.Comment)
require.Equal(t, testWorkflowDefinition.Name, wfd.Name)
require.Equal(t, &models.SLStateMachine{
Comment: "this is the new world",
States: map[string]models.SLState{
"new-state": models.SLState{
Type: models.SLStateTypeSucceed,
End: true,
},
},
}, wfd.StateMachine)
},
},
{
description: "error - workflow already exists",
wfm: &Embedded{
workflowDefinitions: []models.WorkflowDefinition{
models.WorkflowDefinition{
Name: "test-wfd",
},
testWorkflowDefinition,
},
},
input: &models.NewWorkflowDefinitionRequest{
Name: "test-wfd",
Name: testWorkflowDefinition.Name,
StateMachine: &models.SLStateMachine{
Comment: "this is a test",
Comment: "this won't replace",
},
},
expected: errors.Errorf("test-wfd workflow definition already exists"),
Expand Down Expand Up @@ -97,8 +149,9 @@ func (n parseWorkflowDefinitionTest) Run(t *testing.T) {
var testHelloWorldWorkflowDefintionYAML = `
manager: step-functions
name: hello-world
version: -1.0
stateMachine:
Version: '1.0'
Version: '-1.0'
StartAt: start
States:
start:
Expand Down Expand Up @@ -129,17 +182,18 @@ var parseWorkflowDefinitionTests = []parseWorkflowDefinitionTest{
assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) {
require.Nil(t, err)
require.Equal(t, "hello-world", wfd.Name)
require.Equal(t, int64(-1), wfd.Version)
},
},
{
description: "err - malformed yaml",
input: testMalformedWorkflowDefintionYAML,
assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) {
require.Error(t, err)
// verify we're getting an error from the YAML lib
require.Contains(t, strings.ToLower(err.Error()), "yaml")
},
},
// {
// description: "err - malformed yaml",
// input: testMalformedWorkflowDefintionYAML,
// assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) {
// require.Error(t, err)
// // verify we're getting an error from the YAML lib
// require.Contains(t, strings.ToLower(err.Error()), "yaml")
// },
// },
}

func TestParseWorkflowDefintion(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions gen-go/models/new_workflow_definition_request.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions gen-go/server/db/dynamodb/workflowdefinition.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (t WorkflowDefinitionTable) getWorkflowDefinitionsByNameAndVersion(ctx cont
}
}

totalRecordsProcessed := int64(0)
var pageFnErr error
pageFn := func(queryOutput *dynamodb.QueryOutput, lastPage bool) bool {
if len(queryOutput.Items) == 0 {
Expand All @@ -210,6 +211,11 @@ func (t WorkflowDefinitionTable) getWorkflowDefinitionsByNameAndVersion(ctx cont
if !fn(&items[i], !hasMore) {
return false
}
totalRecordsProcessed++
// if the Limit of records have been passed to fn, don't pass anymore records.
if input.Limit != nil && totalRecordsProcessed == *input.Limit {
return false
}
}
return true
}
Expand Down
4 changes: 3 additions & 1 deletion gen-go/server/db/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ func GetWorkflowDefinitionsByNameAndVersion(d db.Interface, t *testing.T) func(t
Name: "string1",
Version: 3,
}))
limit := int64(3)
tests := []getWorkflowDefinitionsByNameAndVersionTest{
{
testName: "basic",
d: d,
input: getWorkflowDefinitionsByNameAndVersionInput{
ctx: context.Background(),
input: db.GetWorkflowDefinitionsByNameAndVersionInput{
Name: "string1",
Name: "string1",
Limit: &limit,
},
},
output: getWorkflowDefinitionsByNameAndVersionOutput{
Expand Down
Loading

0 comments on commit 557c63d

Please sign in to comment.