Skip to content

Commit

Permalink
Merge pull request #2 from arindas/develop
Browse files Browse the repository at this point in the history
Merge development updates for refactors.
  • Loading branch information
arindas authored Oct 14, 2022
2 parents 7347765 + 813e8fc commit ca0f228
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 56 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ In order to use `laminarmq` as a library, add the following to your `Cargo.toml`
[dependencies]
laminarmq = "0.0.2"
```
Refer to [API Documentation](https://arindas.github.io/laminarmq/laminarmq/) for more details.
Refer to latest git [API Documentation](https://arindas.github.io/laminarmq/laminarmq/)
or [Crate Documentation](https://docs.rs/laminarmq) for more details.

## Planned Architecture
This section presents a brief overview on the different aspects of our message queue. This is only an outline of
Expand Down
2 changes: 1 addition & 1 deletion src/commit_log/glommio_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
//! .unwrap();
//!
//! log.remove().await.unwrap();
//! assert!(!PathBuf::from(&storage_dir_path,).exists());
//! assert!(!PathBuf::from(&storage_dir_path).exists());
//! })
//! .unwrap();
//! local_ex.join().unwrap();
Expand Down
24 changes: 12 additions & 12 deletions src/commit_log/glommio_impl/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ mod tests {
}

#[test]
fn test_segment_reads_reflect_appends() {
fn test_segment_reads_reflect_appends_() {
let test_file_path =
PathBuf::from(test_file_path_string("test_segment_reads_reflect_appends"));
PathBuf::from(test_file_path_string("test_segment_reads_reflect_appends_"));

if test_file_path.exists() {
fs::remove_file(&test_file_path).unwrap();
Expand All @@ -95,8 +95,8 @@ mod tests {
let local_ex = LocalExecutorBuilder::new(Placement::Fixed(1))
.spawn(move || async move {
const RECORD_VALUE: &[u8] = b"Hello World!";
let mut record = Record {
value: Vec::from(RECORD_VALUE),
let record = Record {
value: RECORD_VALUE.into(),
offset: 0,
};
let record_representation_size = bincoded_serialized_record_size(&record).unwrap();
Expand All @@ -118,7 +118,7 @@ mod tests {
Err(SegmentError::OffsetOutOfBounds)
));

let offset_1 = segment.append(&mut record).await.unwrap();
let offset_1 = segment.append(RECORD_VALUE).await.unwrap();
assert_eq!(offset_1, 0);
assert_eq!(segment.next_offset(), record_representation_size);

Expand All @@ -132,7 +132,7 @@ mod tests {
Ok(_)
));

let offset_2 = segment.append(&mut record).await.unwrap();
let offset_2 = segment.append(RECORD_VALUE).await.unwrap();
assert_eq!(offset_2, record_representation_size);

assert_eq!(segment.size(), expected_segment_size);
Expand All @@ -156,7 +156,7 @@ mod tests {
assert!(segment.is_maxed());

assert!(matches!(
segment.append(&mut record).await,
segment.append(RECORD_VALUE).await,
Err(SegmentError::SegmentMaxed)
));

Expand All @@ -181,11 +181,6 @@ mod tests {
Err(SegmentError::OffsetOutOfBounds)
));

assert!(matches!(
segment.advance_to_offset(segment.next_offset() + 1),
Err(SegmentError::OffsetBeyondCapacity)
));

let records = vec![record_1, record_2];

let mut segment_scanner = SegmentScanner::new(&segment).unwrap();
Expand All @@ -197,6 +192,11 @@ mod tests {
}
assert_eq!(i, records.len());

assert!(matches!(
segment.advance_to_offset(segment.next_offset() + 1),
Err(SegmentError::OffsetBeyondCapacity)
));

segment.remove().await.unwrap();

assert!(!test_file_path.exists());
Expand Down
36 changes: 17 additions & 19 deletions src/commit_log/glommio_impl/segmented_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,17 +92,17 @@ mod tests {
}

#[test]
fn test_log_reads_reflect_writes() {
const STORAGE_DIR_PATH: &str = "/tmp/laminarmq_log_test_log_reads_reflect_writes";
fn test_log_reads_reflect_writes_() {
const STORAGE_DIR_PATH: &str = "/tmp/laminarmq_log_test_log_reads_reflect_writes_";
if Path::new(STORAGE_DIR_PATH).exists() {
fs::remove_dir_all(STORAGE_DIR_PATH).unwrap();
}

let local_ex = LocalExecutorBuilder::new(Placement::Unbound)
.spawn(move || async move {
const RECORD_VALUE: &[u8] = b"Hello world!";
let mut record = Record {
value: RECORD_VALUE.to_vec(),
let record = Record {
value: RECORD_VALUE.into(),
offset: 0,
};
let record_size = bincoded_serialized_record_size(&record).unwrap();
Expand All @@ -121,7 +121,7 @@ mod tests {
.await
.unwrap();

let offset_0 = log.append(&mut record).await.unwrap();
let offset_0 = log.append(RECORD_VALUE).await.unwrap();
assert_eq!(offset_0, log_config.initial_offset);
// not enough bytes written to trigger sync
matches!(log.read(offset_0).await, Err(LogError::SegmentError(_)));
Expand All @@ -132,13 +132,13 @@ mod tests {
for _ in 1..NUM_RECORDS {
assert!(log.is_write_segment_maxed().unwrap());
// this write will trigger log rotation
let curr_offset = log.append(&mut record).await.unwrap();
let curr_offset = log.append(RECORD_VALUE).await.unwrap();

let (record, next_record_offset) = log.read(prev_offset).await.unwrap();
assert_eq!(
record,
Record {
value: RECORD_VALUE.to_vec(),
value: RECORD_VALUE.into(),
offset: prev_offset
}
);
Expand Down Expand Up @@ -182,8 +182,8 @@ mod tests {
let local_ex = LocalExecutorBuilder::new(Placement::Unbound)
.spawn(move || async move {
const RECORD_VALUE: &[u8] = b"Hello world!";
let mut record = Record {
value: RECORD_VALUE.to_vec(),
let record = Record {
value: RECORD_VALUE.into(),
offset: 0,
};
let record_size = bincoded_serialized_record_size(&record).unwrap();
Expand All @@ -209,15 +209,15 @@ mod tests {
for _ in 0..NUM_RECORDS / 2 {
// this write will trigger log rotation
base_offset_of_first_non_expired_segment =
log.append(&mut record).await.unwrap();
log.append(RECORD_VALUE).await.unwrap();
}

let expiry_duration = Duration::from_millis(100);
let expiry_duration = Duration::from_millis(200);

glommio::timer::sleep(expiry_duration).await;

for _ in NUM_RECORDS / 2..NUM_RECORDS {
log.append(&mut record).await.unwrap();
log.append(RECORD_VALUE).await.unwrap();
}

log.remove_expired_segments(expiry_duration).await.unwrap();
Expand Down Expand Up @@ -256,8 +256,8 @@ mod tests {
let local_ex = LocalExecutorBuilder::new(Placement::Unbound)
.spawn(move || async move {
const RECORD_VALUE: &[u8] = b"Hello world!";
let mut record = Record {
value: RECORD_VALUE.to_vec(),
let record = Record {
value: RECORD_VALUE.into(),
offset: 0,
};
let record_size = bincoded_serialized_record_size(&record).unwrap();
Expand All @@ -280,24 +280,22 @@ mod tests {
.await
.unwrap();

log_0.append(&mut record).await.unwrap(); // record written but not guranteed to be
// synced
log_0.append(RECORD_VALUE).await.unwrap(); // record written but not guranteed to be synced

assert!(matches!(
log_1.advance_to_offset(log_0.highest_offset()).await,
Err(LogError::OffsetNotValidToAdvanceTo)
));

log_0.append(&mut record).await.unwrap(); // first segment rotation
log_0.append(RECORD_VALUE).await.unwrap(); // first segment rotation
let highest_offset_2 = log_0.highest_offset();

assert!(matches!(
log_1.advance_to_offset(highest_offset_2).await,
Err(LogError::OffsetNotValidToAdvanceTo)
));

log_0.append(&mut record).await.unwrap(); // second log rotation; 2nd segment
// synced
log_0.append(RECORD_VALUE).await.unwrap(); // second log rotation; 2nd segment synced

log_1.advance_to_offset(highest_offset_2).await.unwrap();

Expand Down
16 changes: 9 additions & 7 deletions src/commit_log/glommio_impl/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,16 +345,18 @@ mod tests {
record_positions_and_sizes.push(store.append(&record).await.unwrap());
}

let mut read_records = Vec::with_capacity(RECORDS.len());
let mut records_read_before_sync = Vec::with_capacity(RECORDS.len());

for (position, _written_bytes) in &record_positions_and_sizes[0..RECORDS.len() / 2]
{
let (record, _next_record_offset) = store.read(position.clone()).await.unwrap();
read_records.push(record);
for (position, _written_bytes) in &record_positions_and_sizes {
if let Ok((record, _next_record_offset)) = store.read(position.clone()).await {
records_read_before_sync.push(record);
} else {
break;
}
}

for i in 0..RECORDS.len() / 2 {
assert_eq!(read_records[i].deref(), RECORDS[i]);
for i in 0..records_read_before_sync.len() {
assert_eq!(records_read_before_sync[i].deref(), RECORDS[i]);
}

let mut i = 0;
Expand Down
37 changes: 21 additions & 16 deletions src/commit_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
//! In the context of `laminarmq` this module is intended to provide the storage for individual
//! partitions in a topic.

use std::borrow::Cow;

use async_trait::async_trait;

/// Represents a record in a [`CommitLog`].
#[derive(serde::Deserialize, serde::Serialize, Debug, PartialEq, Eq, Clone)]
pub struct Record {
pub struct Record<'a> {
/// Value stored in this record entry. The value itself might be serialized bytes of some other
/// form of record.
pub value: Vec<u8>,
pub value: Cow<'a, [u8]>,

/// Offset at which this record is stored in the log.
pub offset: u64,
Expand Down Expand Up @@ -164,6 +166,7 @@ pub mod store {
/// └─────────────────────────┴────────────────────────┴───────────────────────┘
/// │─────────────── RecordHeader ─────────────────────│
/// ```
#[derive(Debug)]
pub struct RecordHeader {
/// checksum computed from the bytes in the record.
pub checksum: u32,
Expand Down Expand Up @@ -204,7 +207,11 @@ pub mod store {
let checksum = cursor.read_u32::<LittleEndian>()?;
let length = cursor.read_u32::<LittleEndian>()?;

Ok(Self { checksum, length })
if checksum == 0 && length == 0 {
Err(std::io::Error::from(std::io::ErrorKind::UnexpectedEof))
} else {
Ok(Self { checksum, length })
}
}

/// Serializes this given record header to an owned byte array.
Expand Down Expand Up @@ -439,18 +446,19 @@ pub mod segment {
/// given record instance.
/// - [`SegmentError::StoreError`] if there was an error during writing to the underlying
/// [`Store`](super::store::Store) instance.
pub async fn append(&mut self, record: &mut Record) -> Result<u64, SegmentError<T, S>> {
pub async fn append(&mut self, record_bytes: &[u8]) -> Result<u64, SegmentError<T, S>> {
if self.is_maxed() {
return Err(SegmentError::SegmentMaxed);
}

let old_record_offset = record.offset;

let current_offset = self.next_offset;
record.offset = current_offset;
let record = Record {
value: record_bytes.into(),
offset: current_offset,
};

let bincoded_record =
bincode::serialize(record).map_err(|_x| SegmentError::SerializationError)?;
bincode::serialize(&record).map_err(|_x| SegmentError::SerializationError)?;

let (_, bytes_written) = self
.store
Expand All @@ -460,8 +468,6 @@ pub mod segment {

self.next_offset += bytes_written as u64;

record.offset = old_record_offset;

Ok(current_offset)
}

Expand Down Expand Up @@ -608,7 +614,7 @@ pub mod segment {
T: Deref<Target = [u8]> + Unpin,
S: Store<T>,
{
type Item = super::Record;
type Item = super::Record<'a>;

async fn next(&mut self) -> Option<Self::Item> {
self.store_scanner
Expand All @@ -617,7 +623,6 @@ pub mod segment {
.and_then(|record_bytes| bincode::deserialize(&record_bytes).ok())
}
}

pub mod config {
//! Module providing types for configuring [`Segment`](super::Segment) instances.
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -663,7 +668,7 @@ pub trait CommitLog {
}

/// Appends a new [`Record`] at the end of this [`CommitLog`].
async fn append(&mut self, record: &mut Record) -> Result<u64, Self::Error>;
async fn append(&mut self, record_bytes: &[u8]) -> Result<u64, Self::Error>;

/// Reads the [`Record`] at the given offset, along with the offset of the next record from
/// this [`CommitLog`].
Expand Down Expand Up @@ -722,7 +727,7 @@ impl<'a, Log: CommitLog> LogScanner<'a, Log> {

#[async_trait(?Send)]
impl<'a, Log: CommitLog> Scanner for LogScanner<'a, Log> {
type Item = Record;
type Item = Record<'a>;

/// Linearly scans and reads the next record in the [`CommitLog`] instance asynchronously. If
/// the read operation fails, it searches for the next readable offset by seeking with the
Expand Down Expand Up @@ -1266,15 +1271,15 @@ pub mod segmented_log {
{
type Error = SegmentedLogError<T, S>;

async fn append(&mut self, record: &mut Record) -> Result<u64, Self::Error> {
async fn append(&mut self, record_bytes: &[u8]) -> Result<u64, Self::Error> {
while self.is_write_segment_maxed()? {
self.rotate_new_write_segment().await?;
}

self.write_segment
.as_mut()
.ok_or(SegmentedLogError::WriteSegmentLost)?
.append(record)
.append(record_bytes)
.await
.map_err(SegmentedLogError::SegmentError)
}
Expand Down

0 comments on commit ca0f228

Please sign in to comment.