Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: per-topic message IDs #741

Merged
merged 3 commits into from
Jan 3, 2017
Merged

nsqd: per-topic message IDs #741

merged 3 commits into from
Jan 3, 2017

Conversation

mreiferson
Copy link
Member

@mreiferson mreiferson commented Apr 16, 2016

(Pushing up rebased branches I've had laying around locally for ages)

This modifies message ID generation such that each topic
maintains a monotonic/atomic counter to generate
message IDs, a more scalable and performant approach.

@jehiah
Copy link
Member

jehiah commented Apr 16, 2016

interesting. thoughts off the top of my head:

This would mean per-topic message ID's are not unique across nsqd restarts (no timestamp or worker component and not persisted) and means they are less likely to be unique from the client perspective across multiple nsqd nodes (we didn't guarantee this already, but it's nice for logging).

We do persist message ID's in the disk back queue, so it seems per-topic message ID's could get out-of-sync and overlap w/ this sequence based message ID generation no? It seams reasonable that a client should be able to expect the same message ID for a given message from a given nsqd node every time it gets that message, so i like that we persist the message ID.

One approach is we could narrow the definition of a message ID, to make it more of a ACK id, where it's only purpose is an identifier to FIN/ACK/TOUCH a message, and it's not an ID for the message itself. (i'm not sure i like this, just putting it out there)

@mreiferson
Copy link
Member Author

One approach is we could narrow the definition of a message ID, to make it more of a ACK id, where it's only purpose is an identifier to FIN/ACK/TOUCH a message, and it's not an ID for the message itself. (i'm not sure i like this, just putting it out there)

Practically speaking that's all it really is and I regret treating them any differently. Agreed that we need to account for the edge cases you mentioned, but they're pretty easy once you've decided on the above.

By logging I assume you mean in the nsqd logs?

Let me get some benchmarks here so we have some numbers to make decisions.

@mreiferson
Copy link
Member Author

Benchmark added and benchcmp output below.

NOTE: I think at 16+ my laptop just doesn't have anything left to give

benchmark                                old ns/op     new ns/op     delta
BenchmarkProtocolV2PubMultiTopic1-4      1325          1231          -7.09%
BenchmarkProtocolV2PubMultiTopic2-4      890           826           -7.19%
BenchmarkProtocolV2PubMultiTopic4-4      927           633           -31.72%
BenchmarkProtocolV2PubMultiTopic8-4      880           638           -27.50%
BenchmarkProtocolV2PubMultiTopic16-4     745           730           -2.01%
BenchmarkProtocolV2PubMultiTopic32-4     750           685           -8.67%

benchmark                                old MB/s     new MB/s     speedup
BenchmarkProtocolV2PubMultiTopic1-4      150.86       162.44       1.08x
BenchmarkProtocolV2PubMultiTopic2-4      224.62       242.12       1.08x
BenchmarkProtocolV2PubMultiTopic4-4      215.71       315.50       1.46x
BenchmarkProtocolV2PubMultiTopic8-4      227.19       313.21       1.38x
BenchmarkProtocolV2PubMultiTopic16-4     268.36       273.73       1.02x
BenchmarkProtocolV2PubMultiTopic32-4     266.48       291.83       1.10x

@judwhite
Copy link
Contributor

I share @jehiah's concerns about this approach. It most noticeably affects nsqd instances which persist every message to disk, and long running work queues, but I believe it could impact in-memory queues also.

Scenario 1:

  • nsqd receives message, assigns messageID 1
  • Consumer picks up messageID 1
  • nsqd restarts before Consumer FIN's the message
  • Consumer reconnects to nsqd before attempting to FIN
  • nsqd receives a new message, assigns messageID 1
  • Original Consumer FIN's the original messageID 1, which actually FIN's the new message.

Scenario 2:

  • Message goes to disk without being picked up by a Consumer
  • nsqd restarts
  • On next PUB, will there be two messages with messageID 1 in the diskqueue?

I think there was always an edge-case problem with using wall-clock to generate the ID since clocks change and cannot be relied upon to be monotonic. There's code to guard against this, but it could make publishing to nsqd unavailable until time proceeds (ever have NTP send you back in time 5 minutes?) - this could actually be another NOK scenario. It's also not guaranteed time hasn't gone backwards between nsqd restarts, which the current solution wouldn't be able to guard against.

One possible solution could be to persist the sequence with the metadata. This comes with the same issues as Scenario 1 above for messageID's sent out between persists when nsqd restarts without the ability to sync.

Two ideas around using persisted ID's:

  1. If it's purely sequential we'd almost be guaranteeing collisions given the possibility of the persisted ID not being current. A wall-clock component would shorten the possibility of collision, but wouldn't completely eliminate it.
  2. To get globally unique ID's I think you'd have to buffer writes for a very short duration, few ms at most, and only reply once the latest sequence has been persisted and synced, allowing you to remove the time component (if you trust fsync). If you specify disk crashes/corruption require assigning a new -worker-id you should have globally unique ID's without the above problems. I'd have to benchmark this idea, I don't have an idea of how much it will impact write throughput and probably _It Depends_™.

My suggestion is a hybrid, start with 1 and work towards 2 if it's feasible, but keep the time component. If NTP and fsync both work against you it's just not your day.

Thoughts?

@jehiah
Copy link
Member

jehiah commented Apr 18, 2016

@judwhite w/rt scenario 1, it's interesting but message ID's are only valid on the TCP connection they were received over, so we shouldn't have to worry about cross-connection ID re-use (i'm 90% sure on this).

w/r/t number two, it's easy to just re-number the messages ID's when you read them from the backend because the ID wouldn't about the message contents, it's just an ephemeral ID when you send it to the client.

My biggest concern is that because we expose the ID to the client, people might have mistakenly used it as an ID for the message contents, and this would break that semantics. =(

@mreiferson
Copy link
Member Author

My biggest concern is that because we expose the ID to the client, people might have mistakenly used it as an ID for the message contents, and this would break that semantics. =(

Yea, but that was always a bad idea.

#625 would, in theory, make the IDs actually useful with stronger guarantees.

@stephensearles
Copy link

Thinking about the planned resilience work... I think this can still work, but chiming in to be sure. As long as nsqds are replicating peers' backlogs independently, if an nsqd node goes down, those peers can still agree on the set of messages that need to be recovered. They can just treat that whole node-specific backlog as just a list of messages and re-ID them as part of recovery.

message ID's are only valid on the TCP connection they were received over

@jehiah, question: to which TCP connection are you referring, the one from nsqd to the consumer? Is that channel-specific? Does that mean, even in just a simple scenario with one nsqd, when a message is requeued, if it's sent to a different consumer (or even the same one after reconnecting), it may get a new message ID? Might two consumers, on the same topic and different channels, get different message IDs for the same message? If these are possible, in a recovery scenario, how will we be able to tell which channels a message has already been FIN'd for? On recovery, we can just duplicate messages back to channels that already got it, so maybe that's a non-issue. The only other thing I can see is that it perhaps makes it harder to track when we don't need to retain a message's contents anymore because the IDs in the REQ wont necessarily match up to the original message. (Do we store the message content multiple times, per channel?)

@mreiferson
Copy link
Member Author

mreiferson commented Apr 18, 2016

Thinking about the planned resilience work... I think this can still work, but chiming in to be sure. As long as nsqds are replicating peers' backlogs independently, if an nsqd node goes down, those peers can still agree on the set of messages that need to be recovered. They can just treat that whole node-specific backlog as just a list of messages and re-ID them as part of recovery.

I think the outcome of this PR would be that we would officially declare that NSQ's message IDs are an internal implementation detail.

It's possible for us to later provide stronger guarantees around message IDs, especially in light of #625, or not.

@jehiah, question: to which TCP connection are you referring, the one from nsqd to the consumer? Is that channel-specific? Does that mean, even in just a simple scenario with one nsqd, when a message is requeued, if it's sent to a different consumer (or even the same one after reconnecting), it may get a new message ID? Might two consumers, on the same topic and different channels, get different message IDs for the same message? If these are possible, in a recovery scenario, how will we be able to tell which channels a message has already been FIN'd for? On recovery, we can just duplicate messages back to channels that already got it, so maybe that's a non-issue. The only other thing I can see is that it perhaps makes it harder to track when we don't need to retain a message's contents anymore because the IDs in the REQ wont necessarily match up to the original message. (Do we store the message content multiple times, per channel?)

Right now (regardless of this PR), NSQ's message IDs are client-specific. Only the client that received the message, only on the original connection that received it, can respond to it. This means that message IDs as used by nsqd only have a lifetime the duration of a single consumer connection. They aren't used for anything else, internally.

Technically, because of this property, we could generate the IDs on a per-client basis too, but I think generating them at the topic level makes more sense, especially in light of potential work in #625.

@mreiferson
Copy link
Member Author

We've always advised users to implement their own domain-level message IDs and put them in the message body.

This PR is simply "the nail in the coffin".

@judwhite
Copy link
Contributor

message ID's are only valid on the TCP connection they were received over

@jehiah I checked Channel.FinishMessage, you're right you can only FIN a message ID which is in-flight for that connection.

I think the outcome of this PR would be that we would officially declare that NSQ's message IDs are an internal implementation detail.

I worry it may be too late, changing message ID to be ephemeral risks breaking some users, advised or not 😃 It's exported by go-nsq, weakening the current guarantees (even if not in writing) will probably cause issues for some people. For example, tracking retries by message ID.

Would you be open to help moving #625 forward, and strengthening the guarantees instead?

@ploxiln
Copy link
Member

ploxiln commented Apr 18, 2016

Some alternative ideas to reduce contention and improve performance:

  • Save and reuse the last timestamp, don't check the time again until after generating N (e.g. 40) message ids
  • keep the current id generation logic with timestamp and worker id, but do switch to per-topic id generators (assuming messages from separate topics having the same ID won't be problematic for clients which consume multiple topics in the same process)

@mreiferson
Copy link
Member Author

mreiferson commented Apr 19, 2016

Save and reuse the last timestamp, don't check the time again until after generating N (e.g. 40) message ids

Not following this one?

keep the current id generation logic with timestamp and worker id, but do switch to per-topic id generators (assuming messages from separate topics having the same ID won't be problematic for clients which consume multiple topics in the same process)

Thought about this, I need to see if we can modify the algorithm to support concurrent operations so that we don't need to add a goroutine per topic in exchange.

@ploxiln
Copy link
Member

ploxiln commented Apr 19, 2016

Save and reuse the last timestamp, don't check the time again until after generating N (e.g. 40) message ids

Not following this one?

The idea is, you can avoid calling time.Now() on every single call to NewGUID(). Instead, store the last result in a var like guidFactory.lastnanos. Only refresh that timestamp when sequence gets up to 40 (or something like that). Now you're calling time.Now() 2.5% as much as previously.

If on a slow topic lastnanos is actually from hundreds of seconds or even hours or days ago, that's fine, the message id's will still be unique. If nsqd is restarted lastnanos will jump forward to current time, that's also fine.

EDIT: theoretical member name would more accurately be called "lastmillis" or probably "lastTS". Also, to get same reduction in calls to time.Now() when many messages are being produced in (roughly) the same millisecond, you'd re-call time.Now() when sequence % N == N - 1

EDIT 2: the best value for N is probably 2 ^ sequenceBits: don't bother to update the timestamp part until sequence wraps.

@judwhite
Copy link
Contributor

@ploxiln I tried prototyping your idea and so far I haven't found a way to do it safely without a mutex since there are two variables involved (base time + offset) - maybe you could show me? time.Now().UnixNano() benches at 180ns/op on a modest Digital Ocean box and 12ns/op on my dev box. With some not-so-safe but on the right path use of sync/atomic (without a mutex) the prototype benches at 19ns/op on DO and 10ns/op on my dev box. Even so, time isn't a good uniqueifier for reasons above, you can get duplicate ID's if time goes backwards between nsqd restarts which is a (possible) problem for diskqueue, or an unavailability problem if it happens while nsqd is running. If you want to accept that situation, that's fine, I'll back off, just let me know.

@ploxiln
Copy link
Member

ploxiln commented Apr 19, 2016

I probably should have reordered my bullet points in my first post; my proposal for the timestamp only makes sense assuming nsqd keeps basically the same id structure/algorithm (but perhaps switches to independent instances for each topic to reduce contention). The same algorithm would indeed require something like a mutex, because of the treatment of the timestamp. (A mutex around a fast section might still be noticeably faster than a go channel.)

I'm just throwing out an idea; if the maintainers of nsq want to change to just incrementing int64 sequence per topic, I won't argue, I'm really not very familiar with nsq protocol internals.

@mreiferson
Copy link
Member Author

@judwhite we should just bench the simple path, which is the current algorithm in master refactored to be per-topic (ala this PR), and then a mutex surrounding the critical section.

If that shows better scalability characteristics than master, it's a win.

@mreiferson
Copy link
Member Author

mreiferson commented Jan 2, 2017

@jehiah @judwhite @ploxiln

In light of #367 and #838 landing, which will be a "backwards incompatible" release due to removal of deprecated features, I'd like to revisit this before we stamp 1.0.

My preference, in order:

  1. Land this as is. Despite our original design, relying on NSQ message IDs for deduping has always been effectively broken. Users should be adding their own domain-specific IDs to messages for those purposes. This PR would need validation around DiskQueue semantics so that it's impossible for a message ID to be repeated for the current session, despite persistence.

  2. Modify this PR to use the existing master algorithm but on a per-topic basis (with locks around critical section), likely improving multi-topic publishing contention (needs some benchmarking), but not quite as impactful as the current PR.

I'll probably give in to (2) if (1) makes everyone's lives difficult. Speak up!

@mreiferson mreiferson added the 1.0 label Jan 2, 2017
@mreiferson
Copy link
Member Author

Last commit restores the time/node based ID generator.

benchmark                                old ns/op     new ns/op     delta
BenchmarkProtocolV2PubMultiTopic1-4      597           626           +4.86%
BenchmarkProtocolV2PubMultiTopic2-4      388           392           +1.03%
BenchmarkProtocolV2PubMultiTopic4-4      329           332           +0.91%
BenchmarkProtocolV2PubMultiTopic8-4      360           360           +0.00%
BenchmarkProtocolV2PubMultiTopic16-4     384           387           +0.78%
BenchmarkProtocolV2PubMultiTopic32-4     409           418           +2.20%

benchmark                                old MB/s     new MB/s     speedup
BenchmarkProtocolV2PubMultiTopic1-4      334.67       319.20       0.95x
BenchmarkProtocolV2PubMultiTopic2-4      515.39       509.50       0.99x
BenchmarkProtocolV2PubMultiTopic4-4      607.72       601.08       0.99x
BenchmarkProtocolV2PubMultiTopic8-4      554.42       555.32       1.00x
BenchmarkProtocolV2PubMultiTopic16-4     519.98       516.47       0.99x
BenchmarkProtocolV2PubMultiTopic32-4     488.85       477.89       0.98x

Only nominally slower, but still much better scaling characteristics vs master:

benchmark                                old ns/op     new ns/op     delta
BenchmarkProtocolV2PubMultiTopic1-4      685           626           -8.61%
BenchmarkProtocolV2PubMultiTopic2-4      616           392           -36.36%
BenchmarkProtocolV2PubMultiTopic4-4      578           332           -42.56%
BenchmarkProtocolV2PubMultiTopic8-4      551           360           -34.66%
BenchmarkProtocolV2PubMultiTopic16-4     626           387           -38.18%
BenchmarkProtocolV2PubMultiTopic32-4     514           418           -18.68%

benchmark                                old MB/s     new MB/s     speedup
BenchmarkProtocolV2PubMultiTopic1-4      291.94       319.20       1.09x
BenchmarkProtocolV2PubMultiTopic2-4      324.19       509.50       1.57x
BenchmarkProtocolV2PubMultiTopic4-4      345.50       601.08       1.74x
BenchmarkProtocolV2PubMultiTopic8-4      362.92       555.32       1.53x
BenchmarkProtocolV2PubMultiTopic16-4     319.21       516.47       1.62x
BenchmarkProtocolV2PubMultiTopic32-4     388.86       477.89       1.23x

@ploxiln
Copy link
Member

ploxiln commented Jan 2, 2017

I would go for approach (2).

Either approach, by being per-topic, improves scalability. But there's another trade-off: approach (2) avoids changing ID behavior too drastically all at once, pretty much all tools and consumers will be fine. But approach (1) rips off the bandaid and provides duplicated IDs pretty quick - from separate nsqd and from the same nsqd if it restarts after not too many messages. (But not within the same TCP connection, of course.)

@jehiah
Copy link
Member

jehiah commented Jan 3, 2017

thumbsup on (2).

Also this reminds me of a tangential topic about worker-id? Do we really want/need it?

@mreiferson
Copy link
Member Author

Also this reminds me of a tangential topic about worker-id? Do we really want/need it?

No, we don't want it, but I think we need to keep it in order to preserve existing behavior. Given that, I'd vote to rename it node-id or just id, I honestly have no idea what we were thinking calling it worker.

@jehiah
Copy link
Member

jehiah commented Jan 3, 2017

but I think we need to keep it in order to preserve existing behavior

which parts? We don't promise or guarantee worker ID uniqueness in a cluster, and we are not promising message ID uniqueness across nsqd or topic. We also don't need it for naming of the nsqd.%s.dat file (we already require exactly one nsqd pointed at a given data directory, so we could glob match for backwards compatibility)

@mreiferson
Copy link
Member Author

We don't promise or guarantee worker ID uniqueness in a cluster, and we are not promising message ID uniqueness across nsqd or topic

In order to preserve what some people are using message IDs for, continuing to provide --worker-id does provide a mechanism to ensure uniqueness across nsqd for a specific topic.

@jehiah
Copy link
Member

jehiah commented Jan 3, 2017

Fair, a properly managed --worker-id does provide that; I guess i'm just saying removing that as part of switch to 1.0 is the ideal time to do that.

(also as i'm thinking through this it'd probably be nice to switch from nsqd.%s.dat to nsqd.dat separate from the worker-id param.)

@ploxiln
Copy link
Member

ploxiln commented Jan 3, 2017

If worker-id is removed, then it seems like you might as well do the switch to just an atomically incremented uint64, since without worker-id you're pretty likely to get duplicate IDs in the same topic, at the same time, from two separate nsqd.

Here's another idea - switch to incrementing uint64, but with random initial value, so low-number IDs are not repeated so soon from the same nsqd when it restarts twice.

@mreiferson
Copy link
Member Author

Agreed with @ploxiln, I think given our choice to maintain compatibility, we must continue to provide --worker-id (although, for 1.0 I'm OK with renaming it, and we probably should).

(also as i'm thinking through this it'd probably be nice to switch from nsqd.%s.dat to nsqd.dat separate from the worker-id param.)

👍 let's land that separately though

Otherwise, this should be RTM.

@jehiah jehiah merged commit ecfb30f into nsqio:master Jan 3, 2017
@mreiferson mreiferson deleted the id-gen branch January 16, 2017 02:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants