Skip to content

Commit

Permalink
HandlerFunc ctx is .Done() when server is stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
akarl committed Aug 28, 2020
1 parent 400ff5b commit c06b33d
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
25 changes: 18 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,21 @@ type Server struct {
// print most of what is happening internally.
// If nil, logging is not done.
debugLog LogFunc

baseContext context.Context
baseContextCancel context.CancelFunc
}

// NewServer will return a pointer to a new Server.
func NewServer(url string) *Server {
baseContext, cancelFunc := context.WithCancel(context.Background())

server := Server{
url: url,
bindings: []HandlerBinding{},
middlewares: []ServerMiddlewareFunc{},
url: url,
bindings: []HandlerBinding{},
middlewares: []ServerMiddlewareFunc{},
baseContext: baseContext,
baseContextCancel: cancelFunc,
dialconfig: amqp.Config{
Dial: DefaultDialer,
},
Expand Down Expand Up @@ -341,18 +348,22 @@ func (s *Server) listenAndServe() error {
return err
}

// 2. We've told amqp to stop delivering messages, now we wait for all
// 2. Tell all handlers that we are stopping, in case they have any long
// running functions.
s.baseContextCancel()

// 3. We've told amqp to stop delivering messages, now we wait for all
// the consumers to finish inflight messages.
consumersWg.Done()
consumersWg.Wait()

// 3. Close the responses chan and wait until the consumers are finished.
// 4. Close the responses chan and wait until the consumers are finished.
// We might still have responses we want to send.
close(s.responses)
responderWg.Done()
responderWg.Wait()

// 4. We have no more messages incoming and we've published all our
// 5. We have no more messages incoming and we've published all our
// responses. The closing of connections and channels are deferred so we can
// just return now.
return nil
Expand Down Expand Up @@ -429,7 +440,7 @@ func (s *Server) runHandler(
},
}

ctx := context.WithValue(context.Background(), CtxQueueName, queueName)
ctx := context.WithValue(s.baseContext, CtxQueueName, queueName)

go func(delivery amqp.Delivery) {
handler(ctx, &rw, delivery)
Expand Down
42 changes: 39 additions & 3 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func TestNoAutomaticAck(t *testing.T) {

calls := make(chan struct{}, 2)

server.Bind(DirectBinding("no-auto-ack", func(ctc context.Context, responseWriter *ResponseWriter, d amqp.Delivery) {
calls <- struct{}{}
}))
server.Bind(
DirectBinding("no-auto-ack", func(ctc context.Context, responseWriter *ResponseWriter, d amqp.Delivery) {
calls <- struct{}{}
}),
)

start()

Expand Down Expand Up @@ -234,3 +236,37 @@ func TestServerConfig(t *testing.T) {
assert.Equal(t, s.consumeSettings, cSettings)
assert.Equal(t, s.exchangeDeclareSettings, eSettings)
}

func TestContextDoneWhenServerStopped(t *testing.T) {
server, client, start, stop := initTest()

ctxDone := make(chan bool, 1)

server.Bind(DirectBinding("context.test", func(ctx context.Context, rw *ResponseWriter, d amqp.Delivery) {
select {
case <-ctx.Done():
ctxDone <- true
case <-time.After(5 * time.Second):
ctxDone <- false
}
}))

start()

_, err := client.Send(
NewRequest().
WithRoutingKey("context.test").
WithResponse(false),
)

require.NoError(t, err)

stop()

select {
case wasDone := <-ctxDone:
assert.True(t, wasDone)
case <-time.After(10 * time.Second):
t.Fatalf("handler was never called")
}
}

0 comments on commit c06b33d

Please sign in to comment.