diff --git a/CHANGELOG.md b/CHANGELOG.md index 27b82c89a..92f4eac42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Unreleased +Fixed: + +- Connection timeout when UI is suspended on an offscreen workspace due to channel backpressure + # 2024.8 (2024-07-05) Added: diff --git a/data/src/stream.rs b/data/src/stream.rs index fb7bdc0af..2ea797c25 100644 --- a/data/src/stream.rs +++ b/data/src/stream.rs @@ -69,10 +69,21 @@ struct Stream { receiver: mpsc::Receiver, } -pub async fn run( +pub fn run( server: server::Entry, proxy: Option, - mut sender: mpsc::Sender, +) -> impl futures::Stream { + let (sender, receiver) = mpsc::unbounded(); + + let runner = stream::once(_run(server, proxy, sender)).map(|_| unreachable!()); + + stream::select(receiver, runner) +} + +async fn _run( + server: server::Entry, + proxy: Option, + sender: mpsc::UnboundedSender, ) -> Never { let server::Entry { server, config } = server; @@ -82,14 +93,12 @@ pub async fn run( let mut state = State::Disconnected { last_retry: None }; // Notify app of initial disconnected state - let _ = sender - .send(Update::Disconnected { - server: server.clone(), - is_initial, - error: None, - sent_time: Utc::now(), - }) - .await; + let _ = sender.unbounded_send(Update::Disconnected { + server: server.clone(), + is_initial, + error: None, + sent_time: Utc::now(), + }); loop { match &mut state { @@ -106,14 +115,12 @@ pub async fn run( Ok((stream, client)) => { log::info!("[{server}] connected"); - let _ = sender - .send(Update::Connected { - server: server.clone(), - client, - is_initial, - sent_time: Utc::now(), - }) - .await; + let _ = sender.unbounded_send(Update::Connected { + server: server.clone(), + client, + is_initial, + sent_time: Utc::now(), + }); is_initial = false; @@ -133,13 +140,11 @@ pub async fn run( log::warn!("[{server}] connection failed: {error}"); - let _ = sender - .send(Update::ConnectionFailed { - server: server.clone(), - error, - sent_time: Utc::now(), - }) - .await; + let _ = sender.unbounded_send(Update::ConnectionFailed { + server: server.clone(), + error, + sent_time: Utc::now(), + }); *last_retry = Some(Instant::now()); } @@ -185,14 +190,12 @@ pub async fn run( } proto::Command::ERROR(error) => { log::warn!("[{server}] disconnected: {error}"); - let _ = sender - .send(Update::Disconnected { - server: server.clone(), - is_initial, - error: Some(error), - sent_time: Utc::now(), - }) - .await; + let _ = sender.unbounded_send(Update::Disconnected { + server: server.clone(), + is_initial, + error: Some(error), + sent_time: Utc::now(), + }); state = State::Disconnected { last_retry: Some(Instant::now()), }; @@ -206,29 +209,26 @@ pub async fn run( } Input::IrcMessage(Err(e)) => { log::warn!("[{server}] disconnected: {e}"); - let _ = sender - .send(Update::Disconnected { - server: server.clone(), - is_initial, - error: Some(e.to_string()), - sent_time: Utc::now(), - }) - .await; + let _ = sender.unbounded_send(Update::Disconnected { + server: server.clone(), + is_initial, + error: Some(e.to_string()), + sent_time: Utc::now(), + }); state = State::Disconnected { last_retry: Some(Instant::now()), }; } Input::Batch(messages) => { let _ = sender - .send(Update::MessagesReceived(server.clone(), messages)) - .await; + .unbounded_send(Update::MessagesReceived(server.clone(), messages)); } Input::Send(message) => { if let Command::QUIT(reason) = &message.command { let reason = reason.clone(); let _ = stream.connection.send(message).await; - let _ = sender.send(Update::Quit(server.clone(), reason)).await; + let _ = sender.unbounded_send(Update::Quit(server.clone(), reason)); log::info!("[{server}] quit"); @@ -249,14 +249,12 @@ pub async fn run( } Input::PingTimeout => { log::warn!("[{server}] ping timeout"); - let _ = sender - .send(Update::Disconnected { - server: server.clone(), - is_initial, - error: Some("ping timeout".into()), - sent_time: Utc::now(), - }) - .await; + let _ = sender.unbounded_send(Update::Disconnected { + server: server.clone(), + is_initial, + error: Some("ping timeout".into()), + sent_time: Utc::now(), + }); state = State::Disconnected { last_retry: Some(Instant::now()), }; diff --git a/src/stream.rs b/src/stream.rs index c86076758..7934a5597 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -3,9 +3,5 @@ use data::{config, server}; use iced::{subscription, Subscription}; pub fn run(entry: server::Entry, proxy: Option) -> Subscription { - // Channel messages are batched every 50ms so channel size 10 ~= 500ms which - // app thread should more than easily keep up with - subscription::channel(entry.server.clone(), 10, move |sender| { - stream::run(entry, proxy, sender) - }) + subscription::run_with_id(entry.server.clone(), stream::run(entry, proxy)) }