forked from jrallison/go-workers
-
Notifications
You must be signed in to change notification settings - Fork 10
/
enqueue.go
137 lines (115 loc) · 3.36 KB
/
enqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package workers
import (
"crypto/rand"
"encoding/json"
"fmt"
"io"
"time"
"github.com/gomodule/redigo/redis"
)
const (
NanoSecondPrecision = 1000000000.0
)
type EnqueueData struct {
Queue string `json:"queue,omitempty"`
Class string `json:"class"`
Args interface{} `json:"args"`
Jid string `json:"jid"`
EnqueuedAt float64 `json:"enqueued_at"`
EnqueueOptions
}
type EnqueueOptions struct {
RetryCount int `json:"retry_count,omitempty"`
Retry bool `json:"retry,omitempty"`
RetryMax int `json:"retry_max,omitempty"`
At float64 `json:"at,omitempty"`
RetryOptions RetryOptions `json:"retry_options,omitempty"`
ConnectionOptions Options `json:"connection_options,omitempty"`
}
type RetryOptions struct {
Exp int `json:"exp"`
MinDelay int `json:"min_delay"`
MaxDelay int `json:"max_delay"`
MaxRand int `json:"max_rand"`
}
func generateJid() string {
// Return 12 random bytes as 24 character hex
b := make([]byte, 12)
_, err := io.ReadFull(rand.Reader, b)
if err != nil {
return ""
}
return fmt.Sprintf("%x", b)
}
func Enqueue(queue, class string, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision()})
}
func EnqueueIn(queue, class string, in float64, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: nowToSecondsWithNanoPrecision() + in})
}
func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error) {
return EnqueueWithOptions(queue, class, args, EnqueueOptions{At: timeToSecondsWithNanoPrecision(at)})
}
func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error) {
now := nowToSecondsWithNanoPrecision()
data := EnqueueData{
Queue: queue,
Class: class,
Args: args,
Jid: generateJid(),
EnqueuedAt: now,
EnqueueOptions: opts,
}
bytes, err := json.Marshal(data)
if err != nil {
return "", err
}
if now < opts.At {
err := enqueueAt(data.At, bytes)
return data.Jid, err
}
var conn redis.Conn
if opts.ConnectionOptions.Address == "" {
Logger.Debug("missing redis Address in EnqueueWithOptions. using default pool")
conn = Config.Pool.Get()
} else {
conn = GetConnectionPool(opts.ConnectionOptions).Get()
}
defer func(conn redis.Conn) {
err := conn.Close()
if err != nil {
Logger.Errorf("failed to close Redis connection in EnqueueWithOptions: %w", err)
}
}(conn)
_, err = conn.Do("sadd", Config.Namespace+"queues", queue)
if err != nil {
return "", err
}
queue = Config.Namespace + "queue:" + queue
_, err = conn.Do("lpush", queue, bytes)
if err != nil {
return "", err
}
return data.Jid, nil
}
func enqueueAt(at float64, bytes []byte) error {
conn := Config.Pool.Get()
defer conn.Close()
_, err := conn.Do(
"zadd",
Config.Namespace+SCHEDULED_JOBS_KEY, at, bytes,
)
if err != nil {
return err
}
return nil
}
func timeToSecondsWithNanoPrecision(t time.Time) float64 {
return float64(t.UnixNano()) / NanoSecondPrecision
}
func durationToSecondsWithNanoPrecision(d time.Duration) float64 {
return float64(d.Nanoseconds()) / NanoSecondPrecision
}
func nowToSecondsWithNanoPrecision() float64 {
return timeToSecondsWithNanoPrecision(time.Now())
}