Skip to content

Commit

Permalink
IMC and Multi-Tenant Channel Based Broker retries (#2932)
Browse files Browse the repository at this point in the history
* IMC and Multi-Tenant Channel Based Broker retries

- Add retries to message sender
- Add BrokerRedelivery e2e test

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Do not create a unnecessarily a default pooled client

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* No retries by default

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use DispatchMessage directly

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove redundant check

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Put NoRetries in a var

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Create a new struct with SubscriberSpec and RetriesConfig

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add DispatchMessageWithRetries to MessageDispatcher interface

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Rename to RetryConfig

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add UT for delivery spec

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Jul 21, 2020
1 parent 46a7cdb commit d5ea26f
Show file tree
Hide file tree
Showing 52 changed files with 4,242 additions and 116 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/google/gofuzz v1.1.0
github.com/google/mako v0.0.0-20190821191249-122f8dcef9e3
github.com/google/uuid v1.1.1
github.com/hashicorp/go-retryablehttp v0.6.6
github.com/influxdata/tdigest v0.0.0-20191024211133-5d87a7585faa // indirect
github.com/json-iterator/go v1.1.9 // indirect
github.com/kelseyhightower/envconfig v1.4.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -635,14 +635,17 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-retryablehttp v0.6.4/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-retryablehttp v0.6.6 h1:HJunrbHTDDbBb/ay4kxa1n+dLmttUlnP3V9oNE4hmsM=
github.com/hashicorp/go-retryablehttp v0.6.6/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
Expand Down
50 changes: 44 additions & 6 deletions pkg/channel/fanout/fanout_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ package fanout

import (
"context"
"encoding/json"
"errors"
"fmt"
nethttp "net/http"
"net/url"
"time"
Expand All @@ -33,17 +35,32 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduck "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
)

const (
defaultTimeout = 15 * time.Minute
)

type Subscription struct {
eventingduck.SubscriberSpec
RetriesConfig kncloudevents.RetryConfig
}

func (s *Subscription) MarshalJSON() ([]byte, error) {
return json.Marshal(s.SubscriberSpec)
}

func (s *Subscription) UnmarshalJSON(bytes []byte) error {
return json.Unmarshal(bytes, &s.SubscriberSpec)
}

// Config for a fanout.MessageHandler.
type Config struct {
Subscriptions []eventingduck.SubscriberSpec `json:"subscriptions"`
Subscriptions []Subscription `json:"subscriptions"`
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
AsyncHandler bool `json:"asyncHandler,omitempty"`
Expand Down Expand Up @@ -78,9 +95,26 @@ func NewMessageHandler(logger *zap.Logger, messageDispatcher channel.MessageDisp
return nil, err
}
handler.receiver = receiver

for i := range config.Subscriptions {
retriesConfig, err := retriesOf(config.Subscriptions[i].SubscriberSpec)
if err != nil {
return nil, fmt.Errorf("failed to create retries config from SubscriberSpec: %w", err)
}
config.Subscriptions[i].RetriesConfig = retriesConfig
}

return handler, nil
}

func retriesOf(spec eventingduck.SubscriberSpec) (kncloudevents.RetryConfig, error) {
delivery := &eventingduckv1.DeliverySpec{}

_ = spec.ConvertTo(context.Background(), delivery)

return kncloudevents.RetryConfigFromDeliverySpec(*delivery)
}

func createMessageReceiverFunction(f *MessageHandler) func(context.Context, channel.ChannelReference, binding.Message, []binding.Transformer, nethttp.Header) error {
if f.config.AsyncHandler {
return func(ctx context.Context, _ channel.ChannelReference, message binding.Message, transformers []binding.Transformer, additionalHeaders nethttp.Header) error {
Expand Down Expand Up @@ -140,7 +174,7 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M

errorCh := make(chan error, subs)
for _, sub := range f.config.Subscriptions {
go func(s eventingduck.SubscriberSpec) {
go func(s Subscription) {
errorCh <- f.makeFanoutRequest(ctx, bufferedMessage, additionalHeaders, s)
}(sub)
}
Expand All @@ -163,18 +197,22 @@ func (f *MessageHandler) dispatch(ctx context.Context, bufferedMessage binding.M

// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub eventingduck.SubscriberSpec) error {
func (f *MessageHandler) makeFanoutRequest(ctx context.Context, message binding.Message, additionalHeaders nethttp.Header, sub Subscription) error {

var destination *url.URL
if sub.SubscriberURI != nil {
destination = sub.SubscriberURI.URL()
}

var reply *url.URL
if sub.ReplyURI != nil {
reply = sub.ReplyURI.URL()
}
var deadLetter *url.URL

var deadLetterURL *url.URL
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
deadLetter = sub.Delivery.DeadLetterSink.URI.URL()
deadLetterURL = sub.Delivery.DeadLetterSink.URI.URL()
}
return f.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter)

return f.dispatcher.DispatchMessageWithRetries(ctx, message, additionalHeaders, destination, reply, deadLetterURL, &sub.RetriesConfig)
}
78 changes: 49 additions & 29 deletions pkg/channel/fanout/fanout_message_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
testCases := map[string]struct {
receiverFunc channel.UnbufferedMessageReceiverFunc
timeout time.Duration
subs []eventingduck.SubscriberSpec
subs []Subscription
subscriber func(http.ResponseWriter, *http.Request)
subscriberReqs int
replier func(http.ResponseWriter, *http.Request)
Expand All @@ -75,9 +75,11 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
},
"fanout times out": {
timeout: time.Millisecond,
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
},
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -89,21 +91,23 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"zero subs succeed": {
subs: []eventingduck.SubscriberSpec{},
subs: []Subscription{},
expectedStatus: http.StatusAccepted,
asyncExpectedStatus: http.StatusAccepted,
},
"empty sub succeeds": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{},
},
expectedStatus: http.StatusAccepted,
asyncExpectedStatus: http.StatusAccepted,
},
"reply fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
ReplyURI: replaceReplier,
},
},
},
replier: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -114,9 +118,11 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"subscriber fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
},
},
},
subscriber: func(writer http.ResponseWriter, _ *http.Request) {
Expand All @@ -127,10 +133,12 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"subscriber succeeds, result fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -143,10 +151,12 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"one sub succeeds": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -159,14 +169,18 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"one sub succeeds, one sub fails": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -177,18 +191,24 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
asyncExpectedStatus: http.StatusAccepted,
},
"all subs succeed": {
subs: []eventingduck.SubscriberSpec{
subs: []Subscription{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
SubscriberSpec: eventingduck.SubscriberSpec{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceReplier,
},
},
},
subscriber: callableSucceed,
Expand All @@ -211,7 +231,7 @@ func TestFanoutMessageHandler_ServeHTTP(t *testing.T) {
}
}

func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.UnbufferedMessageReceiverFunc, timeout time.Duration, inSubs []eventingduck.SubscriberSpec, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.UnbufferedMessageReceiverFunc, timeout time.Duration, inSubs []Subscription, subscriberHandler func(http.ResponseWriter, *http.Request), subscriberReqs int, replierHandler func(http.ResponseWriter, *http.Request), replierReqs int, expectedStatus int) {
var subscriberServerWg *sync.WaitGroup
if subscriberReqs != 0 {
subscriberServerWg = &sync.WaitGroup{}
Expand All @@ -235,7 +255,7 @@ func testFanoutMessageHandler(t *testing.T, async bool, receiverFunc channel.Unb
defer replyServer.Close()

// Rewrite the subs to use the servers we just started.
subs := make([]eventingduck.SubscriberSpec, 0)
subs := make([]Subscription, 0)
for _, sub := range inSubs {
if sub.SubscriberURI == replaceSubscriber {
sub.SubscriberURI = apis.HTTP(subscriberServer.URL[7:]) // strip the leading 'http://'
Expand Down
Loading

0 comments on commit d5ea26f

Please sign in to comment.