Skip to content

Commit

Permalink
Return error instead of panicking if rewriting fails (#343)
Browse files Browse the repository at this point in the history
* Return error instead of panicing if rewriting fails

Signed-off-by: v01dstar <[email protected]>

* Update rust version

Signed-off-by: v01dstar <[email protected]>

* Update rust version in github workflow

Signed-off-by: v01dstar <[email protected]>

* Update src/file_pipe_log/pipe.rs

Co-authored-by: lucasliang <[email protected]>
Signed-off-by: v01dstar <[email protected]>

* Update src/file_pipe_log/pipe.rs

Co-authored-by: lucasliang <[email protected]>
Signed-off-by: v01dstar <[email protected]>

* Address comments, fix test cases

Signed-off-by: v01dstar <[email protected]>

* Fix format error

Signed-off-by: v01dstar <[email protected]>

* Move panic inside

Signed-off-by: Yang Zhang <[email protected]>

* Fix clippy

Signed-off-by: Yang Zhang <[email protected]>

* Propagate error if writing header fails

Signed-off-by: Yang Zhang <[email protected]>

* Adjust write header fail expectation, from panic to error

Signed-off-by: v01dstar <[email protected]>

* Panic if write header fails since we do not truncate

Signed-off-by: v01dstar <[email protected]>

* Failure other than sync should be returned

Signed-off-by: v01dstar <[email protected]>

* Address comments

Signed-off-by: Yang Zhang <[email protected]>

* Fix test failures

Signed-off-by: Yang Zhang <[email protected]>

* Change test exepectations

Signed-off-by: Yang Zhang <[email protected]>

* Address comments

Signed-off-by: Yang Zhang <[email protected]>

* Fix format

Signed-off-by: Yang Zhang <[email protected]>

* Revert sync() signature

Signed-off-by: Yang Zhang <[email protected]>

* Add more details to rotate test

Signed-off-by: Yang Zhang <[email protected]>

* Fix style

Signed-off-by: Yang Zhang <[email protected]>

* Address comments

Signed-off-by: Yang Zhang <[email protected]>

* Address comments

Signed-off-by: Yang Zhang <[email protected]>

* Fix clippy

Signed-off-by: Yang Zhang <[email protected]>

* Trigger Github actions

Signed-off-by: Yang Zhang <[email protected]>

---------

Signed-off-by: v01dstar <[email protected]>
Signed-off-by: Yang Zhang <[email protected]>
Co-authored-by: lucasliang <[email protected]>
  • Loading branch information
v01dstar and LykxSassinator authored Dec 7, 2023
1 parent 385182b commit e8de5d7
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.66.0
toolchain: 1.67.1
override: true
components: rustfmt, clippy, rust-src
- uses: Swatinem/rust-cache@v1
Expand Down
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "raft-engine"
version = "0.4.1"
authors = ["The TiKV Project Developers"]
edition = "2018"
rust-version = "1.66.0"
rust-version = "1.67.1"
description = "A persistent storage engine for Multi-Raft logs"
readme = "README.md"
repository = "https://github.com/tikv/raft-engine"
Expand Down Expand Up @@ -95,8 +95,6 @@ nightly_group = ["nightly", "swap"]
raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" }
protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" }
# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged.
grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" }

[workspace]
members = ["stress", "ctl"]
2 changes: 1 addition & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
}
perf_context!(log_write_duration).observe_since(now);
if sync {
// As per trait protocol, this error should be retriable. But we panic anyway to
// As per trait protocol, sync error should be retriable. But we panic anyway to
// save the trouble of propagating it to other group members.
self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()");
}
Expand Down
7 changes: 5 additions & 2 deletions src/file_pipe_log/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct LogFileWriter<F: FileSystem> {
capacity: usize,
}

// All APIs provided by `LogFileWriter` are fail-safe, i.e. caller can continue
// using the same "writer" even if the previous operation failed.
impl<F: FileSystem> LogFileWriter<F> {
fn open(
handle: Arc<F::Handle>,
Expand All @@ -67,7 +69,7 @@ impl<F: FileSystem> LogFileWriter<F> {
}

fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> {
self.writer.seek(SeekFrom::Start(0))?;
self.writer.rewind()?;
self.written = 0;
let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version));
format.encode(&mut buf).unwrap();
Expand Down Expand Up @@ -119,7 +121,8 @@ impl<F: FileSystem> LogFileWriter<F> {

pub fn sync(&mut self) -> IoResult<()> {
let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM);
self.handle.sync()?;
// Panic if sync fails, in case of data loss.
self.handle.sync().unwrap();
Ok(())
}

Expand Down
35 changes: 11 additions & 24 deletions src/file_pipe_log/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,13 @@ impl<F: FileSystem> SinglePipe<F> {

// Skip syncing directory in Windows. Refer to badger's discussion for more
// detail: https://github.com/dgraph-io/badger/issues/699
//
// Panic if sync calls fail, keep consistent with the behavior of
// `LogFileWriter::sync()`.
#[cfg(not(windows))]
std::fs::File::open(PathBuf::from(&self.paths[path_id])).and_then(|d| d.sync_all())?;
std::fs::File::open(PathBuf::from(&self.paths[path_id]))
.and_then(|d| d.sync_all())
.unwrap();
Ok(())
}

Expand Down Expand Up @@ -321,12 +326,7 @@ impl<F: FileSystem> SinglePipe<F> {
fail_point!("file_pipe_log::append");
let mut writable_file = self.writable_file.lock();
if writable_file.writer.offset() >= self.target_file_size {
if let Err(e) = self.rotate_imp(&mut writable_file) {
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq,
);
}
self.rotate_imp(&mut writable_file)?;
}

let seq = writable_file.seq;
Expand Down Expand Up @@ -359,9 +359,7 @@ impl<F: FileSystem> SinglePipe<F> {
}
let start_offset = writer.offset();
if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) {
if let Err(te) = writer.truncate() {
panic!("error when truncate {seq} after error: {e}, get: {}", te);
}
writer.truncate()?;
if is_no_space_err(&e) {
// TODO: There exists several corner cases should be tackled if
// `bytes.len()` > `target_file_size`. For example,
Expand All @@ -372,12 +370,7 @@ impl<F: FileSystem> SinglePipe<F> {
// - [3] Both main-dir and spill-dir have several recycled logs.
// But as `bytes.len()` is always smaller than `target_file_size` in common
// cases, this issue will be ignored temprorarily.
if let Err(e) = self.rotate_imp(&mut writable_file) {
panic!(
"error when rotate [{:?}:{}]: {e}",
self.queue, writable_file.seq
);
}
self.rotate_imp(&mut writable_file)?;
// If there still exists free space for this record, rotate the file
// and return a special TryAgain Err (for retry) to the caller.
return Err(Error::TryAgain(format!(
Expand All @@ -403,15 +396,9 @@ impl<F: FileSystem> SinglePipe<F> {

fn sync(&self) -> Result<()> {
let mut writable_file = self.writable_file.lock();
let seq = writable_file.seq;
let writer = &mut writable_file.writer;
{
let _t = StopWatch::new(perf_context!(log_sync_duration));
if let Err(e) = writer.sync() {
panic!("error when sync [{:?}:{seq}]: {e}", self.queue);
}
}

let _t = StopWatch::new(perf_context!(log_sync_duration));
writer.sync().map_err(Error::Io)?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ where
)?;
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
if sync {
self.pipe_log.sync(LogQueue::Rewrite)?
self.pipe_log.sync(LogQueue::Rewrite)?;
}
log_batch.finish_write(file_handle);
self.memtables.apply_rewrite_writes(
Expand Down
97 changes: 64 additions & 33 deletions tests/failpoints/test_io_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,7 @@ fn test_file_write_error() {
assert_eq!(engine.last_index(2).unwrap(), 1);
}

#[test]
fn test_file_rotate_error() {
fn test_file_rotate_error(restart_after_failure: bool) {
let dir = tempfile::Builder::new()
.prefix("test_file_rotate_error")
.tempdir()
Expand All @@ -138,7 +137,7 @@ fn test_file_rotate_error() {
let fs = Arc::new(ObfuscatedFileSystem::default());
let entry = vec![b'x'; 1024];

let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
let mut engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
engine
.write(&mut generate_batch(1, 1, 2, Some(&entry)), false)
.unwrap();
Expand All @@ -160,27 +159,46 @@ fn test_file_rotate_error() {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
}
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
{
// Fail to create new log file.
let _f = FailGuard::new("default_fs::create::err", "return");
assert!(catch_unwind_silent(|| {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
assert!(engine
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
.is_err());
}
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
}
let num_files_before = std::fs::read_dir(&dir).unwrap().count();
{
// Fail to write header of new log file.
let _f = FailGuard::new("log_file::write::err", "1*off->return");
assert!(catch_unwind_silent(|| {
let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false);
})
.is_err());
assert!(engine
.write(&mut generate_batch(1, 4, 5, Some(&entry)), false)
.is_err());
}
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap();
// The new log file is added during recovery phase of restart.
assert_eq!(engine.file_span(LogQueue::Append).1, 2);
} else {
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}
{
// Although the header is not written, the file is still created.
assert_eq!(
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
1
);
if !restart_after_failure {
// If the engine restarted, the write does not require sync will succeed.
// Fail to sync new log file. The old log file is already sync-ed at this point.
let _f = FailGuard::new("log_fd::sync::err", "return");
assert!(catch_unwind_silent(|| {
Expand All @@ -190,18 +208,39 @@ fn test_file_rotate_error() {
assert_eq!(engine.file_span(LogQueue::Append).1, 1);
}

// Only one log file should be created after all the incidents.
assert_eq!(
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
1
);
// We can continue writing after the incidents.
engine
.write(&mut generate_batch(2, 1, 2, Some(&entry)), true)
.unwrap();
drop(engine);
let engine = Engine::open_with_file_system(cfg, fs).unwrap();
if restart_after_failure {
drop(engine);
engine = Engine::open_with_file_system(cfg, fs).unwrap();
}
assert_eq!(
std::fs::read_dir(&dir).unwrap().count() - num_files_before,
1
);
assert_eq!(engine.first_index(1).unwrap(), 1);
assert_eq!(engine.last_index(1).unwrap(), 4);
assert_eq!(engine.first_index(2).unwrap(), 1);
assert_eq!(engine.last_index(2).unwrap(), 1);
}

#[test]
fn test_file_rotate_error_without_restart() {
test_file_rotate_error(false);
}

#[test]
fn test_file_rotate_error_with_restart() {
test_file_rotate_error(true);
}

#[test]
fn test_concurrent_write_error() {
let dir = tempfile::Builder::new()
Expand Down Expand Up @@ -262,10 +301,8 @@ fn test_concurrent_write_error() {
let _f2 = FailGuard::new("log_file::truncate::err", "return");
let entry_clone = entry.clone();
ctx.write_ext(move |e| {
catch_unwind_silent(|| {
e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false)
})
.unwrap_err();
e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false)
.unwrap_err();
});
// We don't test followers, their panics are hard to catch.
ctx.join();
Expand Down Expand Up @@ -527,20 +564,17 @@ fn test_no_space_write_error() {
cfg.dir = dir.path().to_str().unwrap().to_owned();
cfg.spill_dir = Some(spill_dir.path().to_str().unwrap().to_owned());
{
// Case 1: `Write` is abnormal for no space left, Engine should panic at
// Case 1: `Write` is abnormal for no space left, Engine should fail at
// `rotate`.
let cfg_err = Config {
target_file_size: ReadableSize(1),
..cfg.clone()
};
let engine = Engine::open(cfg_err).unwrap();
let _f = FailGuard::new("log_fd::write::no_space_err", "return");
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.unwrap_err();
})
.is_err());
assert!(engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.is_err());
assert_eq!(
0,
engine
Expand All @@ -554,12 +588,9 @@ fn test_no_space_write_error() {
let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off");
let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return");
// The first write should fail, because all dirs run out of space for writing.
assert!(catch_unwind_silent(|| {
engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.unwrap_err();
})
.is_err());
assert!(engine
.write(&mut generate_batch(2, 11, 21, Some(&entry)), true)
.is_err());
assert_eq!(
0,
engine
Expand Down

0 comments on commit e8de5d7

Please sign in to comment.