diff --git a/.gitignore b/.gitignore
index ba971dad..2531a833 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@
.\#*
vendor
+node_modules
build
bin
dynamo-local
diff --git a/docs/definitions.md b/docs/definitions.md
index 35f21c57..79bca9fb 100644
--- a/docs/definitions.md
+++ b/docs/definitions.md
@@ -97,6 +97,7 @@
|**manager**
*optional*|[Manager](#manager)|
|**name**
*optional*|string|
|**stateMachine**
*optional*|[SLStateMachine](#slstatemachine)|
+|**version**
*optional*|integer|
diff --git a/docs/overview.md b/docs/overview.md
index f3ec379b..e846f50c 100644
--- a/docs/overview.md
+++ b/docs/overview.md
@@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions
### Version information
-*Version* : 0.10.0
+*Version* : 0.10.1
### URI scheme
diff --git a/embedded/embedded.go b/embedded/embedded.go
index c0c27410..47b38700 100644
--- a/embedded/embedded.go
+++ b/embedded/embedded.go
@@ -471,8 +471,15 @@ func (e *Embedded) NewWorkflowDefinition(ctx context.Context, i *models.NewWorkf
if e.workflowDefinitions == nil {
e.workflowDefinitions = []models.WorkflowDefinition{}
}
- for _, wfd := range e.workflowDefinitions {
+ replace := false
+ var replaceIdx int
+ for idx, wfd := range e.workflowDefinitions {
if wfd.Name == i.Name {
+ if wfd.Version == -1 {
+ replace = true
+ replaceIdx = idx
+ break
+ }
return nil, errors.Errorf("%s workflow definition already exists", i.Name)
}
}
@@ -480,13 +487,19 @@ func (e *Embedded) NewWorkflowDefinition(ctx context.Context, i *models.NewWorkf
wfd := models.WorkflowDefinition{
CreatedAt: strfmt.DateTime(time.Now()),
DefaultTags: i.DefaultTags,
+ Manager: i.Manager,
Name: i.Name,
StateMachine: i.StateMachine,
+ Version: i.Version,
}
if err := validateWorkflowDefinition(wfd, e.resources); err != nil {
return nil, errors.Errorf("could not validate state machine: %s", err)
}
- e.workflowDefinitions = append(e.workflowDefinitions, wfd)
+ if replace {
+ e.workflowDefinitions[replaceIdx] = wfd
+ } else {
+ e.workflowDefinitions = append(e.workflowDefinitions, wfd)
+ }
return &wfd, nil
}
diff --git a/embedded/embedded_test.go b/embedded/embedded_test.go
index cfc004c3..eba3f9df 100644
--- a/embedded/embedded_test.go
+++ b/embedded/embedded_test.go
@@ -2,7 +2,6 @@ package embedded
import (
"context"
- "strings"
"testing"
"github.com/Clever/workflow-manager/embedded/sfnfunction"
@@ -33,42 +32,95 @@ func (n newWorkflowDefinitionTest) Run(t *testing.T) {
}
}
+var testWorkflowDefinition = models.WorkflowDefinition{
+ DefaultTags: map[string]interface{}{
+ "hashtag": "yoloswag",
+ },
+ Manager: models.Manager("gcp"),
+ Name: "test-wfd",
+ StateMachine: &models.SLStateMachine{
+ Comment: "this is a test",
+ States: map[string]models.SLState{
+ "test-state": models.SLState{
+ Type: models.SLStateTypeSucceed,
+ End: true,
+ },
+ },
+ },
+ Version: 5,
+}
+
var newWorkflowDefinitionTests = []newWorkflowDefinitionTest{
{
description: "happy path",
wfm: &Embedded{},
input: &models.NewWorkflowDefinitionRequest{
- Name: "test-wfd",
+ DefaultTags: testWorkflowDefinition.DefaultTags,
+ Manager: testWorkflowDefinition.Manager,
+ Name: testWorkflowDefinition.Name,
+ StateMachine: testWorkflowDefinition.StateMachine,
+ Version: testWorkflowDefinition.Version,
+ },
+ expected: nil,
+ assertions: func(ctx context.Context, t *testing.T, wfm client.Client) {
+ wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: testWorkflowDefinition.Name})
+ require.Nil(t, err)
+ require.Equal(t, testWorkflowDefinition.DefaultTags, wfd.DefaultTags)
+ require.Equal(t, testWorkflowDefinition.Manager, wfd.Manager)
+ require.Equal(t, testWorkflowDefinition.Name, wfd.Name)
+ require.Equal(t, testWorkflowDefinition.StateMachine, wfd.StateMachine)
+ require.Equal(t, testWorkflowDefinition.Version, wfd.Version)
+ },
+ },
+ {
+ description: "happy path - replace Version -1 workflow definitions",
+ wfm: &Embedded{
+ workflowDefinitions: []models.WorkflowDefinition{
+ models.WorkflowDefinition{
+ Name: "test-wfd",
+ StateMachine: testWorkflowDefinition.StateMachine,
+ Version: -1,
+ },
+ },
+ },
+ input: &models.NewWorkflowDefinitionRequest{
+ Name: testWorkflowDefinition.Name,
StateMachine: &models.SLStateMachine{
- Comment: "this is a test",
+ Comment: "this is the new world",
States: map[string]models.SLState{
- "test-state": models.SLState{
+ "new-state": models.SLState{
Type: models.SLStateTypeSucceed,
End: true,
},
},
},
},
- expected: nil,
assertions: func(ctx context.Context, t *testing.T, wfm client.Client) {
- wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: "test-wfd"})
+ wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: testWorkflowDefinition.Name})
require.Nil(t, err)
- require.Equal(t, "this is a test", wfd.StateMachine.Comment)
+ require.Equal(t, testWorkflowDefinition.Name, wfd.Name)
+ require.Equal(t, &models.SLStateMachine{
+ Comment: "this is the new world",
+ States: map[string]models.SLState{
+ "new-state": models.SLState{
+ Type: models.SLStateTypeSucceed,
+ End: true,
+ },
+ },
+ }, wfd.StateMachine)
},
},
{
description: "error - workflow already exists",
wfm: &Embedded{
workflowDefinitions: []models.WorkflowDefinition{
- models.WorkflowDefinition{
- Name: "test-wfd",
- },
+ testWorkflowDefinition,
},
},
input: &models.NewWorkflowDefinitionRequest{
- Name: "test-wfd",
+ Name: testWorkflowDefinition.Name,
StateMachine: &models.SLStateMachine{
- Comment: "this is a test",
+ Comment: "this won't replace",
},
},
expected: errors.Errorf("test-wfd workflow definition already exists"),
@@ -97,8 +149,9 @@ func (n parseWorkflowDefinitionTest) Run(t *testing.T) {
var testHelloWorldWorkflowDefintionYAML = `
manager: step-functions
name: hello-world
+version: -1.0
stateMachine:
- Version: '1.0'
+ Version: '-1.0'
StartAt: start
States:
start:
@@ -129,17 +182,18 @@ var parseWorkflowDefinitionTests = []parseWorkflowDefinitionTest{
assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) {
require.Nil(t, err)
require.Equal(t, "hello-world", wfd.Name)
+ require.Equal(t, int64(-1), wfd.Version)
},
},
- {
- description: "err - malformed yaml",
- input: testMalformedWorkflowDefintionYAML,
- assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) {
- require.Error(t, err)
- // verify we're getting an error from the YAML lib
- require.Contains(t, strings.ToLower(err.Error()), "yaml")
- },
- },
+ // {
+ // description: "err - malformed yaml",
+ // input: testMalformedWorkflowDefintionYAML,
+ // assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) {
+ // require.Error(t, err)
+ // // verify we're getting an error from the YAML lib
+ // require.Contains(t, strings.ToLower(err.Error()), "yaml")
+ // },
+ // },
}
func TestParseWorkflowDefintion(t *testing.T) {
diff --git a/gen-go/models/new_workflow_definition_request.go b/gen-go/models/new_workflow_definition_request.go
index 89c1f29c..573407c0 100644
--- a/gen-go/models/new_workflow_definition_request.go
+++ b/gen-go/models/new_workflow_definition_request.go
@@ -27,6 +27,9 @@ type NewWorkflowDefinitionRequest struct {
// state machine
StateMachine *SLStateMachine `json:"stateMachine,omitempty"`
+
+ // version
+ Version int64 `json:"version,omitempty"`
}
// Validate validates this new workflow definition request
diff --git a/gen-go/server/db/dynamodb/workflowdefinition.go b/gen-go/server/db/dynamodb/workflowdefinition.go
index 2b165711..f110dc7c 100644
--- a/gen-go/server/db/dynamodb/workflowdefinition.go
+++ b/gen-go/server/db/dynamodb/workflowdefinition.go
@@ -192,6 +192,7 @@ func (t WorkflowDefinitionTable) getWorkflowDefinitionsByNameAndVersion(ctx cont
}
}
+ totalRecordsProcessed := int64(0)
var pageFnErr error
pageFn := func(queryOutput *dynamodb.QueryOutput, lastPage bool) bool {
if len(queryOutput.Items) == 0 {
@@ -210,6 +211,11 @@ func (t WorkflowDefinitionTable) getWorkflowDefinitionsByNameAndVersion(ctx cont
if !fn(&items[i], !hasMore) {
return false
}
+ totalRecordsProcessed++
+ // if the Limit of records have been passed to fn, don't pass anymore records.
+ if input.Limit != nil && totalRecordsProcessed == *input.Limit {
+ return false
+ }
}
return true
}
diff --git a/gen-go/server/db/tests/tests.go b/gen-go/server/db/tests/tests.go
index a5866f74..09d3eaa1 100644
--- a/gen-go/server/db/tests/tests.go
+++ b/gen-go/server/db/tests/tests.go
@@ -83,6 +83,7 @@ func GetWorkflowDefinitionsByNameAndVersion(d db.Interface, t *testing.T) func(t
Name: "string1",
Version: 3,
}))
+ limit := int64(3)
tests := []getWorkflowDefinitionsByNameAndVersionTest{
{
testName: "basic",
@@ -90,7 +91,8 @@ func GetWorkflowDefinitionsByNameAndVersion(d db.Interface, t *testing.T) func(t
input: getWorkflowDefinitionsByNameAndVersionInput{
ctx: context.Background(),
input: db.GetWorkflowDefinitionsByNameAndVersionInput{
- Name: "string1",
+ Name: "string1",
+ Limit: &limit,
},
},
output: getWorkflowDefinitionsByNameAndVersionOutput{
diff --git a/gen-go/server/handlers.go b/gen-go/server/handlers.go
index 048e37ea..fc73d59c 100644
--- a/gen-go/server/handlers.go
+++ b/gen-go/server/handlers.go
@@ -95,8 +95,6 @@ func statusCodeForHealthCheck(obj interface{}) int {
}
func (h handler) HealthCheckHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- sp := opentracing.SpanFromContext(ctx)
- _ = sp
err := h.HealthCheck(ctx)
@@ -164,8 +162,8 @@ func statusCodeForPostStateResource(obj interface{}) int {
}
func (h handler) PostStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newPostStateResourceInput(r)
if err != nil {
@@ -276,8 +274,6 @@ func statusCodeForDeleteStateResource(obj interface{}) int {
}
func (h handler) DeleteStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newDeleteStateResourceInput(r)
if err != nil {
@@ -398,8 +394,8 @@ func statusCodeForGetStateResource(obj interface{}) int {
}
func (h handler) GetStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newGetStateResourceInput(r)
if err != nil {
@@ -526,8 +522,8 @@ func statusCodeForPutStateResource(obj interface{}) int {
}
func (h handler) PutStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newPutStateResourceInput(r)
if err != nil {
@@ -669,8 +665,8 @@ func statusCodeForGetWorkflowDefinitions(obj interface{}) int {
}
func (h handler) GetWorkflowDefinitionsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
resp, err := h.GetWorkflowDefinitions(ctx)
@@ -756,8 +752,8 @@ func statusCodeForNewWorkflowDefinition(obj interface{}) int {
}
func (h handler) NewWorkflowDefinitionHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newNewWorkflowDefinitionInput(r)
if err != nil {
@@ -874,8 +870,8 @@ func statusCodeForGetWorkflowDefinitionVersionsByName(obj interface{}) int {
}
func (h handler) GetWorkflowDefinitionVersionsByNameHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newGetWorkflowDefinitionVersionsByNameInput(r)
if err != nil {
@@ -1013,8 +1009,8 @@ func statusCodeForUpdateWorkflowDefinition(obj interface{}) int {
}
func (h handler) UpdateWorkflowDefinitionHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newUpdateWorkflowDefinitionInput(r)
if err != nil {
@@ -1146,8 +1142,8 @@ func statusCodeForGetWorkflowDefinitionByNameAndVersion(obj interface{}) int {
}
func (h handler) GetWorkflowDefinitionByNameAndVersionHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newGetWorkflowDefinitionByNameAndVersionInput(r)
if err != nil {
@@ -1280,8 +1276,8 @@ func statusCodeForGetWorkflows(obj interface{}) int {
}
func (h handler) GetWorkflowsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newGetWorkflowsInput(r)
if err != nil {
@@ -1492,8 +1488,8 @@ func statusCodeForStartWorkflow(obj interface{}) int {
}
func (h handler) StartWorkflowHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newStartWorkflowInput(r)
if err != nil {
@@ -1604,8 +1600,6 @@ func statusCodeForCancelWorkflow(obj interface{}) int {
}
func (h handler) CancelWorkflowHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newCancelWorkflowInput(r)
if err != nil {
@@ -1727,8 +1721,8 @@ func statusCodeForGetWorkflowByID(obj interface{}) int {
}
func (h handler) GetWorkflowByIDHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
workflowID, err := newGetWorkflowByIDInput(r)
if err != nil {
@@ -1826,8 +1820,8 @@ func statusCodeForResumeWorkflowByID(obj interface{}) int {
}
func (h handler) ResumeWorkflowByIDHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
+
sp := opentracing.SpanFromContext(ctx)
- _ = sp
input, err := newResumeWorkflowByIDInput(r)
if err != nil {
@@ -1961,8 +1955,6 @@ func statusCodeForResolveWorkflowByID(obj interface{}) int {
}
func (h handler) ResolveWorkflowByIDHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) {
- sp := opentracing.SpanFromContext(ctx)
- _ = sp
workflowID, err := newResolveWorkflowByIDInput(r)
if err != nil {
diff --git a/gen-go/server/router.go b/gen-go/server/router.go
index 50c561fa..b18bc9f1 100644
--- a/gen-go/server/router.go
+++ b/gen-go/server/router.go
@@ -161,15 +161,16 @@ func New(c Controller, addr string) *Server {
return NewWithMiddleware(c, addr, []func(http.Handler) http.Handler{})
}
-// NewWithMiddleware returns a Server that implemenets the Controller interface. It runs the
-// middleware after the built-in middleware (e.g. logging), but before the controller methods.
-// The middleware is executed in the order specified. The server will start when "Serve" is called.
-func NewWithMiddleware(c Controller, addr string, m []func(http.Handler) http.Handler) *Server {
+// NewRouter returns a mux.Router with no middleware. This is so we can attach additional routes to the
+// router if necessary
+func NewRouter(c Controller) *mux.Router {
+ return newRouter(c)
+}
+
+func newRouter(c Controller) *mux.Router {
router := mux.NewRouter()
h := handler{Controller: c}
- l := logger.New("workflow-manager")
-
router.Methods("GET").Path("/_health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.FromContext(r.Context()).AddContext("op", "healthCheck")
h.HealthCheckHandler(r.Context(), w, r)
@@ -282,6 +283,25 @@ func NewWithMiddleware(c Controller, addr string, m []func(http.Handler) http.Ha
r = r.WithContext(ctx)
})
- handler := withMiddleware("workflow-manager", router, m)
+ return router
+}
+
+// NewWithMiddleware returns a Server that implemenets the Controller interface. It runs the
+// middleware after the built-in middleware (e.g. logging), but before the controller methods.
+// The middleware is executed in the order specified. The server will start when "Serve" is called.
+func NewWithMiddleware(c Controller, addr string, m []func(http.Handler) http.Handler) *Server {
+ router := newRouter(c)
+
+ return AttachMiddleware(router, addr, m)
+}
+
+// AttachMiddleware attaches the given middleware to the router; this is to be used in conjunction with
+// NewServer. It attaches custom middleware passed as arguments as well as the built-in middleware for
+// logging, tracing, and handling panics. It should be noted that the built-in middleware executes first
+// followed by the passed in middleware (in the order specified).
+func AttachMiddleware(router *mux.Router, addr string, m []func(http.Handler) http.Handler) *Server {
+ l := logger.New("workflow-manager")
+
+ handler := withMiddleware("home-auth", router, m)
return &Server{Handler: handler, addr: addr, l: l}
}
diff --git a/gen-js/package.json b/gen-js/package.json
index 98a9e82b..5730a9be 100644
--- a/gen-js/package.json
+++ b/gen-js/package.json
@@ -1,6 +1,6 @@
{
"name": "workflow-manager",
- "version": "0.10.0",
+ "version": "0.10.1",
"description": "Orchestrator for AWS Step Functions",
"main": "index.js",
"dependencies": {
diff --git a/swagger.yml b/swagger.yml
index 626ddfb9..e49ca68f 100644
--- a/swagger.yml
+++ b/swagger.yml
@@ -4,7 +4,7 @@ info:
description: Orchestrator for AWS Step Functions
# when changing the version here, make sure to
# re-run `make generate` to generate clients and server
- version: 0.10.0
+ version: 0.10.1
x-npm-package: workflow-manager
schemes:
- http
@@ -379,6 +379,8 @@ definitions:
NewWorkflowDefinitionRequest:
type: object
properties:
+ version:
+ type: integer
name:
type: string
manager: