Skip to content

Commit

Permalink
add recover case and fix cancealed rx
Browse files Browse the repository at this point in the history
Signed-off-by: Connor1996 <[email protected]>
  • Loading branch information
Connor1996 committed Sep 13, 2023
1 parent cef9add commit 135ae19
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 39 deletions.
124 changes: 87 additions & 37 deletions src/env/hedged.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::thread::JoinHandle;
use crate::env::default::LogFd;
use crate::env::DefaultFileSystem;
use crate::env::{FileSystem, Handle, Permission, WriteExt};
use futures::channel::oneshot;
use futures::channel::oneshot::{self, Canceled};
use futures::executor::block_on;
use futures::{join, select};

Expand All @@ -48,6 +48,7 @@ struct SeqTask {
seq: u64,
}

#[derive(Clone)]
enum Task {
Create(PathBuf),
Open {
Expand Down Expand Up @@ -111,31 +112,35 @@ impl SeqTask {
Ok(TaskRes::Snapshot((self.seq, files)))
}
Task::Stop | Task::Pause => unreachable!(),
_ => self.handle_process(),
_ => self.handle_process(file_system),
}
}

fn handle_process(self) -> IoResult<TaskRes> {
fn handle_process(self, file_system: &DefaultFileSystem) -> IoResult<TaskRes> {
match self.inner {
Task::Truncate { handle, offset } => {
handle.get().truncate(offset).map(|_| TaskRes::Truncate)
}
Task::FileSize(handle) => handle.get().file_size().map(|s| TaskRes::FileSize(s)),
Task::Sync(handle) => handle.get().sync().map(|_| TaskRes::Sync),
Task::Truncate { handle, offset } => handle
.get(file_system)
.truncate(offset)
.map(|_| TaskRes::Truncate),
Task::FileSize(handle) => handle
.get(file_system)
.file_size()
.map(|s| TaskRes::FileSize(s)),
Task::Sync(handle) => handle.get(file_system).sync().map(|_| TaskRes::Sync),
Task::Write {
handle,
offset,
bytes,
} => handle
.get()
.get(file_system)
.write(offset, &bytes)
.map(|s| TaskRes::Write(s)),
Task::Allocate {
handle,
offset,
size,
} => handle
.get()
.get(file_system)
.allocate(offset, size)
.map(|_| TaskRes::Allocate),
_ => unreachable!(),
Expand Down Expand Up @@ -222,7 +227,15 @@ fn replace_path(path: &Path, from: &Path, to: &Path) -> PathBuf {

// let say the average entry size is 100B, then the total size of the log in the
// channel is 1GB,
const ABORT_THRESHOLD: usize = 10000;
const PAUSE_THRESHOLD: usize = 10000;

fn get_pause_threshold() -> usize {
fail_point!("hedged::pause_threshold", |s| s
.unwrap()
.parse::<usize>()
.unwrap());
PAUSE_THRESHOLD
}

// Make sure the task is sent to two disks' channel atomically, otherwise the
// ordering of the tasks in two disks' channels are not same.
Expand All @@ -233,7 +246,7 @@ struct HedgedSenderInner {
disk1: Sender<(SeqTask, Callback<TaskRes>)>,
disk2: Sender<(SeqTask, Callback<TaskRes>)>,
seq: u64,
state: RecoveryState,
state: State,
}

impl HedgedSender {
Expand All @@ -245,10 +258,14 @@ impl HedgedSender {
disk1,
disk2,
seq: 0,
state: RecoveryState::Normal,
state: State::Normal,
})))
}

fn state(&self) -> State {
self.0.lock().unwrap().state.clone()
}

fn send(&self, task1: Task, task2: Task, cb1: Callback<TaskRes>, cb2: Callback<TaskRes>) {
if matches!(task1, Task::Pause | Task::Snapshot) {
unreachable!();
Expand All @@ -264,15 +281,15 @@ impl HedgedSender {
inner: task2,
seq: inner.seq,
};
if matches!(inner.state, RecoveryState::Normal) {
let check1 = inner.disk1.len() > ABORT_THRESHOLD;
let check2 = inner.disk2.len() > ABORT_THRESHOLD;
if matches!(inner.state, State::Normal) {
let check1 = inner.disk1.len() > get_pause_threshold();
let check2 = inner.disk2.len() > get_pause_threshold();
match (check1, check2) {
(true, true) => {
panic!("Both channels of disk1 and disk2 are full")
}
(true, false) => {
inner.state = RecoveryState::Paused1;
inner.state = State::Paused1;
inner
.disk1
.send((
Expand All @@ -285,7 +302,7 @@ impl HedgedSender {
.unwrap();
}
(false, true) => {
inner.state = RecoveryState::Paused2;
inner.state = State::Paused2;
inner
.disk2
.send((
Expand All @@ -300,10 +317,10 @@ impl HedgedSender {
_ => {}
}
}
if !matches!(inner.state, RecoveryState::Paused1) {
if !matches!(inner.state, State::Paused1) {
inner.disk1.send((task1, cb1)).unwrap();
}
if !matches!(inner.state, RecoveryState::Paused2) {
if !matches!(inner.state, State::Paused2) {
inner.disk2.send((task2, cb2)).unwrap();
}
}
Expand All @@ -316,24 +333,25 @@ impl HedgedSender {
seq: inner.seq,
};
match inner.state {
RecoveryState::Paused1 => {
State::Paused1 => {
inner.disk2.send((task, cb)).unwrap();
}
RecoveryState::Paused2 => {
State::Paused2 => {
inner.disk1.send((task, cb)).unwrap();
}
_ => unreachable!(),
}
inner.state = RecoveryState::Recovering;
inner.state = State::Recovering;
}

fn finish_snapshot(&self) {
let mut inner = self.0.lock().unwrap();
inner.state = RecoveryState::Normal;
inner.state = State::Normal;
}
}

enum RecoveryState {
#[derive(Debug, PartialEq, Clone)]
pub enum State {
Normal,
Paused1, /* When the length of channel of disk1 reaches threshold, a
* `Pause` task is sent and no more later task will be sent
Expand Down Expand Up @@ -426,7 +444,7 @@ impl TaskRunner {
continue;
}
if self.id == 1 {
fail_point!("double_write::thread1");
fail_point!("hedged::task_runner::thread1");
}
let seq = task.seq;
assert_ne!(seq, 0);
Expand Down Expand Up @@ -498,6 +516,10 @@ impl HedgedFileSystem {
}
}

pub fn state(&self) -> State {
self.sender.state()
}

fn catch_up_diff(
fs: &Arc<DefaultFileSystem>,
mut from_files: Files,
Expand Down Expand Up @@ -677,7 +699,7 @@ impl HedgedFileSystem {
async fn wait_handle(&self, task1: Task, task2: Task) -> IoResult<HedgedHandle> {
let (cb1, mut f1) = paired_future_callback();
let (cb2, mut f2) = paired_future_callback();
self.sender.send(task1, task2, cb1, cb2);
self.sender.send(task1.clone(), task2.clone(), cb1, cb2);

let resolve = |res: TaskRes| -> (LogFd, bool) {
match res {
Expand All @@ -690,20 +712,22 @@ impl HedgedFileSystem {
res1 = f1 => res1.unwrap().map(|res| {
let (fd, is_for_rewrite) = resolve(res);
HedgedHandle::new(
self.base.clone(),
is_for_rewrite,
self.sender.clone(),
FutureHandle::new_owned(fd),
FutureHandle::new(f2),
FutureHandle::new(f2, task2),
self.seqno1.clone(),
self.seqno2.clone(),
)
}),
res2 = f2 => res2.unwrap().map(|res| {
let (fd, is_for_rewrite) = resolve(res);
HedgedHandle::new(
self.base.clone(),
is_for_rewrite,
self.sender.clone(),
FutureHandle::new(f1),
FutureHandle::new(f1, task1),
FutureHandle::new_owned(fd) ,
self.seqno1.clone(),
self.seqno2.clone(),
Expand Down Expand Up @@ -865,6 +889,7 @@ impl FileSystem for HedgedFileSystem {

pub struct FutureHandle {
inner: UnsafeCell<Either<oneshot::Receiver<IoResult<TaskRes>>, Arc<LogFd>>>,
task: Option<Task>,
}

unsafe impl Send for FutureHandle {}
Expand All @@ -878,27 +903,47 @@ unsafe impl Send for FutureHandle {}
unsafe impl Sync for FutureHandle {}

impl FutureHandle {
fn new(rx: oneshot::Receiver<IoResult<TaskRes>>) -> Self {
fn new(rx: oneshot::Receiver<IoResult<TaskRes>>, task: Task) -> Self {
Self {
inner: UnsafeCell::new(Either::Left(rx)),
task: Some(task),
}
}
fn new_owned(h: LogFd) -> Self {
Self {
inner: UnsafeCell::new(Either::Right(Arc::new(h))),
task: None,
}
}

fn get(&self) -> Arc<LogFd> {
fn get(&self, file_system: &DefaultFileSystem) -> Arc<LogFd> {
let mut set = false;
let fd = match unsafe { &mut *self.inner.get() } {
Either::Left(rx) => {
set = true;
// TODO: should we handle the second disk io error
match block_on(rx).unwrap().unwrap() {
TaskRes::Open { fd, .. } => Arc::new(fd),
TaskRes::Create { fd, .. } => Arc::new(fd),
_ => unreachable!(),
match block_on(rx) {
Err(Canceled) => {
// Canceled is caused by the task is dropped when in paused state,
// so we should retry the task now
match self.task.as_ref().unwrap() {
Task::Create(path) => {
// has been already created, so just open
let fd = file_system.open(path, Permission::ReadWrite).unwrap(); // TODO: handle error
Arc::new(fd)
}
Task::Open { path, perm } => {
let fd = file_system.open(path, *perm).unwrap(); // TODO: handle error
Arc::new(fd)
}
_ => unreachable!(),
}
}
Ok(res) => match res.unwrap() {
TaskRes::Open { fd, .. } => Arc::new(fd),
TaskRes::Create { fd, .. } => Arc::new(fd),
_ => unreachable!(),
},
}
}
Either::Right(w) => w.clone(),
Expand Down Expand Up @@ -953,6 +998,7 @@ pub struct HedgedHandle {

impl HedgedHandle {
fn new(
base: Arc<DefaultFileSystem>,
strong_consistent: bool,
mut sender: HedgedSender,
handle1: FutureHandle,
Expand All @@ -972,23 +1018,27 @@ impl HedgedHandle {
seqno1 = Arc::new(AtomicU64::new(0));
seqno2 = Arc::new(AtomicU64::new(0));
let seqno1_clone = seqno1.clone();
let fs1 = base.clone();
thread1 = Some(thread::spawn(move || {
for (task, cb) in rx1 {
if let Task::Stop = task.inner {
break;
}
let res = task.handle_process();
assert!(!matches!(task.inner, Task::Pause | Task::Snapshot));
let res = task.handle_process(&fs1);
seqno1_clone.fetch_add(1, Ordering::Relaxed);
cb(res);
}
}));
let seqno2_clone = seqno2.clone();
let fs2 = base;
thread2 = Some(thread::spawn(move || {
for (task, cb) in rx2 {
if let Task::Stop = task.inner {
break;
}
let res = task.handle_process();
assert!(!matches!(task.inner, Task::Pause | Task::Snapshot));
let res = task.handle_process(&fs2);
seqno2_clone.fetch_add(1, Ordering::Relaxed);
cb(res);
}
Expand Down
2 changes: 2 additions & 0 deletions src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub use default::DefaultFileSystem;
pub use hedged::HedgedFileSystem;
pub use obfuscated::ObfuscatedFileSystem;

pub use hedged::State;

#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Permission {
ReadOnly,
Expand Down
Loading

0 comments on commit 135ae19

Please sign in to comment.