From af5979e0c50d07da919094f9aecca03791cdb09b Mon Sep 17 00:00:00 2001 From: Connor1996 Date: Tue, 5 Sep 2023 14:31:27 +0800 Subject: [PATCH] rename and fix seqno update Signed-off-by: Connor1996 --- src/env/double_write.rs | 72 +++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/src/env/double_write.rs b/src/env/double_write.rs index 2ca93040..7d5bf436 100644 --- a/src/env/double_write.rs +++ b/src/env/double_write.rs @@ -197,8 +197,8 @@ pub struct HedgedFileSystem { sender: HedgedSender, - counter1: Arc, - counter2: Arc, + seqno1: Arc, + seqno2: Arc, handle1: Option>, handle2: Option>, @@ -211,9 +211,9 @@ impl HedgedFileSystem { pub fn new(base: Arc, path1: PathBuf, path2: PathBuf) -> Self { let (tx1, rx1) = unbounded::<(SeqTask, Callback)>(); let (tx2, rx2) = unbounded::<(SeqTask, Callback)>(); - let counter1 = Arc::new(AtomicU64::new(0)); - let counter2 = Arc::new(AtomicU64::new(0)); - let counter1_clone = counter1.clone(); + let seqno1 = Arc::new(AtomicU64::new(0)); + let seqno2 = Arc::new(AtomicU64::new(0)); + let seqno1_clone = seqno1.clone(); let fs1 = base.clone(); let handle1 = thread::spawn(move || { for (task, cb) in rx1 { @@ -223,13 +223,17 @@ impl HedgedFileSystem { fail_point!("double_write::thread1"); let seq = task.seq; let res = task.process(&fs1); - cb(res); if seq != 0 { - counter1_clone.store(seq, Ordering::Relaxed); + // seqno should be updated before the write callback is called, otherwise one + // read may be performed right after the write is finished. Then the read may be + // performed on the other disk not having the data because the seqno for this + // disk is not updated yet. + seqno1_clone.store(seq, Ordering::Relaxed); } + cb(res); } }); - let counter2_clone = counter2.clone(); + let seqno2_clone = seqno2.clone(); let fs2 = base.clone(); let handle2 = thread::spawn(move || { for (task, cb) in rx2 { @@ -238,10 +242,10 @@ impl HedgedFileSystem { } let seq = task.seq; let res = task.process(&fs2); - cb(res); if seq != 0 { - counter2_clone.store(seq, Ordering::Relaxed); + seqno2_clone.store(seq, Ordering::Relaxed); } + cb(res); } }); let sender = HedgedSender::new(tx1, tx2); @@ -250,8 +254,8 @@ impl HedgedFileSystem { path1, path2, sender, - counter1, - counter2, + seqno1, + seqno2, handle1: Some(handle1), handle2: Some(handle2), } @@ -457,8 +461,8 @@ impl HedgedFileSystem { self.sender.clone(), FutureHandle::new_owned(fd), FutureHandle::new(f2), - self.counter1.clone(), - self.counter2.clone(), + self.seqno1.clone(), + self.seqno2.clone(), ) }), res2 = f2 => res2.unwrap().map(|res| { @@ -468,8 +472,8 @@ impl HedgedFileSystem { self.sender.clone(), FutureHandle::new(f1), FutureHandle::new_owned(fd) , - self.counter1.clone(), - self.counter2.clone(), + self.seqno1.clone(), + self.seqno2.clone(), ) }), } @@ -677,8 +681,8 @@ pub struct HedgedHandle { handle1: Arc, handle2: Arc, - counter1: Arc, - counter2: Arc, + seqno1: Arc, + seqno2: Arc, thread1: Option>, thread2: Option>, @@ -690,8 +694,8 @@ impl HedgedHandle { mut sender: HedgedSender, handle1: FutureHandle, handle2: FutureHandle, - mut counter1: Arc, - mut counter2: Arc, + mut seqno1: Arc, + mut seqno2: Arc, ) -> Self { let mut thread1 = None; let mut thread2 = None; @@ -699,9 +703,9 @@ impl HedgedHandle { // use two separated threads for both wait let (tx1, rx1) = unbounded::<(SeqTask, Callback)>(); let (tx2, rx2) = unbounded::<(SeqTask, Callback)>(); - counter1 = Arc::new(AtomicU64::new(0)); - counter2 = Arc::new(AtomicU64::new(0)); - let counter1_clone = counter1.clone(); + seqno1 = Arc::new(AtomicU64::new(0)); + seqno2 = Arc::new(AtomicU64::new(0)); + let seqno1_clone = seqno1.clone(); thread1 = Some(thread::spawn(move || { for (task, cb) in rx1 { if let Task::Stop = task.inner { @@ -709,10 +713,10 @@ impl HedgedHandle { } let res = task.handle_process(); cb(res); - counter1_clone.fetch_add(1, Ordering::Relaxed); + seqno1_clone.fetch_add(1, Ordering::Relaxed); } })); - let counter2_clone = counter2.clone(); + let seqno2_clone = seqno2.clone(); thread2 = Some(thread::spawn(move || { for (task, cb) in rx2 { if let Task::Stop = task.inner { @@ -720,7 +724,7 @@ impl HedgedHandle { } let res = task.handle_process(); cb(res); - counter2_clone.fetch_add(1, Ordering::Relaxed); + seqno2_clone.fetch_add(1, Ordering::Relaxed); } })); sender = HedgedSender::new(tx1, tx2); @@ -731,8 +735,8 @@ impl HedgedHandle { sender, handle1: Arc::new(handle1), handle2: Arc::new(handle2), - counter1, - counter2, + seqno1, + seqno2, thread1, thread2, } @@ -741,9 +745,15 @@ impl HedgedHandle { fn read(&self, offset: usize, buf: &mut [u8]) -> IoResult { // TODO: read simultaneously from both disks // choose latest to perform read - let count1 = self.counter1.load(Ordering::Relaxed); - let count2 = self.counter2.load(Ordering::Relaxed); - match count1.cmp(&count2) { + + // Safety: the get for these two seqno is not necessary to be atomic + // Raft engine promises that the offset would be read only after the write is + // finished and memtable is updated. While the hedged file system promises that + // + // + let seq1 = self.seqno1.load(Ordering::Relaxed); + let seq2 = self.seqno2.load(Ordering::Relaxed); + match seq1.cmp(&seq2) { std::cmp::Ordering::Equal => { if let Some(fd) = self.handle1.try_get() { fd.read(offset, buf)