Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

NMT doesn't properly handle out of order data #41

Closed
Dieterbe opened this issue Nov 5, 2015 · 7 comments
Closed

NMT doesn't properly handle out of order data #41

Dieterbe opened this issue Nov 5, 2015 · 7 comments

Comments

@Dieterbe
Copy link
Contributor

Dieterbe commented Nov 5, 2015

due to the way nsqd currently fills over traffic from a in-mem to via-diskqueue channels (by selecting on them), it can arbitrarily reorder your data. ideally if the in memory channel is always empty this shouldn't happen but maybe due to minor hickups. we can see if increasing the size of the memory buffer helps though obviously then we would incur more data loss in case of an nsqd crash.

@woodsaj confirmed this by feeding data into nsqd in order, and have an NMT consumer with 1 concurrent handler, and the data was out of order.

I've had conversations with Matt Reiferson (of nsq) seeing how feasible it would be to add simple ordering guarantees to nsqd, even if merely per-topic per nsqd instance. but even that seems quite complex/tricky and would require a different model for requeues, defers, msg timeouts etc and would be a drastically different nsqd behavior, even with nsqio/nsq#625

his recommendation was to use an ephemeral channel to always read the latest data to serve up to users from RAM, and just drop what we can't handle, an additionally use a diskqueue backed channel which you read from and store into like HDFS, so that you can then use hadoop to properly compute the chunks to store in archival storage (i.e. go-tsz chunks in cassandra) even on out of order data.
though this seems like far more complexity than we want, although i like the idea of separating in-mem data and archival storage, that seems to let us simplify things. but using hadoop to work around poor ordering after the fact...

what we can also do:

  • current approach, but keep a window of messages which we sort, let's say of 10seconds long, and after 10s we can assume we have a good order and decode the messages and commit their metrics to go-tsz chunks and we wouldn't have much risk getting chunks that are >10s late. but of course then it will also take 10s for data to start showing up when NMT responds to queries. hmm well i guess the query handler could also look through the messages in the window and pull data from there.
  • related idea: don't explicitly keep a window of messages to sort, but keep simple non-go-tsz-optimized datastructures of points (like simple arrays of uint32-float64 pairs) so that the metrics of all new messages can immediately be added and are available for querying. whenever the data is getting old enough to move to cassandra, that's when we generate the chunks, at which point the data should be very stable.
    however this means for update operations we might commit the wrong values if the 2 writes for the same slot happen in the wrong order (though we're not currently doing any updates) and also it would be less RAM efficient to keep the data in such arrays.

note that in both above approaches we assume ordering of messages is all we need.
in reality messages from the collectors can contain points for different timestamps (and this is hard to address in the collectors per AJ) so in NMT we would have to order the actual points, not just the messages.

@woodsaj
Copy link
Member

woodsaj commented Nov 5, 2015

It should be noted that this is really on a problem when trying to back fill data (ie sending 24hours of data in the span of a few seconds). under normal operation data wouldnt see this problem.

@woodsaj
Copy link
Member

woodsaj commented Nov 5, 2015

We really cant afford to let this block us from getting NMT into production.

The way that is see it is that NSQ is just the wrong solution for this problem. The primary reason for using NSQ was to allow metrics to be buffered to disk when we couldnt process them due to faults. If writing to disk causes the messages to then be sent to consumers out of order, that is a huge problem (it wasnt a problem for kairosdb as we didnt really care about ordering).

Buffering messages for 10seconds just introduces more issues. Either we ACK the messages immediately and then risk losing up to 10seconds of data when NMT restarts, or we delay the ACKs for 10seconds and put additional pressure on NSQ causing it to buffer to disk and compound the ordering problem. Additionally, under normal operation metrics will generally be spaced at least 10seconds apart anyway.

So i really feel that we should just accept NSQ is the wrong tool for this job. We dont have time to change it right now and it will be good enough in the near term. This problem will only introduce itself when there is an outage causing metrics to be buffered on the collector, in grafana or in NSQ for more then 10seconds. And even then, the worst case scenario is that we drop some metrics for the duration of the outage.

Moving forward i think we should just move to using Kafka, as it will also help us address the fault tolerance and scaling issues of NMT.

@Dieterbe thoughts/comments?

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Nov 5, 2015

yeah it's unfortunate that when we addressed the original scope of the raintank-metrick (nsq) work we didn't have the foresight for the needs around data aggregation.

we delay the ACKs for 10seconds and put additional pressure on NSQ causing it to buffer to disk and compound the ordering problem.

what additional pressure? we had this convo before, nsqd doesn't have a performancy penalty from delayed or out of order acks. (i have not personally verified that, but a lot of people - nsq users and developers - would be quite surprised if it was the case)

also keep in mind that nsqd is undergoing refactoring to send all data through a WAL which is basically a disk backed FIFO (no more separate memory vs disk channels) (see the link in OP), it's just that the message delivery will still be based on an out-of-order model. we will probably effectively have an order much closer to natural time sorting if we use this new mechanism. likely we can even achieve guaranteed ordering as long as we have a single thread/connection and make sure messages don't timeout and trigger a requeue, we also don't do any defers.
i wonder what @mreiferson thinks, i don't remember asking him this in such a a specific way.

i thought you had several arguments against kafka (what were they again?),
to me it looks operationally quite complex and much of that is due to features it has that i'm not sure we actually need. like for any given stream (metric) we only have 1 writer so we don't really have to orchestrate ordering across different machines due to having multiple writers.

we seem to agree that nsq will probably be ok for a bit longer but let's try to get a really detailed picture of our current+future needs before starting a migration to kafka

@woodsaj
Copy link
Member

woodsaj commented Nov 5, 2015

what additional pressure?

Memory pressure. If you delay acks then NSQ will consume more memory resulting in more messages being sent to the diskqueue.

i thought you had several arguments against kafka

I really like Kafka. We were using it, but moved away from it due to problems with the NodeJS zookeeper library. We also had some issues with just keeping Zookeeper up and running with large message rates, but i see the latest Kafka can now handle tracking the read offsets of consumers itself, which was the majority of the workload for Zookeeper. http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

I also agree that NSQ will be ok for a bit longer. I just dont want any more time sunk into trying to make it do something that it is not designed for.

@Dieterbe
Copy link
Contributor Author

Dieterbe commented Nov 5, 2015

Memory pressure. If you delay acks then NSQ will consume more memory resulting in more messages being sent to the diskqueue.

maybe. i'm not sure the case where we ack immediately causes significantly less stress on the disk queue. for example, the delay between message going into the memqueue and being acked may be too long compared to the rate of inbound messages and may send the vast majority of messages to diskqueue anyway even if you ack "quickly".

well that's good news re kafka. there's also a rich ecosystem around it (spark etc).
a benefit of nsq is that it's easy to deploy, flexible in what kind of topology we want to build and arguably much easier (due to its simplicity and being in go) to dive into the code and tweak it for our needs.

if we can't actually build something that meets our needs then of course we'll need to switch away at some point, but i think there's value in exploring further how we can mold nsqd to meet our needs, especially with the big WAL refactoring. but this could be a timesink and cost more time than just switching to kafka, i'm not quite sure how much time that would take. i presume its Go libraries have matured by now

@Dieterbe
Copy link
Contributor Author

we should also think about what should the desired behavior be in case the pipeline (whether nsq or kafka) accumulates several seconds, minutes, or hours worth of data.
let's say we had an NMT/network outage and the pipe has 4 hours worth of data. instead of resuming where we left off and gradually catching up from the past until now (which may take hours), it may make more sense to start getting the current data (say 5 min ago til now, and then a realtime feed of current data) and only then start backfilling the old data. that is what's needed for alerting to be most reliable and probably also the most valuable to people looking at graphs.

@woodsaj
Copy link
Member

woodsaj commented Nov 20, 2015

This is a good point. though much lower in the prio list than other items. This would be hard to impliment in code though, as we know data needs to be processed in order. What i think would work though is if a outage is detected, or simply a large backlog of data detected:
Spin up a new node running NMT, configured in dry-run mode. Then point graphite at it. This will allow the graphite queries to get the latest data from Memory (other queries will get a combination of memory and C*) while the existing cluster works through the backlog.. Once the backlog is cleared we can just switch graphite back to the original NMT node(s).

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants