diff --git a/Cargo.lock b/Cargo.lock index 43cee425237..7a348d0a595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1429,7 +1429,7 @@ dependencies = [ [[package]] name = "common" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "base64 0.20.0", @@ -2801,7 +2801,7 @@ dependencies = [ [[package]] name = "extensions" -version = "0.2.0" +version = "0.2.1" dependencies = [ "dioxus", "libloading 0.7.4", @@ -4246,7 +4246,7 @@ dependencies = [ [[package]] name = "icons" -version = "0.2.0" +version = "0.2.1" dependencies = [ "dioxus", "dioxus-html", @@ -4694,7 +4694,7 @@ dependencies = [ [[package]] name = "kit" -version = "0.2.0" +version = "0.2.1" dependencies = [ "arboard", "base64 0.20.0", @@ -9719,7 +9719,7 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "uplink" -version = "0.2.0" +version = "0.2.1" dependencies = [ "anyhow", "arboard", diff --git a/common/src/upload_file_channel.rs b/common/src/upload_file_channel.rs index ef04934b2bf..dc215088b45 100644 --- a/common/src/upload_file_channel.rs +++ b/common/src/upload_file_channel.rs @@ -11,7 +11,7 @@ pub enum UploadFileAction { Cancelling, UploadFiles(Vec), Uploading((String, String, String)), - Finishing, + Finishing(PathBuf, bool), Finished(T), Error, } diff --git a/common/src/warp_runner/manager/commands/constellation_commands.rs b/common/src/warp_runner/manager/commands/constellation_commands.rs index 07464ca6726..aa28e1cbc4e 100644 --- a/common/src/warp_runner/manager/commands/constellation_commands.rs +++ b/common/src/warp_runner/manager/commands/constellation_commands.rs @@ -1,13 +1,14 @@ use std::{ ffi::OsStr, - io::Read, + io::{Read, Write}, path::{Path, PathBuf}, process::{Command, Stdio}, + sync::mpsc, }; use derive_more::Display; -use futures::{channel::oneshot, StreamExt}; +use futures::{channel::oneshot, stream, StreamExt}; use humansize::{format_size, DECIMAL}; use once_cell::sync::Lazy; use tempfile::TempDir; @@ -25,7 +26,7 @@ use warp::{ directory::Directory, file::File, item::{FormatType, Item, ItemType}, - Progression, + ConstellationProgressStream, Progression, }, error::Error, logging::tracing::log, @@ -70,7 +71,7 @@ pub enum ConstellationCmd { DownloadFile { file_name: String, local_path_to_save_file: PathBuf, - rsp: oneshot::Sender>, + rsp: oneshot::Sender>, }, #[display(fmt = "DeleteItems {{ item: {item:?} }} ")] DeleteItems { @@ -370,7 +371,9 @@ async fn upload_files(warp_storage: &mut warp_storage, files_path: Vec) }; let max_size_ipfs = warp_storage.max_size(); - 'files_parth_loop: for file_path in files_path.clone() { + let (tx, rx) = mpsc::channel(); + + for file_path in files_path.clone() { let mut filename = match file_path .file_name() .map(|file| file.to_string_lossy().to_string()) @@ -419,155 +422,181 @@ async fn upload_files(warp_storage: &mut warp_storage, files_path: Vec) filename = rename_if_duplicate(current_directory.clone(), filename.clone(), file); match warp_storage.put(&filename, &local_path).await { - Ok(mut upload_progress) => { - let mut previous_percentage: usize = 0; - let mut upload_process_started = false; - - while let Some(upload_progress) = upload_progress.next().await { - match upload_progress { - Progression::CurrentProgress { - name, - current, - total, - } => { - log::trace!("starting upload file action listener"); - if let Ok(received_tx) = CANCEL_FILE_UPLOADLISTENER - .rx - .clone() - .lock() - .await - .try_recv() - { - if received_tx { - let _ = tx_upload_file.send(UploadFileAction::Cancelling); - continue 'files_parth_loop; - } - } - if !upload_process_started { - upload_process_started = true; - log::info!("Starting upload for {name}"); - log::info!("0% completed -> written 0 bytes") - }; - - if let Some(total) = total { - let current_percentage = - (((current as f64) / (total as f64)) * 100.) as usize; - if previous_percentage != current_percentage { - previous_percentage = current_percentage; - let readable_current = format_size(current, DECIMAL); - let percentage_number = - ((current as f64) / (total as f64)) * 100.; - let _ = tx_upload_file.send(UploadFileAction::Uploading(( - format!("{}%", percentage_number as usize), - get_local_text("files.uploading-file"), - filename.clone(), - ))); - log::info!( - "{}% completed -> written {readable_current}", - percentage_number as usize - ) - } - } - } - Progression::ProgressComplete { name, total } => { - let total = total.unwrap_or_default(); - let readable_total = format_size(total, DECIMAL); - let _ = tx_upload_file.send(UploadFileAction::Uploading(( - "100%".into(), - get_local_text("files.uploading-file"), - filename.clone(), - ))); - log::info!("{name} has been uploaded with {}", readable_total); - } - Progression::ProgressFailed { - name, - last_size, - error, - } => { - log::info!( - "{name} failed to upload at {} MB due to: {}", - last_size.unwrap_or_default(), - error.unwrap_or_default() - ); - let _ = tx_upload_file.send(UploadFileAction::Error); - continue 'files_parth_loop; - } + Ok(upload_progress) => { + // Handle each upload on another thread + let mut warp_storage = warp_storage.clone(); + let res = tx.clone(); + tokio::spawn(async move { + handle_upload_progress( + &mut warp_storage, + upload_progress, + filename, + file_path.clone(), + ) + .await; + let _ = res.send(file_path); + }); + } + Err(error) => log::error!("Error when upload file: {:?}", error), + } + } + let mut warp_storage = warp_storage.clone(); + // Spawn a listener for when all files finished uploading + // Listener should automatically finish once all senders are dropped (aka done) + tokio::spawn(async move { + loop { + if rx.recv().is_err() { + // Sender all dropped + break; + } + } + let ret = match get_items_from_current_directory(&mut warp_storage) { + Ok(r) => UploadFileAction::Finished(r), + Err(_) => UploadFileAction::Error, + }; + + let _ = tx_upload_file.send(ret); + }); +} + +async fn handle_upload_progress( + warp_storage: &mut warp_storage, + mut upload_progress: ConstellationProgressStream, + filename: String, + file_path: PathBuf, +) { + let tx_upload_file = UPLOAD_FILE_LISTENER.tx.clone(); + let mut previous_percentage: usize = 0; + let mut upload_process_started = false; + + while let Some(upload_progress) = upload_progress.next().await { + match upload_progress { + Progression::CurrentProgress { + name, + current, + total, + } => { + log::trace!("starting upload file action listener"); + if let Ok(received_tx) = CANCEL_FILE_UPLOADLISTENER + .rx + .clone() + .lock() + .await + .try_recv() + { + if received_tx { + let _ = tx_upload_file.send(UploadFileAction::Cancelling); + break; } } - + if !upload_process_started { + upload_process_started = true; + log::info!("Starting upload for {name}"); + log::info!("0% completed -> written 0 bytes") + }; + + if let Some(total) = total { + let current_percentage = (((current as f64) / (total as f64)) * 100.) as usize; + if previous_percentage != current_percentage { + previous_percentage = current_percentage; + let readable_current = format_size(current, DECIMAL); + let percentage_number = ((current as f64) / (total as f64)) * 100.; + let _ = tx_upload_file.send(UploadFileAction::Uploading(( + format!("{}%", percentage_number as usize), + get_local_text("files.uploading-file"), + filename.clone(), + ))); + log::info!( + "{}% completed -> written {readable_current}", + percentage_number as usize + ) + } + // ConstellationProgressStream only ends (atm) when all files in the queue are done uploading + // This causes pending file count to not be updated which is way we send a message here too + if current_percentage == 100 { + let _ = tx_upload_file + .send(UploadFileAction::Finishing(file_path.clone(), false)); + } + } + } + Progression::ProgressComplete { name, total } => { + let total = total.unwrap_or_default(); + let readable_total = format_size(total, DECIMAL); let _ = tx_upload_file.send(UploadFileAction::Uploading(( "100%".into(), - get_local_text("files.checking-thumbnail"), + get_local_text("files.uploading-file"), filename.clone(), ))); + log::info!("{name} has been uploaded with {}", readable_total); + } + Progression::ProgressFailed { + name, + last_size, + error, + } => { + log::info!( + "{name} failed to upload at {} MB due to: {}", + last_size.unwrap_or_default(), + error.unwrap_or_default() + ); + let _ = tx_upload_file.send(UploadFileAction::Error); + break; + } + } + } - let video_formats = VIDEO_FILE_EXTENSIONS.to_vec(); - let doc_formats = DOC_EXTENSIONS.to_vec(); + let _ = tx_upload_file.send(UploadFileAction::Uploading(( + "100%".into(), + get_local_text("files.checking-thumbnail"), + filename.clone(), + ))); - let file_extension = std::path::Path::new(&filename) - .extension() - .and_then(OsStr::to_str) - .map(|s| format!(".{s}")) - .unwrap_or_default(); + let video_formats = VIDEO_FILE_EXTENSIONS.to_vec(); + let doc_formats = DOC_EXTENSIONS.to_vec(); - if video_formats.iter().any(|f| f == &file_extension) { - match set_thumbnail_if_file_is_video( - warp_storage, - filename.clone(), - file_path.clone(), - ) - .await - { - Ok(_) => { - log::info!("Video Thumbnail uploaded"); - let _ = tx_upload_file.send(UploadFileAction::Uploading(( - "100%".into(), - get_local_text("files.thumbnail-uploaded"), - filename.clone(), - ))); - } - Err(error) => { - log::error!("Not possible to update thumbnail for video: {:?}", error); - } - }; - } + let file_extension = std::path::Path::new(&filename) + .extension() + .and_then(OsStr::to_str) + .map(|s| format!(".{s}")) + .unwrap_or_default(); - if doc_formats.iter().any(|f| f == &file_extension) { - match set_thumbnail_if_file_is_document( - warp_storage, - filename.clone(), - file_path.clone(), - ) - .await - { - Ok(_) => { - log::info!("Document Thumbnail uploaded"); - let _ = tx_upload_file.send(UploadFileAction::Uploading(( - "100%".into(), - get_local_text("files.thumbnail-uploaded"), - filename.clone(), - ))); - } - Err(error) => { - log::error!( - "Not possible to update thumbnail for document: {:?}", - error - ); - } - }; - } - let _ = tx_upload_file.send(UploadFileAction::Finishing); - log::info!("{:?} file uploaded!", filename); + if video_formats.iter().any(|f| f == &file_extension) { + match set_thumbnail_if_file_is_video(warp_storage, filename.clone(), file_path.clone()) + .await + { + Ok(_) => { + log::info!("Video Thumbnail uploaded"); + let _ = tx_upload_file.send(UploadFileAction::Uploading(( + "100%".into(), + get_local_text("files.thumbnail-uploaded"), + filename.clone(), + ))); } - Err(error) => log::error!("Error when upload file: {:?}", error), - } + Err(error) => { + log::error!("Not possible to update thumbnail for video: {:?}", error); + } + }; } - let ret = match get_items_from_current_directory(warp_storage) { - Ok(r) => UploadFileAction::Finished(r), - Err(_) => UploadFileAction::Error, - }; - let _ = tx_upload_file.send(ret); + if doc_formats.iter().any(|f| f == &file_extension) { + match set_thumbnail_if_file_is_document(warp_storage, filename.clone(), file_path.clone()) + .await + { + Ok(_) => { + log::info!("Document Thumbnail uploaded"); + let _ = tx_upload_file.send(UploadFileAction::Uploading(( + "100%".into(), + get_local_text("files.thumbnail-uploaded"), + filename.clone(), + ))); + } + Err(error) => { + log::error!("Not possible to update thumbnail for document: {:?}", error); + } + }; + } + let _ = tx_upload_file.send(UploadFileAction::Finishing(file_path, true)); + log::info!("{:?} file uploaded!", filename); } fn rename_if_duplicate( @@ -720,16 +749,50 @@ async fn set_thumbnail_if_file_is_document( .map_err(anyhow::Error::from)? } +#[allow(clippy::expect_fun_call)] async fn download_file( warp_storage: &warp_storage, file_name: String, local_path_to_save_file: PathBuf, -) -> Result<(), Error> { - warp_storage - .get(&file_name, &local_path_to_save_file.to_string_lossy()) - .await?; - log::info!("{file_name} downloaded"); - Ok(()) +) -> Result { + let size = warp_storage + .current_directory()? + .get_item_by_path(&file_name) + .map(|d| d.size()) + .unwrap_or_default(); + let stream = warp_storage.get_stream(&file_name).await?; + let path = local_path_to_save_file.clone(); + let mut file = std::fs::File::create(local_path_to_save_file) + .expect(&format!("Couldn't create file {:?}", path.as_os_str())); + let name = file_name.clone(); + let name2 = file_name.clone(); + let stream = stream + .map(move |v| match v { + Ok(data) => { + let _ = file.write(&data); + Progression::CurrentProgress { + name: file_name.clone(), + current: file + .metadata() + .map(|d| d.len() as usize) + .unwrap_or_default(), + total: Some(size), + } + } + Err(e) => Progression::ProgressFailed { + name: file_name.clone(), + last_size: file.metadata().map(|d| d.len() as usize).ok(), + error: Some(format!("{}", e)), + }, + }) + .chain(stream::once(async move { + Progression::ProgressComplete { + name, + total: path.metadata().map(|d| d.len() as usize).ok(), + } + })); + log::info!("{name2} downloaded"); + Ok(stream.boxed()) } pub fn thumbnail_to_base64(file: &File) -> String { diff --git a/common/src/warp_runner/manager/commands/raygun_commands.rs b/common/src/warp_runner/manager/commands/raygun_commands.rs index e82128cb9dd..7cd18f8dc04 100644 --- a/common/src/warp_runner/manager/commands/raygun_commands.rs +++ b/common/src/warp_runner/manager/commands/raygun_commands.rs @@ -1,6 +1,6 @@ use chrono::{DateTime, Utc}; use derive_more::Display; -use futures::{channel::oneshot, StreamExt}; +use futures::channel::oneshot; use std::{ collections::{HashMap, HashSet}, ops::Range, @@ -13,22 +13,21 @@ use warp::{ error::Error, logging::tracing::log, raygun::{ - self, AttachmentKind, ConversationType, GroupSettings, Location, PinState, ReactionState, + self, AttachmentEventStream, ConversationType, GroupSettings, Location, PinState, + ReactionState, }, }; use crate::{ - state::{chats, identity, pending_message::PendingMessage, Friends}, + state::{chats, identity, Friends}, warp_runner::{ conv_stream, ui_adapter::{ self, conversation_to_chat, dids_to_identity, fetch_messages2, fetch_messages_between, fetch_messages_from_chat, fetch_pinned_messages_from_chat, get_uninitialized_identity, - MessageEvent, }, - Account, FetchMessagesConfig, FetchMessagesResponse, Messaging, WarpEvent, + Account, FetchMessagesConfig, FetchMessagesResponse, Messaging, }, - WARP_EVENT_CH, }; #[allow(clippy::large_enum_variant)] @@ -106,16 +105,14 @@ pub enum RayGunCmd { conv_id: Uuid, msg: Vec, attachments: Vec, - appended_msg_id: Option, - rsp: oneshot::Sender>, + rsp: oneshot::Sender, warp::error::Error>>, }, #[display(fmt = "SendMessageForSeveralChats")] SendMessageForSeveralChats { convs_id: Vec, msg: Vec, attachments: Vec, - appended_msg_id: Option, - rsp: oneshot::Sender>, + rsp: oneshot::Sender, warp::error::Error>>, }, #[display(fmt = "EditMessage")] EditMessage { @@ -144,7 +141,7 @@ pub enum RayGunCmd { reply_to: Uuid, msg: Vec, attachments: Vec, - rsp: oneshot::Sender>, + rsp: oneshot::Sender, warp::error::Error>>, }, // removes all direct conversations involving the recipient #[display(fmt = "RemoveDirectConvs")] @@ -274,43 +271,17 @@ pub async fn handle_raygun_cmd( conv_id, msg, attachments, - appended_msg_id: ui_id, rsp, } => { let r = if attachments.is_empty() { - messaging.send(conv_id, msg).await + messaging.send(conv_id, msg).await.map(|_| None) } else { //TODO: Pass stream off to attachment events match messaging .attach(conv_id, None, attachments.clone(), msg.clone()) .await { - Ok(mut stream) => loop { - let msg_clone = msg.clone(); - //let attachment_clone = attachments.clone(); - if let Some(kind) = stream.next().await { - match kind { - AttachmentKind::Pending(result) => { - break result; - } - AttachmentKind::AttachedProgress(progress) => { - if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message( - MessageEvent::AttachmentProgress { - progress, - conversation_id: conv_id, - msg: PendingMessage::for_compare( - msg_clone, - &attachments, - ui_id, - ), - }, - )) { - log::error!("failed to send warp_event: {e}"); - } - } - } - } - }, + Ok(stream) => Result::Ok(Some(stream)), Err(e) => Err(e), } }; @@ -321,53 +292,27 @@ pub async fn handle_raygun_cmd( convs_id, msg, attachments, - appended_msg_id: ui_id, rsp, } => { + let mut streams = vec![]; for chat_id in convs_id { - let _ = if attachments.is_empty() { - messaging.send(chat_id, msg.clone()).await + if attachments.is_empty() { + let _ = messaging.send(chat_id, msg.clone()).await; } else { //TODO: Pass stream off to attachment events match messaging .attach(chat_id, None, attachments.clone(), msg.clone()) .await { - Ok(mut stream) => loop { - let msg_clone = msg.clone(); - //let attachment_clone = attachments.clone(); - if let Some(kind) = stream.next().await { - match kind { - AttachmentKind::Pending(result) => { - break result; - } - AttachmentKind::AttachedProgress(progress) => { - if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message( - MessageEvent::AttachmentProgress { - progress, - conversation_id: chat_id, - msg: PendingMessage::for_compare( - msg_clone, - &attachments, - ui_id, - ), - }, - )) { - log::error!("failed to send warp_event: {e}"); - } - } - } - } - }, + Ok(stream) => streams.push((chat_id, stream)), Err(e) => { log::error!("Raygun: Send files to several chats: {}", e); - Err(e) } } }; } - let _ = rsp.send(Ok(())); + let _ = rsp.send(Ok(streams)); } RayGunCmd::EditMessage { conv_id, @@ -406,18 +351,13 @@ pub async fn handle_raygun_cmd( rsp, } => { let r = if attachments.is_empty() { - messaging.reply(conv_id, reply_to, msg).await + messaging.reply(conv_id, reply_to, msg).await.map(|_| None) } else { - //TODO: Pass stream off to attachment events match messaging .attach(conv_id, Some(reply_to), attachments, msg) .await { - Ok(mut stream) => loop { - if let Some(AttachmentKind::Pending(result)) = stream.next().await { - break result; - } - }, + Ok(stream) => Result::Ok(Some(stream)), Err(e) => Err(e), } }; diff --git a/ui/src/components/friends/friends_list/mod.rs b/ui/src/components/friends/friends_list/mod.rs index a4f1854bbfd..bcc4fbb448d 100644 --- a/ui/src/components/friends/friends_list/mod.rs +++ b/ui/src/components/friends/friends_list/mod.rs @@ -411,12 +411,11 @@ pub fn ShareFriendsModal(cx: Scope) -> Element { let warp_cmd_tx = WARP_CMD_CH.tx.clone(); while let Some((id, uuid)) = rx.next().await { let msg = vec![id.to_string()]; - let (tx, rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel(); let cmd = RayGunCmd::SendMessageForSeveralChats { convs_id: uuid, msg, attachments: Vec::new(), - appended_msg_id: None, rsp: tx, }; if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(cmd)) { diff --git a/ui/src/layouts/chats/presentation/chatbar/coroutines.rs b/ui/src/layouts/chats/presentation/chatbar/coroutines.rs index 03dace2560f..6f4da1c9c24 100644 --- a/ui/src/layouts/chats/presentation/chatbar/coroutines.rs +++ b/ui/src/layouts/chats/presentation/chatbar/coroutines.rs @@ -13,8 +13,9 @@ use futures::{channel::oneshot, StreamExt}; use uuid::Uuid; use warp::raygun::{self, Location}; -use crate::layouts::chats::data::{ - self, ChatProps, MsgChInput, TypingInfo, DEFAULT_MESSAGES_TO_TAKE, +use crate::{ + layouts::chats::data::{self, ChatProps, MsgChInput, TypingInfo, DEFAULT_MESSAGES_TO_TAKE}, + utils::async_task_queue::chat_upload_stream_handler, }; use super::TypingIndicator; @@ -23,8 +24,9 @@ pub fn get_msg_ch( cx: &Scoped<'_, ChatProps>, state: &UseSharedState, ) -> Coroutine { + let upload_streams = chat_upload_stream_handler(cx); use_coroutine(cx, |mut rx: UnboundedReceiver| { - to_owned![state]; + to_owned![state, upload_streams]; async move { let warp_cmd_tx = WARP_CMD_CH.tx.clone(); while let Some(MsgChInput { @@ -34,7 +36,7 @@ pub fn get_msg_ch( replying_to, }) = rx.next().await { - let (tx, rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel(); let attachments = state .read() .get_active_chat() @@ -45,15 +47,14 @@ pub fn get_msg_ch( Some(reply_to) => RayGunCmd::Reply { conv_id, reply_to, - msg, + msg: msg.clone(), attachments, rsp: tx, }, None => RayGunCmd::SendMessage { conv_id, - msg, + msg: msg.clone(), attachments, - appended_msg_id, rsp: tx, }, }; @@ -89,14 +90,24 @@ pub fn get_msg_ch( } let rsp = rx.await.expect("command canceled"); - if let Err(e) = rsp { - log::error!("failed to send message: {}", e); - state.write().decrement_outgoing_messages( + match rsp { + Ok(Some(attachment)) => upload_streams.write().append(( conv_id, - msg_clone, - attachment_files, + msg, + attachments, appended_msg_id, - ); + attachment, + )), + Err(e) => { + log::error!("failed to send message: {}", e); + state.write().decrement_outgoing_messages( + conv_id, + msg_clone, + attachment_files, + appended_msg_id, + ) + } + _ => {} } } } diff --git a/ui/src/layouts/chats/presentation/messages/coroutines.rs b/ui/src/layouts/chats/presentation/messages/coroutines.rs index d0ae2501799..8a2a5d45132 100644 --- a/ui/src/layouts/chats/presentation/messages/coroutines.rs +++ b/ui/src/layouts/chats/presentation/messages/coroutines.rs @@ -19,7 +19,7 @@ use crate::{ data::{self, ChatBehavior, ChatData, JsMsg, ScrollBtn, DEFAULT_MESSAGES_TO_TAKE}, scripts, }, - utils::download::get_download_path, + utils::{async_task_queue::download_stream_handler, download::get_download_path}, }; use super::{DownloadTracker, MessagesCommand}; @@ -442,8 +442,9 @@ pub fn handle_warp_commands( state: &UseSharedState, pending_downloads: &UseSharedState, ) -> Coroutine { + let download_streams = download_stream_handler(cx); let ch = use_coroutine(cx, |mut rx: UnboundedReceiver| { - to_owned![state, pending_downloads]; + to_owned![state, pending_downloads, download_streams]; async move { let warp_cmd_tx = WARP_CMD_CH.tx.clone(); while let Some(cmd) = rx.next().await { @@ -536,22 +537,10 @@ pub fn handle_warp_commands( let res = rx.await.expect("command canceled"); match res { - Ok(mut stream) => { - while let Some(p) = stream.next().await { - log::debug!("{p:?}"); - } - state.write().mutate(Action::AddToastNotification( - ToastNotification::init( - "".into(), - get_local_text_with_args( - "files.download-success", - vec![("file", file.name())], - ), - None, - 2, - ), - )); - on_finish.await + Ok(stream) => { + download_streams + .write() + .append((stream, file.name(), on_finish)); } Err(e) => { state.write().mutate(Action::AddToastNotification( diff --git a/ui/src/layouts/chats/presentation/quick_profile/mod.rs b/ui/src/layouts/chats/presentation/quick_profile/mod.rs index ab830ee4d9a..164ffc79183 100644 --- a/ui/src/layouts/chats/presentation/quick_profile/mod.rs +++ b/ui/src/layouts/chats/presentation/quick_profile/mod.rs @@ -229,12 +229,11 @@ pub fn QuickProfileContext<'a>(cx: Scope<'a, QuickProfileProps<'a>>) -> Element< None => return, }; let msg_vec = msg.clone(); - let (tx, rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel(); let cmd = RayGunCmd::SendMessage { conv_id: c, msg, attachments: Vec::new(), - appended_msg_id: uuid, rsp: tx, }; if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(cmd)) { diff --git a/ui/src/layouts/log_in/copy_seed_words.rs b/ui/src/layouts/log_in/copy_seed_words.rs index ae362aa5cc2..066a71598bd 100644 --- a/ui/src/layouts/log_in/copy_seed_words.rs +++ b/ui/src/layouts/log_in/copy_seed_words.rs @@ -73,7 +73,7 @@ fn SeedWords(cx: Scope, page: UseState, words: Vec) -> Elemen }, span { aria_label: "seed-word-value-{((idx * 2) + 1).to_string()}", - class: "val", vals.get(0).cloned().unwrap_or_default() + class: "val", vals.first().cloned().unwrap_or_default() } }, div { diff --git a/ui/src/layouts/storage/files_layout/mod.rs b/ui/src/layouts/storage/files_layout/mod.rs index 3d64350c537..707d149f244 100644 --- a/ui/src/layouts/storage/files_layout/mod.rs +++ b/ui/src/layouts/storage/files_layout/mod.rs @@ -12,7 +12,7 @@ use common::WARP_CMD_CH; use dioxus::prelude::*; use dioxus_desktop::use_window; use dioxus_router::prelude::use_navigator; -use futures::channel::oneshot; +use futures::{channel::oneshot, StreamExt}; use kit::elements::label::Label; use kit::{ elements::{ @@ -36,6 +36,7 @@ use crate::layouts::storage::files_layout::file_modal::get_file_modal; use crate::layouts::storage::send_files_layout::modal::SendFilesLayoutModal; use crate::layouts::storage::send_files_layout::SendFilesStartLocation; use crate::layouts::storage::shared_component::{FilesAndFolders, FilesBreadcumbs}; +use crate::utils::async_task_queue::chat_upload_stream_handler; use crate::utils::clipboard::clipboard_data::get_files_path_from_clipboard; use dioxus_html::input_data::keyboard_types::Code; use dioxus_html::input_data::keyboard_types::Modifiers; @@ -104,6 +105,46 @@ pub fn FilesLayout(cx: Scope<'_>) -> Element<'_> { let tx_cancel_file_upload = CANCEL_FILE_UPLOADLISTENER.tx.clone(); + let upload_streams = chat_upload_stream_handler(cx); + let send_ch = use_coroutine( + cx, + |mut rx: UnboundedReceiver<(Vec, Vec)>| { + to_owned![upload_streams, send_files_from_storage]; + async move { + let warp_cmd_tx = WARP_CMD_CH.tx.clone(); + while let Some((files_location, convs_id)) = rx.next().await { + let (tx, rx) = oneshot::channel(); + let msg = vec!["".to_owned()]; + let attachments = files_location; + if let Err(e) = + warp_cmd_tx.send(WarpCmd::RayGun(RayGunCmd::SendMessageForSeveralChats { + convs_id, + msg, + attachments: attachments.clone(), + rsp: tx, + })) + { + log::error!("Failed to send warp command: {}", e); + return; + } + if let Ok(Ok(streams)) = rx.await { + let mut to_append = upload_streams.write(); + for (chat, stream) in streams { + to_append.append(( + chat, + vec!["".to_owned()], + attachments.clone(), + None, + stream, + )) + } + } + send_files_from_storage.set(false); + } + } + }, + ); + cx.render(rsx!( if let Some(file) = storage_controller.read().show_file_modal.as_ref() { let file2 = file.clone(); @@ -273,21 +314,7 @@ pub fn FilesLayout(cx: Scope<'_>) -> Element<'_> { send_files_start_location: SendFilesStartLocation::Storage, files_pre_selected_to_send: files_pre_selected_to_send.read().clone(), on_send: move |(files_location, convs_id): (Vec, Vec)| { - let warp_cmd_tx = WARP_CMD_CH.tx.clone(); - let (tx, _) = oneshot::channel::>(); - let msg = vec!["".to_owned()]; - let attachments = files_location; - if let Err(e) = warp_cmd_tx.send(WarpCmd::RayGun(RayGunCmd::SendMessageForSeveralChats { - convs_id, - msg, - attachments, - appended_msg_id: None, - rsp: tx, - })) { - log::error!("Failed to send warp command: {}", e); - return; - } - send_files_from_storage.set(false); + send_ch.send((files_location, convs_id)); } }, FilesBreadcumbs { diff --git a/ui/src/layouts/storage/functions.rs b/ui/src/layouts/storage/functions.rs index 9da53be0287..5316d1723da 100644 --- a/ui/src/layouts/storage/functions.rs +++ b/ui/src/layouts/storage/functions.rs @@ -21,7 +21,10 @@ use std::{ffi::OsStr, path::PathBuf, rc::Rc, time::Duration}; use tokio::time::sleep; use warp::constellation::{directory::Directory, item::Item}; -use crate::{components::files::upload_progress_bar, utils::download::get_download_path}; +use crate::{ + components::files::upload_progress_bar, + utils::{async_task_queue::download_stream_handler, download::get_download_path}, +}; use super::files_layout::controller::{StorageController, UploadFileController}; @@ -230,8 +233,9 @@ pub fn init_coroutine<'a>( controller: &'a UseRef, state: &'a UseSharedState, ) -> &'a Coroutine { + let download_queue = download_stream_handler(cx); let ch = use_coroutine(cx, |mut rx: UnboundedReceiver| { - to_owned![controller, state]; + to_owned![controller, download_queue, state]; async move { let warp_cmd_tx = WARP_CMD_CH.tx.clone(); while let Some(cmd) = rx.next().await { @@ -338,7 +342,7 @@ pub fn init_coroutine<'a>( } => { let (local_path_to_save_file, on_finish) = get_download_path(local_path_to_save_file); - let (tx, rx) = oneshot::channel::>(); + let (tx, rx) = oneshot::channel(); if let Err(e) = warp_cmd_tx.send(WarpCmd::Constellation( ConstellationCmd::DownloadFile { @@ -364,19 +368,10 @@ pub fn init_coroutine<'a>( let rsp = rx.await.expect("command canceled"); match rsp { - Ok(_) => { - state.write().mutate(Action::AddToastNotification( - ToastNotification::init( - "".into(), - get_local_text_with_args( - "files.download-success", - vec![("file", file_name)], - ), - None, - 2, - ), - )); - on_finish.await + Ok(stream) => { + download_queue + .write() + .append((stream, file_name, on_finish)); } Err(error) => { state.write().mutate(Action::AddToastNotification( @@ -558,10 +553,12 @@ pub fn start_upload_file_listener( upload_progress_bar::change_progress_percentage(&window, progress.clone()); upload_progress_bar::change_progress_description(&window, msg); } - UploadFileAction::Finishing => { + UploadFileAction::Finishing(file, finish) => { *files_been_uploaded.write_silent() = true; - if !files_in_queue_to_upload.read().is_empty() { - files_in_queue_to_upload.with_mut(|i| i.remove(0)); + if !files_in_queue_to_upload.read().is_empty() + && (finish || files_in_queue_to_upload.read().len() > 1) + { + files_in_queue_to_upload.with_mut(|i| i.retain(|p| !p.eq(&file))); upload_progress_bar::update_files_queue_len( &window, files_in_queue_to_upload.read().len(), diff --git a/ui/src/lib.rs b/ui/src/lib.rs index 04218bcae6d..55ec98f0816 100644 --- a/ui/src/lib.rs +++ b/ui/src/lib.rs @@ -11,6 +11,7 @@ use common::icons::Icon as IconElement; use common::language::{get_local_text, get_local_text_with_args}; use common::notifications::{NotificationAction, NOTIFICATION_LISTENER}; use common::state::settings::GlobalShortcut; +use common::state::ToastNotification; use common::warp_runner::ui_adapter::MessageEvent; use common::warp_runner::WarpEvent; use common::{get_extras_dir, warp_runner, STATIC_ARGS, WARP_CMD_CH, WARP_EVENT_CH}; @@ -56,6 +57,7 @@ use crate::layouts::log_in::{AuthGuard, AuthPages}; use crate::layouts::settings::SettingsLayout; use crate::layouts::storage::files_layout::FilesLayout; use crate::misc_scripts::*; +use crate::utils::async_task_queue::{ListenerAction, ACTION_LISTENER}; use crate::utils::keyboard::KeyboardShortcuts; use dioxus_desktop::wry::application::event::Event as WryEvent; use dioxus_desktop::{use_wry_event_handler, DesktopService, PhysicalSize}; @@ -534,6 +536,29 @@ fn use_app_coroutines(cx: &ScopeState) -> Option<()> { } }); + // Listen to async tasks actions that should be handled on main thread + use_future(cx, (), |_| { + to_owned![state]; + async move { + let channel = ACTION_LISTENER.rx.clone(); + let mut ch = channel.lock().await; + while let Some(action) = ch.recv().await { + match action { + ListenerAction::ToastAction { + title, + content, + icon, + timeout, + } => { + state.write().mutate(Action::AddToastNotification( + ToastNotification::init(title, content, icon, timeout), + )); + } + } + } + } + }); + // clear toasts use_future(cx, (), |_| { to_owned![state]; diff --git a/ui/src/utils/async_task_queue.rs b/ui/src/utils/async_task_queue.rs new file mode 100644 index 00000000000..a9a04556b9b --- /dev/null +++ b/ui/src/utils/async_task_queue.rs @@ -0,0 +1,164 @@ +use common::{ + icons::outline::Shape as Icon, + language::get_local_text_with_args, + state::pending_message::PendingMessage, + warp_runner::{ui_adapter::MessageEvent, WarpEvent}, + WARP_EVENT_CH, +}; +use dioxus_core::ScopeState; +use dioxus_hooks::{use_ref, UseRef}; +use futures::{Future, StreamExt}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + Mutex, +}; + +use once_cell::sync::Lazy; +use std::sync::Arc; +use uuid::Uuid; +use warp::raygun::{AttachmentEventStream, AttachmentKind, Location}; + +pub enum ListenerAction { + ToastAction { + title: String, + content: String, + icon: Option, + timeout: u32, + }, +} + +pub struct ListenerChannel { + pub tx: UnboundedSender, + pub rx: Arc>>, +} + +// Channel for actions that should be done on the main thread +pub static ACTION_LISTENER: Lazy = Lazy::new(|| { + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + ListenerChannel { + tx, + rx: Arc::new(Mutex::new(rx)), + } +}); + +#[derive(Clone, PartialEq)] +pub struct AsyncRef { + inner_ref: Option>, +} + +impl AsyncRef { + /// Appends a value to this queue + pub fn append(&mut self, value: T) { + match self.inner_ref.as_mut() { + Some(current) => { + current.push(value); + } + None => self.inner_ref = Some(vec![value]), + }; + } +} + +/// Create a handler for an async queue +/// Everytime a value gets added to the queue the future will be spawned when it rerenders +pub fn async_queue( + cx: &ScopeState, + fut: impl Fn(T) -> Fut, +) -> &UseRef> +where + Fut: Future + Send + 'static, +{ + let queue_ref: &UseRef> = use_ref(cx, || AsyncRef { inner_ref: None }); + if let Some(queue) = queue_ref.write_silent().inner_ref.take() { + for entry in queue { + let future = fut(entry); + tokio::spawn(future); + } + } + queue_ref +} + +pub fn chat_upload_stream_handler( + cx: &ScopeState, +) -> &UseRef< + AsyncRef<( + Uuid, + Vec, + Vec, + Option, + AttachmentEventStream, + )>, +> { + async_queue( + cx, + |(conv_id, msg, attachments, appended_msg_id, mut stream): ( + Uuid, + Vec, + Vec, + Option, + AttachmentEventStream, + )| { + async move { + while let Some(kind) = stream.next().await { + match kind { + AttachmentKind::Pending(res) => { + if let Err(e) = res { + log::debug!("Error uploading file {}", e); + } + return; + } + AttachmentKind::AttachedProgress(progress) => { + if let Err(e) = WARP_EVENT_CH.tx.send(WarpEvent::Message( + MessageEvent::AttachmentProgress { + progress, + conversation_id: conv_id, + msg: PendingMessage::for_compare( + msg.clone(), + &attachments, + appended_msg_id, + ), + }, + )) { + log::error!("failed to send warp_event: {e}"); + } + } + } + } + } + }, + ) +} + +pub fn download_stream_handler( + cx: &ScopeState, +) -> &UseRef< + AsyncRef<( + warp::constellation::ConstellationProgressStream, + String, + std::pin::Pin + Send>>, + )>, +> { + async_queue( + cx, + |(mut stream, file, on_finish): ( + warp::constellation::ConstellationProgressStream, + String, + std::pin::Pin + Send>>, + )| { + async move { + while let Some(p) = stream.next().await { + log::debug!("download progress: {p:?}"); + } + let _ = ACTION_LISTENER.tx.send(ListenerAction::ToastAction { + title: "".into(), + content: get_local_text_with_args( + "files.download-success", + vec![("file", file)], + ), + icon: None, + timeout: 2, + }); + on_finish.await + } + }, + ) +} diff --git a/ui/src/utils/mod.rs b/ui/src/utils/mod.rs index f54177a052c..f3369ee1ef8 100644 --- a/ui/src/utils/mod.rs +++ b/ui/src/utils/mod.rs @@ -9,6 +9,7 @@ use kit::User as UserInfo; use crate::{window_manager::WindowManagerCmd, WINDOW_CMD_CH}; +pub mod async_task_queue; pub mod auto_updater; pub mod clipboard; pub mod download;