-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
nsqd: add compression/decompression for messages #1149
nsqd: add compression/decompression for messages #1149
Conversation
WIP waits for @ploxiln 's re-design about nsqd storage backend. |
667c030
to
c649166
Compare
chanMsg.Timestamp = msg.Timestamp | ||
chanMsg.deferred = msg.deferred | ||
} | ||
for _, channel := range chans { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mainly used to avoid the race condition between topic's PutMessage
method which is detected by the go test -race
.
c649166
to
2bbd7c2
Compare
} | ||
|
||
msg.Body = adjustedBody | ||
_, err = msg.WriteTo(buf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still confused why the changes here are necessary, the existing compression functionality already delivers compressed messages to consumers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the snappy or deflate compression is implemented by wraping the socket, i.e., nsqd will write original message over the wrapped socket and the compressed message will be actually be written to the wire. Reading is the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that what we want to happen?
As we had discussed on #1148 (comment), I feel like the way to accomplish what you want is for your producers and consumers to compress data.
It probably makes sense to get on the same page about the feature and whether or not it belongs in NSQ core before we discuss code and implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. @mreiferson You're right. I have also consider this pattern about compressing message in producer and decompressing it in consumer. However, On this way we need to do:
1、pass the compression algorithm information to the consumer for it to determine how to decompress it. To accomplish this, we need to change the wire format currently used and also make it backward compatible. IMHO, it is difficult.
2、not all compression algorithm libs are supported by Golang/Python standard lib, for example snappy. In case to decompress the message correctly, consumer should know which compression algorithm the message use or the message can not be consumed correctly and will be requeued or even worse dropped directly once the max_tries
is reached(I know that we can just print an error log and exit the consumer to indicate that something is wrong, but how about users set the max_tries
to 1 and the message will be dropped next time it is consumed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How you compress the messages is entirely up to you. In most cases you control both the producer and consumer — is there a need for more than one compression type to be used?
If multiple compression formats do need to be supported, you could also build some identifier into the message structure to indicate to consumers how to decompress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have to transition from uncompressed messages to compressed messages, one way to do it is to start a new topic for the compressed messages ... so if you have "events" you might create another topic "events_gzip" and during the transition consume from both. Messages from "events_gzip" are gzipped, in this hypothetical case. But pick whatever is supported by both producer and consumer.
This scheme is nice because nsqd does not have to compress for storage and then decompress for delivery.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If multiple compression formats do need to be supported, you could also build some identifier into the message structure to indicate to consumers how to decompress.
I know this. But not every case we both control producer and consumer nor every producer need add compression byte info into a message.
If you have to transition from uncompressed messages to compressed messages, one way to do it is to start a new topic for the compressed messages ... so if you have "events" you might create another topic "events_gzip" and during the transition consume from both. Messages from "events_gzip" are gzipped, in this hypothetical case. But pick whatever is supported by both producer and consumer.
Yes. I agree with this. But what i want to express is to do this transparently to end users(producers and consumers).
func DeflateDecompress(compressedMsg []byte) ([]byte, error) { | ||
fr := flate.NewReader(bytes.NewReader(compressedMsg)) | ||
defer fr.Close() | ||
body, err := ioutil.ReadAll(fr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark
2bbd7c2
to
a19f720
Compare
fr := flate.NewReader(br) | ||
defer fr.Close() | ||
body, err := ioutil.ReadAll(fr) | ||
if err != nil && !(err == io.ErrUnexpectedEOF && br.Len() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! 👍
Let's wait for #1170 to be merged, then it will be easy to add this feature. :) |
@andyxning sorry I've been away for so long! Context: the intended use case of the existing compression features of I noticed that you had a design doc linked in the original issue description with the following motivation:
I think we should discuss these individually:
For this PR specifically, I would consider landing (2), i.e. just the changes to HTTP publish methods to support What do you think @andyxning and @ploxiln? |
👍 for adding |
👍 from me as well for HTTP Content-Encoding gzip/deflate (since those are already standard). That probably won't satisfy @andyxning ... but I still think the way to go, when you really need good compression, is for the producer/consumer to do it. That way avoids nsqd doing extra compress and uncompress work for each message, and allows you to squeeze the most efficiency out of the system, if you need it. |
Closing this, opened #1313 to track compression over HTTP pub. |
Fix #1148
This PR adds compression/decompression pipeline for nsqd server.
Update:
/cc @mreiferson @ploxiln