Skip to content

Commit

Permalink
add compression/decompression pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Mar 25, 2019
1 parent 3eb619f commit 2bbd7c2
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 15 deletions.
31 changes: 31 additions & 0 deletions internal/compress/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package compress

import (
"compress/flate"
"fmt"
"io/ioutil"

"bytes"

"github.com/golang/snappy"
)

func SnappyDecompress(compressedMsg []byte) ([]byte, error) {
body, err := snappy.Decode(nil, compressedMsg)
if err != nil {
return nil, fmt.Errorf("error in decode snappy compressed message: %v", err)
}

return body, nil
}

func DeflateDecompress(compressedMsg []byte) ([]byte, error) {
fr := flate.NewReader(bytes.NewReader(compressedMsg))
defer fr.Close()
body, err := ioutil.ReadAll(fr)
if err != nil {
return nil, fmt.Errorf("error in decode deflate compressed message: %v", err)
}

return body, nil
}
8 changes: 5 additions & 3 deletions nsqd/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,10 @@ type clientV2 struct {
IdentifyEventChan chan identifyEvent
SubEventChan chan *Channel

TLS int32
Snappy int32
Deflate int32
TLS int32
Snappy int32
Deflate int32
DeflateLevel int32

// re-usable buffer for reading the 4-byte lengths off the wire
lenBuf [4]byte
Expand Down Expand Up @@ -511,6 +512,7 @@ func (c *clientV2) UpgradeDeflate(level int) error {
c.Writer = bufio.NewWriterSize(fw, c.OutputBufferSize)

atomic.StoreInt32(&c.Deflate, 1)
atomic.StoreInt32(&c.DeflateLevel, int32(level))

return nil
}
Expand Down
41 changes: 41 additions & 0 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,27 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
}
}

var isSnappy, isDeflate bool
if value, exist := req.Header["Content-Encoding"]; exist {
for _, encoding := range value {
if encoding == "snappy" {
isSnappy = true
}

if encoding == "deflate" {
isDeflate = true
}
}
}

if isSnappy && isDeflate {
return nil, http_api.Err{400, "INVALID_CONTENT_ENCODING"}
}

msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
msg.snappyCompressed = isSnappy
msg.deflateCompressed = isDeflate
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
Expand All @@ -248,6 +267,23 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
return nil, err
}

var isSnappy, isDeflate bool
if value, exist := req.Header["Content-Encoding"]; exist {
for _, encoding := range value {
if encoding == "snappy" {
isSnappy = true
}

if encoding == "deflate" {
isDeflate = true
}
}
}

if isSnappy && isDeflate {
return nil, http_api.Err{400, "INVALID_CONTENT_ENCODING"}
}

// text mode is default, but unrecognized binary opt considered true
binaryMode := false
if vals, ok := reqParams["binary"]; ok {
Expand Down Expand Up @@ -302,6 +338,11 @@ func (s *httpServer) doMPUB(w http.ResponseWriter, req *http.Request, ps httprou
}
}

for _, msg := range msgs {
msg.snappyCompressed = isSnappy
msg.deflateCompressed = isDeflate
}

err = topic.PutMessages(msgs)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
Expand Down
3 changes: 3 additions & 0 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ type Message struct {
pri int64
index int
deferred time.Duration

snappyCompressed bool
deflateCompressed bool
}

func NewMessage(id MessageID, body []byte) *Message {
Expand Down
24 changes: 23 additions & 1 deletion nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"
"unsafe"

"github.com/nsqio/nsq/internal/compress"
"github.com/nsqio/nsq/internal/protocol"
"github.com/nsqio/nsq/internal/version"
)
Expand Down Expand Up @@ -126,7 +127,28 @@ func (p *protocolV2) SendMessage(client *clientV2, msg *Message) error {
p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): writing msg(%s) to client(%s) - %s", msg.ID, client, msg.Body)
var buf = &bytes.Buffer{}

_, err := msg.WriteTo(buf)
adjustedBody := msg.Body
var err error
if msg.snappyCompressed {
// message is snappy compressed
adjustedBody, err = compress.SnappyDecompress(msg.Body)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): decompress msg(%s) with snappy error", msg.ID)
return err
}
msg.snappyCompressed = false
} else if msg.deflateCompressed {
// message is deflate compressed
adjustedBody, err = compress.DeflateDecompress(msg.Body)
if err != nil {
p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): decompress msg(%s) with deflate error", msg.ID)
return err
}
msg.deflateCompressed = false
}

msg.Body = adjustedBody
_, err = msg.WriteTo(buf)
if err != nil {
return err
}
Expand Down
17 changes: 6 additions & 11 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,17 +307,12 @@ func (t *Topic) messagePump() {
goto exit
}

for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
for _, channel := range chans {
chanMsg := NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
chanMsg.deflateCompressed = msg.deflateCompressed
chanMsg.snappyCompressed = msg.snappyCompressed
if chanMsg.deferred != 0 {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
Expand Down

0 comments on commit 2bbd7c2

Please sign in to comment.