-
Notifications
You must be signed in to change notification settings - Fork 215
/
redis.go
254 lines (229 loc) · 7.53 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package miniredis
import (
"context"
"fmt"
"math"
"math/big"
"strings"
"sync"
"time"
"github.com/alicebob/miniredis/v2/server"
)
const (
msgWrongType = "WRONGTYPE Operation against a key holding the wrong kind of value"
msgNotValidHllValue = "WRONGTYPE Key is not a valid HyperLogLog string value."
msgInvalidInt = "ERR value is not an integer or out of range"
msgIntOverflow = "ERR increment or decrement would overflow"
msgInvalidFloat = "ERR value is not a valid float"
msgInvalidMinMax = "ERR min or max is not a float"
msgInvalidRangeItem = "ERR min or max not valid string range item"
msgInvalidTimeout = "ERR timeout is not a float or out of range"
msgInvalidRange = "ERR value is out of range, must be positive"
msgSyntaxError = "ERR syntax error"
msgKeyNotFound = "ERR no such key"
msgOutOfRange = "ERR index out of range"
msgInvalidCursor = "ERR invalid cursor"
msgXXandNX = "ERR XX and NX options at the same time are not compatible"
msgTimeoutNegative = "ERR timeout is negative"
msgTimeoutIsOutOfRange = "ERR timeout is out of range"
msgInvalidSETime = "ERR invalid expire time in set"
msgInvalidSETEXTime = "ERR invalid expire time in setex"
msgInvalidPSETEXTime = "ERR invalid expire time in psetex"
msgInvalidKeysNumber = "ERR Number of keys can't be greater than number of args"
msgNegativeKeysNumber = "ERR Number of keys can't be negative"
msgFScriptUsage = "ERR unknown subcommand or wrong number of arguments for '%s'. Try SCRIPT HELP."
msgFScriptUsageSimple = "ERR unknown subcommand '%s'. Try SCRIPT HELP."
msgFPubsubUsage = "ERR unknown subcommand or wrong number of arguments for '%s'. Try PUBSUB HELP."
msgFPubsubUsageSimple = "ERR unknown subcommand '%s'. Try PUBSUB HELP."
msgFObjectUsage = "ERR unknown subcommand '%s'. Try OBJECT HELP."
msgScriptFlush = "ERR SCRIPT FLUSH only support SYNC|ASYNC option"
msgSingleElementPair = "ERR INCR option supports a single increment-element pair"
msgGTLTandNX = "ERR GT, LT, and/or NX options at the same time are not compatible"
msgInvalidStreamID = "ERR Invalid stream ID specified as stream command argument"
msgStreamIDTooSmall = "ERR The ID specified in XADD is equal or smaller than the target stream top item"
msgStreamIDZero = "ERR The ID specified in XADD must be greater than 0-0"
msgNoScriptFound = "NOSCRIPT No matching script. Please use EVAL."
msgUnsupportedUnit = "ERR unsupported unit provided. please use M, KM, FT, MI"
msgXreadUnbalanced = "ERR Unbalanced 'xread' list of streams: for each stream key an ID or '$' must be specified."
msgXgroupKeyNotFound = "ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically."
msgXtrimInvalidStrategy = "ERR unsupported XTRIM strategy. Please use MAXLEN, MINID"
msgXtrimInvalidMaxLen = "ERR value is not an integer or out of range"
msgXtrimInvalidLimit = "ERR syntax error, LIMIT cannot be used without the special ~ option"
msgDBIndexOutOfRange = "ERR DB index is out of range"
msgLimitCombination = "ERR syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"
msgRankIsZero = "ERR RANK can't be zero: use 1 to start from the first match, 2 from the second ... or use negative to start from the end of the list"
msgCountIsNegative = "ERR COUNT can't be negative"
msgMaxLengthIsNegative = "ERR MAXLEN can't be negative"
msgLimitIsNegative = "ERR LIMIT can't be negative"
msgMemorySubcommand = "ERR unknown subcommand '%s'. Try MEMORY HELP."
)
func errWrongNumber(cmd string) string {
return fmt.Sprintf("ERR wrong number of arguments for '%s' command", strings.ToLower(cmd))
}
func errLuaParseError(err error) string {
return fmt.Sprintf("ERR Error compiling script (new function): %s", err.Error())
}
func errReadgroup(key, group string) error {
return fmt.Errorf("NOGROUP No such key '%s' or consumer group '%s'", key, group)
}
func errXreadgroup(key, group string) error {
return fmt.Errorf("NOGROUP No such key '%s' or consumer group '%s' in XREADGROUP with GROUP option", key, group)
}
func msgNotFromScripts(sha string) string {
return fmt.Sprintf("This Redis command is not allowed from script script: %s, &c", sha)
}
// withTx wraps the non-argument-checking part of command handling code in
// transaction logic.
func withTx(
m *Miniredis,
c *server.Peer,
cb txCmd,
) {
ctx := getCtx(c)
if ctx.nested {
// this is a call via Lua's .call(). It's already locked.
cb(c, ctx)
m.signal.Broadcast()
return
}
if inTx(ctx) {
addTxCmd(ctx, cb)
c.WriteInline("QUEUED")
return
}
m.Lock()
cb(c, ctx)
// done, wake up anyone who waits on anything.
m.signal.Broadcast()
m.Unlock()
}
// blockCmd is executed returns whether it is done
type blockCmd func(*server.Peer, *connCtx) bool
// blocking keeps trying a command until the callback returns true. Calls
// onTimeout after the timeout (or when we call this in a transaction).
func blocking(
m *Miniredis,
c *server.Peer,
timeout time.Duration,
cb blockCmd,
onTimeout func(*server.Peer),
) {
var (
ctx = getCtx(c)
)
if inTx(ctx) {
addTxCmd(ctx, func(c *server.Peer, ctx *connCtx) {
if !cb(c, ctx) {
onTimeout(c)
}
})
c.WriteInline("QUEUED")
return
}
localCtx, cancel := context.WithCancel(m.Ctx)
defer cancel()
timedOut := false
if timeout != 0 {
go setCondTimer(localCtx, m.signal, &timedOut, timeout)
}
go func() {
<-localCtx.Done()
m.signal.Broadcast() // main loop might miss this signal
}()
if !ctx.nested {
// this is a call via Lua's .call(). It's already locked.
m.Lock()
defer m.Unlock()
}
for {
if c.Closed() {
return
}
if m.Ctx.Err() != nil {
return
}
done := cb(c, ctx)
if done {
return
}
if timedOut {
onTimeout(c)
return
}
m.signal.Wait()
}
}
func setCondTimer(ctx context.Context, sig *sync.Cond, timedOut *bool, timeout time.Duration) {
dl := time.NewTimer(timeout)
defer dl.Stop()
select {
case <-dl.C:
sig.L.Lock() // for timedOut
*timedOut = true
sig.Broadcast() // main loop might miss this signal
sig.L.Unlock()
case <-ctx.Done():
}
}
// formatBig formats a float the way redis does
func formatBig(v *big.Float) string {
// Format with %f and strip trailing 0s.
if v.IsInf() {
return "inf"
}
// if math.IsInf(v, -1) {
// return "-inf"
// }
return stripZeros(fmt.Sprintf("%.17f", v))
}
func stripZeros(sv string) string {
for strings.Contains(sv, ".") {
if sv[len(sv)-1] != '0' {
break
}
// Remove trailing 0s.
sv = sv[:len(sv)-1]
// Ends with a '.'.
if sv[len(sv)-1] == '.' {
sv = sv[:len(sv)-1]
break
}
}
return sv
}
// redisRange gives Go offsets for something l long with start/end in
// Redis semantics. Both start and end can be negative.
// Used for string range and list range things.
// The results can be used as: v[start:end]
// Note that GETRANGE (on a string key) never returns an empty string when end
// is a large negative number.
func redisRange(l, start, end int, stringSymantics bool) (int, int) {
if start < 0 {
start = l + start
if start < 0 {
start = 0
}
}
if start > l {
start = l
}
if end < 0 {
end = l + end
if end < 0 {
end = -1
if stringSymantics {
end = 0
}
}
}
if end < math.MaxInt32 {
end++ // end argument is inclusive in Redis.
}
if end > l {
end = l
}
if end < start {
return 0, 0
}
return start, end
}