Skip to content

Commit

Permalink
Merge pull request #288 from Clever/embedded-map
Browse files Browse the repository at this point in the history
Embedded map
  • Loading branch information
taylor-sutton authored Mar 9, 2021
2 parents 1992829 + 11592d7 commit 48be6cd
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions


### Version information
*Version* : 0.13.0
*Version* : 0.13.1


### URI scheme
Expand Down
35 changes: 34 additions & 1 deletion embedded/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,36 @@ func validateWorkflowDefinitionStates(wfd models.WorkflowDefinition, resources m
if state.Seconds <= 0 {
return errors.Errorf("invalid seconds parameter in wait %s.%s", wfd.Name, stateName)
}
case models.SLStateTypeSucceed, models.SLStateTypeFail, models.SLStateTypeParallel:
case models.SLStateTypeMap:
checkNextState = true
if state.Iterator == nil {
return errors.Errorf("required parameter Iterator for Map state not given in state %s.%s", wfd.Name, stateName)
}
if state.MaxConcurrency < 0 {
return errors.Errorf("invalid MaxConcurrency in Map state %s.%s, must, cannot be negative, got %d", wfd.Name, stateName, state.MaxConcurrency)
}
innerWFD := models.WorkflowDefinition{
Name: fmt.Sprintf("%s__Iterator", stateName),
StateMachine: state.Iterator,
}
if err := validateWorkflowDefinition(innerWFD, resources); err != nil {
return errors.Errorf("inside the Iterator for Map state %s.%s: %w",
wfd.Name, stateName, err,
)
}
case models.SLStateTypeParallel:
for i, branch := range state.Branches {
innerWFD := models.WorkflowDefinition{
Name: fmt.Sprintf("%s__Branch[%d]", stateName, i),
StateMachine: branch,
}
if err := validateWorkflowDefinition(innerWFD, resources); err != nil {
return errors.Errorf("inside the Branch[%d] for Parallel state %s.%s: %w",
i, wfd.Name, stateName,
)
}
}
case models.SLStateTypeSucceed, models.SLStateTypeFail:
// no op
default:
return errors.Errorf("invalid state type '%s' in %s.%s", state.Type, wfd.Name, stateName)
Expand Down Expand Up @@ -326,6 +355,10 @@ func (e *Embedded) setStateMachineResources(i *models.StartWorkflowRequest, stat
state.Branches[idx] = branch
}
stateMachine.States[stateName] = state

case models.SLStateTypeMap:
e.setStateMachineResources(i, state.Iterator)
stateMachine.States[stateName] = state
}
}
}
Expand Down
42 changes: 42 additions & 0 deletions embedded/embedded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,48 @@ var validateWorkflowDefinitionStatesTests = []validateWorkflowDefinitionStatesTe
require.NoError(t, err)
},
},
{
description: "validate map state (invalid inner)",
input: models.WorkflowDefinition{
StateMachine: &models.SLStateMachine{
States: map[string]models.SLState{
"map": models.SLState{
Type: models.SLStateTypeMap,
Iterator: &models.SLStateMachine{},
End: true,
},
},
},
},
assertions: func(t *testing.T, err error) {
require.Error(t, err)
},
},
{
description: "validate map state (valid inner)",
input: models.WorkflowDefinition{
StateMachine: &models.SLStateMachine{
States: map[string]models.SLState{
"map": models.SLState{
Type: models.SLStateTypeMap,
Iterator: &models.SLStateMachine{
States: map[string]models.SLState{
"pass": models.SLState{
End: true,
Type: models.SLStateTypePass,
Result: "result",
},
},
},
End: true,
},
},
},
},
assertions: func(t *testing.T, err error) {
require.NoError(t, err)
},
},
{
description: "invalid state type",
input: models.WorkflowDefinition{
Expand Down
37 changes: 19 additions & 18 deletions gen-go/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = strconv.FormatInt
var _ = bytes.Compare

// Version of the client.
const Version = "0.13.0"
const Version = "0.13.1"

// VersionHeader is sent with every request.
const VersionHeader = "X-Client-Version"
Expand Down Expand Up @@ -54,7 +54,8 @@ func New(basePath string) *WagClient {
retry := retryDoer{d: tracing, retryPolicy: SingleRetryPolicy{}}
logger := logger.New("workflow-manager-wagclient")
circuit := &circuitBreakerDoer{
d: &retry,
d: &retry,
// TODO: INFRANG-4404 allow passing circuitBreakerOptions
debug: true,
// one circuit for each service + url pair
circuitName: fmt.Sprintf("workflow-manager-%s", shortHash(basePath)),
Expand Down Expand Up @@ -240,7 +241,7 @@ func (c *WagClient) doHealthCheckRequest(ctx context.Context, req *http.Request,
return &output

default:
return &models.InternalError{Message: "Unknown response"}
return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -347,7 +348,7 @@ func (c *WagClient) doPostStateResourceRequest(ctx context.Context, req *http.Re
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -453,7 +454,7 @@ func (c *WagClient) doDeleteStateResourceRequest(ctx context.Context, req *http.
return &output

default:
return &models.InternalError{Message: "Unknown response"}
return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -564,7 +565,7 @@ func (c *WagClient) doGetStateResourceRequest(ctx context.Context, req *http.Req
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -677,7 +678,7 @@ func (c *WagClient) doPutStateResourceRequest(ctx context.Context, req *http.Req
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -773,7 +774,7 @@ func (c *WagClient) doGetWorkflowDefinitionsRequest(ctx context.Context, req *ht
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -880,7 +881,7 @@ func (c *WagClient) doNewWorkflowDefinitionRequest(ctx context.Context, req *htt
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -991,7 +992,7 @@ func (c *WagClient) doGetWorkflowDefinitionVersionsByNameRequest(ctx context.Con
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1113,7 +1114,7 @@ func (c *WagClient) doUpdateWorkflowDefinitionRequest(ctx context.Context, req *
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1224,7 +1225,7 @@ func (c *WagClient) doGetWorkflowDefinitionByNameAndVersionRequest(ctx context.C
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1421,7 +1422,7 @@ func (c *WagClient) doGetWorkflowsRequest(ctx context.Context, req *http.Request
return nil, "", &output

default:
return nil, "", &models.InternalError{Message: "Unknown response"}
return nil, "", &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1537,7 +1538,7 @@ func (c *WagClient) doStartWorkflowRequest(ctx context.Context, req *http.Reques
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1654,7 +1655,7 @@ func (c *WagClient) doCancelWorkflowRequest(ctx context.Context, req *http.Reque
return &output

default:
return &models.InternalError{Message: "Unknown response"}
return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1765,7 +1766,7 @@ func (c *WagClient) doGetWorkflowByIDRequest(ctx context.Context, req *http.Requ
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -1887,7 +1888,7 @@ func (c *WagClient) doResumeWorkflowByIDRequest(ctx context.Context, req *http.R
return nil, &output

default:
return nil, &models.InternalError{Message: "Unknown response"}
return nil, &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down Expand Up @@ -2002,7 +2003,7 @@ func (c *WagClient) doResolveWorkflowByIDRequest(ctx context.Context, req *http.
return &output

default:
return &models.InternalError{Message: "Unknown response"}
return &models.InternalError{Message: fmt.Sprintf("Unknown status code %v", resp.StatusCode)}
}
}

Expand Down
9 changes: 9 additions & 0 deletions gen-go/client/doer.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ func (d *circuitBreakerDoer) init() {
if e.Type != "HystrixCommand" {
continue
}

// Today we are creating a stream for every client so lets only log events for this
// circuit. In an ideal world we only create a single stream and pass it to the client.
// Lets worry about doing this when we implement passing circuitBreakerOptions
// to disable debug mode
if e.Name != d.circuitName {
continue
}

lastSeen, ok := lastEventSeen[e.Name]
lastEventSeen[e.Name] = e

Expand Down
2 changes: 1 addition & 1 deletion gen-js/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2461,7 +2461,7 @@ module.exports.Errors = Errors;

module.exports.DefaultCircuitOptions = defaultCircuitOptions;

const version = "0.13.0";
const version = "0.13.1";
const versionHeader = "X-Client-Version";
module.exports.Version = version;
module.exports.VersionHeader = versionHeader;
2 changes: 1 addition & 1 deletion gen-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "workflow-manager",
"version": "0.13.0",
"version": "0.13.1",
"description": "Orchestrator for AWS Step Functions",
"main": "index.js",
"dependencies": {
Expand Down
2 changes: 1 addition & 1 deletion swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ info:
description: Orchestrator for AWS Step Functions
# when changing the version here, make sure to
# re-run `make generate` to generate clients and server
version: 0.13.0
version: 0.13.1
x-npm-package: workflow-manager
schemes:
- http
Expand Down

0 comments on commit 48be6cd

Please sign in to comment.