Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IMC and Multi-Tenant Channel Based Broker retries #2932

Merged
merged 10 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/gofuzz v1.1.0
github.com/google/mako v0.0.0-20190821191249-122f8dcef9e3
github.com/google/uuid v1.1.1
github.com/hashicorp/go-retryablehttp v0.6.6
github.com/influxdata/tdigest v0.0.0-20191024211133-5d87a7585faa // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -635,14 +635,17 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-retryablehttp v0.6.6 h1:HJunrbHTDDbBb/ay4kxa1n+dLmttUlnP3V9oNE4hmsM=
github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
Expand Down
50 changes: 44 additions & 6 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ package fanout

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
"net/url"
"time"
Expand All @@ -33,17 +35,32 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
)

const (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your doing, but 15 minute timeout? :) I'll look what this is for. Just a note :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems too much :)

It's used here:

errorCh := make(chan error, subs)
for _, sub := range f.config.Subscriptions {
go func(s Subscription) {
errorCh <- f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
}(sub)
}
for range f.config.Subscriptions {
select {
case err := <-errorCh:
if err != nil {
f.logger.Error("Fanout had an error", zap.Error(err))
return err
}
case <-time.After(f.timeout):
f.logger.Error("Fanout timed out")
return errors.New("fanout timed out")
}
}

defaultTimeout = 15 * time.Minute
)

type Subscription struct {
eventingduck.SubscriberSpec
RetriesConfig kncloudevents.RetriesConfig
}

func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.SubscriberSpec)
}

func (s *Subscription) UnmarshalJSON(bytes []byte) error {
return json.Unmarshal(bytes, &s.SubscriberSpec)
}

// Config for a fanout.MessageHandler.
type Config struct {
Subscriptions []eventingduck.SubscriberSpec `json:"subscriptions"`
Subscriptions []Subscription `json:"subscriptions"`
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
AsyncHandler bool `json:"asyncHandler,omitempty"`
Expand Down Expand Up @@ -78,9 +95,26 @@ func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDisp
return nil, err
}
handler.receiver = receiver

for i := range config.Subscriptions {
retriesConfig, err := retriesOf(config.Subscriptions[i].SubscriberSpec)
if err != nil {
return nil, fmt.Errorf("failed to create retries config from SubscriberSpec: %w", err)
}
config.Subscriptions[i].RetriesConfig = retriesConfig
}

return handler, nil
}

func retriesOf(spec eventingduck.SubscriberSpec) (kncloudevents.RetriesConfig, error) {
delivery := &eventingduckv1.DeliverySpec{}

_ = spec.ConvertTo(context.Background(), delivery)

return kncloudevents.RetryConfigFromDeliverySpec(*delivery)
}

func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
if f.config.AsyncHandler {
return func(ctx context.Context, _ channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
Expand Down Expand Up @@ -140,7 +174,7 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M

errorCh := make(chan error, subs)
for _, sub := range f.config.Subscriptions {
go func(s eventingduck.SubscriberSpec) {
go func(s Subscription) {
errorCh <- f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
}(sub)
}
Expand All @@ -163,18 +197,22 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M

// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub eventingduck.SubscriberSpec) error {
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error {

var destination *url.URL
if sub.SubscriberURI != nil {
destination = sub.SubscriberURI.URL()
}

var reply *url.URL
if sub.ReplyURI != nil {
reply = sub.ReplyURI.URL()
}
var deadLetter *url.URL

var deadLetterURL *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
deadLetter = sub.Delivery.DeadLetterSink.URI.URL()
deadLetterURL = sub.Delivery.DeadLetterSink.URI.URL()
}
return f.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter)

return f.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetterURL, sub.RetriesConfig)
}
78 changes: 49 additions & 29 deletions pkg/channel/fanout/fanout_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
testCases := map[string]struct {
receiverFunc channel.UnbufferedMessageReceiverFunc
timeout time.Duration
subs []eventingduck.SubscriberSpec
subs []Subscription
subscriber func(http.ResponseWriter, *http.Request)
subscriberReqs int
replier func(http.ResponseWriter, *http.Request)
Expand All @@ -75,9 +75,11 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
},
"fanout times out": {
timeout: time.Millisecond,
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
},
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -89,21 +91,23 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"zero subs succeed": {
subs: []eventingduck.SubscriberSpec{},
subs: []Subscription{},
expectedStatus: http.StatusAccepted,
asyncExpectedStatus: http.StatusAccepted,
},
"empty sub succeeds": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{},
},
expectedStatus: http.StatusAccepted,
asyncExpectedStatus: http.StatusAccepted,
},
"reply fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
ReplyURI: replaceReplier,
},
},
},
replier: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -114,9 +118,11 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"subscriber fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
},
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -127,10 +133,12 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"subscriber succeeds, result fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -143,10 +151,12 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"one sub succeeds": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -159,14 +169,18 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"one sub succeeds, one sub fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -177,18 +191,24 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"all subs succeed": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -211,7 +231,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
}
}

func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.UnbufferedMessageReceiverFunc, timeout time.Duration, inSubs []eventingduck.SubscriberSpec, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.UnbufferedMessageReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
var subscriberServerWg *sync.WaitGroup
if subscriberReqs != 0 {
subscriberServerWg = &sync.WaitGroup{}
Expand All @@ -235,7 +255,7 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
defer replyServer.Close()

// Rewrite the subs to use the servers we just started.
subs := make([]eventingduck.SubscriberSpec, 0)
subs := make([]Subscription, 0)
for _, sub := range inSubs {
if sub.SubscriberURI == replaceSubscriber {
sub.SubscriberURI = apis.HTTP(subscriberServer.URL[7:]) // strip the leading 'http://'
Expand Down
Loading