Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Flyte webhooks #583

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,5 @@ replace github.com/robfig/cron/v3 => github.com/unionai/cron/v3 v3.0.2-0.2022091
// Retracted versions
// This was published in error when attempting to create 1.5.1 Flyte release.
retract v1.1.94

replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.17-0.20230821222808-485ae223c7f1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ github.com/fatih/structs v1.0.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.5.14 h1:+3ewipoOp82fPyIVgvvrMq1lorl5Kz3Lh6sh/a9+loI=
github.com/flyteorg/flyteidl v1.5.14/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteidl v1.5.17-0.20230821222808-485ae223c7f1 h1:B0OlEJujyYKWVkgJ7cqsIVbhaRGwMgdToCIAau8JmAY=
github.com/flyteorg/flyteidl v1.5.17-0.20230821222808-485ae223c7f1/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og=
github.com/flyteorg/flyteplugins v1.0.67 h1:d2FXpwxQwX/k4YdmhuusykOemHb/cUTPEob4WBmdpjE=
github.com/flyteorg/flyteplugins v1.0.67/go.mod h1:HHt4nKDKVwrZPKDsj99dNtDSIJL378xNotYMA3a/TFA=
github.com/flyteorg/flytepropeller v1.1.98 h1:Zk2ENYB9VZRT5tFUIFjm+aCkr0TU2EuyJ5gh52fpLoA=
Expand Down
6 changes: 3 additions & 3 deletions pkg/async/notifications/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var getTemplateValueFuncs = map[string]GetTemplateValue{
launchPlanVersion: getLaunchPlanVersion,
}

func substituteEmailParameters(message string, request admin.WorkflowExecutionEventRequest, execution *admin.Execution) string {
func SubstituteParameters(message string, request admin.WorkflowExecutionEventRequest, execution *admin.Execution) string {
for template, function := range getTemplateValueFuncs {
message = strings.Replace(message, fmt.Sprintf(substitutionParam, template), function(request, execution), replaceAllInstances)
message = strings.Replace(message, fmt.Sprintf(substitutionParamNoSpaces, template), function(request, execution), replaceAllInstances)
Expand All @@ -118,9 +118,9 @@ func ToEmailMessageFromWorkflowExecutionEvent(
execution *admin.Execution) *admin.EmailMessage {

return &admin.EmailMessage{
SubjectLine: substituteEmailParameters(config.NotificationsEmailerConfig.Subject, request, execution),
SubjectLine: SubstituteParameters(config.NotificationsEmailerConfig.Subject, request, execution),
SenderEmail: config.NotificationsEmailerConfig.Sender,
RecipientsEmail: emailNotification.GetRecipientsEmail(),
Body: substituteEmailParameters(config.NotificationsEmailerConfig.Body, request, execution),
Body: SubstituteParameters(config.NotificationsEmailerConfig.Body, request, execution),
}
}
8 changes: 4 additions & 4 deletions pkg/async/notifications/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func TestSubstituteEmailParameters(t *testing.T) {
},
}
assert.Equal(t, "{{ unused }}. {{project }} and prod and e124 ended up in succeeded.",
substituteEmailParameters(message, request, workflowExecution))
SubstituteParameters(message, request, workflowExecution))
request.Event.OutputResult = &event.WorkflowExecutionEvent_Error{
Error: &core.ExecutionError{
Message: "uh-oh",
},
}
assert.Equal(t, "{{ unused }}. {{project }} and prod and e124 ended up in succeeded. The execution failed with error: [uh-oh].",
substituteEmailParameters(message, request, workflowExecution))
SubstituteParameters(message, request, workflowExecution))
}

func TestSubstituteAllTemplates(t *testing.T) {
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestSubstituteAllTemplates(t *testing.T) {
},
}
assert.Equal(t, strings.Join(desiredResult, ","),
substituteEmailParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
SubstituteParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
}

func TestSubstituteAllTemplatesNoSpaces(t *testing.T) {
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestSubstituteAllTemplatesNoSpaces(t *testing.T) {
},
}
assert.Equal(t, strings.Join(desiredResult, ","),
substituteEmailParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
SubstituteParameters(strings.Join(messageTemplate, ","), request, workflowExecution))
}

func TestToEmailMessageFromWorkflowExecutionEvent(t *testing.T) {
Expand Down
97 changes: 21 additions & 76 deletions pkg/async/notifications/implementations/aws_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package implementations

import (
"context"
"encoding/base64"
"encoding/json"
"time"

"github.com/NYTimes/gizmo/pubsub"
Expand All @@ -17,12 +15,11 @@ import (

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type Processor struct {
sub pubsub.Subscriber
email interfaces.Emailer
systemMetrics processorSystemMetrics
email interfaces.Emailer
interfaces.BaseProcessor
}

// Currently only email is the supported notification because slack and pagerduty both use
// StartProcessing Currently only email is the supported notification because slack and pagerduty both use
// email client to trigger those notifications.
// When Pagerduty and other notifications are supported, a publisher per type should be created.
func (p *Processor) StartProcessing() {
Expand All @@ -37,102 +34,50 @@ func (p *Processor) StartProcessing() {
func (p *Processor) run() error {
var emailMessage admin.EmailMessage
var err error
for msg := range p.sub.Start() {
p.systemMetrics.MessageTotal.Inc()
// Currently this is safe because Gizmo takes a string and casts it to a byte array.
for msg := range p.Sub.Start() {
p.SystemMetrics.MessageTotal.Inc()
stringMsg := string(msg.Message())

var snsJSONFormat map[string]interface{}

// At Lyft, SNS populates SQS. This results in the message body of SQS having the SNS message format.
// The message format is documented here: https://docs.aws.amazon.com/sns/latest/dg/sns-message-and-json-formats.html
// The notification published is stored in the message field after unmarshalling the SQS message.
if err := json.Unmarshal(msg.Message(), &snsJSONFormat); err != nil {
p.systemMetrics.MessageDecodingError.Inc()
logger.Errorf(context.Background(), "failed to unmarshall JSON message [%s] from processor with err: %v", stringMsg, err)
p.markMessageDone(msg)
continue
}

var value interface{}
var ok bool
var valueString string

if value, ok = snsJSONFormat["Message"]; !ok {
logger.Errorf(context.Background(), "failed to retrieve message from unmarshalled JSON object [%s]", stringMsg)
p.systemMetrics.MessageDataError.Inc()
p.markMessageDone(msg)
_, messageByte, ok := p.FromSQSMessage(msg)
if !ok {
continue
}

if valueString, ok = value.(string); !ok {
p.systemMetrics.MessageDataError.Inc()
logger.Errorf(context.Background(), "failed to retrieve notification message (in string format) from unmarshalled JSON object for message [%s]", stringMsg)
p.markMessageDone(msg)
continue
}

// The Publish method for SNS Encodes the notification using Base64 then stringifies it before
// setting that as the message body for SNS. Do the inverse to retrieve the notification.
notificationBytes, err := base64.StdEncoding.DecodeString(valueString)
if err != nil {
logger.Errorf(context.Background(), "failed to Base64 decode from message string [%s] from message [%s] with err: %v", valueString, stringMsg, err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
continue
}

if err = proto.Unmarshal(notificationBytes, &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object from decoded string[%s] from message [%s] with err: %v", valueString, stringMsg, err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
if err = proto.Unmarshal(messageByte, &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object from decoded string from message [%s] with err: %v", stringMsg, err)
p.SystemMetrics.MessageDecodingError.Inc()
p.MarkMessageDone(msg)
continue
}

if err = p.email.SendEmail(context.Background(), emailMessage); err != nil {
p.systemMetrics.MessageProcessorError.Inc()
p.SystemMetrics.MessageProcessorError.Inc()
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
} else {
p.systemMetrics.MessageSuccess.Inc()
p.SystemMetrics.MessageSuccess.Inc()
}

p.markMessageDone(msg)

p.MarkMessageDone(msg)
}

// According to https://github.com/NYTimes/gizmo/blob/f2b3deec03175b11cdfb6642245a49722751357f/pubsub/pubsub.go#L36-L39,
// the channel backing the subscriber will just close if there is an error. The call to Err() is needed to identify
// there was an error in the channel or there are no more messages left (resulting in no errors when calling Err()).
if err = p.sub.Err(); err != nil {
p.systemMetrics.ChannelClosedError.Inc()
if err = p.Sub.Err(); err != nil {
p.SystemMetrics.ChannelClosedError.Inc()
logger.Warningf(context.Background(), "The stream for the subscriber channel closed with err: %v", err)
}

// If there are no errors, nil will be returned.
return err
}

func (p *Processor) markMessageDone(message pubsub.SubscriberMessage) {
if err := message.Done(); err != nil {
p.systemMetrics.MessageDoneError.Inc()
logger.Errorf(context.Background(), "failed to mark message as Done() in processor with err: %v", err)
}
}

func (p *Processor) StopProcessing() error {
// Note: If the underlying channel is already closed, then Stop() will return an error.
err := p.sub.Stop()
if err != nil {
p.systemMetrics.StopError.Inc()
logger.Errorf(context.Background(), "Failed to stop the subscriber channel gracefully with err: %v", err)
}
return err
}

func NewProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor {
return &Processor{
sub: sub,
email: emailer,
systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("processor")),
email: emailer,
BaseProcessor: interfaces.BaseProcessor{
Sub: sub,
SystemMetrics: interfaces.NewProcessorSystemMetrics(scope.NewSubScope("processor")),
},
}
}
49 changes: 16 additions & 33 deletions pkg/async/notifications/implementations/gcp_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ import (

// TODO: Add a counter that encompasses the publisher stats grouped by project and domain.
type GcpProcessor struct {
sub pubsub.Subscriber
email interfaces.Emailer
systemMetrics processorSystemMetrics
email interfaces.Emailer
interfaces.BaseProcessor
}

func NewGcpProcessor(sub pubsub.Subscriber, emailer interfaces.Emailer, scope promutils.Scope) interfaces.Processor {
return &GcpProcessor{
sub: sub,
email: emailer,
systemMetrics: newProcessorSystemMetrics(scope.NewSubScope("gcp_processor")),
email: emailer,
BaseProcessor: interfaces.BaseProcessor{
Sub: sub,
SystemMetrics: interfaces.NewProcessorSystemMetrics(scope.NewSubScope("processor")),
},
}
}

Expand All @@ -40,52 +41,34 @@ func (p *GcpProcessor) StartProcessing() {
func (p *GcpProcessor) run() error {
var emailMessage admin.EmailMessage

for msg := range p.sub.Start() {
p.systemMetrics.MessageTotal.Inc()
for msg := range p.Sub.Start() {
p.SystemMetrics.MessageTotal.Inc()

if err := proto.Unmarshal(msg.Message(), &emailMessage); err != nil {
logger.Debugf(context.Background(), "failed to unmarshal to notification object message [%s] with err: %v", string(msg.Message()), err)
p.systemMetrics.MessageDecodingError.Inc()
p.markMessageDone(msg)
p.SystemMetrics.MessageDecodingError.Inc()
p.MarkMessageDone(msg)
continue
}

if err := p.email.SendEmail(context.Background(), emailMessage); err != nil {
p.systemMetrics.MessageProcessorError.Inc()
p.SystemMetrics.MessageProcessorError.Inc()
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
} else {
p.systemMetrics.MessageSuccess.Inc()
p.SystemMetrics.MessageSuccess.Inc()
}

p.markMessageDone(msg)
p.MarkMessageDone(msg)
}

// According to https://github.com/NYTimes/gizmo/blob/f2b3deec03175b11cdfb6642245a49722751357f/pubsub/pubsub.go#L36-L39,
// the channel backing the subscriber will just close if there is an error. The call to Err() is needed to identify
// there was an error in the channel or there are no more messages left (resulting in no errors when calling Err()).
if err := p.sub.Err(); err != nil {
p.systemMetrics.ChannelClosedError.Inc()
if err := p.Sub.Err(); err != nil {
p.SystemMetrics.ChannelClosedError.Inc()
logger.Warningf(context.Background(), "The stream for the subscriber channel closed with err: %v", err)
return err
}

return nil
}

func (p *GcpProcessor) markMessageDone(message pubsub.SubscriberMessage) {
if err := message.Done(); err != nil {
p.systemMetrics.MessageDoneError.Inc()
logger.Errorf(context.Background(), "failed to mark message as Done() in processor with err: %v", err)
}
}

func (p *GcpProcessor) StopProcessing() error {
// Note: If the underlying channel is already closed, then Stop() will return an error.
if err := p.sub.Stop(); err != nil {
p.systemMetrics.StopError.Inc()
logger.Errorf(context.Background(), "Failed to stop the subscriber channel gracefully with err: %v", err)
return err
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestGcpProcessor_StartProcessing(t *testing.T) {

// Check fornumber of messages processed.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageSuccess.Write(m)
err := testGcpProcessor.(*GcpProcessor).SystemMetrics.MessageSuccess.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:<value:1 > ", m.String())
}
Expand All @@ -60,7 +60,7 @@ func TestGcpProcessor_StartProcessingNoMessages(t *testing.T) {

// Check fornumber of messages processed.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageSuccess.Write(m)
err := testGcpProcessor.(*GcpProcessor).SystemMetrics.MessageSuccess.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:<value:0 > ", m.String())
}
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestGcpProcessor_StartProcessingEmailError(t *testing.T) {

// Check for an email error stat.
m := &dto.Metric{}
err := testGcpProcessor.(*GcpProcessor).systemMetrics.MessageProcessorError.Write(m)
err := testGcpProcessor.(*GcpProcessor).SystemMetrics.MessageProcessorError.Write(m)
assert.Nil(t, err)
assert.Equal(t, "counter:<value:1 > ", m.String())
}
Expand Down
Loading
Loading