From be9ad751c8b0291b3e5b868085a4affaf8e72443 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 30 Sep 2022 14:46:11 +0530 Subject: [PATCH 1/4] Detect unexpected eof while reading record header in store.read(). Make tests check records read before sync --- src/commit_log/glommio_impl/store.rs | 16 +++++++++------- src/commit_log/mod.rs | 7 ++++++- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/src/commit_log/glommio_impl/store.rs b/src/commit_log/glommio_impl/store.rs index 334205fe9..e50fca936 100644 --- a/src/commit_log/glommio_impl/store.rs +++ b/src/commit_log/glommio_impl/store.rs @@ -345,16 +345,18 @@ mod tests { record_positions_and_sizes.push(store.append(&record).await.unwrap()); } - let mut read_records = Vec::with_capacity(RECORDS.len()); + let mut records_read_before_sync = Vec::with_capacity(RECORDS.len()); - for (position, _written_bytes) in &record_positions_and_sizes[0..RECORDS.len() / 2] - { - let (record, _next_record_offset) = store.read(position.clone()).await.unwrap(); - read_records.push(record); + for (position, _written_bytes) in &record_positions_and_sizes { + if let Ok((record, _next_record_offset)) = store.read(position.clone()).await { + records_read_before_sync.push(record); + } else { + break; + } } - for i in 0..RECORDS.len() / 2 { - assert_eq!(read_records[i].deref(), RECORDS[i]); + for i in 0..records_read_before_sync.len() { + assert_eq!(records_read_before_sync[i].deref(), RECORDS[i]); } let mut i = 0; diff --git a/src/commit_log/mod.rs b/src/commit_log/mod.rs index 9ca819978..6176a569f 100644 --- a/src/commit_log/mod.rs +++ b/src/commit_log/mod.rs @@ -164,6 +164,7 @@ pub mod store { /// └─────────────────────────┴────────────────────────┴───────────────────────┘ /// │─────────────── RecordHeader ─────────────────────│ /// ``` + #[derive(Debug)] pub struct RecordHeader { /// checksum computed from the bytes in the record. pub checksum: u32, @@ -204,7 +205,11 @@ pub mod store { let checksum = cursor.read_u32::()?; let length = cursor.read_u32::()?; - Ok(Self { checksum, length }) + if checksum == 0 && length == 0 { + Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof)) + } else { + Ok(Self { checksum, length }) + } } /// Serializes this given record header to an owned byte array. From 5032df8a5cf40c82dff2f1abcb60be36d2aea0f7 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 12 Oct 2022 19:50:34 +0530 Subject: [PATCH 2/4] Rough commit with duplicate entitites for supporting Cow<'_, &[u8]> version of record value. --- src/commit_log/glommio_impl/segment.rs | 125 +++++++++++++++++++++++++ src/commit_log/mod.rs | 118 ++++++++++++++++++++++- 2 files changed, 242 insertions(+), 1 deletion(-) diff --git a/src/commit_log/glommio_impl/segment.rs b/src/commit_log/glommio_impl/segment.rs index 75af05a63..b041fc106 100644 --- a/src/commit_log/glommio_impl/segment.rs +++ b/src/commit_log/glommio_impl/segment.rs @@ -204,4 +204,129 @@ mod tests { .unwrap(); local_ex.join().unwrap(); } + + #[test] + fn test_segment_reads_reflect_appends_() { + let test_file_path = + PathBuf::from(test_file_path_string("test_segment_reads_reflect_appends_")); + + if test_file_path.exists() { + fs::remove_file(&test_file_path).unwrap(); + } + + let local_ex = LocalExecutorBuilder::new(Placement::Fixed(1)) + .spawn(move || async move { + const RECORD_VALUE: &[u8] = b"Hello World!"; + let record = Record { + value: Vec::from(RECORD_VALUE), + offset: 0, + }; + let record_representation_size = bincoded_serialized_record_size(&record).unwrap(); + let expected_segment_size: u64 = 2 * record_representation_size; + + let mut segment = Segment::new( + test_file_path.clone(), + 0, + SegmentConfig { + store_buffer_size: 512, + max_store_bytes: expected_segment_size, + }, + ) + .await + .unwrap(); + + assert!(matches!( + segment.read_(segment.next_offset()).await, + Err(SegmentError::OffsetOutOfBounds) + )); + + let offset_1 = segment.append_(RECORD_VALUE).await.unwrap(); + assert_eq!(offset_1, 0); + assert_eq!(segment.next_offset(), record_representation_size); + + assert!(matches!( + segment.read(segment.next_offset()).await, + Err(SegmentError::OffsetOutOfBounds) + )); + + assert!(matches!( + segment.advance_to_offset(segment.next_offset()), + Ok(_) + )); + + let offset_2 = segment.append_(RECORD_VALUE).await.unwrap(); + assert_eq!(offset_2, record_representation_size); + + assert_eq!(segment.size(), expected_segment_size); + assert!(segment.is_maxed()); + + // close segment to ensure that the records are presisted + segment.close().await.unwrap(); + + let mut segment = Segment::new( + test_file_path.clone(), + 0, + SegmentConfig { + store_buffer_size: 512, + max_store_bytes: expected_segment_size, + }, + ) + .await + .unwrap(); + + assert_eq!(segment.size(), expected_segment_size); + assert!(segment.is_maxed()); + + assert!(matches!( + segment.append_(RECORD_VALUE).await, + Err(SegmentError::SegmentMaxed) + )); + + let (record_1, record_1_next_record_offset) = + segment.read_(offset_1).await.unwrap(); + assert_eq!(record_1.offset, offset_1); + assert_eq!(record_1.value, RECORD_VALUE); + assert_eq!(record_1_next_record_offset, offset_2); + + let (record_2, record_2_next_record_offset) = + segment.read_(offset_2).await.unwrap(); + assert_eq!(record_2.offset, offset_2); + assert_eq!(record_2.value, RECORD_VALUE); + assert_eq!(record_2_next_record_offset, segment.next_offset()); + + // read at invalid loacation + assert!(matches!( + segment.read_(offset_2 + 1).await, + Err(SegmentError::StoreError(_)) + )); + + assert!(matches!( + segment.read_(segment.next_offset()).await, + Err(SegmentError::OffsetOutOfBounds) + )); + + let records = vec![record_1, record_2]; + + let mut segment_scanner = + crate::commit_log::segment::SegmentScanner_::new(&segment).unwrap(); + let mut i = 0; + while let Some(record) = segment_scanner.next().await { + assert_eq!(record, records[i]); + + i += 1; + } + assert_eq!(i, records.len()); + + assert!(matches!( + segment.advance_to_offset(segment.next_offset() + 1), + Err(SegmentError::OffsetBeyondCapacity) + )); + + segment.remove().await.unwrap(); + + assert!(!test_file_path.exists()); + }) + .unwrap(); + local_ex.join().unwrap(); + } } diff --git a/src/commit_log/mod.rs b/src/commit_log/mod.rs index 6176a569f..c7fde2f19 100644 --- a/src/commit_log/mod.rs +++ b/src/commit_log/mod.rs @@ -11,6 +11,8 @@ //! In the context of `laminarmq` this module is intended to provide the storage for individual //! partitions in a topic. +use std::borrow::Cow; + use async_trait::async_trait; /// Represents a record in a [`CommitLog`]. @@ -24,6 +26,16 @@ pub struct Record { pub offset: u64, } +#[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq, Eq, Clone)] +pub struct Record_<'a> { + /// Value stored in this record entry. The value itself might be serialized bytes of some other + /// form of record. + pub value: Cow<'a, [u8]>, + + /// Offset at which this record is stored in the log. + pub offset: u64, +} + /// Abstraction for representing all types that can asynchronously linearly scanned for items. #[async_trait(?Send)] pub trait Scanner { @@ -248,7 +260,7 @@ pub mod segment { use super::{ store::{Store, StoreScanner}, - Record, Scanner, + Record, Record_, Scanner, }; /// Error type used for operations on a [`Segment`]. @@ -470,6 +482,31 @@ pub mod segment { Ok(current_offset) } + pub async fn append_(&mut self, record_bytes: &[u8]) -> Result> { + if self.is_maxed() { + return Err(SegmentError::SegmentMaxed); + } + + let current_offset = self.next_offset; + let record = Record_ { + value: record_bytes.into(), + offset: current_offset, + }; + + let bincoded_record = + bincode::serialize(&record).map_err(|_x| SegmentError::SerializationError)?; + + let (_, bytes_written) = self + .store + .append(&bincoded_record) + .await + .map_err(SegmentError::StoreError)?; + + self.next_offset += bytes_written as u64; + + Ok(current_offset) + } + pub fn offset_within_bounds(&self, offset: u64) -> bool { offset < self.next_offset() } @@ -511,6 +548,27 @@ pub mod segment { Ok((record, self.base_offset() + next_record_position)) } + pub async fn read_(&self, offset: u64) -> Result<(Record_, u64), SegmentError> { + if !self.offset_within_bounds(offset) { + return Err(SegmentError::OffsetOutOfBounds); + } + + let position = self + .store_position(offset) + .ok_or(SegmentError::OffsetBeyondCapacity)?; + + let (record_bytes, next_record_position) = self + .store + .read(position) + .await + .map_err(SegmentError::StoreError)?; + + let record: Record_ = bincode::deserialize(&record_bytes) + .map_err(|_x| SegmentError::SerializationError)?; + + Ok((record, self.base_offset() + next_record_position)) + } + /// Advances this [`Segment`] instance's `next_offset` value to the given value. /// This method simply returns [`Ok`] if `new_next_offset <= next_offset`. /// @@ -623,6 +681,64 @@ pub mod segment { } } + pub struct SegmentScanner_<'a, T, S> + where + T: Deref, + S: Store, + { + store_scanner: StoreScanner<'a, T, S>, + } + + impl<'a, T, S> SegmentScanner_<'a, T, S> + where + T: Deref, + S: Store, + { + /// Creates a new [`SegmentScanner`] that starts reading from the given segments `base_offset`. + pub fn new(segment: &'a Segment) -> Result> { + Self::with_offset(segment, segment.base_offset()) + } + + /// Creates a new [`SegmentScanner`] that starts reading from the given offset. + /// + /// ## Errors + /// - [`SegmentError::OffsetOutOfBounds`] if the given `offset >= segment.next_offset()` + /// - [`SegmentError::OffsetBeyondCapacity`] if the given offset doesn't map to a valid + /// location on the [`Segment`] instances underlying store. + pub fn with_offset( + segment: &'a Segment, + offset: u64, + ) -> Result> { + if !segment.offset_within_bounds(offset) { + return Err(SegmentError::OffsetOutOfBounds); + } + + Ok(Self { + store_scanner: StoreScanner::with_position( + segment.store(), + segment + .store_position(offset) + .ok_or(SegmentError::OffsetBeyondCapacity)?, + ), + }) + } + } + + #[async_trait(?Send)] + impl<'a, T, S> Scanner for SegmentScanner_<'a, T, S> + where + T: Deref + Unpin, + S: Store, + { + type Item = super::Record_<'a>; + + async fn next(&mut self) -> Option { + self.store_scanner + .next() + .await + .and_then(|record_bytes| bincode::deserialize(&record_bytes).ok()) + } + } pub mod config { //! Module providing types for configuring [`Segment`](super::Segment) instances. use serde::{Deserialize, Serialize}; From 841cf4b4e4c0107f223817190acf594e7a5fce55 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Wed, 12 Oct 2022 20:00:17 +0530 Subject: [PATCH 3/4] Increased expiry duration to correctly detect expired segments. --- src/commit_log/glommio_impl/segmented_log.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/commit_log/glommio_impl/segmented_log.rs b/src/commit_log/glommio_impl/segmented_log.rs index a8aae149f..0df7bd81f 100644 --- a/src/commit_log/glommio_impl/segmented_log.rs +++ b/src/commit_log/glommio_impl/segmented_log.rs @@ -212,7 +212,7 @@ mod tests { log.append(&mut record).await.unwrap(); } - let expiry_duration = Duration::from_millis(100); + let expiry_duration = Duration::from_millis(200); glommio::timer::sleep(expiry_duration).await; From 97ebb13d627a58889adc12d8c714836238a62aa9 Mon Sep 17 00:00:00 2001 From: Arindam Das Date: Fri, 14 Oct 2022 11:03:06 +0530 Subject: [PATCH 4/4] Use a Copy-on-write pointer instead of an owned vec in Record for storing data. --- README.md | 3 +- src/commit_log/glommio_impl/mod.rs | 2 +- src/commit_log/glommio_impl/segment.rs | 139 +------------------ src/commit_log/glommio_impl/segmented_log.rs | 34 +++-- src/commit_log/mod.rs | 134 ++---------------- 5 files changed, 35 insertions(+), 277 deletions(-) diff --git a/README.md b/README.md index 2698d4abc..e04dbcdd4 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,8 @@ In order to use `laminarmq` as a library, add the following to your `Cargo.toml` [dependencies] laminarmq = "0.0.2" ``` -Refer to [API Documentation](https://arindas.github.io/laminarmq/laminarmq/) for more details. +Refer to latest git [API Documentation](https://arindas.github.io/laminarmq/laminarmq/) +or [Crate Documentation](https://docs.rs/laminarmq) for more details. ## Planned Architecture This section presents a brief overview on the different aspects of our message queue. This is only an outline of diff --git a/src/commit_log/glommio_impl/mod.rs b/src/commit_log/glommio_impl/mod.rs index 7f9e55eee..905c21ca2 100644 --- a/src/commit_log/glommio_impl/mod.rs +++ b/src/commit_log/glommio_impl/mod.rs @@ -46,7 +46,7 @@ //! .unwrap(); //! //! log.remove().await.unwrap(); -//! assert!(!PathBuf::from(&storage_dir_path,).exists()); +//! assert!(!PathBuf::from(&storage_dir_path).exists()); //! }) //! .unwrap(); //! local_ex.join().unwrap(); diff --git a/src/commit_log/glommio_impl/segment.rs b/src/commit_log/glommio_impl/segment.rs index b041fc106..0010b3579 100644 --- a/src/commit_log/glommio_impl/segment.rs +++ b/src/commit_log/glommio_impl/segment.rs @@ -84,9 +84,9 @@ mod tests { } #[test] - fn test_segment_reads_reflect_appends() { + fn test_segment_reads_reflect_appends_() { let test_file_path = - PathBuf::from(test_file_path_string("test_segment_reads_reflect_appends")); + PathBuf::from(test_file_path_string("test_segment_reads_reflect_appends_")); if test_file_path.exists() { fs::remove_file(&test_file_path).unwrap(); @@ -95,8 +95,8 @@ mod tests { let local_ex = LocalExecutorBuilder::new(Placement::Fixed(1)) .spawn(move || async move { const RECORD_VALUE: &[u8] = b"Hello World!"; - let mut record = Record { - value: Vec::from(RECORD_VALUE), + let record = Record { + value: RECORD_VALUE.into(), offset: 0, }; let record_representation_size = bincoded_serialized_record_size(&record).unwrap(); @@ -118,7 +118,7 @@ mod tests { Err(SegmentError::OffsetOutOfBounds) )); - let offset_1 = segment.append(&mut record).await.unwrap(); + let offset_1 = segment.append(RECORD_VALUE).await.unwrap(); assert_eq!(offset_1, 0); assert_eq!(segment.next_offset(), record_representation_size); @@ -132,7 +132,7 @@ mod tests { Ok(_) )); - let offset_2 = segment.append(&mut record).await.unwrap(); + let offset_2 = segment.append(RECORD_VALUE).await.unwrap(); assert_eq!(offset_2, record_representation_size); assert_eq!(segment.size(), expected_segment_size); @@ -156,7 +156,7 @@ mod tests { assert!(segment.is_maxed()); assert!(matches!( - segment.append(&mut record).await, + segment.append(RECORD_VALUE).await, Err(SegmentError::SegmentMaxed) )); @@ -181,11 +181,6 @@ mod tests { Err(SegmentError::OffsetOutOfBounds) )); - assert!(matches!( - segment.advance_to_offset(segment.next_offset() + 1), - Err(SegmentError::OffsetBeyondCapacity) - )); - let records = vec![record_1, record_2]; let mut segment_scanner = SegmentScanner::new(&segment).unwrap(); @@ -197,126 +192,6 @@ mod tests { } assert_eq!(i, records.len()); - segment.remove().await.unwrap(); - - assert!(!test_file_path.exists()); - }) - .unwrap(); - local_ex.join().unwrap(); - } - - #[test] - fn test_segment_reads_reflect_appends_() { - let test_file_path = - PathBuf::from(test_file_path_string("test_segment_reads_reflect_appends_")); - - if test_file_path.exists() { - fs::remove_file(&test_file_path).unwrap(); - } - - let local_ex = LocalExecutorBuilder::new(Placement::Fixed(1)) - .spawn(move || async move { - const RECORD_VALUE: &[u8] = b"Hello World!"; - let record = Record { - value: Vec::from(RECORD_VALUE), - offset: 0, - }; - let record_representation_size = bincoded_serialized_record_size(&record).unwrap(); - let expected_segment_size: u64 = 2 * record_representation_size; - - let mut segment = Segment::new( - test_file_path.clone(), - 0, - SegmentConfig { - store_buffer_size: 512, - max_store_bytes: expected_segment_size, - }, - ) - .await - .unwrap(); - - assert!(matches!( - segment.read_(segment.next_offset()).await, - Err(SegmentError::OffsetOutOfBounds) - )); - - let offset_1 = segment.append_(RECORD_VALUE).await.unwrap(); - assert_eq!(offset_1, 0); - assert_eq!(segment.next_offset(), record_representation_size); - - assert!(matches!( - segment.read(segment.next_offset()).await, - Err(SegmentError::OffsetOutOfBounds) - )); - - assert!(matches!( - segment.advance_to_offset(segment.next_offset()), - Ok(_) - )); - - let offset_2 = segment.append_(RECORD_VALUE).await.unwrap(); - assert_eq!(offset_2, record_representation_size); - - assert_eq!(segment.size(), expected_segment_size); - assert!(segment.is_maxed()); - - // close segment to ensure that the records are presisted - segment.close().await.unwrap(); - - let mut segment = Segment::new( - test_file_path.clone(), - 0, - SegmentConfig { - store_buffer_size: 512, - max_store_bytes: expected_segment_size, - }, - ) - .await - .unwrap(); - - assert_eq!(segment.size(), expected_segment_size); - assert!(segment.is_maxed()); - - assert!(matches!( - segment.append_(RECORD_VALUE).await, - Err(SegmentError::SegmentMaxed) - )); - - let (record_1, record_1_next_record_offset) = - segment.read_(offset_1).await.unwrap(); - assert_eq!(record_1.offset, offset_1); - assert_eq!(record_1.value, RECORD_VALUE); - assert_eq!(record_1_next_record_offset, offset_2); - - let (record_2, record_2_next_record_offset) = - segment.read_(offset_2).await.unwrap(); - assert_eq!(record_2.offset, offset_2); - assert_eq!(record_2.value, RECORD_VALUE); - assert_eq!(record_2_next_record_offset, segment.next_offset()); - - // read at invalid loacation - assert!(matches!( - segment.read_(offset_2 + 1).await, - Err(SegmentError::StoreError(_)) - )); - - assert!(matches!( - segment.read_(segment.next_offset()).await, - Err(SegmentError::OffsetOutOfBounds) - )); - - let records = vec![record_1, record_2]; - - let mut segment_scanner = - crate::commit_log::segment::SegmentScanner_::new(&segment).unwrap(); - let mut i = 0; - while let Some(record) = segment_scanner.next().await { - assert_eq!(record, records[i]); - - i += 1; - } - assert_eq!(i, records.len()); - assert!(matches!( segment.advance_to_offset(segment.next_offset() + 1), Err(SegmentError::OffsetBeyondCapacity) diff --git a/src/commit_log/glommio_impl/segmented_log.rs b/src/commit_log/glommio_impl/segmented_log.rs index 0df7bd81f..3aaff4869 100644 --- a/src/commit_log/glommio_impl/segmented_log.rs +++ b/src/commit_log/glommio_impl/segmented_log.rs @@ -92,8 +92,8 @@ mod tests { } #[test] - fn test_log_reads_reflect_writes() { - const STORAGE_DIR_PATH: &str = "/tmp/laminarmq_log_test_log_reads_reflect_writes"; + fn test_log_reads_reflect_writes_() { + const STORAGE_DIR_PATH: &str = "/tmp/laminarmq_log_test_log_reads_reflect_writes_"; if Path::new(STORAGE_DIR_PATH).exists() { fs::remove_dir_all(STORAGE_DIR_PATH).unwrap(); } @@ -101,8 +101,8 @@ mod tests { let local_ex = LocalExecutorBuilder::new(Placement::Unbound) .spawn(move || async move { const RECORD_VALUE: &[u8] = b"Hello world!"; - let mut record = Record { - value: RECORD_VALUE.to_vec(), + let record = Record { + value: RECORD_VALUE.into(), offset: 0, }; let record_size = bincoded_serialized_record_size(&record).unwrap(); @@ -121,7 +121,7 @@ mod tests { .await .unwrap(); - let offset_0 = log.append(&mut record).await.unwrap(); + let offset_0 = log.append(RECORD_VALUE).await.unwrap(); assert_eq!(offset_0, log_config.initial_offset); // not enough bytes written to trigger sync matches!(log.read(offset_0).await, Err(LogError::SegmentError(_))); @@ -132,13 +132,13 @@ mod tests { for _ in 1..NUM_RECORDS { assert!(log.is_write_segment_maxed().unwrap()); // this write will trigger log rotation - let curr_offset = log.append(&mut record).await.unwrap(); + let curr_offset = log.append(RECORD_VALUE).await.unwrap(); let (record, next_record_offset) = log.read(prev_offset).await.unwrap(); assert_eq!( record, Record { - value: RECORD_VALUE.to_vec(), + value: RECORD_VALUE.into(), offset: prev_offset } ); @@ -182,8 +182,8 @@ mod tests { let local_ex = LocalExecutorBuilder::new(Placement::Unbound) .spawn(move || async move { const RECORD_VALUE: &[u8] = b"Hello world!"; - let mut record = Record { - value: RECORD_VALUE.to_vec(), + let record = Record { + value: RECORD_VALUE.into(), offset: 0, }; let record_size = bincoded_serialized_record_size(&record).unwrap(); @@ -209,7 +209,7 @@ mod tests { for _ in 0..NUM_RECORDS / 2 { // this write will trigger log rotation base_offset_of_first_non_expired_segment = - log.append(&mut record).await.unwrap(); + log.append(RECORD_VALUE).await.unwrap(); } let expiry_duration = Duration::from_millis(200); @@ -217,7 +217,7 @@ mod tests { glommio::timer::sleep(expiry_duration).await; for _ in NUM_RECORDS / 2..NUM_RECORDS { - log.append(&mut record).await.unwrap(); + log.append(RECORD_VALUE).await.unwrap(); } log.remove_expired_segments(expiry_duration).await.unwrap(); @@ -256,8 +256,8 @@ mod tests { let local_ex = LocalExecutorBuilder::new(Placement::Unbound) .spawn(move || async move { const RECORD_VALUE: &[u8] = b"Hello world!"; - let mut record = Record { - value: RECORD_VALUE.to_vec(), + let record = Record { + value: RECORD_VALUE.into(), offset: 0, }; let record_size = bincoded_serialized_record_size(&record).unwrap(); @@ -280,15 +280,14 @@ mod tests { .await .unwrap(); - log_0.append(&mut record).await.unwrap(); // record written but not guranteed to be - // synced + log_0.append(RECORD_VALUE).await.unwrap(); // record written but not guranteed to be synced assert!(matches!( log_1.advance_to_offset(log_0.highest_offset()).await, Err(LogError::OffsetNotValidToAdvanceTo) )); - log_0.append(&mut record).await.unwrap(); // first segment rotation + log_0.append(RECORD_VALUE).await.unwrap(); // first segment rotation let highest_offset_2 = log_0.highest_offset(); assert!(matches!( @@ -296,8 +295,7 @@ mod tests { Err(LogError::OffsetNotValidToAdvanceTo) )); - log_0.append(&mut record).await.unwrap(); // second log rotation; 2nd segment - // synced + log_0.append(RECORD_VALUE).await.unwrap(); // second log rotation; 2nd segment synced log_1.advance_to_offset(highest_offset_2).await.unwrap(); diff --git a/src/commit_log/mod.rs b/src/commit_log/mod.rs index c7fde2f19..6c4639375 100644 --- a/src/commit_log/mod.rs +++ b/src/commit_log/mod.rs @@ -17,17 +17,7 @@ use async_trait::async_trait; /// Represents a record in a [`CommitLog`]. #[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq, Eq, Clone)] -pub struct Record { - /// Value stored in this record entry. The value itself might be serialized bytes of some other - /// form of record. - pub value: Vec, - - /// Offset at which this record is stored in the log. - pub offset: u64, -} - -#[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq, Eq, Clone)] -pub struct Record_<'a> { +pub struct Record<'a> { /// Value stored in this record entry. The value itself might be serialized bytes of some other /// form of record. pub value: Cow<'a, [u8]>, @@ -260,7 +250,7 @@ pub mod segment { use super::{ store::{Store, StoreScanner}, - Record, Record_, Scanner, + Record, Scanner, }; /// Error type used for operations on a [`Segment`]. @@ -456,39 +446,13 @@ pub mod segment { /// given record instance. /// - [`SegmentError::StoreError`] if there was an error during writing to the underlying /// [`Store`](super::store::Store) instance. - pub async fn append(&mut self, record: &mut Record) -> Result> { - if self.is_maxed() { - return Err(SegmentError::SegmentMaxed); - } - - let old_record_offset = record.offset; - - let current_offset = self.next_offset; - record.offset = current_offset; - - let bincoded_record = - bincode::serialize(record).map_err(|_x| SegmentError::SerializationError)?; - - let (_, bytes_written) = self - .store - .append(&bincoded_record) - .await - .map_err(SegmentError::StoreError)?; - - self.next_offset += bytes_written as u64; - - record.offset = old_record_offset; - - Ok(current_offset) - } - - pub async fn append_(&mut self, record_bytes: &[u8]) -> Result> { + pub async fn append(&mut self, record_bytes: &[u8]) -> Result> { if self.is_maxed() { return Err(SegmentError::SegmentMaxed); } let current_offset = self.next_offset; - let record = Record_ { + let record = Record { value: record_bytes.into(), offset: current_offset, }; @@ -548,27 +512,6 @@ pub mod segment { Ok((record, self.base_offset() + next_record_position)) } - pub async fn read_(&self, offset: u64) -> Result<(Record_, u64), SegmentError> { - if !self.offset_within_bounds(offset) { - return Err(SegmentError::OffsetOutOfBounds); - } - - let position = self - .store_position(offset) - .ok_or(SegmentError::OffsetBeyondCapacity)?; - - let (record_bytes, next_record_position) = self - .store - .read(position) - .await - .map_err(SegmentError::StoreError)?; - - let record: Record_ = bincode::deserialize(&record_bytes) - .map_err(|_x| SegmentError::SerializationError)?; - - Ok((record, self.base_offset() + next_record_position)) - } - /// Advances this [`Segment`] instance's `next_offset` value to the given value. /// This method simply returns [`Ok`] if `new_next_offset <= next_offset`. /// @@ -671,66 +614,7 @@ pub mod segment { T: Deref + Unpin, S: Store, { - type Item = super::Record; - - async fn next(&mut self) -> Option { - self.store_scanner - .next() - .await - .and_then(|record_bytes| bincode::deserialize(&record_bytes).ok()) - } - } - - pub struct SegmentScanner_<'a, T, S> - where - T: Deref, - S: Store, - { - store_scanner: StoreScanner<'a, T, S>, - } - - impl<'a, T, S> SegmentScanner_<'a, T, S> - where - T: Deref, - S: Store, - { - /// Creates a new [`SegmentScanner`] that starts reading from the given segments `base_offset`. - pub fn new(segment: &'a Segment) -> Result> { - Self::with_offset(segment, segment.base_offset()) - } - - /// Creates a new [`SegmentScanner`] that starts reading from the given offset. - /// - /// ## Errors - /// - [`SegmentError::OffsetOutOfBounds`] if the given `offset >= segment.next_offset()` - /// - [`SegmentError::OffsetBeyondCapacity`] if the given offset doesn't map to a valid - /// location on the [`Segment`] instances underlying store. - pub fn with_offset( - segment: &'a Segment, - offset: u64, - ) -> Result> { - if !segment.offset_within_bounds(offset) { - return Err(SegmentError::OffsetOutOfBounds); - } - - Ok(Self { - store_scanner: StoreScanner::with_position( - segment.store(), - segment - .store_position(offset) - .ok_or(SegmentError::OffsetBeyondCapacity)?, - ), - }) - } - } - - #[async_trait(?Send)] - impl<'a, T, S> Scanner for SegmentScanner_<'a, T, S> - where - T: Deref + Unpin, - S: Store, - { - type Item = super::Record_<'a>; + type Item = super::Record<'a>; async fn next(&mut self) -> Option { self.store_scanner @@ -784,7 +668,7 @@ pub trait CommitLog { } /// Appends a new [`Record`] at the end of this [`CommitLog`]. - async fn append(&mut self, record: &mut Record) -> Result; + async fn append(&mut self, record_bytes: &[u8]) -> Result; /// Reads the [`Record`] at the given offset, along with the offset of the next record from /// this [`CommitLog`]. @@ -843,7 +727,7 @@ impl<'a, Log: CommitLog> LogScanner<'a, Log> { #[async_trait(?Send)] impl<'a, Log: CommitLog> Scanner for LogScanner<'a, Log> { - type Item = Record; + type Item = Record<'a>; /// Linearly scans and reads the next record in the [`CommitLog`] instance asynchronously. If /// the read operation fails, it searches for the next readable offset by seeking with the @@ -1387,7 +1271,7 @@ pub mod segmented_log { { type Error = SegmentedLogError; - async fn append(&mut self, record: &mut Record) -> Result { + async fn append(&mut self, record_bytes: &[u8]) -> Result { while self.is_write_segment_maxed()? { self.rotate_new_write_segment().await?; } @@ -1395,7 +1279,7 @@ pub mod segmented_log { self.write_segment .as_mut() .ok_or(SegmentedLogError::WriteSegmentLost)? - .append(record) + .append(record_bytes) .await .map_err(SegmentedLogError::SegmentError) }