Skip to content

Commit

Permalink
rename and fix seqno update
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <[email protected]>
  • Loading branch information
Connor1996 committed Sep 5, 2023
1 parent f250521 commit af5979e
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions src/env/double_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ pub struct HedgedFileSystem {

sender: HedgedSender,

counter1: Arc<AtomicU64>,
counter2: Arc<AtomicU64>,
seqno1: Arc<AtomicU64>,
seqno2: Arc<AtomicU64>,

handle1: Option<thread::JoinHandle<()>>,
handle2: Option<thread::JoinHandle<()>>,
Expand All @@ -211,9 +211,9 @@ impl HedgedFileSystem {
pub fn new(base: Arc<DefaultFileSystem>, path1: PathBuf, path2: PathBuf) -> Self {
let (tx1, rx1) = unbounded::<(SeqTask, Callback<TaskRes>)>();
let (tx2, rx2) = unbounded::<(SeqTask, Callback<TaskRes>)>();
let counter1 = Arc::new(AtomicU64::new(0));
let counter2 = Arc::new(AtomicU64::new(0));
let counter1_clone = counter1.clone();
let seqno1 = Arc::new(AtomicU64::new(0));
let seqno2 = Arc::new(AtomicU64::new(0));
let seqno1_clone = seqno1.clone();
let fs1 = base.clone();
let handle1 = thread::spawn(move || {
for (task, cb) in rx1 {
Expand All @@ -223,13 +223,17 @@ impl HedgedFileSystem {
fail_point!("double_write::thread1");
let seq = task.seq;
let res = task.process(&fs1);
cb(res);
if seq != 0 {
counter1_clone.store(seq, Ordering::Relaxed);
// seqno should be updated before the write callback is called, otherwise one
// read may be performed right after the write is finished. Then the read may be
// performed on the other disk not having the data because the seqno for this
// disk is not updated yet.
seqno1_clone.store(seq, Ordering::Relaxed);
}
cb(res);
}
});
let counter2_clone = counter2.clone();
let seqno2_clone = seqno2.clone();
let fs2 = base.clone();
let handle2 = thread::spawn(move || {
for (task, cb) in rx2 {
Expand All @@ -238,10 +242,10 @@ impl HedgedFileSystem {
}
let seq = task.seq;
let res = task.process(&fs2);
cb(res);
if seq != 0 {
counter2_clone.store(seq, Ordering::Relaxed);
seqno2_clone.store(seq, Ordering::Relaxed);
}
cb(res);
}
});
let sender = HedgedSender::new(tx1, tx2);
Expand All @@ -250,8 +254,8 @@ impl HedgedFileSystem {
path1,
path2,
sender,
counter1,
counter2,
seqno1,
seqno2,
handle1: Some(handle1),
handle2: Some(handle2),
}
Expand Down Expand Up @@ -457,8 +461,8 @@ impl HedgedFileSystem {
self.sender.clone(),
FutureHandle::new_owned(fd),
FutureHandle::new(f2),
self.counter1.clone(),
self.counter2.clone(),
self.seqno1.clone(),
self.seqno2.clone(),
)
}),
res2 = f2 => res2.unwrap().map(|res| {
Expand All @@ -468,8 +472,8 @@ impl HedgedFileSystem {
self.sender.clone(),
FutureHandle::new(f1),
FutureHandle::new_owned(fd) ,
self.counter1.clone(),
self.counter2.clone(),
self.seqno1.clone(),
self.seqno2.clone(),
)
}),
}
Expand Down Expand Up @@ -677,8 +681,8 @@ pub struct HedgedHandle {
handle1: Arc<FutureHandle>,
handle2: Arc<FutureHandle>,

counter1: Arc<AtomicU64>,
counter2: Arc<AtomicU64>,
seqno1: Arc<AtomicU64>,
seqno2: Arc<AtomicU64>,

thread1: Option<JoinHandle<()>>,
thread2: Option<JoinHandle<()>>,
Expand All @@ -690,37 +694,37 @@ impl HedgedHandle {
mut sender: HedgedSender,
handle1: FutureHandle,
handle2: FutureHandle,
mut counter1: Arc<AtomicU64>,
mut counter2: Arc<AtomicU64>,
mut seqno1: Arc<AtomicU64>,
mut seqno2: Arc<AtomicU64>,
) -> Self {
let mut thread1 = None;
let mut thread2 = None;
if strong_consistent {
// use two separated threads for both wait
let (tx1, rx1) = unbounded::<(SeqTask, Callback<TaskRes>)>();
let (tx2, rx2) = unbounded::<(SeqTask, Callback<TaskRes>)>();
counter1 = Arc::new(AtomicU64::new(0));
counter2 = Arc::new(AtomicU64::new(0));
let counter1_clone = counter1.clone();
seqno1 = Arc::new(AtomicU64::new(0));
seqno2 = Arc::new(AtomicU64::new(0));
let seqno1_clone = seqno1.clone();
thread1 = Some(thread::spawn(move || {
for (task, cb) in rx1 {
if let Task::Stop = task.inner {
break;
}
let res = task.handle_process();
cb(res);
counter1_clone.fetch_add(1, Ordering::Relaxed);
seqno1_clone.fetch_add(1, Ordering::Relaxed);
}
}));
let counter2_clone = counter2.clone();
let seqno2_clone = seqno2.clone();
thread2 = Some(thread::spawn(move || {
for (task, cb) in rx2 {
if let Task::Stop = task.inner {
break;
}
let res = task.handle_process();
cb(res);
counter2_clone.fetch_add(1, Ordering::Relaxed);
seqno2_clone.fetch_add(1, Ordering::Relaxed);
}
}));
sender = HedgedSender::new(tx1, tx2);
Expand All @@ -731,8 +735,8 @@ impl HedgedHandle {
sender,
handle1: Arc::new(handle1),
handle2: Arc::new(handle2),
counter1,
counter2,
seqno1,
seqno2,
thread1,
thread2,
}
Expand All @@ -741,9 +745,15 @@ impl HedgedHandle {
fn read(&self, offset: usize, buf: &mut [u8]) -> IoResult<usize> {
// TODO: read simultaneously from both disks
// choose latest to perform read
let count1 = self.counter1.load(Ordering::Relaxed);
let count2 = self.counter2.load(Ordering::Relaxed);
match count1.cmp(&count2) {

// Safety: the get for these two seqno is not necessary to be atomic
// Raft engine promises that the offset would be read only after the write is
// finished and memtable is updated. While the hedged file system promises that
//
//
let seq1 = self.seqno1.load(Ordering::Relaxed);
let seq2 = self.seqno2.load(Ordering::Relaxed);
match seq1.cmp(&seq2) {
std::cmp::Ordering::Equal => {
if let Some(fd) = self.handle1.try_get() {
fd.read(offset, buf)
Expand Down

0 comments on commit af5979e

Please sign in to comment.