Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug: Index Workers lose connection to RMQ while queue still full #118

Open
artntek opened this issue Jul 23, 2024 · 8 comments
Open

Bug: Index Workers lose connection to RMQ while queue still full #118

artntek opened this issue Jul 23, 2024 · 8 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@artntek
Copy link
Collaborator

artntek commented Jul 23, 2024

When doing a reindex-all, the indexing runs well until it starts to process the resource maps, which take a long time. At this point, the RMQ channels get closed in a timeframe determined by the consumer_timeout (default 30 mins).

We're ACK-ing messages immediately, in an attempt to circumvent this issue, but we're still having problems, even with longer timeout settings. The problem is with messages being sent to indexers that are still working on the previous job - the next message cannot be delivered, and so times out.

For more details, see Metacat Issue 1932

Proposed fix is to use @jeanetteclark's solution from metadig engine: catch the resulting com.rabbitmq.client.AlreadyClosedException, and re-open the connections.

@artntek artntek self-assigned this Jul 23, 2024
@artntek artntek added this to the 3.1.0 milestone Jul 23, 2024
@artntek artntek added the bug Something isn't working label Jul 23, 2024
@mbjones
Copy link
Member

mbjones commented Jul 24, 2024

@artntek Some brief thoughts on this, hopefully they are useful to you.

You wrote:

The problem is with messages being sent to indexers that are still working on the previous job - the next message cannot be delivered, and so times out.

This seems like a design issue. Workers should always be able to respond to new messages coming in. If they block while processing, I think that is a problem. At a minimum, seems like they should always be able to say Yes, I've got it or No, go away to incoming messages. To elaborate...

In addition to actively managing AlreadyClosedException, which seems clearly needed, we might also consider: 1) why can't the worker accept more than one message, can't it be multi-threaded to read multiple delivered messages? 2) if it can be re-written to accept multiple, what is a good message limit to be set with [basic.qos](https://www.rabbitmq.com/docs/confirms#channel-qos-prefetch)?, and 3) if it can't be re-written to accept multiple, shouldn't we configure rmq to only post one message per worker until the worker is done with its prior message? maybe that last one is a side-effect of our premature ACK of the message, which tells the queue that the worker is ready for a new message?

In addition, if a worker can be written to accept multiple messages, and it hits its capacity limit, can't we use a [basic.reject](https://www.rabbitmq.com/docs/confirms#consumer-nacks-requeue) response to tell rmq to redeliver the message to another worker. Seems like this is what it is designed for. And it would let the worker indicate its really busy (e.g., processing a long resource map). This is the scenario in which having more workers helps a lot if some of them get tied up for long periods of time.

In summary, it strikes me that each worker should 1) be written to be processing the incoming messages in its main thread, 2) be set to accept a maximum number of messages via basic.qos (possibly 1), 3) start new threads to process the index task associated with each message, and then send a basic.ack for that message, and 4) use basic.reject to send back messages that can't be processed due to being too busy or resource constraints.

I think a sequence diagram showing this process would be really useful. Here's the analogous sequence diagram for MetaDIG.

@mbjones
Copy link
Member

mbjones commented Jul 24, 2024

Also, for a little commiseration, see this Reddit thread:

The issue with this is as soon as you ack a message, the server delivers the next one and starts the consumer timeout ticking! Even if the consumer hasn't read the message yet... So after 30 minutes the unread message times out and the connection is closed.

Maybe this is what is causing your AlreadyClosedException? Once you ack a message seems like you need to be prepared to basic.reject future messages. And keep basic.qos to a low number like 1. I wonder if, when you ack the message, you can also say I'm busy, don't send more messages till I say?

@jeanetteclark
Copy link

Even if the basic.qos is one, you still get the AlreadyClosedException with a pre-emptive ack because of exactly what that quote above says. I'm still looking for some kind of way to say "I'm busy" but I'm not sure that exists since I think that is really the purpose of the ack

@artntek
Copy link
Collaborator Author

artntek commented Jul 24, 2024

Thank you - great info. I will regroup and propose next steps.

In the meantime, the reindex finished on test.adc k8s; I'll post remaining findings from that run below (since we're drifting off-topic for the original title of Metacat Issue 1932, and the info is more relevant here.)

@artntek
Copy link
Collaborator Author

artntek commented Jul 24, 2024

After helpful discussions in the 7/23/24 dev meeting, and afterwards with @taojing2002 and @jeanetteclark, I tried the following:

The RMQ docs state:

Channels and Concurrency Considerations (Thread Safety)

Sharing Channel instances between threads should be avoided. Applications should be using a Channel per thread instead of sharing the same Channel across multiple threads.
[...] Concurrent publishing on a shared channel can result in incorrect frame interleaving on the wire, triggering a connection-level protocol exception and immediate connection closure by the broker. [...] Concurrent publishing on a shared channel is best avoided entirely, e.g. by using a channel per thread.

Indexer currently defaults to pool size of 5 threads per worker. I tried reducing the limit to 1 thread per worker, to see if that would stop the channel closures. Deployed 7/23 at 13:29. This change DID NOT resolve channel closure issue.

@artntek
Copy link
Collaborator Author

artntek commented Jul 24, 2024

Experimental worker-code changes, based on discussions with J&J:

  1. The worker sends an ACK before processing a message (since resource map processing can take up to 3 hours). I moved the ACK from the processing method, directly into the com.rabbitmq.client.DefaultConsumer#handleDelivery method called by the amqp client code

  2. I adopted @jeanetteclark's metadig solution of surrounding the ACK call with a try ... catch (AlreadyClosedException e) and re-opening the connection.

  3. I also implemented an override for the com.rabbitmq.client.DefaultConsumer#handleShutdownSignal method, to re-open the connection if it is closed.

Deployments

  1. 7/23 at 17:59: Deployed these changes across 25 workers, each still limited to 1 thread. Logs indicated the "reopen" code was working, but I the original 25 channels (1 per worker) eventually dwindled to only a few (but never zero)

  2. 7/23 at 18:19: Reduced number of workers to 1, still with only 1 thread, and left running overnight (to reduce [DEBUG] logging noise). Still connected and processing successfully on 7/24, but queue size was still > 16,000 messages

  3. 7/24 at 08:27: Increased number of workers back to 25, and assigned 5 threads per worker. Almost immediately, the queue was emptied of the remaining messages!

final 10 mins-ALL

@artntek
Copy link
Collaborator Author

artntek commented Jul 24, 2024

Notes from our 7/24/24 meeting

(Attendees: @artntek, @mbjones, @jeanetteclark, @taojing2002)

Problem Statement

rmq will always try to "pre-fetch" additional messages for the worker to process, even if it's busy. The limit can be set using Channel#basicQos, but this cannot be set any lower than 1 (a value of 0 means "unlimited"). So, irrespective of how many threads the worker runs, if all its threads are busy with long-lived tasks, rmq will always pre-fetch at least 1 additional message. If none of the worker threads free up in time to process this message before the timeout expires, the channel is closed.

Proposed Solution

Have a non-blocking thread that immediately processes Consumer#handleDelivery requests, and decides whether to process the incoming message, or to explicitly reject it.

  1. If at least one worker thread is available, ack the message and process it in that thread.
  2. If no threads are available, wait for a short time1, before issuing a Channel#basicReject

Footnotes

  1. The "short time" needs some thought/experimentation... if we reject the message immediately, this increases churn - rmq will continue sending messages, thus using system resources and bandwidth unnecessarily. If we wait too long, the worker may be sitting idle, when it could be processing more messages. In the extreme case, if we wait until the chanel timeout limit, the channel will be closed

@artntek
Copy link
Collaborator Author

artntek commented Jul 24, 2024

We agreed that, short term, I will tidy up and release the fixes that I made in the experimental code yesterday (PR #119), since this successfully completed indexing and works for now. However, the next step is to do fix this the right way, per the description above.

Useful links:

@artntek artntek modified the milestones: 3.1.0, 3.2.0 Oct 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants