Skip to content

Commit

Permalink
Merge pull request #6 from arindas/develop
Browse files Browse the repository at this point in the history
Improve code coverage and remove unused modules
  • Loading branch information
arindas authored Dec 28, 2022
2 parents dfc2cfa + f1ae883 commit a778f81
Show file tree
Hide file tree
Showing 18 changed files with 255 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
uses: actions/checkout@v3

- name: Run cargo-tarpaulin
run: cargo tarpaulin -j 1 --out Xml
run: cargo tarpaulin --out Xml

- name: Upload to codecov.io
uses: codecov/[email protected]
Expand Down
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license = "MIT"
categories = ["web-programming"]
keywords = ["message-queue", "distributed-systems", "segmented-log", "io-uring"]
exclude = [".github/"]
version = "0.0.3"
version = "0.0.4"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ A scalable, distributed message queue powered by a segmented, partitioned, repli
In order to use `laminarmq` as a library, add the following to your `Cargo.toml`:
```toml
[dependencies]
laminarmq = "0.0.3"
laminarmq = "0.0.4"
```

The current implementation based on [`glommio`](https://docs.rs/glommio) runs only on linux. `glommio` requires
Expand Down
8 changes: 8 additions & 0 deletions src/commit_log/glommio_impl/segmented_log/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ impl Store {

self.writer = stream_writer_with_buffer_size(writer, self.buffer_size);

let reader = DmaFile::open(backing_file_path.deref())
.await
.map_err(StoreError::StorageError)?;

drop(backing_file_path);

self.reader = reader;

self.size = position;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/commit_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
/// Remove expired storage used, if any. Default implementation simply returns with [`Ok(())`]
///
/// ### Errors:
/// Possible errors arising in implementations could include errors during remooval of files.
/// Possible errors arising in implementations could include errors during removal of files.
async fn remove_expired(&mut self, _expiry_duration: Duration) -> Result<(), Self::Error> {
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions src/commit_log/segmented_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ where
OffsetNotValidToAdvanceTo,
}

#[cfg(not(tarpaulin_include))]
impl<T, S> Display for SegmentedLogError<T, S>
where
T: Deref<Target = [u8]>,
Expand Down
1 change: 1 addition & 0 deletions src/commit_log/segmented_log/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ where
OffsetOutOfBounds,
}

#[cfg(not(tarpaulin_include))]
impl<T: Deref<Target = [u8]>, S: Store<T>> Display for SegmentError<T, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
43 changes: 1 addition & 42 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod split {
fn split_at(self, at: usize) -> Option<(Self, Self)>;
}

#[cfg(not(tarpaulin_include))]
impl<T> SplitAt<T> for Vec<T> {
fn split_at(mut self, at: usize) -> Option<(Self, Self)> {
if at > self.len() {
Expand Down Expand Up @@ -42,45 +43,3 @@ pub mod split {
}
}
}

pub mod borrow {
//! Module providing types making borrowed values easier to work with.
use bytes::Bytes;
use std::{borrow::Cow, ops::Deref};

/// Enumeration to generalize over [`bytes::Bytes`] and [`Cow`]`<[u8]>`.
#[derive(Debug, Clone)]
pub enum BytesCow<'a> {
Owned(Bytes),
Borrowed(Cow<'a, [u8]>),
}

impl<'a> From<Cow<'a, [u8]>> for BytesCow<'a> {
fn from(cow: Cow<'a, [u8]>) -> Self {
match cow {
Cow::Borrowed(_) => BytesCow::Borrowed(cow),
Cow::Owned(owned_bytes) => BytesCow::Owned(Bytes::from(owned_bytes)),
}
}
}

impl<'a> From<BytesCow<'a>> for Cow<'a, [u8]> {
fn from(bytes_cow: BytesCow<'a>) -> Self {
match bytes_cow {
BytesCow::Owned(x) => Into::<Vec<u8>>::into(x).into(),
BytesCow::Borrowed(cow) => cow,
}
}
}

impl<'a> Deref for BytesCow<'a> {
type Target = [u8];

fn deref(&self) -> &Self::Target {
match self {
BytesCow::Owned(x) => x.deref(),
BytesCow::Borrowed(x) => x.deref(),
}
}
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! In order to use `laminarmq` as a library, add the following to your `Cargo.toml`:
//! ```toml
//! [dependencies]
//! laminarmq = "0.0.2"
//! laminarmq = "0.0.4"
//! ```
//!
//! The current implementation based on [`glommio`](https://docs.rs/glommio) runs only on linux. `glommio` requires
Expand Down Expand Up @@ -211,6 +211,7 @@ pub mod commit_log;
pub mod common;
pub mod server;

#[cfg(not(tarpaulin_include))]
pub mod prelude {
//! Prelude module for [`laminarmq`](super) with common exports for convenience.

Expand Down
2 changes: 2 additions & 0 deletions src/server/glommio_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub mod worker {
}
}

#[cfg(not(tarpaulin_include))]
pub mod hyper_compat;

pub mod partition;
pub mod processor;
52 changes: 45 additions & 7 deletions src/server/glommio_impl/processor/partition_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,52 @@ mod tests {
fn test_partition_consumer() {
LocalExecutorBuilder::new(Placement::Unbound)
.spawn(|| async move {
let partition_container = Rc::new(RwLock::new(HashMap::<
PartitionId,
Rc<RwLock<InMemPartition>>,
>::new()));

let partition_id = PartitionId {
topic: "some_topic".into(),
partition_number: 0,
};

// test case with multiple Rc refs
let partition = Rc::new(RwLock::new(InMemPartition::new()));

let _stub = partition.clone();

let remainder = PartitionConsumer::with_retries_and_wait_duration(
PartitionRemainder::Rc(partition),
partition_id.clone(),
InMemPartitionCreator,
ConsumeMethod::Remove,
5,
Duration::from_nanos(1),
)
.consume()
.await;

assert!(matches!(remainder, Err(TaskError::PartitionLost(_))));

// test case with existing read lock
let partition = Rc::new(RwLock::new(InMemPartition::new()));

let _stub = partition.read().await.unwrap();

let remainder = PartitionConsumer::with_retries_and_wait_duration(
PartitionRemainder::Rc(partition.clone()),
partition_id.clone(),
InMemPartitionCreator,
ConsumeMethod::Remove,
5,
Duration::from_nanos(1),
)
.consume()
.await;

assert!(matches!(remainder, Err(TaskError::PartitionLost(_))));

let partition_container = Rc::new(RwLock::new(HashMap::<
PartitionId,
Rc<RwLock<InMemPartition>>,
>::new()));

partition_container.write().await.unwrap().insert(
partition_id.clone(),
Rc::new(RwLock::new(InMemPartition::new())),
Expand All @@ -193,11 +229,13 @@ mod tests {
.remove(&partition_id)
.unwrap();

PartitionConsumer::new(
PartitionConsumer::with_retries_and_wait_duration(
PartitionRemainder::Rc(partition),
partition_id,
partition_id.clone(),
InMemPartitionCreator,
ConsumeMethod::Remove,
5,
Duration::from_nanos(1),
)
.consume()
.await
Expand Down
19 changes: 9 additions & 10 deletions src/server/glommio_impl/processor/single_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ where

#[cfg(test)]
mod tests {
use std::{borrow::Cow, ops::Deref};
use std::ops::Deref;

use super::{
super::super::{
Expand All @@ -303,22 +303,21 @@ mod tests {
},
Processor,
};
use bytes::Bytes;
use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares};

fn new_single_node_in_memory_partition_task(
worker_request: WorkerRequest<Cow<'static, [u8]>>,
worker_request: WorkerRequest<Bytes>,
) -> (
Task<
Partition,
Request<Cow<'static, [u8]>>,
Response<Cow<'static, [u8]>>,
ResponseSender<Response<Cow<'static, [u8]>>, Partition>,
Request<Bytes>,
Response<Bytes>,
ResponseSender<Response<Bytes>, Partition>,
>,
ResponseReceiver<Response<Cow<'static, [u8]>>, Partition>,
ResponseReceiver<Response<Bytes>, Partition>,
) {
new_task::<Partition, Request<Cow<'static, [u8]>>, Response<Cow<'static, [u8]>>>(
worker_request.into(),
)
new_task::<Partition, Request<Bytes>, Response<Bytes>>(worker_request.into())
}

#[test]
Expand Down Expand Up @@ -383,7 +382,7 @@ mod tests {
new_single_node_in_memory_partition_task(WorkerRequest::Partition {
partition: partition_id_1.clone(),
request: PartitionRequest::Append {
record_bytes: sample_record_bytes.into(),
record_bytes: Bytes::from_static(sample_record_bytes),
},
});

Expand Down
1 change: 1 addition & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ pub mod partition;
pub mod router;
pub mod worker;

#[cfg(not(tarpaulin_include))]
pub mod tokio_compat;

#[cfg(target_os = "linux")]
Expand Down
2 changes: 2 additions & 0 deletions src/server/partition/single_node/commit_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ where
NotSupported,
}

#[cfg(not(tarpaulin_include))]
impl<M, T, CL> Display for PartitionError<M, T, CL>
where
M: serde::Serialize + serde::de::DeserializeOwned,
Expand All @@ -37,6 +38,7 @@ where
}
}

#[cfg(not(tarpaulin_include))]
impl<M, T, CL> Debug for PartitionError<M, T, CL>
where
M: serde::Serialize + serde::de::DeserializeOwned,
Expand Down
Loading

0 comments on commit a778f81

Please sign in to comment.