Dispatcher is an asynchronous task queue/job queue based on distributed message passing. Dispatcher can send tasks to queue and execute them asynchronously on different servers.
Goals:
- Reconnection ability and its configuration
- Graceful quit (stop all workers, wait until all tasks will be finished, close connection)
- Ability to create as many workers as we wish from only one connection
- Simplicity
- Ability to configure timeouts for tasks
- Ability to limit number of parallel task for every worker
Non goals:
- Handling results of executed tasks
Sender: link
Worker: link
$ go get github.com/gofort/dispatcher
$ cd $GOPATH/src/github.com/gofort/dispatcher
$ go run examples/sender/sender.go #sends one task as example
$ go run examples/worker/worker.go #executes one task and waits until you send more
Requirements:
- Go > 1.6
- RabbitMQ
go get github.com/gofort/dispatcher
This library is created around AMQP protocol and this is why I advice to read RabbitMQ tutorial, because a lot of stuff are covered there. If you don't know what is queue or routing key it is a must.
The main goal of server is to handle AMQP connection properly. If AMQP connection was broken, server should try to reconnect to it and when connection will be restored, all workers should be restored too. Server has also a publisher inside. Publisher has its own AMQP channel and it only sends messages.
This is full server configuration:
type ServerConfig struct {
AMQPConnectionString string
ReconnectionRetries int
ReconnectionIntervalSeconds int64
TLSConfig *tls.Config
SecureConnection bool
DebugMode bool // for default logger only
InitQueues []Queue
Exchange string // required
DefaultRoutingKey string // required
Logger Log
}
Configuration description:
- AMQPConnectionString: example - amqp://guest:guest@localhost:5672/ (amqp://username:password@host:port)
- ReconnectionRetries: number of retries of reconnecting to AMQP after connection problems.
- ReconnectionIntervalSeconds: interval in seconds between reconnection retries.
- SecureConnection: if true, uses TLSConfig with param InsecureSkipVerify.
- DebugMode: extended logger, works only with default logger (logrus), if you use your custom logger, enable debug level in it yourself.
- InitQueues: queues and their binding keys which server will create during first start.
- Exchange: exchange which will be used by dispatcher.
- DefaultRoutingKey: default routing key for publishing tasks (you can set routing key manually in every task if you want).
- Logger: your custom logger which will be used everywhere in Dispatcher.
Custom logger interface:
type Log interface {
Info(args ...interface{})
Infof(format string, args ...interface{})
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
}
All tasks which workers handle and publisher sends have the following structure:
type Task struct {
UUID string `json:"uuid"`
Name string `json:"name"`
RoutingKey string `json:"-"`
Args []TaskArgument `json:"args"`
Headers map[string]interface{} `json:"-"`
}
type TaskArgument struct {
Type string `json:"type"`
Value interface{} `json:"value"`
}
Description of task structure:
- UUID: task uuid, if empty - will be generated by dispatcher.
- Name: task name, can't be empty, because when you register tasks in workers, workers won't handle tasks which are not registered in it and each task has it'w own function.
- RoutingKey: if empty - task will be sended to default routing key (which is in server config).
- Args: task arguments which consists of type and value.
- Headers: are used to direct the task.
TaskArgument example:
args := []dispatcher.TaskArgument{
{
Type: "string",
Value: "simple string",
},
{
Type: "int",
Value: 1,
},
}
// Task with such arguments will call this function:
func SomeFunc(somestr string, someint int) {}
// or task can be with results (remember, dispatcher doesn't handle results - ignores them, save them yourself where you need):
func SomeFunc(somestr string, someint int) (string, error) {}
Available types of arguments:
- bool
- string
- int int8 int16 int32 int64
- uint uint8 uint16 uint32 uint64
- float32 float64
Task config is what you register in worker (during worker creation).
type TaskConfig struct {
TimeoutSeconds int64
Function interface{}
TaskUUIDAsFirstArg bool
}
Description:
- TimeoutSeconds - timeout after which worker will take new task (but this task won't be stopped!), it is made for a case when your task can be frozen by something.
- Function - function which will be called when worker receive this task.
- TaskUUIDAsFirstArg - because dispatcher doesn't handle results of your tasks, you should handle it yourself, this is why worker can pass task uuid as first argument to this type of task.
// TaskUUIDAsFirstArg = false
func SomeFunc(somestr string, someint int) {}
// TaskUUIDAsFirstArg = true
func SomeFunc(taskuuid string, somestr string, someint int) {}
Example: link
server, _, err := dispatcher.NewServer(&cfg)
if err != nil {
log.Println(err.Error())
return
}
task := dispatcher.Task{
Name: "task_1",
Args: []dispatcher.TaskArgument{
{
Type: "string",
Value: "simple string",
},
{
Type: "int",
Value: 1,
},
},
}
// Here we sending task to a queue
if err := server.Publish(&task); err != nil {
log.Println(err.Error())
return
}
Worker receives tasks from queues and call functions which were registered in it. Worker calls functions with arguments which were in task. Remember, workers don't know anything about results of your tasks, handle them your self, which is why worker can pass task UUID (see TaskConfig) as first argument to functions if you wish. If somebody closes worker (you or server after all reconnection retries), worker will wait until all tasks which it called will be finished.
Example: link
This is full worker configuration:
type WorkerConfig struct {
Limit int
Queue string // required
BindingKeys []string
Name string // required
}
Configuration description:
- Limit - number of tasks which worker can handle concurrently.
- Queue - queue which worker will consume.
- BindingKeys - keys which worker will bind to queue during its creation.
- Name - worker name (also consumer tag).
Tasks registering example:
tasks := make(map[string]dispatcher.TaskConfig)
// Task configuration where we pass function which will be executed by this worker when this task will be received
tasks["task_1"] = dispatcher.TaskConfig{
Function: func(str string, someint int) {
log.Printf("Example function arguments: string - %s, int - %d\n", str, someint)
log.Println("Example function completed!")
},
}
// This function creates worker, but he won't start to consume messages here
worker, err := server.NewWorker(&workercfg, tasks)
if err != nil {
log.Println(err.Error())
return
}
Updating dependencies:
$ go get github.com/tools/godep #if you don't have godep
$ go get -u
$ godep save
Testing:
For testing you should have set environment variable DISPATCHER_AMQP_CON which is equal to AMQP connection string, example: amqp://guest:guest@localhost:5672/
$ go test -v
- Richard Knop for his Machinery project which was an example for this project and a bit of code was taken from there.