diff --git a/src/env/double_write.rs b/src/env/double_write.rs index 14a077d8..f3e2d339 100644 --- a/src/env/double_write.rs +++ b/src/env/double_write.rs @@ -23,7 +23,6 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Mutex; -use std::sync::RwLock; use std::thread; use std::thread::JoinHandle; @@ -71,12 +70,12 @@ enum Task { size: usize, }, Pause, - Snapshot(PathBuf), + Snapshot, Stop, } impl SeqTask { - fn process(self, file_system: &DefaultFileSystem) -> IoResult { + fn process(self, file_system: &DefaultFileSystem, path: &PathBuf) -> IoResult { match self.inner { Task::Create(path) => file_system.create(&path).map(|h| TaskRes::Create { fd: h, @@ -90,7 +89,7 @@ impl SeqTask { Task::Rename { src_path, dst_path } => file_system .rename(src_path, dst_path) .map(|_| TaskRes::Rename), - Task::Snapshot(path) => { + Task::Snapshot => { let mut files = HedgedFileSystem::get_files(&path).unwrap(); // TODO: handle error files.append_files = files .append_files @@ -103,7 +102,7 @@ impl SeqTask { .into_iter() .map(|f| f.into_handle(file_system)) .collect(); - Ok(TaskRes::Snapshot(files)) + Ok(TaskRes::Snapshot((self.seq, files))) } Task::Stop | Task::Pause => unreachable!(), _ => self.handle_process(), @@ -148,7 +147,7 @@ enum TaskRes { Sync, Write(usize), Allocate, - Snapshot(Files), + Snapshot((u64, Files)), Stop, } @@ -228,29 +227,24 @@ struct HedgedSenderInner { disk1: Sender<(SeqTask, Callback)>, disk2: Sender<(SeqTask, Callback)>, seq: u64, - state: Arc>, + state: RecoveryState, } impl HedgedSender { fn new( disk1: Sender<(SeqTask, Callback)>, disk2: Sender<(SeqTask, Callback)>, - state: Arc>, ) -> Self { Self(Arc::new(Mutex::new(HedgedSenderInner { disk1, disk2, seq: 0, - state, + state: RecoveryState::Normal, }))) } - fn state(&self) -> Arc> { - self.0.lock().unwrap().state.clone() - } - fn send(&self, task1: Task, task2: Task, cb1: Callback, cb2: Callback) { - if matches!(task1, Task::Pause | Task::Snapshot(_)) { + if matches!(task1, Task::Pause | Task::Snapshot) { unreachable!(); } @@ -264,8 +258,7 @@ impl HedgedSender { inner: task2, seq: inner.seq, }; - let state = inner.state.read().unwrap(); - if matches!(*state, RecoveryState::Normal) { + if matches!(inner.state, RecoveryState::Normal) { let check1 = inner.disk1.len() > ABORT_THRESHOLD; let check2 = inner.disk2.len() > ABORT_THRESHOLD; match (check1, check2) { @@ -273,7 +266,7 @@ impl HedgedSender { panic!("Both channels of disk1 and disk2 are full") } (true, false) => { - *inner.state.write().unwrap() = RecoveryState::Paused1; + inner.state = RecoveryState::Paused1; inner .disk1 .send(( @@ -286,7 +279,7 @@ impl HedgedSender { .unwrap(); } (false, true) => { - *inner.state.write().unwrap() = RecoveryState::Paused2; + inner.state = RecoveryState::Paused2; inner .disk2 .send(( @@ -301,47 +294,36 @@ impl HedgedSender { _ => {} } } - if !matches!( - *state, - RecoveryState::Paused1 | RecoveryState::WaitRecover1(_) - ) { + if !matches!(inner.state, RecoveryState::Paused1) { inner.disk1.send((task1, cb1)).unwrap(); } - if !matches!( - *state, - RecoveryState::Paused2 | RecoveryState::WaitRecover2(_) - ) { + if !matches!(inner.state, RecoveryState::Paused2) { inner.disk2.send((task2, cb2)).unwrap(); } } - fn send_snapshot(&self, index: u8, task: Task, cb: Callback) { - assert!(matches!(task, Task::Snapshot(_))); - - let inner = self.0.lock().unwrap(); - if index == 1 { - inner - .disk1 - .send(( - SeqTask { - inner: task, - seq: 0, - }, - cb, - )) - .unwrap(); - } else { - inner - .disk2 - .send(( - SeqTask { - inner: task, - seq: 0, - }, - cb, - )) - .unwrap(); + fn send_snapshot(&self, cb: Callback) { + let mut inner = self.0.lock().unwrap(); + inner.seq += 1; + let task = SeqTask { + inner: Task::Snapshot, + seq: inner.seq, + }; + match inner.state { + RecoveryState::Paused1 => { + inner.disk1.send((task, cb)).unwrap(); + } + RecoveryState::Paused2 => { + inner.disk2.send((task, cb)).unwrap(); + } + _ => unreachable!(), } + inner.state = RecoveryState::Recovering; + } + + fn finish_snapshot(&self) { + let mut inner = self.0.lock().unwrap(); + inner.state = RecoveryState::Normal; } } @@ -351,8 +333,6 @@ enum RecoveryState { * `Pause` task is sent and no more later task will be sent * to disk1 */ Paused2, // no more task will be sent to disk2 - WaitRecover1(oneshot::Sender<()>), - WaitRecover2(oneshot::Sender<()>), Recovering, } @@ -369,8 +349,6 @@ pub struct HedgedFileSystem { thread1: Option>, thread2: Option>, - - state: Arc>, } struct TaskRunner { @@ -378,8 +356,8 @@ struct TaskRunner { path: PathBuf, fs: Arc, rx: Receiver<(SeqTask, Callback)>, + sender: HedgedSender, seqno: Arc, - state: Arc>, } impl TaskRunner { @@ -388,16 +366,16 @@ impl TaskRunner { path: PathBuf, fs: Arc, rx: Receiver<(SeqTask, Callback)>, + sender: HedgedSender, seqno: Arc, - state: Arc>, ) -> Self { Self { id, path, fs, rx, + sender, seqno, - state, } } @@ -405,35 +383,66 @@ impl TaskRunner { thread::Builder::new() .name(format!("raft-engine-disk{}", self.id)) .spawn(move || { + let mut last_seq = 0; + let mut snap_seq = None; for (task, cb) in self.rx { if let Task::Stop = task.inner { cb(Ok(TaskRes::Stop)); break; } if let Task::Pause = task.inner { - let (tx, rx) = oneshot::channel(); - *self.state.write().unwrap() = if self.id == 1 { - RecoveryState::WaitRecover1(tx) - } else { - RecoveryState::WaitRecover2(tx) - }; - let _ = block_on(rx); - // indicate the pause is done - // do not update seqno for pause task + // Encountering `Pause`, indicate the disk may not slow anymore + let (cb, f) = paired_future_callback(); + self.sender.send_snapshot(cb); + let to_files = HedgedFileSystem::get_files(&self.path).unwrap(); // TODO: handle error + let from_files = block_on(f) + .unwrap() + .map(|res| { + if let TaskRes::Snapshot((seq, files)) = res { + snap_seq = Some(seq); + files + } else { + unreachable!() + } + }) + .unwrap(); // TODO: handle error + + // Snapshot doesn't include the file size, so it would copy more data than + // the data seen at the time of snapshot. But it's okay, as the data is + // written with specific offset, so the data written + // of no necessity will be overwritten by the latter writes. + // Exclude rewrite files because rewrite files are always synced. + HedgedFileSystem::catch_up_diff(&self.fs, from_files, to_files, true); + + self.sender.finish_snapshot(); + self.seqno.store(snap_seq.unwrap(), Ordering::Relaxed); + last_seq = snap_seq.unwrap(); continue; } if self.id == 1 { fail_point!("double_write::thread1"); } let seq = task.seq; - let res = task.process(&self.fs); + assert_ne!(seq, 0); + if let Some(snap) = snap_seq.as_ref() { + // the change already included in the snapshot + if seq + 1 < *snap { + } else if seq + 1 == *snap { + snap_seq = None; + } else { + panic!("seqno {} is larger than snapshot seqno {}", seq, *snap); + } + continue; + } + + assert_eq!(last_seq + 1, seq); + last_seq = seq; + let res = task.process(&self.fs, &self.path); // seqno should be updated before the write callback is called, otherwise one // read may be performed right after the write is finished. Then the read may be // performed on the other disk not having the data because the seqno for this // disk is not updated yet. - if seq != 0 { - self.seqno.store(seq, Ordering::Relaxed); - } + self.seqno.store(seq, Ordering::Relaxed); cb(res); } }) @@ -447,7 +456,8 @@ impl HedgedFileSystem { pub fn new(base: Arc, path1: PathBuf, path2: PathBuf) -> Self { let (tx1, rx1) = unbounded::<(SeqTask, Callback)>(); let (tx2, rx2) = unbounded::<(SeqTask, Callback)>(); - let state = Arc::new(RwLock::new(RecoveryState::Normal)); + let sender = HedgedSender::new(tx1, tx2); + let seqno1 = Arc::new(AtomicU64::new(0)); let seqno2 = Arc::new(AtomicU64::new(0)); let runner1 = TaskRunner::new( @@ -455,20 +465,19 @@ impl HedgedFileSystem { path1.clone(), base.clone(), rx1, + sender.clone(), seqno1.clone(), - state.clone(), ); let runner2 = TaskRunner::new( 2, path2.clone(), base.clone(), rx2, + sender.clone(), seqno2.clone(), - state.clone(), ); let thread1 = runner1.spawn(); let thread2 = runner2.spawn(); - let sender = HedgedSender::new(tx1, tx2, state.clone()); Self { base, path1, @@ -478,12 +487,11 @@ impl HedgedFileSystem { seqno2, thread1: Some(thread1), thread2: Some(thread2), - state, } } fn catch_up_diff( - &self, + fs: &Arc, mut from_files: Files, mut to_files: Files, skip_rewrite: bool, @@ -519,7 +527,7 @@ impl HedgedFileSystem { from_files.prefix.as_ref(), to_files.prefix.as_ref(), ); - f1.copy(&self.base, &to)?; + f1.copy(fs, &to)?; iter1.next(); } (None, Some(f2)) => { @@ -538,7 +546,7 @@ impl HedgedFileSystem { from_files.prefix.as_ref(), to_files.prefix.as_ref(), ); - f1.copy(&self.base, &to)?; + f1.copy(fs, &to)?; } iter1.next(); iter2.next(); @@ -549,7 +557,7 @@ impl HedgedFileSystem { from_files.prefix.as_ref(), to_files.prefix.as_ref(), ); - f1.copy(&self.base, &to)?; + f1.copy(fs, &to)?; iter1.next(); } std::cmp::Ordering::Greater => { @@ -761,68 +769,28 @@ impl RecoverExt for HedgedFileSystem { match count1.cmp(&count2) { std::cmp::Ordering::Equal => { // still need to catch up, but only diff - self.catch_up_diff(files1, files2, false)?; + HedgedFileSystem::catch_up_diff(&self.base, files1, files2, false)?; return Ok(()); } std::cmp::Ordering::Less => { - self.catch_up_diff(files2, files1, false)?; + HedgedFileSystem::catch_up_diff(&self.base, files2, files1, false)?; } std::cmp::Ordering::Greater => { - self.catch_up_diff(files1, files2, false)?; + HedgedFileSystem::catch_up_diff(&self.base, files1, files2, false)?; } } Ok(()) } fn need_recover(&self) -> bool { - // in wait recover state, the task should be still dropped - let res = matches!( - *self.state.read().unwrap(), - RecoveryState::WaitRecover1(_) | RecoveryState::WaitRecover2(_) - ); - if res { - // in recovering stat, the task can keep sending - *self.state.write().unwrap() = RecoveryState::Recovering; - } - res + false } fn is_in_recover(&self) -> bool { - matches!(*self.state.read().unwrap(), RecoveryState::Recovering) + false } - fn trigger_recover(&self) { - // let (cb, f) = paired_future_callback(); - // let to_files = match *self.state.read().unwrap() { - // RecoveryState::WaitRecover1(tx) => { - // self.sender - // .send_snapshot(1, Task::Snapshot(self.path2.clone()), cb); - // self.get_files(&self.path1).unwrap() // TODO: handle error - // } - // RecoveryState::WaitRecover2(tx) => { - // self.sender - // .send_snapshot(2, Task::Snapshot(self.path1.clone()), cb); - // self.get_files(&self.path2).unwrap() - // } - // _ => unreachable!(), - // }; - - // let from_files = block_on(f).unwrap().map(|res| { - // if let TaskRes::Snapshot(files) = res { - // files - // } else { - // unreachable!() - // } - // }).unwrap(); // TODO: handle error - - // TODO: async - // exclude rewrite files because rewrite files are always synced - // self.catch_up_diff(from_files, to_files, true); - - // when - *self.state.write().unwrap() = RecoveryState::Normal; - // tx.send(()).unwrap(); - } + fn trigger_recover(&self) {} } impl FileSystem for HedgedFileSystem { @@ -1017,7 +985,7 @@ impl HedgedHandle { cb(res); } })); - sender = HedgedSender::new(tx1, tx2, sender.state()); + sender = HedgedSender::new(tx1, tx2); } Self {