Skip to content

Commit

Permalink
chore: wip implements an inmemory message bus
Browse files Browse the repository at this point in the history
The goal is to avoid tight coupling between handlers and the API.

The next step is to use it for domain events and application messages (commands AND queries)
  • Loading branch information
YuukanOO committed Dec 5, 2023
1 parent d553b7c commit 77a217c
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 0 deletions.
126 changes: 126 additions & 0 deletions pkg/bus/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package bus

import (
"context"
"errors"
)

var ErrNoHandlerRegistered = errors.New("no_handler_registered")

type (
// Handler for a specific message.
RequestHandler[TResult any, TMsg Request[TResult]] func(context.Context, TMsg) (TResult, error)
// Handler for signal.
SignalHandler[TSignal Signal] func(context.Context, TSignal) error
// Generic handler (as seen by middlewares).
NextFunc func(context.Context, Message) (any, error)
// Middleware function used to add behavior to the dispatch process.
MiddlewareFunc func(NextFunc) NextFunc

Bus interface {
register(Message, NextFunc) // Register an handler for a specific message kind
handler(string) any // Get the handler for a specific message kind (any since for notification it will be an array of handlers)
}

inMemoryBus struct {
middlewares []MiddlewareFunc
handlers map[string]any
}
)

// Register an handler for a specific request on the provided bus.
func Register[TResult any, TMsg Request[TResult]](bus Bus, handler RequestHandler[TResult, TMsg]) {
var (
msg TMsg
h NextFunc = func(ctx context.Context, m Message) (any, error) {
return handler(ctx, m.(TMsg))
}
)

bus.register(msg, h)
}

// Register a signal handler for the given signal. Multiple signals can be registered for the same signal
// and will all be called.
func On[TSignal Signal](bus Bus, handler SignalHandler[TSignal]) {
var (
msg TSignal
h NextFunc = func(ctx context.Context, m Message) (any, error) {
return nil, handler(ctx, m.(TSignal))
}
)

bus.register(msg, h)
}

// Send the given message to the bus and return the result and an error if any.
func Send[TResult any, TMsg Request[TResult]](bus Bus, ctx context.Context, msg TMsg) (TResult, error) {
handler := bus.handler(msg.Name_())

if handler == nil {
var r TResult
return r, ErrNoHandlerRegistered
}

r, err := handler.(NextFunc)(ctx, msg)

return r.(TResult), err
}

// Call every signal handlers registered for the given signal.
func Notify[TSignal Signal](bus Bus, ctx context.Context, msg TSignal) error {
handlers := bus.handler(msg.Name_())

if handlers == nil {
return nil
}

hdls := handlers.([]NextFunc)

for _, h := range hdls {
_, err := h(ctx, msg)

if err != nil {
return err
}
}

return nil
}

// Creates a new in memory bus which will call the handlers synchronously.
func NewInMemoryBus(middlewares ...MiddlewareFunc) Bus {
return &inMemoryBus{
middlewares: middlewares,
handlers: make(map[string]any),
}
}

func (b *inMemoryBus) register(msg Message, handler NextFunc) {
name := msg.Name_()
_, exists := b.handlers[name]

// Apply middlewares to avoid doing it at runtime
for i := len(b.middlewares) - 1; i >= 0; i-- {
handler = b.middlewares[i](handler)
}

if msg.Kind_() == MessageKindNotification {
if !exists {
b.handlers[name] = []NextFunc{handler}
} else {
b.handlers[name] = append(b.handlers[name].([]NextFunc), handler)
}
return
}

if exists {
panic("an handler is already registered for " + name) // Panic since this should never happen outside of a dev environment
}

b.handlers[name] = handler
}

func (b *inMemoryBus) handler(name string) any {
return b.handlers[name]
}
182 changes: 182 additions & 0 deletions pkg/bus/dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package bus_test

import (
"context"
"errors"
"testing"

"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/testutil"
)

func TestInMemoryBus(t *testing.T) {
t.Run("should accepts registration of all message kind", func(t *testing.T) {
local := bus.NewInMemoryBus()

bus.Register(local, AddCommandHandler)
bus.Register(local, GetQueryHandler)
bus.On(local, NotificationHandler)
bus.On(local, OtherNotificationHandler)
})

t.Run("should panic if an handler is already registered for a request", func(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("should have panicked")
}
}()

local := bus.NewInMemoryBus()

bus.Register(local, AddCommandHandler)
bus.Register(local, AddCommandHandler)
})

t.Run("should returns an error if no handler is registered for a given request", func(t *testing.T) {
local := bus.NewInMemoryBus()

_, err := bus.Send(local, context.Background(), &AddCommand{})

testutil.ErrorIs(t, bus.ErrNoHandlerRegistered, err)
})

t.Run("should returns the request handler error back if any", func(t *testing.T) {
local := bus.NewInMemoryBus()
expectedErr := errors.New("handler error")

bus.Register(local, func(ctx context.Context, cmd AddCommand) (int, error) {
return 0, expectedErr
})

_, err := bus.Send(local, context.Background(), AddCommand{})

testutil.ErrorIs(t, expectedErr, err)
})

t.Run("should call the appropriate request handler and returns the result", func(t *testing.T) {
local := bus.NewInMemoryBus()

bus.Register(local, AddCommandHandler)
bus.Register(local, GetQueryHandler)
bus.On(local, NotificationHandler)
bus.On(local, OtherNotificationHandler)

result, err := bus.Send(local, context.Background(), AddCommand{A: 1, B: 2})

testutil.IsNil(t, err)
testutil.Equals(t, 3, result)

result, err = bus.Send(local, context.Background(), GetQuery{})

testutil.IsNil(t, err)
testutil.Equals(t, 42, result)
})

t.Run("should do nothing if no signal handler is registered for a given signal", func(t *testing.T) {
local := bus.NewInMemoryBus()

err := bus.Notify(local, context.Background(), RegisteredNotification{})

testutil.IsNil(t, err)
})

t.Run("should returns a signal handler error back if any", func(t *testing.T) {
local := bus.NewInMemoryBus()
expectedErr := errors.New("handler error")

bus.On(local, func(ctx context.Context, notif RegisteredNotification) error {
return nil
})

bus.On(local, func(ctx context.Context, notif RegisteredNotification) error {
return expectedErr
})

err := bus.Notify(local, context.Background(), RegisteredNotification{})

testutil.ErrorIs(t, expectedErr, err)
})

t.Run("should call every signal handlers registered for the given signal", func(t *testing.T) {
var (
local = bus.NewInMemoryBus()
firstOneCalled = false
secondOneCalled = false
)

bus.On(local, func(ctx context.Context, notif RegisteredNotification) error {
firstOneCalled = true
return nil
})

bus.On(local, func(ctx context.Context, notif RegisteredNotification) error {
secondOneCalled = true
return nil
})

err := bus.Notify(local, context.Background(), RegisteredNotification{})

testutil.IsNil(t, err)
testutil.IsTrue(t, firstOneCalled && secondOneCalled)
})

t.Run("should call every middlewares registered", func(t *testing.T) {
calls := make([]int, 0)

local := bus.NewInMemoryBus(
func(next bus.NextFunc) bus.NextFunc {
return func(ctx context.Context, m bus.Message) (any, error) {
calls = append(calls, 1)
r, err := next(ctx, m)
calls = append(calls, 1)
return r, err
}
},
func(next bus.NextFunc) bus.NextFunc {
return func(ctx context.Context, m bus.Message) (any, error) {
calls = append(calls, 2)
r, err := next(ctx, m)
calls = append(calls, 2)
return r, err
}
},
)

bus.Register(local, AddCommandHandler)
bus.Register(local, GetQueryHandler)
bus.On(local, NotificationHandler)
bus.On(local, OtherNotificationHandler)

r, err := bus.Send(local, context.Background(), AddCommand{
A: 1,
B: 2,
})

testutil.IsNil(t, err)
testutil.Equals(t, 3, r)
testutil.DeepEquals(t, []int{1, 2, 2, 1}, calls)

calls = make([]int, 0)

bus.Notify(local, context.Background(), RegisteredNotification{})

// Should have been called twice cuz 2 signal handlers are registered
testutil.DeepEquals(t, []int{1, 2, 2, 1, 1, 2, 2, 1}, calls)
})
}

func AddCommandHandler(ctx context.Context, cmd AddCommand) (int, error) {
return cmd.A + cmd.B, nil
}

func GetQueryHandler(ctx context.Context, query GetQuery) (int, error) {
return 42, nil
}

func NotificationHandler(ctx context.Context, notif RegisteredNotification) error {
return nil
}

func OtherNotificationHandler(ctx context.Context, notif RegisteredNotification) error {
return nil
}
57 changes: 57 additions & 0 deletions pkg/bus/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Mediator style message bus adapted to the Go language without requiring the reflect package.
// Message members are suffixed with an underscore to avoid name conflicts it a command need a field
// named "Name" for example.
//
// Performance wise, it is lightly slower than direct calls but you get a lower coupling
// and a way to add middlewares to your handlers.
package bus

const (
MessageKindNotification MessageKind = iota
MessageKindCommand
MessageKindQuery
)

type (
// Represent the kind of a message being dispatched. This is especially useful for middlewares
// to adapt their behavior depending on the message kind.
//
// For example, a command may need a transaction whereas a query may not.
MessageKind int8

// Message which can be sent in the bus and handled by a registered handler.
Message interface {
Name_() string // Unique name of the message (here to not require reflection)
Kind_() MessageKind // Type of the message to be able to customize middlewares
}

// Signal which do not need a result.
Signal interface {
Message
isSignal() // Marker method to differentiate signals from messages
}

// Message which requires a result.
Request[T any] interface {
Message
isRequest() T // Marker method. Without it, the compiler will not be able to infer the T.
}

// Message without result.
Notification struct{}

// Request to mutate the system.
Command[T any] struct{}

// Request to query the system.
Query[T any] struct{}
)

func (Notification) Kind_() MessageKind { return MessageKindNotification }
func (Notification) isSignal() {}

func (Command[T]) Kind_() MessageKind { return MessageKindCommand }
func (Command[T]) isRequest() (t T) { return t }

func (Query[T]) Kind_() MessageKind { return MessageKindQuery }
func (Query[T]) isRequest() (t T) { return t }
Loading

0 comments on commit 77a217c

Please sign in to comment.