diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock index d12e8b176e..0ea7027609 100644 --- a/application/apps/indexer/Cargo.lock +++ b/application/apps/indexer/Cargo.lock @@ -1916,6 +1916,7 @@ dependencies = [ "serde_json", "serialport", "sources", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/application/apps/indexer/session/Cargo.toml b/application/apps/indexer/session/Cargo.toml index 6c7417aa53..1b09d68794 100644 --- a/application/apps/indexer/session/Cargo.toml +++ b/application/apps/indexer/session/Cargo.toml @@ -33,3 +33,4 @@ walkdir = "2.3" [dev-dependencies] lazy_static.workspace = true +tempfile.workspace = true diff --git a/application/apps/indexer/session/src/state/mod.rs b/application/apps/indexer/session/src/state/mod.rs index 7dce1944f6..2572fe6513 100644 --- a/application/apps/indexer/session/src/state/mod.rs +++ b/application/apps/indexer/session/src/state/mod.rs @@ -771,7 +771,6 @@ pub async fn run( } Api::ShutdownWithError => { debug!("shutdown state loop with error for testing"); - state.session_file.cleanup()?; return Err(NativeError { severity: Severity::ERROR, kind: NativeErrorKind::Io, @@ -780,7 +779,15 @@ pub async fn run( } } } - state.session_file.cleanup()?; debug!("task is finished"); Ok(()) } + +impl Drop for SessionState { + fn drop(&mut self) { + // Ensure session files are cleaned up by calling the cleanup function on drop. + if let Err(err) = self.session_file.cleanup() { + log::error!("Cleaning up session files failed. Error: {err:#?}"); + } + } +} diff --git a/application/apps/indexer/session/src/state/session_file.rs b/application/apps/indexer/session/src/state/session_file.rs index 7c53fdafce..85853a99a5 100644 --- a/application/apps/indexer/session/src/state/session_file.rs +++ b/application/apps/indexer/session/src/state/session_file.rs @@ -20,6 +20,7 @@ use tokio_util::sync::CancellationToken; use uuid::Uuid; pub const FLUSH_DATA_IN_MS: u128 = 500; +pub const SESSION_FILE_EXTENSION: &str = "session"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct GrabbedElement { @@ -93,7 +94,7 @@ impl SessionFile { filename } else { let streams = paths::get_streams_dir()?; - let filename = streams.join(format!("{}.session", Uuid::new_v4())); + let filename = streams.join(format!("{}.{SESSION_FILE_EXTENSION}", Uuid::new_v4())); debug!("Session file setup: {}", filename.to_string_lossy()); self.writer = Some(BufWriter::new(File::create(&filename).map_err(|e| { NativeError { @@ -301,19 +302,55 @@ impl SessionFile { }) } + /// Cleans up the temporary generated files and for the session on its attachments if exist + /// for none-linked sessions. pub fn cleanup(&mut self) -> Result<(), NativeError> { if self.writer.is_none() { + // Session is linked. No temporary files has been generated. return Ok(()); } + + // Remove session main file. let filename = self.filename()?; debug!("cleaning up files: {:?}", filename); if filename.exists() { - std::fs::remove_file(filename).map_err(|e| NativeError { + std::fs::remove_file(&filename).map_err(|e| NativeError { severity: Severity::ERROR, kind: NativeErrorKind::Io, - message: Some(e.to_string()), + message: Some(format!( + "Removing session main file fialed. Error: {e}. Path: {}", + filename.display() + )), })?; } + + // Remove attachments directory if exists. + let attachments_dir = filename + .to_str() + .and_then(|file| file.strip_suffix(&format!(".{SESSION_FILE_EXTENSION}"))) + .map(PathBuf::from) + .ok_or_else(|| NativeError { + severity: Severity::ERROR, + kind: NativeErrorKind::Io, + message: Some("Session file name isn't UTF-8 valid".into()), + })?; + + if attachments_dir.exists() { + debug!( + "Cleaning up attachments direcotry: {}", + attachments_dir.display() + ); + + std::fs::remove_dir_all(&attachments_dir).map_err(|err| NativeError { + severity: Severity::ERROR, + kind: NativeErrorKind::Io, + message: Some(format!( + "Removing attachments directory failed. Error: {err}, Path: {}", + attachments_dir.display() + )), + })? + } + Ok(()) } } diff --git a/application/apps/indexer/session/src/unbound/cleanup.rs b/application/apps/indexer/session/src/unbound/cleanup.rs new file mode 100644 index 0000000000..e6c39ab100 --- /dev/null +++ b/application/apps/indexer/session/src/unbound/cleanup.rs @@ -0,0 +1,106 @@ +use std::{ + fs, io, + path::Path, + time::{Duration, SystemTime}, +}; + +use crate::{ + events::{NativeError, NativeErrorKind}, + paths::get_streams_dir, + progress::Severity, +}; + +/// Iterates through chipmunk temporary directory and remove the entries which is older +/// than two months. +pub fn cleanup_temp_dir() -> Result<(), NativeError> { + let tmp_dir = get_streams_dir()?; + + const TWO_MONTHS_SECONDS: u64 = 60 * 60 * 24 * 60; + let modified_limit = SystemTime::now() + .checked_sub(Duration::from_secs(TWO_MONTHS_SECONDS)) + .ok_or_else(|| NativeError { + severity: Severity::ERROR, + kind: NativeErrorKind::Io, + message: Some(String::from( + "Error while calculating modification time limit", + )), + })?; + + cleanup_dir(&tmp_dir, modified_limit)?; + + Ok(()) +} + +// Clean files and directory within the given path that have a modified time older than +// the given modified date limit +fn cleanup_dir(path: &Path, modified_date_limit: SystemTime) -> io::Result<()> { + if !path.exists() { + return Ok(()); + } + + fs::read_dir(path)? + .flat_map(Result::ok) + .filter(|p| { + p.metadata() + .is_ok_and(|meta| meta.modified().is_ok_and(|date| date < modified_date_limit)) + }) + .map(|entry| entry.path()) + .try_for_each(|path| { + if path.is_dir() { + fs::remove_dir_all(path) + } else if path.is_file() { + fs::remove_file(path) + } else { + Ok(()) + } + }) +} + +#[cfg(test)] +mod tests { + use std::{ + fs::{self, File}, + thread, + time::{Duration, SystemTime}, + }; + + use super::cleanup_dir; + + #[test] + fn test_cleanup_dir() { + // Create temporary directory with some entries + let tempdir = tempfile::tempdir().unwrap(); + let temp_path = tempdir.path(); + + let dir = temp_path.join("dir"); + fs::create_dir(&dir).unwrap(); + let sub_file = dir.join("sub_file"); + _ = File::create(&sub_file).unwrap(); + let file = temp_path.join("file"); + _ = File::create(&file).unwrap(); + + let entries = [dir, sub_file, file]; + + // Make sure there differences in modification time and current time. + thread::sleep(Duration::from_millis(50)); + + // Cleaning up with time stamp one day ago must not remove anything. + let past = SystemTime::now() + .checked_sub(Duration::from_secs(3600)) + .unwrap(); + cleanup_dir(&temp_path, past).unwrap(); + for entry in &entries { + assert!(entry.exists()); + } + + // Cleaning up with now must remove all files and directories. + cleanup_dir(&temp_path, SystemTime::now()).unwrap(); + + // Temp directory itself shouldn't be removed. + assert!(temp_path.exists()); + + for entry in entries { + assert!(!entry.exists()); + } + } +} diff --git a/application/apps/indexer/session/src/unbound/mod.rs b/application/apps/indexer/session/src/unbound/mod.rs index 34cb1d7ed0..205ec7fb0c 100644 --- a/application/apps/indexer/session/src/unbound/mod.rs +++ b/application/apps/indexer/session/src/unbound/mod.rs @@ -1,4 +1,5 @@ pub mod api; +mod cleanup; pub mod commands; mod signal; @@ -10,6 +11,7 @@ use crate::{ signal::Signal, }, }; +use cleanup::cleanup_temp_dir; use log::{debug, error, warn}; use std::collections::HashMap; use tokio::{ @@ -117,6 +119,16 @@ impl UnboundSession { finished.cancel(); debug!("Unbound session is down"); }); + + // Call cleanup here because this function should be called once when chipmunk starts. + // Run cleaning up on a separate thread to avoid latency in startup in case temporary + // directory has a lot of entries to cleanup. + tokio::task::spawn_blocking(|| { + if let Err(err) = cleanup_temp_dir() { + log::error!("Error while cleaning up temporary directory. Error: {err:?}"); + } + }); + Ok(()) }