Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

esp_mqtt_client_enqueue() can block the caller in unreliable connection cases (IDFGH-7853) #230

Closed
michaelgaehwiler opened this issue Jul 18, 2022 · 19 comments

Comments

@michaelgaehwiler
Copy link

Hi all,

Finding

esp_mqtt_client_enqueue() can block for the MQTT network timeout time.

Expectations

  • esp_mqtt_client_enqueue() never blocks, in no circumstance (or at least blocks only for very short amount of time in the range of milliseconds).
  • The blocking nature of esp_mqtt_client_enqueue() is documented

Long Version

From the documentation (... could be used as a non blocking version of esp_mqtt_client_publish()), I have the expectation, that esp_mqtt_client_enqueue() never blocks (or at least it blocks only very short in the range of milliseconds). I wanted to use esp_mqtt_client_enqueue() to avoid creating an additional MQTT task (for saving resources), as ESP-MQTT is creating a task already. But the function can block for the time used for connection timeout and I'm using the watchdog with a way shorter timeout for the caller task.

The happy path works great, no problem. I came across the bug when doing some smoke tests, as I must expect an unreliable internet connection and I must also expect an unreliable MQTT server. If the MQTT server is no more available (e.g. unplug the LAN port of the server to simulate an ungraceful shutdown), the network communication hangs for the timeout (which is by default 10 seconds). If I call esp_mqtt_client_enqueue() in this time, the function will block the caller until the network connection timeout is over.

The same happens if the MQTT server is not available from the start of the ESP32 program (like pointing the client to e.g. ws://8.8.8.8:80) and calling esp_mqtt_client_enqueue() regardless of the connection state of ESP-MQTT, the caller will be blocked until the network connection timeout triggers.

I came across this problem, because my device rebooted, because of a trigger of the watchdog. The caller of esp_mqtt_client_enqueue() is a task with default watchdog of 5 seconds. It handles also the GUI, which now is from time to time unresponsive. I know, I could create a separate FreeRTOS task as a workaround for this case, but it is ridiculous to have an additional task when ESP-MQTT uses already a dedicated task...

Details

I'm sorry to not provide an example that reproduces the problem, as it would need a system of multiple hosts and manually unplugging the LAN connection on the server.

The dependency to network timeouts arise in the use of MQTT_API_LOCK() / MQTT_API_UNLOCK(), as the same lock is used also in the function esp_mqtt_task() in which the lock can be acquired with afterwards calling blocking functions.

From my understanding, the lock should be acquired only very shortly and never while calling blocking functions. Or if that is needed, the lock for esp_mqtt_task() should be separated from the lock used by esp_mqtt_client_enqueue(). Or a buffer is used, which never blocks the writer for putting messages in the queue.

Question

Can you confirm this bug? Do you need any additional information?

Thanks in advance

@github-actions github-actions bot changed the title esp_mqtt_client_enqueue() can block the caller in unreliable connection cases esp_mqtt_client_enqueue() can block the caller in unreliable connection cases (IDFGH-7853) Jul 18, 2022
@someburner
Copy link

Just a suggestion- as this is what I plan to do- couldn't you just build without API locks and just ensure you only access the API from a single task?

I agree though- it seems that a different mutex could be used for enqueue/dequeue messages when locks are enabled.

@michaelgaehwiler
Copy link
Author

@someburner Good point, but I think this option won't work. Why: There are always two tasks involved: 1) the "user task" which enqueues the messages and 2) the ESP-MQTT task which dequeues the messages. The MQTT-ESP task is spawn automatically – no option to disable it. Therefore, disabling the lock would imply, that the underneath used STAILQ queue is thread safe, which is not mentioned in the manpage (man 3 queue). Correct me if I'm wrong and STAILQ is thread safe. BTW: STAILQ is used in mqtt_outbox.c.

@someburner
Copy link

someburner commented Jul 21, 2022

@michaelgaehwiler

Yeah I don't think STAILQ is thread-safe. I suppose you could assume that if the esp-mqtt task is blocking due to not being connected, then it's probably not accessing those queues, but thats not really safe.

I think my proposed solution of a separate lock for enqueue/dequeue would work in theory, but looking at the esp-mqtt codebase, the API_LOCK is on the entire client struct and is required as part of formatting the message before it even goes into the outbox, so that's not really an option without a major overhaul. It would probably just be better if esp-mqtt could release the mutex while it's waiting to reconnect.

However the MQTT_DISABLE_API_LOCKS option help says It is possible to disable these locks if the user code doesn't access MQTT API from multiple concurrent tasks. To me that reads like it should be okay to use a single task to interact with the available API methods? But I agree it seems like with MQTT running in it's own task, it may not be safe to enqueue from another task. Can @david-cermak comment on this?

If it'd be possible to use a counting or binary semaphore for the API_LOCK then you could implement your own queue and then call uxSemaphoreGetCount to see if it's safe to dequeue from your queue and call the publish API. Or similarly do your own queue and only dequeue and publish when MQTT is connected. But both of those seem ugly.

FYI:

I gave this a try myself and do not get a reboot, but it does seem like the calling task stalls for 5-10 seconds, and after the stall I get this log:

W (303792) TRANSPORT_BASE: Poll timeout or error, errno=Success, fd=54, timeout_ms=10000
E (303792) MQTT_CLIENT: Writing didn't complete in specified timeout: errno=0

It only stalls once. After a little bit esp_transport_write returns. And it only stalls the calling task. So you could just a queue for all the data you want to send, and another task to send that data. Then that task will stall but the rest of your application wont. Like this:

typedef struct {
    char * payload;
    int len;
    int qos;
} MSG_T;

static QueueHandle_t mqtt_tx_queue;

void mqtt_tx_task(void *arg)
{
    MSG_T * m;
    while(1) {
        if(xQueueReceive(mqtt_tx_queue, &m, portMAX_DELAY) == pdTRUE) {
            int msg_id = esp_mqtt_client_publish(mqtt_client, "your/topic", m->payload, m->len, m->qos, 0);
            if(m->payload) free(m->payload);
            free((void*)(m));
            ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id);
        }
    }
}

bool your_publish_task(MSG_T * m)
{
    if(!m) return false;
    if(xQueueSend(mqtt_tx_queue, &m, portMAX_DELAY) != pdPASS) {
        free((void*)(m));
        return false;
    }
    return true;
}

void main() {
    mqtt_tx_queue = xQueueCreate(32, sizeof(MSG_T *));

    BaseType_t xReturned = xTaskCreatePinnedToCore(
        mqtt_tx_task, "mqtt_tx", MQTT_TX_TASK_STACK_SIZE, NULL, MQTT_TX_TASK_PRIORITY, NULL, MQTT_TX_TASK_CORE);

    while(1) {
        vTaskDelay( 2000 / portTICK_PERIOD_MS );

        MSG_T * m = (MSG_T *)malloc(sizeof(MSG_T));
        m->payload = (char *)malloc(5);
        memcpy(m->payload, "test", 5);
        m->len = 5;
        m->qos = 0;
        your_publish_task(m);
    }
}

Alternatively you can set MQTT_NETWORK_TIMEOUT_MS in esp-mqtt/lib/include/mqtt_config.h to be lower. Although in my experience going too much lower will cause problems for certain bad connections.

@michaelgaehwiler
Copy link
Author

@someburner
Yes, I fully agree and I see the same challenges. And yes, I have also thought about the workaround creating an additional task to separate the blocking nature of my watchdog-watched user task. But as already mentioned in my initial issue description, it's ridiculous to spawn another task when ESP-MQTT already spawns one – especially if the API is blocking resp. STAILQ is not thread-safe. Nevertheless, as we will only use official releases of ESP-IDF, I have to implement this workaround in our software for the time being. Thanks for you opinion and suggestions.

I have made some more thoughts on what could be made better. The idea is to optionally externalise the task for ESP-MQTT. Going this path would allow to have one task that can synchronize all. If the (to be created) Kconfig option is enabled (default disabled for backwards compatibility), the user must take care of calling the processing part of esp_mqtt_task() (which has to be made available externally) – to be concrete, the function must be split into multiple functions, with the while-loop in the external task. The user can then call the enqueue function at the right time (outside of the nowadays esp_mqtt_task()), fully synchronized. I think that shouldn't be much work and would improve flexibility, even to the extend of using ESP-MQTT in the main task, without any additional-to-main-task internal or external task. Users that don't want to care much go with the internal task and accept blocking, and users that cannot accept blockings can implement the synchronization as far es needed in the way it's working for them.

@someburner
Have you any feedback on this idea?

@david-cermak
What do you think of this approach? If you agree on this approach, I could prepare a PR.

Thanks a lot

@someburner
Copy link

@michaelgaehwiler - Honestly to me that sounds like a lot more work on your part, but yes there could be a kconfig option to provide your own task method, like how there is the option to provide your own outbox implementation.

I was going to say that just using an extra queue is enough for my purposes but actually the issue is more insidious for me too I think. In my case I have qos1 messages that I must send only one at a time and to do so, I have to check the number of qos1 messages in the outbox. To make that safe I obviously need API locks, but in that case would lead to the same stalling in another thread. However I think in that case it would be better for the mqtt_task to signal to my other task when the outbox has no qos1 items.

I'm not constrained to use the official SDK and I just copy esp-mqtt into my own project. But if I was this would be a 2nd case of needing a custom task.

@david-cermak
Copy link
Collaborator

Hi @michaelgaehwiler and @someburner

Thanks for reporting this problem and elaborating on a potential solution.
I think the two options (separate locks, external mqtt task) you suggested make sense and would fix the issue. The cleaner fix IMO would be using special locks for outbox. Moreover, making the current mqtt task more adjustable for users and adding own actions seem like a bright idea, too, as I could think of may other possible use-cases.
Both these approaches would however need non-trivial changes to the library.

Also, thanks for the idea of disabling the API-locks! Maybe this could be used as a workaround to have the internal event loop handle also user events and (in the context of the mqtt thread) call the esp_mqtt_client_enqueue().
I've tried to add the support for the user defined events in (very WIP):
WIP-support-user-events.patch.txt

Then we'd be able to post requests to publish from user threads:

    char *data;
    asprintf(&data, "data %d", i);
    esp_mqtt_event_t event = { .event_id = MQTT_USER_EVENT, .client = client, .topic = "/topic/qos0", .data = data, .qos = 0};
    esp_err_t ret = esp_mqtt_dispatch_custom_event(client, MQTT_USER_EVENT, &event);
    ESP_LOGI(TAG, "[APP] Posted event with ret-code %d", ret);

and process it directly in the event handlers:

static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
{
    ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id);
    esp_mqtt_event_handle_t event = event_data;
    esp_mqtt_client_handle_t client = event->client;
    int msg_id;
    switch ((esp_mqtt_event_id_t)event_id) {
    case MQTT_EVENT_CONNECTED:
   ....
   ....
    case MQTT_USER_EVENT:
        ESP_LOGI(TAG, "MQTT_USER_EVENT");
        esp_err_t ret = esp_mqtt_client_enqueue(client, event->topic, event->data, 0, event->qos,0,1); // or publish
        free(event->data);
        ESP_LOGD(TAG, "[APP] Enqueued posted data with ret-code %d", ret);
        break;

Note that this should work with MQTT_DISABLE_API_LOCKS both on and off.

@michaelgaehwiler
Copy link
Author

Hi @david-cermak

Thank you for the feedback. I have no experience with event loops, but if I understand correctly, mqtt_event_handler() would run in the context of the ESP-MQTT internal task and thus running synchronized. Sounds like a good approach that would solve my problem. Thanks!

How to proceed? Will you take care of finalizing it or what would you expect from me?

@michaelgaehwiler
Copy link
Author

Hi @david-cermak again,

I just have thought about how I would integrate it in my code. As I'm using C++ as much as possible and to have a more generic MQTT_USER_EVENT that could be used in the future also for other user-specific events, I would propose to replace the fields topic, data and qos by a void pointer, so that I can handle everything needed in mqtt_event_handler().

Why:
I create the topics and data dynamically, because I use a lot of different topics at very distinct points in time, in the end saving memory while it's not needed. Therefore I would like to pass a (C++) object as the event, which will handle the deletion of the dynamically allocated memory. If I would need at some time in the future to call something other than esp_mqtt_client_enqueue(), I just could add this case to the C++ object and handle it accordingly in mqtt_event_handler().

What do you think of that approach?

@david-cermak
Copy link
Collaborator

Hi @michaelgaehwiler

Thanks! Just wanted to hear the feedback to see if the proposed user-event would help and fix your issues.

About the handler's arguments, it uses the standard type, the same as other events:

typedef struct esp_mqtt_event_t {
esp_mqtt_event_id_t event_id; /*!< *MQTT* event type */
esp_mqtt_client_handle_t client; /*!< *MQTT* client handle for this event */
char *data; /*!< Data associated with this event */
int data_len; /*!< Length of the data for this event */
int total_data_len; /*!< Total length of the data (longer data are supplied
with multiple events) */
int current_data_offset; /*!< Actual offset for the data associated with this
event */
char *topic; /*!< Topic associated with this event */
int topic_len; /*!< Length of the topic for this event associated with this
event */
int msg_id; /*!< *MQTT* messaged id of message */
int session_present; /*!< *MQTT* session_present flag for connection event */
esp_mqtt_error_codes_t
*error_handle; /*!< esp-mqtt error handle including esp-tls errors as well
as internal *MQTT* errors */
bool retain; /*!< Retained flag of the message associated with this event */
int qos; /*!< QoS of the messages associated with this event */
bool dup; /*!< dup flag of the message associated with this event */
esp_mqtt_protocol_ver_t protocol_ver; /*!< MQTT protocol version used for connection, defaults to value from menuconfig*/
#ifdef CONFIG_MQTT_PROTOCOL_5
esp_mqtt5_event_property_t *property; /*!< MQTT 5 property associated with this event */
#endif
} esp_mqtt_event_t;

I think we could add another field void* user_ctx or send user events with different argument. Alternatively we could reuse the data pointer and describe the arbitrary meaning if MQTT_USER_EVENT posted here:

MQTT_EVENT_DATA, /*!< data event, additional context:
- msg_id message id
- topic pointer to the received topic
- topic_len length of the topic
- data pointer to the received data

(downside is that the data ptr is char* rather than void*)

PS: About C++

As I'm using C++ as much as possible

Have you seen IDF's C++ wrapper? https://github.com/espressif/esp-idf/blob/master/examples/cxx/experimental/esp_mqtt_cxx/tcp/main/mqtt_tcp_example.cpp

@someburner
Copy link

@david-cermak Just want to throw in my vote for separate outbox locks :) Seems like the main difficulty there is handling msg_id properly. What other pitfalls are there? Perhaps I should make a new issue for this feature?

For the time being I like the idea of custom user events that could be used to enqueue within the esp-mqtt task. I still need a way to ensure only 1 at a time QoS 1 publishes (and be able to check that before actually calling any enqueue methods), so I think I will manually hack in task signalling for that. But being able to lock/unlock the outbox specifically would be great in the future.

@david-cermak
Copy link
Collaborator

@someburner As said above, using separate outbox locks would be my preferred choice, so yes, please, create a new issue for this.

I still need a way to ensure only 1 at a time QoS 1 publishes (and be able to check that before actually calling any enqueue methods)

Could you please elaborate? What kind of signalling would you need? Are you trying to use QoS 1 messages for Qos 2 purpose?

@someburner
Copy link

@david-cermak Okay will make separate issue for that.

Could you please elaborate? What kind of signalling would you need? Are you trying to use QoS 1 messages for Qos 2 purpose?

Sure. Essentially I need to guarantee in-order delivery of QoS 1 messages, similar to setting the max_inflight_messages mosquitto option to 1 (but this is no good for me as it is not per-client on mosquitto). Using either QoS 1 or 2, it's possible to fire off multiple messages and get pub-acks at different times. Say your first QoS 1 message does not get a puback, but the 2nd does. In my case, the data being sent is time series data that must be inserted into a DB in the correct order, so if the first payload arrives after the 2nd payload, it will be discarded. The backend is ingesting a lot of data so it's impractical to try to keep track of this on the server side. The server is capable of identifying duplicates based on timestamps within the payload so QoS 2 is not required (in my experience it adds a lot of overhead and does not scale well, ideal to avoid).

Right now what I have done is add some methods like this:

int outbox_get_qos1_count(outbox_handle_t outbox)
{
    int count = 0;
    outbox_item_handle_t item;
    STAILQ_FOREACH(item, outbox, next) {
        if(item->msg_qos == 1) {
            count += 1;
        }
    }
    return count;
}

int esp_mqtt_client_get_outbox_qos1_count(esp_mqtt_client_handle_t client)
{
    int outbox_size = 1;

    if (client == NULL) {
        return 1;
    }

    MQTT_API_LOCK(client);

    if (client->outbox) {
        outbox_size = outbox_get_qos1_count(client->outbox);
    }

    MQTT_API_UNLOCK(client);

    return outbox_size;
}

Also inside outbox_delete_single_expired and outbox_delete_expired I added (item->msg_qos == 0) && checks since I don't want to expire qos1 messges.

I can then call esp_mqtt_client_get_outbox_qos1_count externally before enqueueing any more of this type of data. Once the outbox is cleared of any pending QoS1 messages, I know the correct PubAck has been seen and can queue the next set.

I understand this is a pretty specific use-case, although I do believe it is the correct approach since neither Qos 1 or 2 make any claims about order of delivery.

If esp-mqtt had a specific lock for outbox then I can continue with this approach. But as that might take a while to implement, I was thinking of using a signaling approach. E.g. You have user_task and esp_mqtt_task . user_task starts out assuming the outbox is empty. It then enqueues some Qos1 message and assumes the outbox is full, until it receives a notification from esp_mqtt_task. For this I was thinking xTaskNotify / ulTaskNotifyTake would be the fastest approach but a queue might be the simpler approach. So somewhere inside is_valid_mqtt_msg / outbox_delete I'll want to know if the in-flight Qos1 message has been cleared and signal to user_task that the next payload can be enqueued.

@michaelgaehwiler
Copy link
Author

@david-cermak Sorry for the delay until my response.

Thanks! Just wanted to hear the feedback to see if the proposed user-event would help and fix your issues.

Yes, it would be great, if that solution could be provided until #231 gets available.

I think we could add another field void* user_ctx or send user events with different argument. Alternatively we could reuse the data pointer and describe the arbitrary meaning if MQTT_USER_EVENT posted here:

You're right and yes, reusing the data pointer would suffice – no need for an additional void* user_ctx.

Have you seen IDF's C++ wrapper?

Thanks for the suggestion! I'll take a look on it.

@david-cermak
Copy link
Collaborator

@someburner Thanks for explaining! If I understand correctly, you're trying to implement some kind of streaming with mqtt messages. Would some platform which inherently supports steaming help in your use-case? For example, I was thinking (for some time already) about adding support for kafka client API. Do you think using apache kafka would be a solution?

I think you already thought about alternatives, but let me mention this anyway:
What If you keep track of the (in flight) message-id(s) and publish the next message only after getting the MQTT_EVENT_PUBLISHED event with the same message id (maybe useful to set incremental message-ids)

@david-cermak
Copy link
Collaborator

@michaelgaehwiler Thanks for the feedback. Yes, I think the user-event would be provided before implementing separate locking.

@someburner
Copy link

@david-cermak

We already have an architecture in place with thousands of ESP8266's, and that is basically what we do for that using edits to the taunpm library and a custom qos1 outbox implementation. I don't think we have the resources or want to switch to a different library/platform for ingesting data, although it is something we have thought about. I think we would run into the same issue using kafka.

Keeping track of in-flight message IDs does work, but I didn't investigate how the message IDs work in this library. E.g. does message ID survive through reconnect and re-send? I'm guessing it does, so that would probably be an option. I forgot that I just went with the "quick and dirty" way of just tracking outbox size. Perhaps message ID would work better, as I could add my own lock to just check for that specific message ID. However it's still a bit of a circular issue if the enqueue code is blocking and I can't obtain a message ID while the client is locked. But perhaps I could just assume I'm waiting for a message ID immediately after generating the payload.

@michaelgaehwiler
Copy link
Author

@david-cermak Thanks for the implementation! I've quickly made a code review of the changes in 97503cc, but from my understanding, the user event will be executed after MQTT_API_LOCK() was called, with what I couldn't use esp_mqtt_client_enqueue(), because latter would also call MQTT_API_LOCK(). I haven't tested it and perhaps I'm wrong, but it looks like that would result in a dead lock, wouldn't it?

@david-cermak
Copy link
Collaborator

Yes, the event is posted after locking the client, but that should not prevent from calling other client's API. Same as the default example publishes in the event handler:

https://github.com/espressif/esp-idf/blob/release/v4.4/examples/protocols/mqtt/ssl/main/app_main.c#L86

It's because the lock used here is a recursive mutex:

# define MQTT_API_LOCK(c) xSemaphoreTakeRecursive(c->api_lock, portMAX_DELAY)
# define MQTT_API_UNLOCK(c) xSemaphoreGiveRecursive(c->api_lock)

@michaelgaehwiler
Copy link
Author

Oh, I missed that – thanks for clarification!

espressif-bot pushed a commit to espressif/esp-idf that referenced this issue Dec 22, 2022
* Update submodule: git log --oneline ae53d799da294f03ef65c33e88fa33648e638134..fde00340f19b9f5ae81fff02ccfa9926f0e33687

Detailed description of the changes:
* Fix the default configuration for event queue
  - See merge request espressif/esp-mqtt!153
  - See commit espressif/esp-mqtt@fb42588
* Adds missing header.
  - See merge request espressif/esp-mqtt!152
  - See commit espressif/esp-mqtt@8a60057
* Moves state change when stopping the client
  - See merge request espressif/esp-mqtt!150
  - Closes espressif/esp-mqtt#239
  - See commit espressif/esp-mqtt@3738fcd
* Adds error code to MQTT_EVENT_SUBSCRIBED in case of failure
  - See merge request espressif/esp-mqtt!143
  - - Closes espressif/esp-mqtt#233
  - See commit espressif/esp-mqtt@9af5c26
* Adds debug information on sending dup messages
  - See merge request espressif/esp-mqtt!145
  - See commit espressif/esp-mqtt@47b3f9b
* ci: Fix qemu build
  - See merge request espressif/esp-mqtt!147
  - See commit espressif/esp-mqtt@68e8c4f
* ci: Build and Test QEMU on v5.0
  - See merge request espressif/esp-mqtt!142
  - See commit espressif/esp-mqtt@9db9ee7
* client: Add support for user events
  - See merge request espressif/esp-mqtt!140
  - Closes espressif/esp-mqtt#230
  - See commit espressif/esp-mqtt@97503cc
* Adds unregister event API
  - See merge request espressif/esp-mqtt!139
  - Closes #9194
  - See commit espressif/esp-mqtt@a9a9fe7
egnor pushed a commit to egnor/esp-mqtt that referenced this issue Dec 23, 2022
Also supporting configurable queue size for the internal event loop.

Closes espressif#230
egnor pushed a commit to egnor/esp-mqtt that referenced this issue Dec 23, 2022
Also supporting configurable queue size for the internal event loop.

Closes espressif#230
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants