From f6d54aa78a8ee75fe4896bf1fc9cebf6aef13ea4 Mon Sep 17 00:00:00 2001 From: Wannes De Smet Date: Mon, 25 Sep 2023 19:39:33 +0200 Subject: [PATCH] Implement MQTT reconnection logic --- ebusd-thermostat/src/ebusd.rs | 2 +- ebusd-thermostat/src/main.rs | 40 +++++++++++++++++++++-------------- 2 files changed, 25 insertions(+), 17 deletions(-) diff --git a/ebusd-thermostat/src/ebusd.rs b/ebusd-thermostat/src/ebusd.rs index 3804057..4b8a146 100644 --- a/ebusd-thermostat/src/ebusd.rs +++ b/ebusd-thermostat/src/ebusd.rs @@ -1,6 +1,6 @@ use crate::HeaterSettings; use anyhow::bail; -use log::{debug, trace}; +use log::debug; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; diff --git a/ebusd-thermostat/src/main.rs b/ebusd-thermostat/src/main.rs index 9da26b2..f7501c0 100644 --- a/ebusd-thermostat/src/main.rs +++ b/ebusd-thermostat/src/main.rs @@ -145,8 +145,9 @@ impl Default for HeaterSettings { pub struct Thermostat { ebusd: Ebusd, ha_api: Api, - mqtt_client: AsyncClient, - mqtt_eventloop: EventLoop, + mqtt_host: String, + mqtt_username: String, + mqtt_password: String, thermometer_entity: String, prefs: TemperaturePreferences, active_mode: HeaterSettings, @@ -188,18 +189,13 @@ impl Thermostat { let mut ebusd = Ebusd::new(ebusd_address).await?; ebusd.define_message( "wi,BAI,SetModeOverride,Betriebsart,,08,B510,00,hcmode,,UCH,,,,flowtempdesired,,D1C,,,,hwctempdesired,,D1C,,,,hwcflowtempdesired,,UCH,,,,setmode1,,UCH,,,,disablehc,,BI0,,,,disablehwctapping,,BI1,,,,disablehwcload,,BI2,,,,setmode2,,UCH,,,,remoteControlHcPump,,BI0,,,,releaseBackup,,BI1,,,,releaseCooling,,BI2".to_string()).await?; - let mut mqttoptions = MqttOptions::new("ebusd-thermostat", mqtt_host, 1883); - mqttoptions.set_keep_alive(Duration::from_secs(5)); - mqttoptions.set_credentials(mqtt_username, mqtt_password); - - let (client, eventloop) = AsyncClient::new(mqttoptions, 10); - Ok(Self { ebusd, ha_api: api, thermometer_entity, - mqtt_client: client, - mqtt_eventloop: eventloop, + mqtt_host, + mqtt_username, + mqtt_password, prefs: TemperaturePreferences::default(), active_mode: HeaterSettings::default(), last_mode_set_time: None, @@ -208,11 +204,22 @@ impl Thermostat { }) } - pub async fn run(&mut self) -> anyhow::Result<()> { - self.mqtt_client + async fn mqtt_reconnect(&self) -> anyhow::Result<(AsyncClient, EventLoop)> { + let mut mqttoptions = MqttOptions::new("ebusd-thermostat", self.mqtt_host.clone(), 1883); + mqttoptions.set_keep_alive(Duration::from_secs(5)); + mqttoptions.set_credentials(self.mqtt_username.clone(), self.mqtt_password.clone()); + + let (client, eventloop) = AsyncClient::new(mqttoptions, 10); + + client .subscribe("ebusd-thermostat/#", QoS::AtLeastOnce) - .await - .unwrap(); + .await?; + + Ok((client, eventloop)) + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + let (mut client, mut mqtt_eventloop) = self.mqtt_reconnect().await?; let mut temp_rx = self.temperature_changes().await?; @@ -231,13 +238,14 @@ impl Thermostat { } select! { - event = self.mqtt_eventloop.poll() => { + event = mqtt_eventloop.poll() => { match event { Ok(ev) => { self.handle_mqtt_message(ev).await?; } Err(e) => { error!("MQTT error: {:?}", e); + (client, mqtt_eventloop) = self.mqtt_reconnect().await?; } } } @@ -246,7 +254,7 @@ impl Thermostat { continue; } let temp = temp.unwrap(); - self.mqtt_client.publish("ebusd-thermostat/temp", QoS::AtLeastOnce, true, format!("{}", temp)).await?; + client.publish("ebusd-thermostat/temp", QoS::AtLeastOnce, true, format!("{}", temp)).await?; debug!("Published MQTT update: {}", temp); if temp == self.current_temperature {