-
Notifications
You must be signed in to change notification settings - Fork 0
/
lambda.go
79 lines (73 loc) · 2.19 KB
/
lambda.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package lambdag
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/Songmu/flextime"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/lambda/messages"
"github.com/google/uuid"
)
type LambdaHandler struct {
dag *DAG
}
func NewLambdaHandler(dag *DAG) lambda.Handler {
handler := &LambdaHandler{
dag: dag,
}
return lambda.NewHandler(handler.Invoke)
}
type DAGRunContext struct {
DAGRunID string `json:"DAGRunId"`
DAGRunStartAt time.Time `json:"DAGRunStartAt"`
DAGRunConfig json.RawMessage `json:"DAGRunConfig"`
TaskResponses map[string]json.RawMessage `json:"TaskResponses,omitempty"`
LambdaCallCount int `json:"LambdaCallCount"`
Continue bool `json:"Continue"`
IsCircuitBreak bool `json:"IsCircuitBreak"`
}
func (h *LambdaHandler) Invoke(ctx context.Context, payload json.RawMessage) (interface{}, error) {
var dagRunCtx DAGRunContext
if err := json.Unmarshal(payload, &dagRunCtx); err != nil || dagRunCtx.DAGRunID == "" {
dagRunCtx.DAGRunConfig = payload
dagRunCtx.TaskResponses = make(map[string]json.RawMessage)
uuidObj, err := uuid.NewRandom()
if err != nil {
return nil, err
}
dagRunCtx.DAGRunID = uuidObj.String()
dagRunCtx.DAGRunStartAt = flextime.Now()
dagRunCtx.LambdaCallCount = 0
}
updatedDAGRunCtx, err := h.dag.Execute(ctx, &dagRunCtx)
if err != nil {
var tre *TaskRetryableError
if errors.As(err, &tre) {
if h.dag.NumOfTasksInSingleInvoke() > 1 {
updatedDAGRunCtx.Continue = true
return updatedDAGRunCtx, nil
}
return nil, messages.InvokeResponse_Error{
Message: tre.Error(),
Type: "LambDAG.Retryable",
}
}
var jme *json.MarshalerError
if errors.As(err, &jme) {
return nil, messages.InvokeResponse_Error{
Message: err.Error(),
Type: "LambDAG.ResponseInvalid",
}
}
return nil, err
}
if updatedDAGRunCtx.IsCircuitBreak {
return dagRunCtx, messages.InvokeResponse_Error{
Message: fmt.Sprintf("CircuitBreak: lambda call count over %d", h.dag.CircuitBreaker()),
Type: "LambDAG.CircuitBreak",
}
}
return updatedDAGRunCtx, nil
}