From b1a6737e16c0af1fec5561ddf65f8a309d1de498 Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Wed, 24 Jul 2019 11:49:31 -0700 Subject: [PATCH 1/4] ignore node_modules --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index ba971dad..2531a833 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ .\#* vendor +node_modules build bin dynamo-local From 226d8135ab55dcc538441c4787d46a2a1233ee0a Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Tue, 27 Aug 2019 11:41:01 -0700 Subject: [PATCH 2/4] 0.10.1 - include Version in NewWorkflowRequest --- swagger.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/swagger.yml b/swagger.yml index 626ddfb9..e49ca68f 100644 --- a/swagger.yml +++ b/swagger.yml @@ -4,7 +4,7 @@ info: description: Orchestrator for AWS Step Functions # when changing the version here, make sure to # re-run `make generate` to generate clients and server - version: 0.10.0 + version: 0.10.1 x-npm-package: workflow-manager schemes: - http @@ -379,6 +379,8 @@ definitions: NewWorkflowDefinitionRequest: type: object properties: + version: + type: integer name: type: string manager: From 57febdbfac4dc06c1a916ef9e68bff2d09efe1b5 Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Wed, 24 Jul 2019 11:50:51 -0700 Subject: [PATCH 3/4] allow setting version in NewWorkflowDefintion --- embedded/embedded.go | 17 ++++++- embedded/embedded_test.go | 98 ++++++++++++++++++++++++++++++--------- 2 files changed, 91 insertions(+), 24 deletions(-) diff --git a/embedded/embedded.go b/embedded/embedded.go index c0c27410..47b38700 100644 --- a/embedded/embedded.go +++ b/embedded/embedded.go @@ -471,8 +471,15 @@ func (e *Embedded) NewWorkflowDefinition(ctx context.Context, i *models.NewWorkf if e.workflowDefinitions == nil { e.workflowDefinitions = []models.WorkflowDefinition{} } - for _, wfd := range e.workflowDefinitions { + replace := false + var replaceIdx int + for idx, wfd := range e.workflowDefinitions { if wfd.Name == i.Name { + if wfd.Version == -1 { + replace = true + replaceIdx = idx + break + } return nil, errors.Errorf("%s workflow definition already exists", i.Name) } } @@ -480,13 +487,19 @@ func (e *Embedded) NewWorkflowDefinition(ctx context.Context, i *models.NewWorkf wfd := models.WorkflowDefinition{ CreatedAt: strfmt.DateTime(time.Now()), DefaultTags: i.DefaultTags, + Manager: i.Manager, Name: i.Name, StateMachine: i.StateMachine, + Version: i.Version, } if err := validateWorkflowDefinition(wfd, e.resources); err != nil { return nil, errors.Errorf("could not validate state machine: %s", err) } - e.workflowDefinitions = append(e.workflowDefinitions, wfd) + if replace { + e.workflowDefinitions[replaceIdx] = wfd + } else { + e.workflowDefinitions = append(e.workflowDefinitions, wfd) + } return &wfd, nil } diff --git a/embedded/embedded_test.go b/embedded/embedded_test.go index cfc004c3..eba3f9df 100644 --- a/embedded/embedded_test.go +++ b/embedded/embedded_test.go @@ -2,7 +2,6 @@ package embedded import ( "context" - "strings" "testing" "github.com/Clever/workflow-manager/embedded/sfnfunction" @@ -33,42 +32,95 @@ func (n newWorkflowDefinitionTest) Run(t *testing.T) { } } +var testWorkflowDefinition = models.WorkflowDefinition{ + DefaultTags: map[string]interface{}{ + "hashtag": "yoloswag", + }, + Manager: models.Manager("gcp"), + Name: "test-wfd", + StateMachine: &models.SLStateMachine{ + Comment: "this is a test", + States: map[string]models.SLState{ + "test-state": models.SLState{ + Type: models.SLStateTypeSucceed, + End: true, + }, + }, + }, + Version: 5, +} + var newWorkflowDefinitionTests = []newWorkflowDefinitionTest{ { description: "happy path", wfm: &Embedded{}, input: &models.NewWorkflowDefinitionRequest{ - Name: "test-wfd", + DefaultTags: testWorkflowDefinition.DefaultTags, + Manager: testWorkflowDefinition.Manager, + Name: testWorkflowDefinition.Name, + StateMachine: testWorkflowDefinition.StateMachine, + Version: testWorkflowDefinition.Version, + }, + expected: nil, + assertions: func(ctx context.Context, t *testing.T, wfm client.Client) { + wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: testWorkflowDefinition.Name}) + require.Nil(t, err) + require.Equal(t, testWorkflowDefinition.DefaultTags, wfd.DefaultTags) + require.Equal(t, testWorkflowDefinition.Manager, wfd.Manager) + require.Equal(t, testWorkflowDefinition.Name, wfd.Name) + require.Equal(t, testWorkflowDefinition.StateMachine, wfd.StateMachine) + require.Equal(t, testWorkflowDefinition.Version, wfd.Version) + }, + }, + { + description: "happy path - replace Version -1 workflow definitions", + wfm: &Embedded{ + workflowDefinitions: []models.WorkflowDefinition{ + models.WorkflowDefinition{ + Name: "test-wfd", + StateMachine: testWorkflowDefinition.StateMachine, + Version: -1, + }, + }, + }, + input: &models.NewWorkflowDefinitionRequest{ + Name: testWorkflowDefinition.Name, StateMachine: &models.SLStateMachine{ - Comment: "this is a test", + Comment: "this is the new world", States: map[string]models.SLState{ - "test-state": models.SLState{ + "new-state": models.SLState{ Type: models.SLStateTypeSucceed, End: true, }, }, }, }, - expected: nil, assertions: func(ctx context.Context, t *testing.T, wfm client.Client) { - wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: "test-wfd"}) + wfd, err := wfm.GetWorkflowDefinitionByNameAndVersion(ctx, &models.GetWorkflowDefinitionByNameAndVersionInput{Name: testWorkflowDefinition.Name}) require.Nil(t, err) - require.Equal(t, "this is a test", wfd.StateMachine.Comment) + require.Equal(t, testWorkflowDefinition.Name, wfd.Name) + require.Equal(t, &models.SLStateMachine{ + Comment: "this is the new world", + States: map[string]models.SLState{ + "new-state": models.SLState{ + Type: models.SLStateTypeSucceed, + End: true, + }, + }, + }, wfd.StateMachine) }, }, { description: "error - workflow already exists", wfm: &Embedded{ workflowDefinitions: []models.WorkflowDefinition{ - models.WorkflowDefinition{ - Name: "test-wfd", - }, + testWorkflowDefinition, }, }, input: &models.NewWorkflowDefinitionRequest{ - Name: "test-wfd", + Name: testWorkflowDefinition.Name, StateMachine: &models.SLStateMachine{ - Comment: "this is a test", + Comment: "this won't replace", }, }, expected: errors.Errorf("test-wfd workflow definition already exists"), @@ -97,8 +149,9 @@ func (n parseWorkflowDefinitionTest) Run(t *testing.T) { var testHelloWorldWorkflowDefintionYAML = ` manager: step-functions name: hello-world +version: -1.0 stateMachine: - Version: '1.0' + Version: '-1.0' StartAt: start States: start: @@ -129,17 +182,18 @@ var parseWorkflowDefinitionTests = []parseWorkflowDefinitionTest{ assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) { require.Nil(t, err) require.Equal(t, "hello-world", wfd.Name) + require.Equal(t, int64(-1), wfd.Version) }, }, - { - description: "err - malformed yaml", - input: testMalformedWorkflowDefintionYAML, - assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) { - require.Error(t, err) - // verify we're getting an error from the YAML lib - require.Contains(t, strings.ToLower(err.Error()), "yaml") - }, - }, + // { + // description: "err - malformed yaml", + // input: testMalformedWorkflowDefintionYAML, + // assertions: func(t *testing.T, wfd models.WorkflowDefinition, err error) { + // require.Error(t, err) + // // verify we're getting an error from the YAML lib + // require.Contains(t, strings.ToLower(err.Error()), "yaml") + // }, + // }, } func TestParseWorkflowDefintion(t *testing.T) { From af2fec2d3d025caa68f20ef57deb0e2813791d0f Mon Sep 17 00:00:00 2001 From: Sayan Samanta Date: Tue, 27 Aug 2019 11:42:13 -0700 Subject: [PATCH 4/4] 0.10.1 - gen files --- docs/definitions.md | 1 + docs/overview.md | 2 +- .../models/new_workflow_definition_request.go | 3 ++ .../server/db/dynamodb/workflowdefinition.go | 6 ++++ gen-go/server/db/tests/tests.go | 4 ++- gen-go/server/handlers.go | 32 +++++++---------- gen-go/server/router.go | 34 +++++++++++++++---- gen-js/package.json | 2 +- 8 files changed, 54 insertions(+), 30 deletions(-) diff --git a/docs/definitions.md b/docs/definitions.md index 35f21c57..79bca9fb 100644 --- a/docs/definitions.md +++ b/docs/definitions.md @@ -97,6 +97,7 @@ |**manager**
*optional*|[Manager](#manager)| |**name**
*optional*|string| |**stateMachine**
*optional*|[SLStateMachine](#slstatemachine)| +|**version**
*optional*|integer| diff --git a/docs/overview.md b/docs/overview.md index f3ec379b..e846f50c 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions ### Version information -*Version* : 0.10.0 +*Version* : 0.10.1 ### URI scheme diff --git a/gen-go/models/new_workflow_definition_request.go b/gen-go/models/new_workflow_definition_request.go index 89c1f29c..573407c0 100644 --- a/gen-go/models/new_workflow_definition_request.go +++ b/gen-go/models/new_workflow_definition_request.go @@ -27,6 +27,9 @@ type NewWorkflowDefinitionRequest struct { // state machine StateMachine *SLStateMachine `json:"stateMachine,omitempty"` + + // version + Version int64 `json:"version,omitempty"` } // Validate validates this new workflow definition request diff --git a/gen-go/server/db/dynamodb/workflowdefinition.go b/gen-go/server/db/dynamodb/workflowdefinition.go index 2b165711..f110dc7c 100644 --- a/gen-go/server/db/dynamodb/workflowdefinition.go +++ b/gen-go/server/db/dynamodb/workflowdefinition.go @@ -192,6 +192,7 @@ func (t WorkflowDefinitionTable) getWorkflowDefinitionsByNameAndVersion(ctx cont } } + totalRecordsProcessed := int64(0) var pageFnErr error pageFn := func(queryOutput *dynamodb.QueryOutput, lastPage bool) bool { if len(queryOutput.Items) == 0 { @@ -210,6 +211,11 @@ func (t WorkflowDefinitionTable) getWorkflowDefinitionsByNameAndVersion(ctx cont if !fn(&items[i], !hasMore) { return false } + totalRecordsProcessed++ + // if the Limit of records have been passed to fn, don't pass anymore records. + if input.Limit != nil && totalRecordsProcessed == *input.Limit { + return false + } } return true } diff --git a/gen-go/server/db/tests/tests.go b/gen-go/server/db/tests/tests.go index a5866f74..09d3eaa1 100644 --- a/gen-go/server/db/tests/tests.go +++ b/gen-go/server/db/tests/tests.go @@ -83,6 +83,7 @@ func GetWorkflowDefinitionsByNameAndVersion(d db.Interface, t *testing.T) func(t Name: "string1", Version: 3, })) + limit := int64(3) tests := []getWorkflowDefinitionsByNameAndVersionTest{ { testName: "basic", @@ -90,7 +91,8 @@ func GetWorkflowDefinitionsByNameAndVersion(d db.Interface, t *testing.T) func(t input: getWorkflowDefinitionsByNameAndVersionInput{ ctx: context.Background(), input: db.GetWorkflowDefinitionsByNameAndVersionInput{ - Name: "string1", + Name: "string1", + Limit: &limit, }, }, output: getWorkflowDefinitionsByNameAndVersionOutput{ diff --git a/gen-go/server/handlers.go b/gen-go/server/handlers.go index 048e37ea..fc73d59c 100644 --- a/gen-go/server/handlers.go +++ b/gen-go/server/handlers.go @@ -95,8 +95,6 @@ func statusCodeForHealthCheck(obj interface{}) int { } func (h handler) HealthCheckHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { - sp := opentracing.SpanFromContext(ctx) - _ = sp err := h.HealthCheck(ctx) @@ -164,8 +162,8 @@ func statusCodeForPostStateResource(obj interface{}) int { } func (h handler) PostStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newPostStateResourceInput(r) if err != nil { @@ -276,8 +274,6 @@ func statusCodeForDeleteStateResource(obj interface{}) int { } func (h handler) DeleteStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { - sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newDeleteStateResourceInput(r) if err != nil { @@ -398,8 +394,8 @@ func statusCodeForGetStateResource(obj interface{}) int { } func (h handler) GetStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newGetStateResourceInput(r) if err != nil { @@ -526,8 +522,8 @@ func statusCodeForPutStateResource(obj interface{}) int { } func (h handler) PutStateResourceHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newPutStateResourceInput(r) if err != nil { @@ -669,8 +665,8 @@ func statusCodeForGetWorkflowDefinitions(obj interface{}) int { } func (h handler) GetWorkflowDefinitionsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp resp, err := h.GetWorkflowDefinitions(ctx) @@ -756,8 +752,8 @@ func statusCodeForNewWorkflowDefinition(obj interface{}) int { } func (h handler) NewWorkflowDefinitionHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newNewWorkflowDefinitionInput(r) if err != nil { @@ -874,8 +870,8 @@ func statusCodeForGetWorkflowDefinitionVersionsByName(obj interface{}) int { } func (h handler) GetWorkflowDefinitionVersionsByNameHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newGetWorkflowDefinitionVersionsByNameInput(r) if err != nil { @@ -1013,8 +1009,8 @@ func statusCodeForUpdateWorkflowDefinition(obj interface{}) int { } func (h handler) UpdateWorkflowDefinitionHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newUpdateWorkflowDefinitionInput(r) if err != nil { @@ -1146,8 +1142,8 @@ func statusCodeForGetWorkflowDefinitionByNameAndVersion(obj interface{}) int { } func (h handler) GetWorkflowDefinitionByNameAndVersionHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newGetWorkflowDefinitionByNameAndVersionInput(r) if err != nil { @@ -1280,8 +1276,8 @@ func statusCodeForGetWorkflows(obj interface{}) int { } func (h handler) GetWorkflowsHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newGetWorkflowsInput(r) if err != nil { @@ -1492,8 +1488,8 @@ func statusCodeForStartWorkflow(obj interface{}) int { } func (h handler) StartWorkflowHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newStartWorkflowInput(r) if err != nil { @@ -1604,8 +1600,6 @@ func statusCodeForCancelWorkflow(obj interface{}) int { } func (h handler) CancelWorkflowHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { - sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newCancelWorkflowInput(r) if err != nil { @@ -1727,8 +1721,8 @@ func statusCodeForGetWorkflowByID(obj interface{}) int { } func (h handler) GetWorkflowByIDHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp workflowID, err := newGetWorkflowByIDInput(r) if err != nil { @@ -1826,8 +1820,8 @@ func statusCodeForResumeWorkflowByID(obj interface{}) int { } func (h handler) ResumeWorkflowByIDHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { + sp := opentracing.SpanFromContext(ctx) - _ = sp input, err := newResumeWorkflowByIDInput(r) if err != nil { @@ -1961,8 +1955,6 @@ func statusCodeForResolveWorkflowByID(obj interface{}) int { } func (h handler) ResolveWorkflowByIDHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) { - sp := opentracing.SpanFromContext(ctx) - _ = sp workflowID, err := newResolveWorkflowByIDInput(r) if err != nil { diff --git a/gen-go/server/router.go b/gen-go/server/router.go index 50c561fa..b18bc9f1 100644 --- a/gen-go/server/router.go +++ b/gen-go/server/router.go @@ -161,15 +161,16 @@ func New(c Controller, addr string) *Server { return NewWithMiddleware(c, addr, []func(http.Handler) http.Handler{}) } -// NewWithMiddleware returns a Server that implemenets the Controller interface. It runs the -// middleware after the built-in middleware (e.g. logging), but before the controller methods. -// The middleware is executed in the order specified. The server will start when "Serve" is called. -func NewWithMiddleware(c Controller, addr string, m []func(http.Handler) http.Handler) *Server { +// NewRouter returns a mux.Router with no middleware. This is so we can attach additional routes to the +// router if necessary +func NewRouter(c Controller) *mux.Router { + return newRouter(c) +} + +func newRouter(c Controller) *mux.Router { router := mux.NewRouter() h := handler{Controller: c} - l := logger.New("workflow-manager") - router.Methods("GET").Path("/_health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { logger.FromContext(r.Context()).AddContext("op", "healthCheck") h.HealthCheckHandler(r.Context(), w, r) @@ -282,6 +283,25 @@ func NewWithMiddleware(c Controller, addr string, m []func(http.Handler) http.Ha r = r.WithContext(ctx) }) - handler := withMiddleware("workflow-manager", router, m) + return router +} + +// NewWithMiddleware returns a Server that implemenets the Controller interface. It runs the +// middleware after the built-in middleware (e.g. logging), but before the controller methods. +// The middleware is executed in the order specified. The server will start when "Serve" is called. +func NewWithMiddleware(c Controller, addr string, m []func(http.Handler) http.Handler) *Server { + router := newRouter(c) + + return AttachMiddleware(router, addr, m) +} + +// AttachMiddleware attaches the given middleware to the router; this is to be used in conjunction with +// NewServer. It attaches custom middleware passed as arguments as well as the built-in middleware for +// logging, tracing, and handling panics. It should be noted that the built-in middleware executes first +// followed by the passed in middleware (in the order specified). +func AttachMiddleware(router *mux.Router, addr string, m []func(http.Handler) http.Handler) *Server { + l := logger.New("workflow-manager") + + handler := withMiddleware("home-auth", router, m) return &Server{Handler: handler, addr: addr, l: l} } diff --git a/gen-js/package.json b/gen-js/package.json index 98a9e82b..5730a9be 100644 --- a/gen-js/package.json +++ b/gen-js/package.json @@ -1,6 +1,6 @@ { "name": "workflow-manager", - "version": "0.10.0", + "version": "0.10.1", "description": "Orchestrator for AWS Step Functions", "main": "index.js", "dependencies": {