From 944677c79b01a1ac5b1e0637d4c1fe5a81716712 Mon Sep 17 00:00:00 2001 From: luanshaotong Date: Tue, 15 Aug 2023 10:44:36 +0800 Subject: [PATCH] recover mount point (#165) * recover mount point * fix clippy --- Cargo.lock | 55 +++++++++------- Cargo.toml | 2 +- src/client/daemon.rs | 147 ++++++++++++++++++++++++++++--------------- 3 files changed, 131 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c05b154..7a916c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -62,18 +62,18 @@ checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] name = "async-trait" -version = "0.1.61" +version = "0.1.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" +checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.28", ] [[package]] @@ -351,7 +351,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1144,7 +1144,7 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1206,7 +1206,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e97e3215779627f01ee256d2fad52f3d95e8e1c11e9fc6fd08f7cd455d5d5c78" dependencies = [ "proc-macro2", - "syn", + "syn 1.0.107", ] [[package]] @@ -1218,7 +1218,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "version_check", ] @@ -1235,9 +1235,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.49" +version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57a8eca9f9c4ffde41714334dee777596264c7825420f521abc92b5b5deb63a5" +checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9" dependencies = [ "unicode-ident", ] @@ -1269,7 +1269,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn", + "syn 1.0.107", "tempfile", "which", ] @@ -1284,7 +1284,7 @@ dependencies = [ "itertools", "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1299,9 +1299,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.23" +version = "1.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965" dependencies = [ "proc-macro2", ] @@ -1502,7 +1502,7 @@ checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1604,6 +1604,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "sync_wrapper" version = "0.1.2" @@ -1665,7 +1676,7 @@ checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1716,7 +1727,7 @@ checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1786,7 +1797,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1861,7 +1872,7 @@ checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] @@ -1983,7 +1994,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-shared", ] @@ -2005,7 +2016,7 @@ checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2152,7 +2163,7 @@ checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.107", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 303b690..f48f168 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ serde = { version = "1", features = ["derive"] } serde_yaml = "0.9.14" # tonic-health = "0.7.1" dashmap = "5.4.0" -async-trait = "0.1.59" +async-trait = "0.1.73" nix = "0.26.1" rocksdb = "0.19.0" bincode = "1.3.3" diff --git a/src/client/daemon.rs b/src/client/daemon.rs index 37fd18a..6205c09 100644 --- a/src/client/daemon.rs +++ b/src/client/daemon.rs @@ -34,13 +34,14 @@ pub struct SealfsFused { pub client: Arc, pub mount_points: DashMap, pub index_file: String, + pub mount_lock: tokio::sync::Mutex<()>, } // TODO: remove this // replace fuser with other fuse library which supports async. // better for performance too. -unsafe impl std::marker::Sync for SealfsFused {} -unsafe impl std::marker::Send for SealfsFused {} +unsafe impl Sync for SealfsFused {} +unsafe impl Send for SealfsFused {} impl SealfsFused { pub fn new(index_file: String, client: Arc) -> Self { @@ -48,6 +49,7 @@ impl SealfsFused { client, mount_points: DashMap::new(), index_file, + mount_lock: tokio::sync::Mutex::new(()), } } @@ -56,7 +58,8 @@ impl SealfsFused { mountpoint: String, volume_name: String, read_only: bool, - ) -> Result<(), i32> { + ) -> Result<(), String> { + let _lock = self.mount_lock.lock().await; let mount_mode = if read_only { MountOption::RO } else { @@ -74,7 +77,19 @@ impl SealfsFused { // check if already mounted if self.mount_points.contains_key(&mountpoint) { warn!("mountpoint {} already mounted", mountpoint); - return Err(CONNECTION_ERROR); + if self.mount_points.get(&mountpoint).unwrap().0 != volume_name { + return Err(format!( + "mountpoint {} already mounted with different volume", + mountpoint + )); + } + if self.mount_points.get(&mountpoint).unwrap().1 != read_only { + return Err(format!( + "mountpoint {} already mounted with different mode", + mountpoint + )); + } + return Ok(()); } match fuser::spawn_mount2( @@ -88,26 +103,18 @@ impl SealfsFused { .insert(mountpoint, (volume_name, read_only, session)); Ok(()) } - Err(e) => { - error!("mount error: {}", e); - Err(CONNECTION_ERROR) - } + Err(e) => Err(format!("mount error: {}", e)), } } - Err(e) => { - error!("mount error: {}", status_to_string(e)); - Err(e) - } + Err(e) => Err(format!("mount error: {}", status_to_string(e))), } } - pub fn unmount(&self, mountpoint: &str) -> Result<(), i32> { + pub async fn unmount(&self, mountpoint: &str) -> Result<(), String> { + let _lock = self.mount_lock.lock().await; match self.mount_points.remove(mountpoint) { Some(_) => Ok(()), - None => { - error!("mountpoint {} not found", mountpoint); - Err(CONNECTION_ERROR) - } + None => Err(format!("mountpoint {} not found", mountpoint)), } } @@ -121,58 +128,95 @@ impl SealfsFused { // remove old index file and sync mount points to index file pub fn sync_index_file(&self) { + // write to swap file first + let mut file = std::fs::File::create(format!("{}.swap", &self.index_file)).unwrap(); + for k in self.mount_points.iter() { + let line = format!("{}\n{}\n{}\n\n", k.key(), k.value().0, k.value().1); + file.write_all(line.as_bytes()).unwrap(); + } + // write a $ to indicate the end of file + file.write_all(b"$\n").unwrap(); + file.sync_all().unwrap(); + drop(file); + std::fs::remove_file(&self.index_file).unwrap_or(()); let mut file = std::fs::File::create(&self.index_file).unwrap(); for k in self.mount_points.iter() { let line = format!("{}\n{}\n{}\n\n", k.key(), k.value().0, k.value().1); file.write_all(line.as_bytes()).unwrap(); } + // write a $ to indicate the end of file + file.write_all(b"$\n").unwrap(); + file.sync_all().unwrap(); + drop(file); + std::fs::remove_file(format!("{}.swap", &self.index_file)).unwrap_or(()); } - // read index file and mount all volumes - pub async fn init(&self) -> Result<(), i32> { - let mut file = match std::fs::File::open(&self.index_file) { + pub fn read_index_file( + &self, + index_file_name: &str, + allow_nonexist: bool, + ) -> Result, String> { + let mut result = Vec::new(); + let mut file = match std::fs::File::open(index_file_name) { Ok(f) => f, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { - return Ok(()); + if allow_nonexist { + return Ok(result); + } + return Err(format!("index file {} not found", index_file_name)); } - error!("open index file {} error: {}", &self.index_file, e); - return Err(libc::EIO); + return Err(format!("open index file {} error: {}", index_file_name, e)); } }; let mut content = String::new(); match file.read_to_string(&mut content) { Ok(_) => (), Err(e) => { - error!("read index file {} error: {}", &self.index_file, e); - return Ok(()); + error!("read index file {} error: {}", index_file_name, e); + return Err(format!("read index file {} error: {}", index_file_name, e)); } } let lines: Vec<&str> = content.split('\n').collect(); - for i in 0..lines.len() / 4 { - let mountpoint = lines[i * 4].to_owned(); - let volume_name = lines[i * 4 + 1].to_owned(); - let read_only = lines[i * 4 + 2].parse::().unwrap(); - info!("mounting volume {} to {}", volume_name, mountpoint); - - // umount old mountpoint - if let Err(e) = std::process::Command::new("umount") - .arg(&mountpoint) - .output() - { - warn!("umount {} error: {}", &mountpoint, e); + for i in (0..lines.len()).step_by(4) { + if i + 3 >= lines.len() { + println!("{}", lines[i]); + if lines[i] == "$" { + break; + } + return Err(format!("index file {} format error", index_file_name)); } + result.push(( + lines[i].to_string(), + lines[i + 1].to_string(), + lines[i + 2].parse::().unwrap(), + )); + } + Ok(result) + } - match self.mount(mountpoint, volume_name.clone(), read_only).await { - Ok(_) => { - info!("mount success"); - } + // read index file and mount all volumes + pub async fn init(&self) -> Result<(), String> { + let volumes = { + match self.read_index_file(format!("{}.swap", &self.index_file).as_str(), false) { + Ok(result) => result, Err(e) => { - if e == libc::ENOENT { - error!("volume {} not found", volume_name); - continue; + warn!("read swap index file error: {}", e); + match self.read_index_file(&self.index_file, true) { + Ok(result) => result, + Err(e) => { + return Err(format!("read index file error: {}", e)); + } } + } + } + }; + + for (mountpoint, volume_name, read_only) in volumes { + match self.mount(mountpoint, volume_name.clone(), read_only).await { + Ok(_) => {} + Err(e) => { return Err(e); } } @@ -212,20 +256,23 @@ impl Handler for SealfsFused { self.sync_index_file(); Ok((0, 0, 0, 0, vec![], vec![])) } - Err(e) => Ok((e, 0, 0, 0, vec![], vec![])), + Err(e) => { + error!("mount error: {}", e); + Ok((libc::EIO, 0, 0, 0, vec![], vec![])) + } } } UMOUNT => { let mountpoint = std::str::from_utf8(&path).unwrap(); info!("unmounting volume {}", mountpoint); - match self.mount_points.remove(mountpoint) { - Some(_) => { + match self.unmount(mountpoint).await { + Ok(()) => { self.sync_index_file(); Ok((0, 0, 0, 0, vec![], vec![])) } - None => { - error!("mountpoint not found: {}", mountpoint); - Ok((libc::ENOENT, 0, 0, 0, vec![], vec![])) + Err(e) => { + error!("unmount error: {}", e); + Ok((libc::EIO, 0, 0, 0, vec![], vec![])) } } }