Skip to content

Commit

Permalink
feat: pubsub and sentinel in resp2
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Oct 13, 2022
1 parent aa0a257 commit defba73
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 83 deletions.
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A Fast Golang Redis client that does auto pipelining and supports client side ca
* Auto pipeline for non-blocking redis commands
* Connection pooling for blocking redis commands
* Opt-in client side caching in RESP3
* Pub/Sub, Redis 7 Sharded Pub/Sub in RESP3
* Pub/Sub, Redis 7 Sharded Pub/Sub
* Redis Cluster, Sentinel, Streams, TLS, RedisJSON, RedisBloom, RediSearch, RedisGraph, RedisTimeseries, RedisAI, RedisGears
* IDE friendly redis command builder
* Generic Hash/RedisJSON Object Mapping with client side caching and optimistic locking
Expand All @@ -27,8 +27,6 @@ A Fast Golang Redis client that does auto pipelining and supports client side ca
Rueidis is built around RESP2 and RESP3 protocol, and supports almost all redis features.
However, the following features has not yet been implemented in RESP2 mode:

* PubSub only works in RESP3 and Redis >= 6.0
* Redis Sentinel only works in RESP3 and Redis >= 6.0
* Client side caching only works in RESP3 and Redis >= 6.0

## Getting Started
Expand Down Expand Up @@ -636,9 +634,3 @@ client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString()
client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice()
```

## Not Yet Implement

The following subjects are not yet implemented.
* PubSub in RESP2
* Sentinel in RESP2
12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ services:
ports:
- "6380:6380"
- "26379:26379"
sentinel5:
image: redis:5-alpine
entrypoint:
- /bin/sh
- -c
- |
redis-server --save "" --appendonly no --port 6385 &
echo "sentinel monitor test5 127.0.0.1 6385 2\n" > sentinel.conf
redis-server sentinel.conf --sentinel
ports:
- "6385:6385"
- "26355:26379"
cluster:
image: redis:7.0.5-alpine
entrypoint:
Expand Down
2 changes: 1 addition & 1 deletion dockertest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
set -ev

docker-compose up -d
go test -coverprofile=./c.out -v -race -timeout 20m ./...
go test -coverprofile=./c.out -v -race -timeout 30m ./...
cp c.out coverage.txt
docker-compose down -v
7 changes: 3 additions & 4 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ type mux struct {
func makeMux(dst string, option *ClientOption, dialFn dialFn) *mux {
dead := deadFn()
return newMux(dst, option, (*pipe)(nil), dead, func() (w wire) {
conn, err := dialFn(dst, option)
if err == nil {
w, err = newPipe(conn, option)
}
w, err := newPipe(func() (net.Conn, error) {
return dialFn(dst, option)
}, option)
if err != nil {
dead.error.Store(&errs{error: err})
w = dead
Expand Down
138 changes: 99 additions & 39 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,22 @@ type pipe struct {
error atomic.Value

onInvalidations func([]RedisMessage)

r2psFn func() (p *pipe, err error)
r2mu sync.Mutex
r2pipe *pipe
r2ps bool
}

func newPipe(connFn func() (net.Conn, error), option *ClientOption) (p *pipe, err error) {
return _newPipe(connFn, option, false)
}

func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) {
func _newPipe(connFn func() (net.Conn, error), option *ClientOption, r2ps bool) (p *pipe, err error) {
conn, err := connFn()
if err != nil {
return nil, err
}
p = &pipe{
conn: conn,
queue: newRing(option.RingScaleEachConn),
Expand All @@ -75,6 +88,13 @@ func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) {

timeout: option.ConnWriteTimeout,
pinggap: option.Dialer.KeepAlive,

r2ps: r2ps,
}
if !r2ps {
p.r2psFn = func() (p *pipe, err error) {
return _newPipe(connFn, option, true)
}
}
if !option.DisableCache {
p.cache = newLRU(option.CacheSizeEachConn)
Expand Down Expand Up @@ -112,29 +132,31 @@ func newPipe(conn net.Conn, option *ClientOption) (p *pipe, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

var resp2 bool
for i, r := range p.DoMulti(ctx, cmds.NewMultiCompleted(init)...) {
if i == 0 {
p.info, err = r.ToMap()
} else {
err = r.Error()
}
if err != nil {
if re, ok := err.(*RedisError); ok {
if !resp2 && noHello.MatchString(re.string) {
resp2 = true
continue
} else if strings.Contains(re.string, "wrong number of arguments for 'TRACKING'") {
err = fmt.Errorf("%s: %w", re.string, ErrNoCache)
} else if resp2 {
continue
var r2 bool
if !r2ps {
for i, r := range p.DoMulti(ctx, cmds.NewMultiCompleted(init)...) {
if i == 0 {
p.info, err = r.ToMap()
} else {
err = r.Error()
}
if err != nil {
if re, ok := err.(*RedisError); ok {
if !r2 && noHello.MatchString(re.string) {
r2 = true
continue
} else if strings.Contains(re.string, "wrong number of arguments for 'TRACKING'") {
err = fmt.Errorf("%s: %w", re.string, ErrNoCache)
} else if r2 {
continue
}
}
p.Close()
return nil, err
}
p.Close()
return nil, err
}
}
if !resp2 {
if !r2 && !r2ps {
if ver, ok := p.info["version"]; ok {
if v := strings.Split(ver.string, "."); len(v) != 0 {
vv, _ := strconv.ParseInt(v[0], 10, 32)
Expand Down Expand Up @@ -300,6 +322,7 @@ func (p *pipe) _backgroundRead() (err error) {
skip int // skip rest push messages
ver = p.version
pr bool // push reply
r2ps = p.r2ps
)

defer func() {
Expand All @@ -316,7 +339,7 @@ func (p *pipe) _backgroundRead() (err error) {
if msg, err = readNextMessage(p.r); err != nil {
return
}
if msg.typ == '>' {
if msg.typ == '>' || (r2ps && len(msg.values) != 0 && msg.values[0].string != "pong") {
if pr = p.handlePush(msg.values); !pr {
continue
}
Expand All @@ -325,7 +348,7 @@ func (p *pipe) _backgroundRead() (err error) {
pr = false
continue
}
} else if ver < 7 && len(msg.values) != 0 {
} else if ver >= 6 && ver < 7 && len(msg.values) != 0 {
// This is a workaround for Redis 6's broken invalidation protocol: https://github.com/redis/redis/issues/8935
// When Redis 6 handles MULTI, MGET, or other multi-keys command,
// it will send invalidation message immediately if it finds the keys are expired, thus causing the multi-keys command response to be broken.
Expand Down Expand Up @@ -516,10 +539,29 @@ func (p *pipe) handlePush(values []RedisMessage) (reply bool) {
}
return false
}
func (p *pipe) _r2pipe() (r2p *pipe) {
p.r2mu.Lock()
if p.r2pipe != nil {
r2p = p.r2pipe
} else {
var err error
if r2p, err = p.r2psFn(); err != nil {
r2p = epipeFn(err)
} else {
p.r2pipe = r2p
}
}
p.r2mu.Unlock()
return r2p
}

func (p *pipe) Receive(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error {
if p.nsubs == nil || p.psubs == nil || p.ssubs == nil {
return ErrClosing
return p.Error()
}

if p.version < 6 && p.r2psFn != nil {
return p._r2pipe().Receive(ctx, subscribe, fn)
}

var sb *subs
Expand All @@ -535,9 +577,6 @@ func (p *pipe) Receive(ctx context.Context, subscribe cmds.Completed, fn func(me
default:
panic(wrongreceive)
}
if p.version < 6 {
return ErrRESP2PubSub
}

if ch, cancel := sb.Subscribe(args); ch != nil {
defer cancel()
Expand Down Expand Up @@ -575,6 +614,9 @@ func (p *pipe) CleanSubscriptions() {
}

func (p *pipe) SetPubSubHooks(hooks PubSubHooks) <-chan error {
if p.version < 6 && p.r2psFn != nil {
return p._r2pipe().SetPubSubHooks(hooks)
}
if hooks.isZero() {
if old := p.pshks.Swap(emptypshks).(*pshks); old.close != nil {
close(old.close)
Expand Down Expand Up @@ -622,6 +664,12 @@ func (p *pipe) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
}()
}

if cmd.NoReply() {
if p.version < 6 && p.r2psFn != nil {
return p._r2pipe().Do(ctx, cmd)
}
}

waits := atomic.AddInt32(&p.waits, 1) // if this is 1, and background worker is not started, no need to queue
state := atomic.LoadInt32(&p.state)

Expand All @@ -634,10 +682,6 @@ func (p *pipe) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
goto queue
}
if cmd.NoReply() {
if p.version < 6 {
atomic.AddInt32(&p.waits, -1)
return newErrResult(ErrRESP2PubSub)
}
p.background()
goto queue
}
Expand Down Expand Up @@ -685,19 +729,23 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...cmds.Completed) []RedisResu
}

isOptIn := multi[0].IsOptIn() // len(multi) > 0 should have already been checked by upper layer
noReply := false
noReply := 0
isBlock := false

for _, cmd := range multi {
if cmd.NoReply() {
if p.version < 6 {
for i := 0; i < len(resp); i++ {
resp[i] = newErrResult(ErrRESP2PubSub)
}
return resp
noReply++
}
}

if p.version < 6 && noReply != 0 {
if noReply != len(multi) {
for i := 0; i < len(resp); i++ {
resp[i] = newErrResult(ErrRESP2PubSubMixed)
}
noReply = true
break
return resp
} else if p.r2psFn != nil {
return p._r2pipe().DoMulti(ctx, multi...)
}
}

Expand Down Expand Up @@ -731,7 +779,7 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...cmds.Completed) []RedisResu
if waits != 1 {
goto queue
}
if isOptIn || noReply {
if isOptIn || noReply != 0 {
p.background()
goto queue
}
Expand Down Expand Up @@ -1104,6 +1152,11 @@ func (p *pipe) Close() {
if p.conn != nil {
p.conn.Close()
}
p.r2mu.Lock()
if p.r2pipe != nil {
p.r2pipe.Close()
}
p.r2mu.Unlock()
}

type pshks struct {
Expand All @@ -1126,6 +1179,13 @@ func deadFn() *pipe {
return dead
}

func epipeFn(err error) *pipe {
dead := &pipe{state: 3}
dead.error.Store(&errs{error: err})
dead.pshks.Store(emptypshks)
return dead
}

const (
protocolbug = "protocol bug, message handled out of order"
wrongreceive = "only SUBSCRIBE, SSUBSCRIBE, or PSUBSCRIBE command are allowed in Receive"
Expand Down
Loading

0 comments on commit defba73

Please sign in to comment.