diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2a5ce83d..f6d967c6 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -60,7 +60,7 @@ jobs: uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: 1.66.0 + toolchain: 1.67.1 override: true components: rustfmt, clippy, rust-src - uses: Swatinem/rust-cache@v1 diff --git a/Cargo.toml b/Cargo.toml index bce02a10..d00b3ffe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "raft-engine" version = "0.4.1" authors = ["The TiKV Project Developers"] edition = "2018" -rust-version = "1.66.0" +rust-version = "1.67.1" description = "A persistent storage engine for Multi-Raft logs" readme = "README.md" repository = "https://github.com/tikv/raft-engine" @@ -95,8 +95,6 @@ nightly_group = ["nightly", "swap"] raft-proto = { git = "https://github.com/tikv/raft-rs", branch = "master" } protobuf = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } protobuf-codegen = { git = "https://github.com/pingcap/rust-protobuf", branch = "v2.8" } -# TODO: Use official grpc-rs once https://github.com/tikv/grpc-rs/pull/622 is merged. -grpcio = { git = "https://github.com/tabokie/grpc-rs", branch = "v0.10.x-win" } [workspace] members = ["stress", "ctl"] diff --git a/src/engine.rs b/src/engine.rs index 0d055296..1a09b397 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -172,7 +172,7 @@ where } perf_context!(log_write_duration).observe_since(now); if sync { - // As per trait protocol, this error should be retriable. But we panic anyway to + // As per trait protocol, sync error should be retriable. But we panic anyway to // save the trouble of propagating it to other group members. self.pipe_log.sync(LogQueue::Append).expect("pipe::sync()"); } diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index 8ba92592..9643bf61 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -43,6 +43,8 @@ pub struct LogFileWriter { capacity: usize, } +// All APIs provided by `LogFileWriter` are fail-safe, i.e. caller can continue +// using the same "writer" even if the previous operation failed. impl LogFileWriter { fn open( handle: Arc, @@ -67,7 +69,7 @@ impl LogFileWriter { } fn write_header(&mut self, format: LogFileFormat) -> IoResult<()> { - self.writer.seek(SeekFrom::Start(0))?; + self.writer.rewind()?; self.written = 0; let mut buf = Vec::with_capacity(LogFileFormat::encoded_len(format.version)); format.encode(&mut buf).unwrap(); @@ -119,7 +121,8 @@ impl LogFileWriter { pub fn sync(&mut self) -> IoResult<()> { let _t = StopWatch::new(&*LOG_SYNC_DURATION_HISTOGRAM); - self.handle.sync()?; + // Panic if sync fails, in case of data loss. + self.handle.sync().unwrap(); Ok(()) } diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 09bc1f42..43b3483a 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -177,8 +177,13 @@ impl SinglePipe { // Skip syncing directory in Windows. Refer to badger's discussion for more // detail: https://github.com/dgraph-io/badger/issues/699 + // + // Panic if sync calls fail, keep consistent with the behavior of + // `LogFileWriter::sync()`. #[cfg(not(windows))] - std::fs::File::open(PathBuf::from(&self.paths[path_id])).and_then(|d| d.sync_all())?; + std::fs::File::open(PathBuf::from(&self.paths[path_id])) + .and_then(|d| d.sync_all()) + .unwrap(); Ok(()) } @@ -321,12 +326,7 @@ impl SinglePipe { fail_point!("file_pipe_log::append"); let mut writable_file = self.writable_file.lock(); if writable_file.writer.offset() >= self.target_file_size { - if let Err(e) = self.rotate_imp(&mut writable_file) { - panic!( - "error when rotate [{:?}:{}]: {e}", - self.queue, writable_file.seq, - ); - } + self.rotate_imp(&mut writable_file)?; } let seq = writable_file.seq; @@ -359,9 +359,7 @@ impl SinglePipe { } let start_offset = writer.offset(); if let Err(e) = writer.write(bytes.as_bytes(&ctx), self.target_file_size) { - if let Err(te) = writer.truncate() { - panic!("error when truncate {seq} after error: {e}, get: {}", te); - } + writer.truncate()?; if is_no_space_err(&e) { // TODO: There exists several corner cases should be tackled if // `bytes.len()` > `target_file_size`. For example, @@ -372,12 +370,7 @@ impl SinglePipe { // - [3] Both main-dir and spill-dir have several recycled logs. // But as `bytes.len()` is always smaller than `target_file_size` in common // cases, this issue will be ignored temprorarily. - if let Err(e) = self.rotate_imp(&mut writable_file) { - panic!( - "error when rotate [{:?}:{}]: {e}", - self.queue, writable_file.seq - ); - } + self.rotate_imp(&mut writable_file)?; // If there still exists free space for this record, rotate the file // and return a special TryAgain Err (for retry) to the caller. return Err(Error::TryAgain(format!( @@ -403,15 +396,9 @@ impl SinglePipe { fn sync(&self) -> Result<()> { let mut writable_file = self.writable_file.lock(); - let seq = writable_file.seq; let writer = &mut writable_file.writer; - { - let _t = StopWatch::new(perf_context!(log_sync_duration)); - if let Err(e) = writer.sync() { - panic!("error when sync [{:?}:{seq}]: {e}", self.queue); - } - } - + let _t = StopWatch::new(perf_context!(log_sync_duration)); + writer.sync().map_err(Error::Io)?; Ok(()) } diff --git a/src/purge.rs b/src/purge.rs index b1183438..b88d462d 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -439,7 +439,7 @@ where )?; let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?; if sync { - self.pipe_log.sync(LogQueue::Rewrite)? + self.pipe_log.sync(LogQueue::Rewrite)?; } log_batch.finish_write(file_handle); self.memtables.apply_rewrite_writes( diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index d24ab049..4383d7b8 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -124,8 +124,7 @@ fn test_file_write_error() { assert_eq!(engine.last_index(2).unwrap(), 1); } -#[test] -fn test_file_rotate_error() { +fn test_file_rotate_error(restart_after_failure: bool) { let dir = tempfile::Builder::new() .prefix("test_file_rotate_error") .tempdir() @@ -138,7 +137,7 @@ fn test_file_rotate_error() { let fs = Arc::new(ObfuscatedFileSystem::default()); let entry = vec![b'x'; 1024]; - let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + let mut engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); engine .write(&mut generate_batch(1, 1, 2, Some(&entry)), false) .unwrap(); @@ -160,27 +159,46 @@ fn test_file_rotate_error() { let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); }) .is_err()); - assert_eq!(engine.file_span(LogQueue::Append).1, 1); } + if restart_after_failure { + drop(engine); + engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + } + assert_eq!(engine.file_span(LogQueue::Append).1, 1); { // Fail to create new log file. let _f = FailGuard::new("default_fs::create::err", "return"); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); - }) - .is_err()); - assert_eq!(engine.file_span(LogQueue::Append).1, 1); + assert!(engine + .write(&mut generate_batch(1, 4, 5, Some(&entry)), false) + .is_err()); } + if restart_after_failure { + drop(engine); + engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + } + let num_files_before = std::fs::read_dir(&dir).unwrap().count(); { // Fail to write header of new log file. let _f = FailGuard::new("log_file::write::err", "1*off->return"); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); - }) - .is_err()); + assert!(engine + .write(&mut generate_batch(1, 4, 5, Some(&entry)), false) + .is_err()); + } + if restart_after_failure { + drop(engine); + engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); + // The new log file is added during recovery phase of restart. + assert_eq!(engine.file_span(LogQueue::Append).1, 2); + } else { assert_eq!(engine.file_span(LogQueue::Append).1, 1); } - { + // Although the header is not written, the file is still created. + assert_eq!( + std::fs::read_dir(&dir).unwrap().count() - num_files_before, + 1 + ); + if !restart_after_failure { + // If the engine restarted, the write does not require sync will succeed. // Fail to sync new log file. The old log file is already sync-ed at this point. let _f = FailGuard::new("log_fd::sync::err", "return"); assert!(catch_unwind_silent(|| { @@ -190,18 +208,39 @@ fn test_file_rotate_error() { assert_eq!(engine.file_span(LogQueue::Append).1, 1); } + // Only one log file should be created after all the incidents. + assert_eq!( + std::fs::read_dir(&dir).unwrap().count() - num_files_before, + 1 + ); // We can continue writing after the incidents. engine .write(&mut generate_batch(2, 1, 2, Some(&entry)), true) .unwrap(); - drop(engine); - let engine = Engine::open_with_file_system(cfg, fs).unwrap(); + if restart_after_failure { + drop(engine); + engine = Engine::open_with_file_system(cfg, fs).unwrap(); + } + assert_eq!( + std::fs::read_dir(&dir).unwrap().count() - num_files_before, + 1 + ); assert_eq!(engine.first_index(1).unwrap(), 1); assert_eq!(engine.last_index(1).unwrap(), 4); assert_eq!(engine.first_index(2).unwrap(), 1); assert_eq!(engine.last_index(2).unwrap(), 1); } +#[test] +fn test_file_rotate_error_without_restart() { + test_file_rotate_error(false); +} + +#[test] +fn test_file_rotate_error_with_restart() { + test_file_rotate_error(true); +} + #[test] fn test_concurrent_write_error() { let dir = tempfile::Builder::new() @@ -262,10 +301,8 @@ fn test_concurrent_write_error() { let _f2 = FailGuard::new("log_file::truncate::err", "return"); let entry_clone = entry.clone(); ctx.write_ext(move |e| { - catch_unwind_silent(|| { - e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false) - }) - .unwrap_err(); + e.write(&mut generate_batch(1, 11, 21, Some(&entry_clone)), false) + .unwrap_err(); }); // We don't test followers, their panics are hard to catch. ctx.join(); @@ -527,7 +564,7 @@ fn test_no_space_write_error() { cfg.dir = dir.path().to_str().unwrap().to_owned(); cfg.spill_dir = Some(spill_dir.path().to_str().unwrap().to_owned()); { - // Case 1: `Write` is abnormal for no space left, Engine should panic at + // Case 1: `Write` is abnormal for no space left, Engine should fail at // `rotate`. let cfg_err = Config { target_file_size: ReadableSize(1), @@ -535,12 +572,9 @@ fn test_no_space_write_error() { }; let engine = Engine::open(cfg_err).unwrap(); let _f = FailGuard::new("log_fd::write::no_space_err", "return"); - assert!(catch_unwind_silent(|| { - engine - .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) - .unwrap_err(); - }) - .is_err()); + assert!(engine + .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) + .is_err()); assert_eq!( 0, engine @@ -554,12 +588,9 @@ fn test_no_space_write_error() { let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off"); let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return"); // The first write should fail, because all dirs run out of space for writing. - assert!(catch_unwind_silent(|| { - engine - .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) - .unwrap_err(); - }) - .is_err()); + assert!(engine + .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) + .is_err()); assert_eq!( 0, engine