Skip to content

Commit

Permalink
When using AMQP as backend, set x-expires parameter so task state que…
Browse files Browse the repository at this point in the history
…ues will eventually be cleaned up.
  • Loading branch information
RichardKnop committed Aug 28, 2017
1 parent 00d09f6 commit 5643426
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 0 deletions.
7 changes: 7 additions & 0 deletions example/machinery.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -259,5 +260,11 @@ func send() error {
return fmt.Errorf("Could not send task: %s", err.Error())
}

results, err = asyncResult.Get(time.Duration(time.Millisecond * 5))
if err == nil {
return errors.New("Error should not be nil if task panicked")
}
log.INFO.Printf("Task panicked and returned error = %v\n", err.Error())

return nil
}
12 changes: 12 additions & 0 deletions v1/backends/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ func (b *AMQPBackend) SetStateFailure(signature *tasks.Signature, err string) er
// as the message will get consumed and removed from the queue.
func (b *AMQPBackend) GetState(taskUUID string) (*tasks.TaskState, error) {
declareQueueArgs := amqp.Table{
// Time in milliseconds
// after that message will expire
"x-message-ttl": int32(b.getExpiresIn()),
// Time after that the queue will be deleted.
"x-expires": int32(b.getExpiresIn()),
}
conn, channel, _, _, _, err := b.Connect(
b.cnf.Broker,
Expand Down Expand Up @@ -261,7 +265,11 @@ func (b *AMQPBackend) updateState(taskState *tasks.TaskState) error {
}

declareQueueArgs := amqp.Table{
// Time in milliseconds
// after that message will expire
"x-message-ttl": int32(b.getExpiresIn()),
// Time after that the queue will be deleted.
"x-expires": int32(b.getExpiresIn()),
}
conn, channel, queue, confirmsChan, _, err := b.Connect(
b.cnf.Broker,
Expand Down Expand Up @@ -328,7 +336,11 @@ func (b *AMQPBackend) markTaskCompleted(signature *tasks.Signature, taskState *t
}

declareQueueArgs := amqp.Table{
// Time in milliseconds
// after that message will expire
"x-message-ttl": int32(b.getExpiresIn()),
// Time after that the queue will be deleted.
"x-expires": int32(b.getExpiresIn()),
}
conn, channel, queue, confirmsChan, _, err := b.Connect(
b.cnf.Broker,
Expand Down

0 comments on commit 5643426

Please sign in to comment.