Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: buffer Framed<.., Codec> packets #826

Closed
wants to merge 20 commits into from
Closed

feat: buffer Framed<.., Codec> packets #826

wants to merge 20 commits into from

Conversation

de-sh
Copy link
Contributor

@de-sh de-sh commented Mar 20, 2024

resolves #810 on top of #825

Defaults to buffering 10 packets before each flush, behavior can be changed with MqttOptions.set_max_request_batch(). Instantly flushes outgoing PingReq as well(required to not close the connection).

Type of change

Checklist:

  • Formatted with cargo fmt
  • Make an entry to CHANGELOG.md if it's relevant to the users of the library. If it's not relevant mention why.

@de-sh de-sh changed the title feat: buffer packets feat: buffer Framed<.., Codec> packets Mar 20, 2024
Copy link
Contributor

@flxo flxo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update!

I don't understand why there's homebrewn counting of the outgoing packets. The counter also includes packets generated in response to incoming packets, so the name of the configuration is misleading.

The manual count + flush also works against the capacity of Framed (which defaults here). Wouldn't it be better to leave the need to a implicit flush to Framed? Flushing must still happen after each batch of incoming packets and requests.

I totally agree with wrapping of SinkExt::flush with the timeout.

My suggestion is:

  • rename max_request_batch to io_buffer_capacity. Apply that when Framed is constructed.
  • Remove Network::write and use SinkExt::feed instead (or write a wrapper if you wan't to hide theSink)
  • Keep Network::flush with the timeout feature.

cheers

@flxo

@@ -185,7 +185,7 @@ impl EventLoop {
o = network.read() => {
let incoming = o?;
if let Some(packet) = self.state.handle_incoming_packet(incoming)? {
network.send(packet).await?;
network.write(packet).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This packet is never flushed unless there's a ping or options.max_request_batch - 1 more packets "sent".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had the same doubt, was using tests as a crutch, but I see how even the tests aren't complete.

What type of acks should we instantly respond to, I guess only PingResp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so deep in the MQTT standard but think it's just PingResp.

@de-sh
Copy link
Contributor Author

de-sh commented Mar 21, 2024

Wouldn't it be better to leave the need to a implicit flush to Framed?

How does Framed do implicit flushing again? just calling. I have validated that using framed.feed() doesn't flush on breaching buffer, what could I be doing wrong?

Remove Network::write and use SinkExt::feed instead (or write a wrapper if you wan't to hide theSink)

I would like to keep write and wrap the internal workings for the time being, mainly because this isn't directly exposed to the user and readability of code is important for maintenance.

Copy link
Contributor

@flxo flxo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. My suggestion is to move the "when to flush" logic to EventLoop. See comments.

@@ -185,7 +185,7 @@ impl EventLoop {
o = network.read() => {
let incoming = o?;
if let Some(packet) = self.state.handle_incoming_packet(incoming)? {
network.send(packet).await?;
network.write(packet).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so deep in the MQTT standard but think it's just PingResp.

.await
.map_err(StateError::Deserialization)?;

if should_flush || self.framed.write_buffer().len() + packet_size >= self.buffer_capacity {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FramedWrite does that job for you here. I think removing the manual buffer size check is good and avoid not necessary complexity here.

I'd also do the "when to flush" logic in EventLoop and not in here because it must reflect MQTT and the IO part should be agnostic.

Add a loop (up to e.g 10) around the select! that breaks if pending or request_rx is empty and the rx buffer of Framed is empty after a fill_buf. There is no need for extra logic in Network.

like this:

for _ in 0..10 {
  select! {
    r = next_request() => {
       packet = state.process(r)
       network.feed(packet)
    }
    p = network.read() => {
       packet = state.handle_incoming(p)
       network.feed(packet)
       if matches(packet, PingResp) {
           // Ensure the flush by breaking.
           break;
       }
    }
    _ = ping_interval.tick() => {
       packet = PingReq
       network.feed(packet)
    }
  }

  let data_pending = network.fill_rx_buf(); // get inner of framed; get read_buffer_mut, fill it.

  // Break if there is nothing to do. 
  if pending.is_empty() && requests_rx.is_empty() && !data_pending() {
    break;
  }
}
network.flush()

pub async fn write(&mut self, packet: Packet) -> Result<(), StateError> {
let packet_size = packet.size();
let should_flush = match packet {
Packet::Connect(..) | Packet::PingReq(_) | Packet::PingResp(_) => true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Base automatically changed from codec to main March 25, 2024 10:02
@de-sh
Copy link
Contributor Author

de-sh commented Mar 25, 2024

closed in favor of #823

@de-sh de-sh closed this Mar 25, 2024
@de-sh de-sh deleted the buffers branch July 31, 2024 06:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

rumqttc: Outgoing publications are sent one by one
2 participants