Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: disconnect with properties #915

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions rumqttc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

* `size()` method on `Packet` calculates size once serialized.
* `read()` and `write()` methods on `Packet`.
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`.
* `Auth` packet as per MQTT5 standards
* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions`
* `size()` method on `Packet` calculates size once serialized;
* `read()` and `write()` methods on `Packet`;
* `ConnectionAborted` variant on `StateError` type to denote abrupt end to a connection;
* `set_session_expiry_interval` and `session_expiry_interval` methods on `MqttOptions`;
* `Auth` packet as per MQTT5 standards;
* Allow configuring the `nodelay` property of underlying TCP client with the `tcp_nodelay` field in `NetworkOptions`;
* `disconnect_with_properties` and `try_disconnect_with_properties` methods on `Client` and `AsyncClient`, allowing disconnection from the broker with custom properties and reason.

### Changed

Expand All @@ -23,7 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* use `Login` to store credentials
* Made `DisconnectProperties` struct public.
* Replace `Vec<Option<u16>>` with `FixedBitSet` for managing packet ids of released QoS 2 publishes and incoming QoS 2 publishes in `MqttState`.
* Accept `native_tls::TlsConnector` as input for `Transport::tls_with_config`.
* Accept `native_tls::TlsConnector` as input for `Transport::tls_with_config`;
* Updated `Request::Disconnect` to include a Disconnect struct.

### Deprecated

Expand Down
97 changes: 87 additions & 10 deletions rumqttc/src/v5/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use super::mqttbytes::v5::{
Unsubscribe, UnsubscribeProperties,
};
use super::mqttbytes::QoS;
use super::{ConnectionError, Event, EventLoop, MqttOptions, Request};
use super::{
ConnectionError, Disconnect, DisconnectProperties, DisconnectReasonCode, Event, EventLoop,
MqttOptions, Request,
};
use crate::{valid_filter, valid_topic};

use bytes::Bytes;
Expand Down Expand Up @@ -429,19 +432,68 @@ impl AsyncClient {
self.handle_try_unsubscribe(topic, None)
}

/// Sends a MQTT disconnect to the `EventLoop`
/// Sends a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection
pub async fn disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
.await
}

/// Sends a MQTT disconnect to the `EventLoop` with properties
pub async fn disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.handle_disconnect(reason, Some(properties)).await
}

// Handle disconnect interface which can have properties or not
async fn handle_disconnect(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Result<(), ClientError> {
let request = self.build_disconnect_request(reason, properties);
self.request_tx.send_async(request).await?;
Ok(())
}

/// Attempts to send a MQTT disconnect to the `EventLoop`
/// Attempts to send a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection
pub fn try_disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.handle_try_disconnect(DisconnectReasonCode::NormalDisconnection, None)
}

/// Sends a MQTT disconnect to the `EventLoop` with properties
pub fn try_disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.handle_try_disconnect(reason, Some(properties))
}

// Handle disconnect interface which can have properties or not
fn handle_try_disconnect(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Result<(), ClientError> {
let request = self.build_disconnect_request(reason, properties);
self.request_tx.try_send(request)?;
Ok(())
}

// Helper function to build disconnect request
fn build_disconnect_request(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Request {
match properties {
Some(p) => Request::Disconnect(Disconnect::new_with_properties(reason, p)),
None => Request::Disconnect(Disconnect::new(reason)),
}
}
}

fn get_ack_req(publish: &Publish) -> Option<Request> {
Expand Down Expand Up @@ -732,17 +784,42 @@ impl Client {
self.client.try_unsubscribe(topic)
}

/// Sends a MQTT disconnect to the `EventLoop`
/// Sends a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection
pub fn disconnect(&self) -> Result<(), ClientError> {
let request = Request::Disconnect;
self.handle_disconnect(DisconnectReasonCode::NormalDisconnection, None)
}

/// Sends a MQTT disconnect to the `EventLoop` with properties
pub fn disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.handle_disconnect(reason, Some(properties))
}

fn handle_disconnect(
&self,
reason: DisconnectReasonCode,
properties: Option<DisconnectProperties>,
) -> Result<(), ClientError> {
let request = self.client.build_disconnect_request(reason, properties);
self.client.request_tx.send(request)?;
Ok(())
}

/// Sends a MQTT disconnect to the `EventLoop`
/// Try to send a MQTT disconnect to the `EventLoop` with default DisconnectReasonCode::NormalDisconnection
pub fn try_disconnect(&self) -> Result<(), ClientError> {
self.client.try_disconnect()?;
Ok(())
self.client.try_disconnect()
}

/// Try to sends a MQTT disconnect to the `EventLoop` with properties
pub fn try_disconnect_with_properties(
&self,
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Result<(), ClientError> {
self.client.handle_try_disconnect(reason, Some(properties))
}
}

Expand Down
2 changes: 1 addition & 1 deletion rumqttc/src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum Request {
SubAck(SubAck),
Unsubscribe(Unsubscribe),
UnsubAck(UnsubAck),
Disconnect,
Disconnect(Disconnect),
}

impl From<Subscribe> for Request {
Expand Down
10 changes: 10 additions & 0 deletions rumqttc/src/v5/mqttbytes/v5/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,16 @@ impl Disconnect {
}
}

pub fn new_with_properties(
reason: DisconnectReasonCode,
properties: DisconnectProperties,
) -> Self {
Self {
reason_code: reason,
properties: Some(properties),
}
}

fn len(&self) -> usize {
if self.reason_code == DisconnectReasonCode::NormalDisconnection
&& self.properties.is_none()
Expand Down
36 changes: 24 additions & 12 deletions rumqttc/src/v5/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ impl MqttState {
Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?,
Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?,
Request::PingReq => self.outgoing_ping()?,
Request::Disconnect => {
self.outgoing_disconnect(DisconnectReasonCode::NormalDisconnection)?
}
Request::Disconnect(disconnect) => self.outgoing_disconnect(disconnect)?,
Request::PubAck(puback) => self.outgoing_puback(puback)?,
Request::PubRec(pubrec) => self.outgoing_pubrec(pubrec)?,
_ => unimplemented!(),
Expand Down Expand Up @@ -228,7 +226,8 @@ impl MqttState {

pub fn handle_protocol_error(&mut self) -> Result<Option<Packet>, StateError> {
// send DISCONNECT packet with REASON_CODE 0x82
self.outgoing_disconnect(DisconnectReasonCode::ProtocolError)
let disconnect = Disconnect::new(DisconnectReasonCode::ProtocolError);
self.outgoing_disconnect(disconnect)
}

fn handle_incoming_suback(
Expand All @@ -242,7 +241,7 @@ impl MqttState {
}
_ => {
warn!("SubAck Pkid = {:?}, Reason = {:?}", suback.pkid, reason);
},
}
}
}
Ok(None)
Expand Down Expand Up @@ -364,7 +363,10 @@ impl MqttState {
if puback.reason != PubAckReason::Success
&& puback.reason != PubAckReason::NoMatchingSubscribers
{
warn!("PubAck Pkid = {:?}, reason: {:?}", puback.pkid, puback.reason);
warn!(
"PubAck Pkid = {:?}, reason: {:?}",
puback.pkid, puback.reason
);
return Ok(None);
}

Expand Down Expand Up @@ -397,7 +399,10 @@ impl MqttState {
if pubrec.reason != PubRecReason::Success
&& pubrec.reason != PubRecReason::NoMatchingSubscribers
{
warn!("PubRec Pkid = {:?}, reason: {:?}", pubrec.pkid, pubrec.reason);
warn!(
"PubRec Pkid = {:?}, reason: {:?}",
pubrec.pkid, pubrec.reason
);
return Ok(None);
}

Expand All @@ -417,7 +422,10 @@ impl MqttState {
self.incoming_pub.set(pubrel.pkid as usize, false);

if pubrel.reason != PubRelReason::Success {
warn!("PubRel Pkid = {:?}, reason: {:?}", pubrel.pkid, pubrel.reason);
warn!(
"PubRel Pkid = {:?}, reason: {:?}",
pubrel.pkid, pubrel.reason
);
return Ok(None);
}

Expand All @@ -444,7 +452,10 @@ impl MqttState {
self.outgoing_rel.set(pubcomp.pkid as usize, false);

if pubcomp.reason != PubCompReason::Success {
warn!("PubComp Pkid = {:?}, reason: {:?}", pubcomp.pkid, pubcomp.reason);
warn!(
"PubComp Pkid = {:?}, reason: {:?}",
pubcomp.pkid, pubcomp.reason
);
return Ok(None);
}

Expand Down Expand Up @@ -614,15 +625,16 @@ impl MqttState {
Ok(Some(Packet::Unsubscribe(unsub)))
}

/// Send Disconnect packet to broker
fn outgoing_disconnect(
&mut self,
reason: DisconnectReasonCode,
disconnect: Disconnect,
) -> Result<Option<Packet>, StateError> {
debug!("Disconnect with {:?}", reason);
debug!("Disconnect with {:?}", disconnect.reason_code);
let event = Event::Outgoing(Outgoing::Disconnect);
self.events.push_back(event);

Ok(Some(Packet::Disconnect(Disconnect::new(reason))))
Ok(Some(Packet::Disconnect(disconnect)))
}

fn check_collision(&mut self, pkid: u16) -> Option<Publish> {
Expand Down