forked from adjust/rmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
delivery.go
76 lines (61 loc) · 1.58 KB
/
delivery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package rmq
import (
"fmt"
"gopkg.in/redis.v3"
)
type Delivery interface {
Payload() string
Ack() bool
Reject() bool
Push() bool
}
type wrapDelivery struct {
payload string
unackedKey string
rejectedKey string
pushKey string
redisClient *redis.Client
}
func newDelivery(payload, unackedKey, rejectedKey, pushKey string, redisClient *redis.Client) *wrapDelivery {
return &wrapDelivery{
payload: payload,
unackedKey: unackedKey,
rejectedKey: rejectedKey,
pushKey: pushKey,
redisClient: redisClient,
}
}
func (delivery *wrapDelivery) String() string {
return fmt.Sprintf("[%s %s]", delivery.payload, delivery.unackedKey)
}
func (delivery *wrapDelivery) Payload() string {
return delivery.payload
}
func (delivery *wrapDelivery) Ack() bool {
// debug(fmt.Sprintf("delivery ack %s", delivery)) // COMMENTOUT
result := delivery.redisClient.LRem(delivery.unackedKey, 1, delivery.payload)
if redisErrIsNil(result) {
return false
}
return result.Val() == 1
}
func (delivery *wrapDelivery) Reject() bool {
return delivery.move(delivery.rejectedKey)
}
func (delivery *wrapDelivery) Push() bool {
if delivery.pushKey != "" {
return delivery.move(delivery.pushKey)
} else {
return delivery.move(delivery.rejectedKey)
}
}
func (delivery *wrapDelivery) move(key string) bool {
if redisErrIsNil(delivery.redisClient.LPush(key, delivery.payload)) {
return false
}
if redisErrIsNil(delivery.redisClient.LRem(delivery.unackedKey, 1, delivery.payload)) {
return false
}
// debug(fmt.Sprintf("delivery rejected %s", delivery)) // COMMENTOUT
return true
}