Skip to content

Commit

Permalink
Merge pull request #97 from 0x4b53/remove-ctx-cancel
Browse files Browse the repository at this point in the history
Ensure new context for each request
  • Loading branch information
akarl authored Nov 18, 2021
2 parents d83f837 + e533046 commit 7f5a4a2
Show file tree
Hide file tree
Showing 22 changed files with 87 additions and 66 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/pr-and-merge.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: lint
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v2
with:
version: v1.42
version: v1.43

tests:
name: tests
needs: golangci-lint # run after golangci-lint action to not produce duplicated errors
strategy:
matrix:
go:
- 1.16
- 1.17
os:
- ubuntu-latest
Expand Down
6 changes: 2 additions & 4 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ output:
# print linter name in the end of issue text, default is true
print-linter-name: true


# all available settings of specific linters
linters-settings:
errcheck:
Expand Down Expand Up @@ -167,21 +166,21 @@ linters:
- prealloc
- scopelint
- testpackage
- varnamelen
- wrapcheck
disable-all: false
# presets:
# - bugs
# - unused
fast: false


issues:
# List of regexps of issue texts to exclude, empty list by default.
# But independently from this option we use default exclude patterns,
# it can be disabled by `exclude-use-default: false`. To list all
# excluded by default patterns execute `golangci-lint run --help`
exclude:
- 'can be .*testify/assert.TestingT'
- "can be .*testify/assert.TestingT"
- 'declaration of "err" shadows declaration'
# Global variables ending with Re should be ignored (like with Err and _).
# See https://github.com/leighmcculloch/gochecknoglobals/issues/6
Expand Down Expand Up @@ -209,5 +208,4 @@ issues:
# of integration: much better don't allow issues in new code.
# Default is false.
new: false

# vim: set sw=2 ts=2 et:
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ test: compose ## Run all tests (with race detection)

coverage:
go test -coverprofile c.out ./...
@sed -i "s%github.com/0x4b53/%%" c.out
@sed -i "s%github.com/0x4b53/amqp-rpc/v2/%amqp-rpc/%" c.out

.PHONY: all compose compose-down help hooks lint test
2 changes: 1 addition & 1 deletion amqprpctest/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package amqprpctest

import (
amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

// NewTestClient returns a client with a custom send function to use for testing.
Expand Down
32 changes: 32 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package amqprpc

import "context"

type ctxKey int

const (
queueNameKey ctxKey = iota
shutdownChanKey
)

// ContextWithQueueName adds the given queueName to the provided context.
func ContextWithQueueName(ctx context.Context, queueName string) context.Context {
return context.WithValue(ctx, queueNameKey, queueName)
}

// QueueNameFromContext returns the queue name for the current request.
func QueueNameFromContext(ctx context.Context) (string, bool) {
queueName, ok := ctx.Value(queueNameKey).(string)
return queueName, ok
}

// ContextWithShutdownChan adds a shutdown chan to the given context.
func ContextWithShutdownChan(ctx context.Context, ch chan struct{}) context.Context {
return context.WithValue(ctx, shutdownChanKey, ch)
}

// ShutdownChanFromContext returns the shutdown chan.
func ShutdownChanFromContext(ctx context.Context) (chan struct{}, bool) {
ch, ok := ctx.Value(shutdownChanKey).(chan struct{})
return ch, ok
}
3 changes: 1 addition & 2 deletions examples/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"time"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

func main() {
Expand Down Expand Up @@ -41,7 +41,6 @@ func heartbeat(c *amqprpc.Client) {
WithBody(time.Now().String()).
WithTimeout(100 * time.Millisecond),
)

if err != nil {
fmt.Println("Heartbeat error: ", err)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"

amqp "github.com/rabbitmq/amqp091-go"
)
Expand Down
2 changes: 1 addition & 1 deletion examples/middlewares/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/google/uuid"
amqp "github.com/rabbitmq/amqp091-go"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

var (
Expand Down
4 changes: 2 additions & 2 deletions examples/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"strings"
"syscall"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpcmw "github.com/0x4b53/amqp-rpc/middleware"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
amqprpcmw "github.com/0x4b53/amqp-rpc/v2/middleware"

amqp "github.com/rabbitmq/amqp091-go"
)
Expand Down
2 changes: 1 addition & 1 deletion examples/tls-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"log"
"os"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"

amqp "github.com/rabbitmq/amqp091-go"
)
Expand Down
14 changes: 9 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
module github.com/0x4b53/amqp-rpc
module github.com/0x4b53/amqp-rpc/v2

go 1.15
go 1.17

require (
github.com/google/uuid v1.3.0
github.com/rabbitmq/amqp091-go v1.2.0
github.com/stretchr/testify v1.4.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.1.1
github.com/kr/pretty v0.1.0 // indirect
github.com/rabbitmq/amqp091-go v1.1.0
github.com/stretchr/testify v1.4.0
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
)
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.1.0 h1:qx8cGMJha71/5t31Z+LdPLdPrkj/BvD38cqC3Bi1pNI=
github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/rabbitmq/amqp091-go v1.2.0 h1:1pHBxAsQh54R9eX/xo679fUEAfv3loMqi0pvRFOj2nk=
github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down
5 changes: 2 additions & 3 deletions logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package amqprpc

import (
"io"
"io/ioutil"
"log"
"testing"
"time"
Expand All @@ -28,7 +27,7 @@ func TestServerLogging(t *testing.T) {
require.NoError(t, writer.Close())
}()

buf, err := ioutil.ReadAll(reader)
buf, err := io.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -56,7 +55,7 @@ func TestClientLogging(t *testing.T) {
_ = writer.Close()
}()

buf, err := ioutil.ReadAll(reader)
buf, err := io.ReadAll(reader)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion middleware/ack.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

amqp "github.com/rabbitmq/amqp091-go"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

// AckDelivery is a middleware that will acknowledge the delivery after the
Expand Down
2 changes: 1 addition & 1 deletion middleware/ack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

func TestAckDelivery(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion middleware/panic_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

amqp "github.com/rabbitmq/amqp091-go"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

// PanicRecovery is a middleware that will recover any panics caused by a
Expand Down
2 changes: 1 addition & 1 deletion middleware/panic_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"

amqprpc "github.com/0x4b53/amqp-rpc"
amqprpc "github.com/0x4b53/amqp-rpc/v2"
)

func TestPanicRecovery(t *testing.T) {
Expand Down
29 changes: 6 additions & 23 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,6 @@ import (
amqp "github.com/rabbitmq/amqp091-go"
)

type ctxKey string

const (
// CtxQueueName can be used to get the queue name from the context.Context
// inside the HandlerFunc.
CtxQueueName ctxKey = "queue_name"
)

// HandlerFunc is the function that handles all request based on the routing key.
type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)

Expand Down Expand Up @@ -88,21 +80,14 @@ type Server struct {
// print most of what is happening internally.
// If nil, logging is not done.
debugLog LogFunc

baseContext context.Context
baseContextCancel context.CancelFunc
}

// NewServer will return a pointer to a new Server.
func NewServer(url string) *Server {
baseContext, cancelFunc := context.WithCancel(context.Background())

server := Server{
url: url,
bindings: []HandlerBinding{},
middlewares: []ServerMiddlewareFunc{},
baseContext: baseContext,
baseContextCancel: cancelFunc,
url: url,
bindings: []HandlerBinding{},
middlewares: []ServerMiddlewareFunc{},
dialconfig: amqp.Config{
Dial: DefaultDialer,
},
Expand Down Expand Up @@ -348,10 +333,6 @@ func (s *Server) listenAndServe() error {
return err
}

// 2. Tell all handlers that we are stopping, in case they have any long
// running functions.
s.baseContextCancel()

// 3. We've told amqp to stop delivering messages, now we wait for all
// the consumers to finish inflight messages.
consumersWg.Done()
Expand Down Expand Up @@ -440,7 +421,9 @@ func (s *Server) runHandler(
},
}

ctx := context.WithValue(s.baseContext, CtxQueueName, queueName)
ctx := context.Background()
ctx = ContextWithShutdownChan(ctx, s.stopChan)
ctx = ContextWithQueueName(ctx, queueName)

go func(delivery amqp.Delivery) {
handler(ctx, &rw, delivery)
Expand Down
15 changes: 9 additions & 6 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestNoAutomaticAck(t *testing.T) {
func TestMiddleware(t *testing.T) {
mw := func(next HandlerFunc) HandlerFunc {
return func(ctx context.Context, rw *ResponseWriter, d amqp.Delivery) {
if ctx.Value(CtxQueueName).(string) == "denied" {
if queueName, _ := QueueNameFromContext(ctx); queueName == "denied" {
fmt.Fprint(rw, "routing key 'denied' is not allowed")
return
}
Expand Down Expand Up @@ -240,14 +240,17 @@ func TestServerConfig(t *testing.T) {
func TestContextDoneWhenServerStopped(t *testing.T) {
server, client, start, stop := initTest()

ctxDone := make(chan bool, 1)
isShuttingDown := make(chan bool, 1)

server.Bind(DirectBinding("context.test", func(ctx context.Context, rw *ResponseWriter, d amqp.Delivery) {
shutdownCh, ok := ShutdownChanFromContext(ctx)
require.True(t, ok)

select {
case <-ctx.Done():
ctxDone <- true
case <-shutdownCh:
isShuttingDown <- true
case <-time.After(5 * time.Second):
ctxDone <- false
isShuttingDown <- false
}
}))

Expand All @@ -264,7 +267,7 @@ func TestContextDoneWhenServerStopped(t *testing.T) {
stop()

select {
case wasDone := <-ctxDone:
case wasDone := <-isShuttingDown:
assert.True(t, wasDone)
case <-time.After(10 * time.Second):
t.Fatalf("handler was never called")
Expand Down
4 changes: 2 additions & 2 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func startAndWait(s *Server) func() {
func deleteQueue(name string) {
queueURL := fmt.Sprintf("%s/queues/%s/%s", serverAPITestURL, url.PathEscape("/"), url.PathEscape(name))

req, err := http.NewRequest("DELETE", queueURL, nil)
req, err := http.NewRequest("DELETE", queueURL, http.NoBody)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func closeConnections(names ...string) {

connectionURL := fmt.Sprintf("%s/connections/%s", serverAPITestURL, url.PathEscape(conn["name"].(string)))

req, err := http.NewRequest("DELETE", connectionURL, nil)
req, err := http.NewRequest("DELETE", connectionURL, http.NoBody)
if err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 7f5a4a2

Please sign in to comment.