Skip to content

Commit

Permalink
Merge pull request #388 from Clever/INFRANG-6559
Browse files Browse the repository at this point in the history
Infrang 6559
  • Loading branch information
jakegut authored Oct 29, 2024
2 parents 467604b + 9d1ecf8 commit 5087c8e
Show file tree
Hide file tree
Showing 24 changed files with 646 additions and 141 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ SHELL := /bin/bash
APP_NAME ?= workflow-manager
EXECUTABLE = $(APP_NAME)
PKG = github.com/Clever/$(APP_NAME)
PKGS := $(shell go list ./... | grep -v /vendor | grep -v /gen-go | grep -v /workflow-ops | grep -v /dynamodb)
PKGS := $(shell go list ./... | grep -v /vendor | grep -v /gen-go | grep -v /workflow-ops | grep -v /dynamodb | grep -v /scripts)
PKGS := $(PKGS) $(PKG)/gen-go/server/db/dynamodb
APPS := $(shell [ -d "./cmd" ] && ls ./cmd/)

.PHONY: all test build run dynamodb-test $(APPS) $(APP_NAME)

WAG_VERSION := latest

$(eval $(call golang-version-check,1.16))
$(eval $(call golang-version-check,1.21))

all: test build

test: $(PKGS) dynamodb-test
test: $(PKGS) dynamodb-test

$(PKGS): golang-test-all-deps
$(call golang-test-all,$@)
Expand Down
2 changes: 2 additions & 0 deletions docs/definitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@
|---|---|
|**And** <br>*optional*|< [SLChoice](#slchoice) > array|
|**BooleanEquals** <br>*optional*|boolean|
|**IsNull** <br>*optional*|boolean|
|**IsPresent** <br>*optional*|boolean|
|**Next** <br>*optional*|string|
|**Not** <br>*optional*|[SLChoice](#slchoice)|
|**NumericEquals** <br>*optional*|integer|
Expand Down
2 changes: 1 addition & 1 deletion docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Orchestrator for AWS Step Functions


### Version information
*Version* : 0.16.1
*Version* : 0.16.2


### URI scheme
Expand Down
1 change: 1 addition & 0 deletions dynamodb-local-metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"installationId":"998062c4-a80b-45d7-a1e3-a2ea548dc92a","telemetryEnabled":"true"}
55 changes: 33 additions & 22 deletions embedded/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ import (

// Embedded ...
type Embedded struct {
environment string
app string
sfnAccountID string
sfnRegion string
sfnRoleArn string
sfnAPI sfniface.SFNAPI
resources map[string]*sfnfunction.Resource
concurrencyLimits map[string]int64
workflowDefinitions []models.WorkflowDefinition
workerName string
environment string
app string
sfnAccountID string
sfnRegion string
sfnRoleArn string
sfnAPI sfniface.SFNAPI
resources map[string]*sfnfunction.Resource
concurrencyLimits map[string]int64
getActivityTaskPollRate time.Duration
workflowDefinitions []models.WorkflowDefinition
workerName string
}

var _ client.Client = &Embedded{}
Expand All @@ -62,12 +63,17 @@ type Config struct {
// It is a global setting for all resources that defaults to 1.
// It is safe use to alongside PerResourceConcurrencyLimits.
AllResourceConcurrencyLimit *int64
WorkflowDefinitions []byte
WorkerName string
// GetActivityTaskPollRate determines the rate in seconds
// the max interval at which to poll GetActivityTask per resource,
// defaults to 1 per second
GetActivityTaskPollRate *int64
WorkflowDefinitions []byte
WorkerName string
}

// DefaultResourceConcurrency ...
const DefaultResourceConcurrency int64 = 1
const DefaultGetActivityTaskPollRate int64 = 1

func (c Config) validate() error {
if c.Environment == "" {
Expand Down Expand Up @@ -100,6 +106,10 @@ func New(config *Config) (*Embedded, error) {
if err != nil {
return nil, err
}
getActivityTaskPollRate := DefaultGetActivityTaskPollRate
if config.GetActivityTaskPollRate != nil {
getActivityTaskPollRate = *config.GetActivityTaskPollRate
}
r := map[string]*sfnfunction.Resource{}
concurrencyLimits := make(map[string]int64)
concurrencyLimit := DefaultResourceConcurrency
Expand Down Expand Up @@ -133,16 +143,17 @@ func New(config *Config) (*Embedded, error) {
wn = fmt.Sprintf("%s-%s-%s", an, hn, randString(5))
}
return &Embedded{
environment: config.Environment,
app: config.App,
sfnAccountID: config.SFNAccountID,
sfnRegion: config.SFNRegion,
sfnRoleArn: config.SFNRoleArn,
sfnAPI: config.SFNAPI,
resources: r,
concurrencyLimits: concurrencyLimits,
workflowDefinitions: wfdefs,
workerName: wn,
environment: config.Environment,
app: config.App,
sfnAccountID: config.SFNAccountID,
sfnRegion: config.SFNRegion,
sfnRoleArn: config.SFNRoleArn,
sfnAPI: config.SFNAPI,
resources: r,
concurrencyLimits: concurrencyLimits,
workflowDefinitions: wfdefs,
workerName: wn,
getActivityTaskPollRate: time.Duration(getActivityTaskPollRate),
}, nil
}

Expand Down
8 changes: 5 additions & 3 deletions embedded/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func (e *Embedded) PollForWork(ctx context.Context) error {

func (e *Embedded) pollGetActivityTask(ctx context.Context, resourceName string, resource *sfnfunction.Resource, activityArn string) error {
concurrentExecutions := swag.Int64(0)
// allow one GetActivityTask per second, max 1 at a time
limiter := rate.NewLimiter(rate.Every(1*time.Second), 1)
// limit GetActivitTask calls based on rate provided in config, default to 1 per second
limiter := rate.NewLimiter(rate.Every(time.Second/e.getActivityTaskPollRate), 1)
for ctx.Err() == nil {
if err := limiter.Wait(ctx); err != nil {
continue
Expand All @@ -63,7 +63,9 @@ func (e *Embedded) pollGetActivityTask(ctx context.Context, resourceName string,
case <-ctx.Done():
log.Info("getactivitytask-stop")
default:
log.TraceD("getactivitytask-start", logger.M{"activity-arn": activityArn, "worker-name": e.workerName})
if e.getActivityTaskPollRate == 1 {
log.TraceD("getactivitytask-start", logger.M{"activity-arn": activityArn, "worker-name": e.workerName})
}
out, err := e.sfnAPI.GetActivityTaskWithContext(ctx, &sfn.GetActivityTaskInput{
ActivityArn: aws.String(activityArn),
WorkerName: aws.String(e.workerName),
Expand Down
98 changes: 81 additions & 17 deletions gen-go/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var _ = strconv.FormatInt
var _ = bytes.Compare

// Version of the client.
const Version = "0.16.1"
const Version = "0.16.2"

// VersionHeader is sent with every request.
const VersionHeader = "X-Client-Version"
Expand Down Expand Up @@ -221,7 +221,11 @@ func (c *WagClient) doHealthCheckRequest(ctx context.Context, req *http.Request,
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -325,7 +329,11 @@ func (c *WagClient) doPostStateResourceRequest(ctx context.Context, req *http.Re
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -430,7 +438,11 @@ func (c *WagClient) doDeleteStateResourceRequest(ctx context.Context, req *http.
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -538,7 +550,11 @@ func (c *WagClient) doGetStateResourceRequest(ctx context.Context, req *http.Req
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -661,7 +677,11 @@ func (c *WagClient) doPutStateResourceRequest(ctx context.Context, req *http.Req
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -759,7 +779,11 @@ func (c *WagClient) doGetWorkflowDefinitionsRequest(ctx context.Context, req *ht
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -868,7 +892,11 @@ func (c *WagClient) doNewWorkflowDefinitionRequest(ctx context.Context, req *htt
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -973,7 +1001,11 @@ func (c *WagClient) doGetWorkflowDefinitionVersionsByNameRequest(ctx context.Con
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1097,7 +1129,11 @@ func (c *WagClient) doUpdateWorkflowDefinitionRequest(ctx context.Context, req *
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1210,7 +1246,11 @@ func (c *WagClient) doGetWorkflowDefinitionByNameAndVersionRequest(ctx context.C
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1409,7 +1449,11 @@ func (c *WagClient) doGetWorkflowsRequest(ctx context.Context, req *http.Request
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1527,7 +1571,11 @@ func (c *WagClient) doStartWorkflowRequest(ctx context.Context, req *http.Reques
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1651,7 +1699,11 @@ func (c *WagClient) doCancelWorkflowRequest(ctx context.Context, req *http.Reque
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1759,7 +1811,11 @@ func (c *WagClient) doGetWorkflowByIDRequest(ctx context.Context, req *http.Requ
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1883,7 +1939,11 @@ func (c *WagClient) doResumeWorkflowByIDRequest(ctx context.Context, req *http.R
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down Expand Up @@ -1997,7 +2057,11 @@ func (c *WagClient) doResolveWorkflowByIDRequest(ctx context.Context, req *http.
"uri": req.URL,
"status_code": retCode,
}
if err == nil && retCode > 399 {
if err == nil && retCode > 399 && retCode < 500 {
logData["message"] = resp.Status
c.logger.Log(wcl.Warning, "client-request-finished", logData)
}
if err == nil && retCode > 499 {
logData["message"] = resp.Status
c.logger.Log(wcl.Error, "client-request-finished", logData)
}
Expand Down
2 changes: 1 addition & 1 deletion gen-go/client/go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

module github.com/Clever/workflow-manager/gen-go/client

go 1.16
go 1.21

require (
github.com/Clever/discovery-go v1.8.1
Expand Down
2 changes: 1 addition & 1 deletion gen-go/models/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
module github.com/Clever/workflow-manager/gen-go/models


go 1.16
go 1.21

require (
github.com/go-openapi/errors v0.20.2
Expand Down
Loading

0 comments on commit 5087c8e

Please sign in to comment.