diff --git a/src/engine.rs b/src/engine.rs index a5987f2f..88eaaa45 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -31,6 +31,30 @@ const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30); /// Max times for `write`. const MAX_WRITE_ATTEMPT: u64 = 2; +// pub struct HedgedEngine> +// where +// F: FileSystem, +// P: PipeLog, +// { +// inner: Engine, P>, +// fs: Arc>, +// } + +// impl HedgedEngine> +// where +// F: FileSystem, +// { + +// } + +// impl Deref for HedgedEngine> { +// type Target = Engine>; + +// fn deref(&self) -> &Self::Target { +// &self.inner +// } +// } + pub struct Engine> where F: FileSystem, @@ -66,23 +90,36 @@ impl Engine> { } } +pub fn open_with_hedged_file_system( + cfg: Config, + mut file_system: Arc, +) -> Result, FilePipeLog>>> { + let file_system = if let Some(sec_dir) = cfg.second_dir { + let fs = Arc::new(HedgedFileSystem::new( + file_system, + cfg.dir.into(), + sec_dir.into(), + )); + fs.bootstrap()?; + fs + } else { + panic!() + }; + Engine::open_with(cfg, file_system, vec![]) +} + +pub fn open_with_file_system( + cfg: Config, + mut file_system: Arc, +) -> Result>> { + Engine::open_with(cfg, file_system, vec![]) +} + impl Engine> where F: FileSystem, { - pub fn open_with_file_system( - cfg: Config, - mut file_system: Arc, - ) -> Result>> { - file_system = if let Some(sec_dir) = cfg.second_dir { - let fs = Arc::new(HedgedFileSystem::new(file_system, cfg.dir.into(), sec_dir.into())); - fs.bootstrap()?; - fs - }; - Self::open_with(cfg, file_system, vec![]) - } - - pub fn open_with( + fn open_with( mut cfg: Config, file_system: Arc, mut listeners: Vec>, @@ -743,8 +780,7 @@ pub(crate) mod tests { dir: sub_dir.to_str().unwrap().to_owned(), ..Default::default() }; - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); } #[test] @@ -762,11 +798,9 @@ pub(crate) mod tests { ..Default::default() }; - let engine = RaftLogEngine::open_with_file_system( - cfg.clone(), - Arc::new(ObfuscatedFileSystem::default()), - ) - .unwrap(); + let engine = + open_with_file_system(cfg.clone(), Arc::new(ObfuscatedFileSystem::default())) + .unwrap(); assert_eq!(engine.path(), dir.path().to_str().unwrap()); let data = vec![b'x'; entry_size]; for i in 10..20 { @@ -816,7 +850,7 @@ pub(crate) mod tests { target_file_size: ReadableSize(1), ..Default::default() }; - let engine = RaftLogEngine::open_with_file_system( + let engine = open_with_file_system( cfg.clone(), Arc::new(ObfuscatedFileSystem::default()), ) @@ -889,9 +923,7 @@ pub(crate) mod tests { ..Default::default() }; let rid = 1; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); engine .scan_messages::(rid, None, None, false, |_, _| { @@ -978,9 +1010,7 @@ pub(crate) mod tests { let mut delete_batch = LogBatch::default(); delete_batch.delete(rid, key.clone()); - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); assert_eq!( engine.get_message::(rid, &key).unwrap(), None @@ -1089,9 +1119,7 @@ pub(crate) mod tests { target_file_size: ReadableSize(1), ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 1024]; // rewrite:[1 ..10] @@ -1202,9 +1230,7 @@ pub(crate) mod tests { ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 1024]; for index in 0..100 { engine.append(1, index, index + 1, Some(&data)); @@ -1263,9 +1289,7 @@ pub(crate) mod tests { ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 1024]; // write 50 small entries into region 1~3, it should trigger force compact. for rid in 1..=3 { @@ -1318,9 +1342,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize::kb(80), ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 1024]; // Put 100 entries into 10 regions. @@ -1382,9 +1404,7 @@ pub(crate) mod tests { dir: dir.path().to_str().unwrap().to_owned(), ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let mut log_batch = LogBatch::default(); let empty_entry = Entry::new(); @@ -1442,9 +1462,7 @@ pub(crate) mod tests { dir: dir.path().to_str().unwrap().to_owned(), ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 16]; let cases = [[false, false], [false, true], [true, true]]; for (i, writes) in cases.iter().enumerate() { @@ -1470,9 +1488,7 @@ pub(crate) mod tests { dir: dir.path().to_str().unwrap().to_owned(), ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 1024]; for rid in 1..21 { @@ -1503,9 +1519,7 @@ pub(crate) mod tests { target_file_size: ReadableSize(1), ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())) - .unwrap(); + let engine = open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default())).unwrap(); let data = vec![b'x'; 2 * 1024 * 1024]; for rid in 1..=3 { @@ -1663,7 +1677,7 @@ pub(crate) mod tests { ..Default::default() }; - let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); + let engine = open_with_file_system(cfg, fs.clone()).unwrap(); for bs in batches.iter_mut() { for batch in bs.iter_mut() { engine.write(batch, false).unwrap(); @@ -1724,7 +1738,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=50 { engine.append(rid, 1, 6, Some(&entry_data)); } @@ -1761,7 +1775,7 @@ pub(crate) mod tests { ) .unwrap(); - let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + let engine = open_with_file_system(cfg, fs).unwrap(); for rid in 1..25 { engine.scan_entries(rid, 1, 6, |_, _, d| { assert_eq!(d, &entry_data); @@ -1789,7 +1803,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=50 { engine.append(rid, 1, 6, Some(&entry_data)); } @@ -1823,7 +1837,7 @@ pub(crate) mod tests { ) .unwrap(); - let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + let engine = open_with_file_system(cfg, fs).unwrap(); for rid in 1..25 { if existing_emptied.contains(&rid) || incoming_emptied.contains(&rid) { continue; @@ -1870,7 +1884,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=50 { engine.append(rid, 1, 6, Some(&entry_data)); } @@ -1891,11 +1905,11 @@ pub(crate) mod tests { // Corrupt a log batch. f.set_len(f.metadata().unwrap().len() - 1).unwrap(); - RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + open_with_file_system(cfg.clone(), fs.clone()).unwrap(); // Corrupt the file header. f.set_len(1).unwrap(); - RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + open_with_file_system(cfg, fs).unwrap(); } #[test] @@ -1912,7 +1926,7 @@ pub(crate) mod tests { }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = RaftLogEngine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), fs.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } @@ -1920,7 +1934,7 @@ pub(crate) mod tests { assert!(RaftLogEngine::open(cfg.clone()).is_err()); - let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + let engine = open_with_file_system(cfg, fs).unwrap(); for rid in 1..10 { engine.scan_entries(rid, 1, 11, |_, _, d| { assert_eq!(d, &entry_data); @@ -1972,7 +1986,7 @@ pub(crate) mod tests { let fs = Arc::new(ObfuscatedFileSystem::default()); let rid = 1; - let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + let engine = open_with_file_system(cfg, fs).unwrap(); assert!(engine.is_empty()); engine.append(rid, 1, 11, Some(&entry_data)); assert!(!engine.is_empty()); @@ -2109,7 +2123,7 @@ pub(crate) mod tests { ..Default::default() }; let fs = Arc::new(DeleteMonitoredFileSystem::new()); - let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); + let engine = open_with_file_system(cfg, fs.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } @@ -2166,7 +2180,7 @@ pub(crate) mod tests { ..Default::default() }; let fs = Arc::new(DeleteMonitoredFileSystem::new()); - let engine = RaftLogEngine::open_with_file_system(cfg, fs.clone()).unwrap(); + let engine = open_with_file_system(cfg, fs.clone()).unwrap(); let reserved_start = *fs.reserved_metadata.lock().unwrap().first().unwrap(); for rid in 1..=10 { @@ -2274,14 +2288,14 @@ pub(crate) mod tests { assert!(cfg_v2.recycle_capacity() > 0); // Prepare files with format_version V1 { - let engine = RaftLogEngine::open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap(); + let engine = open_with_file_system(cfg_v1.clone(), fs.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 11, Some(&entry_data)); } } // Reopen the Engine with V2 and purge { - let engine = RaftLogEngine::open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap(); + let engine = open_with_file_system(cfg_v2.clone(), fs.clone()).unwrap(); let (start, _) = engine.file_span(LogQueue::Append); for rid in 6..=10 { engine.append(rid, 11, 20, Some(&entry_data)); @@ -2295,7 +2309,7 @@ pub(crate) mod tests { } // Reopen the Engine with V1 -> V2 and purge { - let engine = RaftLogEngine::open_with_file_system(cfg_v1, fs.clone()).unwrap(); + let engine = open_with_file_system(cfg_v1, fs.clone()).unwrap(); let (start, _) = engine.file_span(LogQueue::Append); for rid in 6..=10 { engine.append(rid, 20, 30, Some(&entry_data)); @@ -2309,7 +2323,7 @@ pub(crate) mod tests { assert_eq!(engine.file_span(LogQueue::Append).0, start); let file_count = engine.file_count(Some(LogQueue::Append)); drop(engine); - let engine = RaftLogEngine::open_with_file_system(cfg_v2, fs).unwrap(); + let engine = open_with_file_system(cfg_v2, fs).unwrap(); assert_eq!(engine.file_span(LogQueue::Append).0, start); assert_eq!(engine.file_count(Some(LogQueue::Append)), file_count); // Mark all regions obsolete. @@ -2340,7 +2354,7 @@ pub(crate) mod tests { enable_log_recycle: false, ..Default::default() }; - let engine = RaftLogEngine::open_with_file_system(cfg, file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg, file_system.clone()).unwrap(); let (start, _) = engine.file_span(LogQueue::Append); // Only one valid file left, the last one => active_file. assert_eq!(engine.file_count(Some(LogQueue::Append)), 1); @@ -2362,8 +2376,7 @@ pub(crate) mod tests { prefill_for_recycle: true, ..Default::default() }; - let engine = - RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); let (start, end) = engine.file_span(LogQueue::Append); // Only one valid file left, the last one => active_file. assert_eq!(start, end); @@ -2386,8 +2399,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize(50), ..cfg }; - let engine = - RaftLogEngine::open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg_v2.clone(), file_system.clone()).unwrap(); assert_eq!(engine.file_span(LogQueue::Append), (start, end)); assert!(recycled_count > file_system.inner.file_count() - engine.file_count(None)); // Recycled files have filled the LogQueue::Append, purge_expired_files won't @@ -2411,7 +2423,7 @@ pub(crate) mod tests { prefill_for_recycle: false, ..cfg_v2 }; - let engine = RaftLogEngine::open_with_file_system(cfg_v3, file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg_v3, file_system.clone()).unwrap(); assert_eq!(file_system.inner.file_count(), engine.file_count(None)); } @@ -2432,7 +2444,7 @@ pub(crate) mod tests { let key = vec![b'x'; 2]; let value = vec![b'y'; 8]; - let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + let engine = open_with_file_system(cfg, fs).unwrap(); let mut data = HashSet::new(); let mut rid = 1; // Directly write to pipe log. @@ -2573,7 +2585,7 @@ pub(crate) mod tests { ..Default::default() }; let fs = Arc::new(ObfuscatedFileSystem::default()); - let engine = RaftLogEngine::open_with_file_system(cfg, fs).unwrap(); + let engine = open_with_file_system(cfg, fs).unwrap(); let value = vec![b'y'; 8]; let mut log_batch = LogBatch::default(); log_batch.put_unchecked(1, crate::make_internal_key(&[1]), value.clone()); @@ -2649,8 +2661,7 @@ pub(crate) mod tests { }; // Step 1: write data into the main directory. - let engine = - RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 10, Some(&entry_data)); } @@ -2664,7 +2675,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize(40), ..cfg }; - let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system).unwrap(); + let engine = open_with_file_system(cfg_2, file_system).unwrap(); assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); for rid in 1..=10 { assert_eq!(engine.first_index(rid).unwrap(), 1); @@ -2713,8 +2724,7 @@ pub(crate) mod tests { }; // Step 1: write data into the main directory. - let engine = - RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 10, Some(&entry_data)); } @@ -2740,8 +2750,7 @@ pub(crate) mod tests { // abnormal case - Empty second dir { std::fs::remove_dir_all(sec_dir.path()).unwrap(); - let engine = - RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); } // abnormal case - Missing some append files in second dir @@ -2762,7 +2771,7 @@ pub(crate) mod tests { file_count += 1; } } - let engine = RaftLogEngine::open_with_file_system(cfg, file_system).unwrap(); + let engine = open_with_file_system(cfg, file_system).unwrap(); assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); } // abnormal case - Missing some rewrite files in second dir @@ -2783,7 +2792,7 @@ pub(crate) mod tests { file_count += 1; } } - let engine = RaftLogEngine::open_with_file_system(cfg, file_system).unwrap(); + let engine = open_with_file_system(cfg, file_system).unwrap(); assert_eq!(number_of_files(sec_dir.path()), number_of_files(dir.path())); } // abnormal case - Missing some reserve files in second dir @@ -2815,8 +2824,7 @@ pub(crate) mod tests { }; { // Step 1: write data into the main directory. - let engine = - RaftLogEngine::open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg.clone(), file_system.clone()).unwrap(); for rid in 1..=10 { engine.append(rid, 1, 10, Some(&entry_data)); } @@ -2852,7 +2860,7 @@ pub(crate) mod tests { purge_threshold: ReadableSize(40), ..cfg.clone() }; - let engine = RaftLogEngine::open_with_file_system(cfg_2, file_system.clone()).unwrap(); + let engine = open_with_file_system(cfg_2, file_system.clone()).unwrap(); assert!(number_of_files(spill_dir.path()) > 0); for rid in 1..=10 { assert_eq!(engine.first_index(rid).unwrap(), 1); @@ -2879,7 +2887,7 @@ pub(crate) mod tests { ..cfg }; drop(engine); - let engine = RaftLogEngine::open_with_file_system(cfg_3, file_system).unwrap(); + let engine = open_with_file_system(cfg_3, file_system).unwrap(); assert!(number_of_files(spill_dir.path()) > 0); for rid in 1..=10 { assert_eq!(engine.first_index(rid).unwrap(), 20); diff --git a/src/env/double_write.rs b/src/env/double_write.rs index a444a01e..ce9b2e82 100644 --- a/src/env/double_write.rs +++ b/src/env/double_write.rs @@ -1,7 +1,6 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. use crate::file_pipe_log::log_file::build_file_reader; -use crate::file_pipe_log::pipe::File; use crate::file_pipe_log::pipe_builder::FileName; use crate::file_pipe_log::reader::LogItemBatchFileReader; use crate::file_pipe_log::FileNameExt; @@ -12,7 +11,7 @@ use crate::{Error, Result}; use crossbeam::channel::unbounded; use crossbeam::channel::Sender; use fail::fail_point; -use log::{info, warn, Log}; +use log::{info, warn}; use std::fs; use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; use std::path::Path; @@ -23,8 +22,6 @@ use std::sync::Arc; use std::sync::RwLock; use std::thread; -use crate::env::default::LogFd; -use crate::env::DefaultFileSystem; use crate::env::{FileSystem, Handle, Permission, WriteExt}; use futures::channel::oneshot; use futures::executor::block_on; @@ -66,12 +63,12 @@ fn replace_path(path: &Path, from: &Path, to: &Path) -> PathBuf { } pub struct HedgedFileSystem { - inner: Arc, + base: Arc, path1: PathBuf, path2: PathBuf, - disk1: Sender<(Task, Callback)>, - disk2: Sender<(Task, Callback)>, + disk1: Sender<(Task, Callback)>, + disk2: Sender<(Task, Callback)>, handle1: Option>, handle2: Option>, @@ -81,33 +78,32 @@ pub struct HedgedFileSystem { // disks TODO: consider encryption impl HedgedFileSystem { - pub fn new(inner: Arc, path1: PathBuf, path2: PathBuf) -> Self { - let (tx1, rx1) = unbounded::<(Task, Callback)>(); - let (tx2, rx2) = unbounded::<(Task, Callback)>(); - let fs1 = inner.clone(); + pub fn new(base: Arc, path1: PathBuf, path2: PathBuf) -> Self { + let (tx1, rx1) = unbounded::<(Task, Callback)>(); + let (tx2, rx2) = unbounded::<(Task, Callback)>(); + let fs1 = base.clone(); let handle1 = thread::spawn(|| { for (task, cb) in rx1 { if task == Task::Stop { break; } fail_point!("double_write::thread1"); - let res = Self::handle(&fs1, task); + let res = Self::process(&fs1, task); cb(res); } }); - let fs2 = inner.clone(); + let fs2 = base.clone(); let handle2 = thread::spawn(|| { - let fs = DefaultFileSystem {}; for (task, cb) in rx2 { if task == Task::Stop { break; } - let res = Self::handle(&fs2, task); + let res = Self::process(&fs2, task); cb(res); } }); Self { - inner, + base, path1, path2, disk1: tx1, @@ -127,7 +123,8 @@ impl HedgedFileSystem { match count1.cmp(&count2) { std::cmp::Ordering::Equal => { - // TODO: still need to catch up + // still need to catch up + self.catch_up_diff(files1, files2); return Ok(()); } std::cmp::Ordering::Less => { @@ -200,6 +197,7 @@ impl HedgedFileSystem { // check file size is not enough, treat the last files differently considering // the recycle, always copy the last file + // TODO: only copy diff part if let Some(last_file) = fromFiles.append_file.last() { let to = replace_path( last_file.path.as_ref(), @@ -288,9 +286,8 @@ impl HedgedFileSystem { if let Some(f) = files.append_file.last() { let recovery_read_block_size = 1024; let mut reader = LogItemBatchFileReader::new(recovery_read_block_size); - // TODO: change file system - let handle = Arc::new(DefaultFileSystem {}.open(&f.path, Permission::ReadOnly)?); - let file_reader = build_file_reader(&DefaultFileSystem {}, handle)?; + let handle = Arc::new(self.base.open(&f.path, Permission::ReadOnly)?); + let file_reader = build_file_reader(self.base.as_ref(), handle)?; match reader.open( FileId { queue: LogQueue::Append, @@ -320,7 +317,7 @@ impl HedgedFileSystem { Ok(count) } - async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult { + async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult> { let (cb1, mut f1) = paired_future_callback(); let (cb2, mut f2) = paired_future_callback(); self.disk1.send((task1, cb1)).unwrap(); @@ -347,7 +344,7 @@ impl HedgedFileSystem { } #[inline] - fn handle(file_system: &F, task: Task) -> IoResult> { + fn process(file_system: &F, task: Task) -> IoResult> { match task { Task::Create(path) => file_system.create(path).map(Some), Task::Open { path, perm } => file_system.open(path, perm).map(Some), @@ -370,9 +367,9 @@ impl Drop for HedgedFileSystem { } impl FileSystem for HedgedFileSystem { - type Handle = HedgedHandle; - type Reader = HedgedReader; - type Writer = HedgedWriter; + type Handle = HedgedHandle; + type Reader = HedgedReader; + type Writer = HedgedWriter; fn create>(&self, path: P) -> IoResult { block_on(self.wait_handle( @@ -427,36 +424,46 @@ impl FileSystem for HedgedFileSystem { } fn new_writer(&self, handle: Arc) -> IoResult { - Ok(HedgedWriter::new(handle)) + Ok(handle.new_writer()) } } #[derive(Clone, PartialEq)] -enum FileTask { +enum FileTask { Truncate(usize), FileSize, Sync, Write { offset: usize, bytes: Vec }, Allocate { offset: usize, size: usize }, + NewWriter { fs: Arc }, Stop, } -pub struct HedgedHandle { +enum TaskRes { + Truncate, + FileSize(usize), + Sync, + Write(usize), + Allocate, + NewWriter(F::Writer), +} + +pub struct HedgedHandle { disk1: Sender<(FileTask, Callback)>, disk2: Sender<(FileTask, Callback)>, counter1: Arc, counter2: Arc, - fd1: Arc>>>, - fd2: Arc>>>, + fd1: Arc>>>, + fd2: Arc>>>, handle1: Option>, handle2: Option>, } -impl HedgedHandle { +impl HedgedHandle { pub fn new( - file1: Either>>, LogFd>, - file2: Either>>, LogFd>, + file1: Either>>, F::Handle>, + file2: Either>>, F::Handle>, ) -> Self { let (tx1, rx1) = unbounded::<(FileTask, Callback)>(); let (tx2, rx2) = unbounded::<(FileTask, Callback)>(); @@ -475,7 +482,7 @@ impl HedgedHandle { if task == FileTask::Stop { break; } - let res = Self::handle(&fd, task); + let res = Self::process(&fd, task); counter1.fetch_add(1, Ordering::Relaxed); cb(res); } @@ -491,7 +498,7 @@ impl HedgedHandle { if task == FileTask::Stop { break; } - let res = Self::handle(&fd, task); + let res = Self::process(&fd, task); counter2.fetch_add(1, Ordering::Relaxed); cb(res); } @@ -509,7 +516,9 @@ impl HedgedHandle { } } - fn resolve(file: Either>>, LogFd>) -> LogFd { + fn resolve( + file: Either>>, F::Handle>, + ) -> F::Handle { match file { Either::Left(f) => { // TODO: should we handle the second disk io error @@ -519,17 +528,38 @@ impl HedgedHandle { } } - fn handle(fd: &LogFd, task: FileTask) -> IoResult> { + fn process(fd: &Arc, task: FileTask) -> IoResult> { match task { - FileTask::Truncate(offset) => fd.truncate(offset).map(|_| None), - FileTask::FileSize => fd.file_size().map(Some), - FileTask::Sync => fd.sync().map(|_| None), - FileTask::Write { offset, bytes } => fd.write(offset, &bytes).map(Some), - FileTask::Allocate { offset, size } => fd.allocate(offset, size).map(|_| None), + FileTask::Truncate(offset) => fd.truncate(offset).map(TaskRes::Truncate), + FileTask::FileSize => fd.file_size().map(|s| TaskRes::FileSize(s)), + FileTask::Sync => fd.sync().map(TaskRes::Sync), + FileTask::Write { offset, bytes } => fd.write(offset, &bytes).map(|s| TaskRes::Write(s)), + FileTask::Allocate { offset, size } => fd.allocate(offset, size).map(TaskRes::Allocate), + FileTask::NewWriter { fs } => fs.new_writer(fd.clone()).map(|w| TaskRes::NewWriter(w)), FileTask::Stop => unreachable!(), } } + fn new_writer(&self) -> IoResult> { + let task1 = FileTask::NewWriter(self.base.clone()); + let task2 = FileTask::NewWriter(self.base.clone()); + let (cb1, mut f1) = paired_future_callback(); + let (cb2, mut f2) = paired_future_callback(); + self.disk1.send((task1, cb1)).unwrap(); + self.disk2.send((task2, cb2)).unwrap(); + + select! { + res1 = f1 => res1.unwrap().map(|w| HedgedWriter::new(self.disk1.clone(), self.disk2.clone(), + Either::Right(w.unwrap()), Either::Left(f2) )), + res2 = f2 => res2.unwrap().map(|w| HedgedHandle::new( + Either::Left(f1), Either::Right(w.unwrap()) )), + } + select! { + res1 = f1 => res1.unwrap().map(|_| ()), + res2 = f2 => res2.unwrap().map(|_| ()), + } + } + fn read(&self, offset: usize, buf: &mut [u8]) -> IoResult { // TODO: read simultaneously from both disks // choose latest to perform read @@ -579,7 +609,7 @@ impl HedgedHandle { } } -impl Drop for HedgedHandle { +impl Drop for HedgedHandle { fn drop(&mut self) { self.disk1.send((FileTask::Stop, Box::new(|_| {}))).unwrap(); self.disk2.send((FileTask::Stop, Box::new(|_| {}))).unwrap(); @@ -588,7 +618,7 @@ impl Drop for HedgedHandle { } } -impl Handle for HedgedHandle { +impl Handle for HedgedHandle { fn truncate(&self, offset: usize) -> IoResult<()> { block_on(self.wait_one(FileTask::Truncate(offset))).map(|_| ()) } @@ -602,45 +632,40 @@ impl Handle for HedgedHandle { } } -pub struct HedgedWriter { - inner: Arc, - offset: usize, +pub struct HedgedWriter { + disk1: Sender<(FileTask, Callback)>, + disk2: Sender<(FileTask, Callback)>, + writer1: Arc, + writer2: Arc, } -impl HedgedWriter { - pub fn new(handle: Arc) -> Self { +impl HedgedWriter { + pub fn new(disk1: Sender<(FileTask, Callback)>, disk2: Sender<(FileTask, Callback)>, writer1: Arc, writer2: Arc) -> Self { + Self { - inner: handle, - offset: 0, + handle, + } } } -impl Write for HedgedWriter { +impl Write for HedgedWriter { fn write(&mut self, buf: &[u8]) -> IoResult { - let len = self.inner.write(self.offset, buf)?; - self.offset += len; - Ok(len) } fn flush(&mut self) -> IoResult<()> { - Ok(()) } } -impl WriteExt for HedgedWriter { +impl WriteExt for HedgedWriter { fn truncate(&mut self, offset: usize) -> IoResult<()> { - self.inner.truncate(offset)?; - self.offset = offset; - Ok(()) } fn allocate(&mut self, offset: usize, size: usize) -> IoResult<()> { - self.inner.allocate(offset, size) } } -impl Seek for HedgedWriter { +impl Seek for HedgedWriter { fn seek(&mut self, pos: SeekFrom) -> IoResult { match pos { SeekFrom::Start(offset) => self.offset = offset as usize, @@ -651,13 +676,13 @@ impl Seek for HedgedWriter { } } -pub struct HedgedReader { - inner: Arc, +pub struct HedgedReader { + inner: Arc>, offset: usize, } -impl HedgedReader { - pub fn new(handle: Arc) -> Self { +impl HedgedReader { + pub fn new(handle: Arc>) -> Self { Self { inner: handle, offset: 0, @@ -665,7 +690,7 @@ impl HedgedReader { } } -impl Seek for HedgedReader { +impl Seek for HedgedReader { fn seek(&mut self, pos: SeekFrom) -> IoResult { match pos { SeekFrom::Start(offset) => self.offset = offset as usize, @@ -676,7 +701,7 @@ impl Seek for HedgedReader { } } -impl Read for HedgedReader { +impl Read for HedgedReader { fn read(&mut self, buf: &mut [u8]) -> IoResult { let len = self.inner.read(self.offset, buf)?; self.offset += len;