Skip to content

Commit

Permalink
Implement MQTT reconnection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
daft-panda committed Sep 25, 2023
1 parent acba64d commit f6d54aa
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
2 changes: 1 addition & 1 deletion ebusd-thermostat/src/ebusd.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::HeaterSettings;
use anyhow::bail;
use log::{debug, trace};
use log::debug;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

Expand Down
40 changes: 24 additions & 16 deletions ebusd-thermostat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,9 @@ impl Default for HeaterSettings {
pub struct Thermostat {
ebusd: Ebusd,
ha_api: Api,
mqtt_client: AsyncClient,
mqtt_eventloop: EventLoop,
mqtt_host: String,
mqtt_username: String,
mqtt_password: String,
thermometer_entity: String,
prefs: TemperaturePreferences,
active_mode: HeaterSettings,
Expand Down Expand Up @@ -188,18 +189,13 @@ impl Thermostat {
let mut ebusd = Ebusd::new(ebusd_address).await?;
ebusd.define_message( "wi,BAI,SetModeOverride,Betriebsart,,08,B510,00,hcmode,,UCH,,,,flowtempdesired,,D1C,,,,hwctempdesired,,D1C,,,,hwcflowtempdesired,,UCH,,,,setmode1,,UCH,,,,disablehc,,BI0,,,,disablehwctapping,,BI1,,,,disablehwcload,,BI2,,,,setmode2,,UCH,,,,remoteControlHcPump,,BI0,,,,releaseBackup,,BI1,,,,releaseCooling,,BI2".to_string()).await?;

let mut mqttoptions = MqttOptions::new("ebusd-thermostat", mqtt_host, 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(mqtt_username, mqtt_password);

let (client, eventloop) = AsyncClient::new(mqttoptions, 10);

Ok(Self {
ebusd,
ha_api: api,
thermometer_entity,
mqtt_client: client,
mqtt_eventloop: eventloop,
mqtt_host,
mqtt_username,
mqtt_password,
prefs: TemperaturePreferences::default(),
active_mode: HeaterSettings::default(),
last_mode_set_time: None,
Expand All @@ -208,11 +204,22 @@ impl Thermostat {
})
}

pub async fn run(&mut self) -> anyhow::Result<()> {
self.mqtt_client
async fn mqtt_reconnect(&self) -> anyhow::Result<(AsyncClient, EventLoop)> {
let mut mqttoptions = MqttOptions::new("ebusd-thermostat", self.mqtt_host.clone(), 1883);
mqttoptions.set_keep_alive(Duration::from_secs(5));
mqttoptions.set_credentials(self.mqtt_username.clone(), self.mqtt_password.clone());

let (client, eventloop) = AsyncClient::new(mqttoptions, 10);

client
.subscribe("ebusd-thermostat/#", QoS::AtLeastOnce)
.await
.unwrap();
.await?;

Ok((client, eventloop))
}

pub async fn run(&mut self) -> anyhow::Result<()> {
let (mut client, mut mqtt_eventloop) = self.mqtt_reconnect().await?;

let mut temp_rx = self.temperature_changes().await?;

Expand All @@ -231,13 +238,14 @@ impl Thermostat {
}

select! {
event = self.mqtt_eventloop.poll() => {
event = mqtt_eventloop.poll() => {
match event {
Ok(ev) => {
self.handle_mqtt_message(ev).await?;
}
Err(e) => {
error!("MQTT error: {:?}", e);
(client, mqtt_eventloop) = self.mqtt_reconnect().await?;
}
}
}
Expand All @@ -246,7 +254,7 @@ impl Thermostat {
continue;
}
let temp = temp.unwrap();
self.mqtt_client.publish("ebusd-thermostat/temp", QoS::AtLeastOnce, true, format!("{}", temp)).await?;
client.publish("ebusd-thermostat/temp", QoS::AtLeastOnce, true, format!("{}", temp)).await?;
debug!("Published MQTT update: {}", temp);

if temp == self.current_temperature {
Expand Down

0 comments on commit f6d54aa

Please sign in to comment.