diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c225c18d5..39d8617b5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: uses: actions/checkout@v3 - name: Run cargo-tarpaulin - run: cargo tarpaulin -j 1 --out Xml + run: cargo tarpaulin --out Xml - name: Upload to codecov.io uses: codecov/codecov-action@v1.0.2 diff --git a/Cargo.lock b/Cargo.lock index 4b9510055..ccfb055c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,9 +511,9 @@ checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" [[package]] name = "js-sys" -version = "0.3.59" +version = "0.3.60" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "258451ab10b34f8af53416d1fdab72c22e805f0c92a1136d59470ec0b11138b2" +checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" dependencies = [ "wasm-bindgen", ] @@ -530,7 +530,7 @@ dependencies = [ [[package]] name = "laminarmq" -version = "0.0.2" +version = "0.0.4" dependencies = [ "async-stream", "async-trait", @@ -969,9 +969,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.82" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" +checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -979,9 +979,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.82" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "662cd44805586bd52971b9586b1df85cdbbd9112e4ef4d8f41559c334dc6ac3f" +checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" dependencies = [ "bumpalo", "log", @@ -994,9 +994,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.82" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b260f13d3012071dfb1512849c033b1925038373aea48ced3012c09df952c602" +checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1004,9 +1004,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.82" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5be8e654bdd9b79216c2929ab90721aa82faf65c48cdf08bdc4e7f51357b80da" +checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" dependencies = [ "proc-macro2", "quote", @@ -1017,9 +1017,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.82" +version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6598dd0bd3c7d51095ff6531a5b23e02acdc81804e30d8f07afb77b7215a140a" +checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" [[package]] name = "winapi" diff --git a/Cargo.toml b/Cargo.toml index 01b4e079a..25e1890eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,7 @@ license = "MIT" categories = ["web-programming"] keywords = ["message-queue", "distributed-systems", "segmented-log", "io-uring"] exclude = [".github/"] -version = "0.0.3" +version = "0.0.4" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/README.md b/README.md index 136310db9..1f505ee10 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ A scalable, distributed message queue powered by a segmented, partitioned, repli In order to use `laminarmq` as a library, add the following to your `Cargo.toml`: ```toml [dependencies] -laminarmq = "0.0.3" +laminarmq = "0.0.4" ``` The current implementation based on [`glommio`](https://docs.rs/glommio) runs only on linux. `glommio` requires diff --git a/src/commit_log/glommio_impl/segmented_log/store.rs b/src/commit_log/glommio_impl/segmented_log/store.rs index ba6e0b04b..3b9408f8d 100644 --- a/src/commit_log/glommio_impl/segmented_log/store.rs +++ b/src/commit_log/glommio_impl/segmented_log/store.rs @@ -202,6 +202,14 @@ impl Store { self.writer = stream_writer_with_buffer_size(writer, self.buffer_size); + let reader = DmaFile::open(backing_file_path.deref()) + .await + .map_err(StoreError::StorageError)?; + + drop(backing_file_path); + + self.reader = reader; + self.size = position; Ok(()) } diff --git a/src/commit_log/mod.rs b/src/commit_log/mod.rs index 97adf08b2..bb05f1e5f 100644 --- a/src/commit_log/mod.rs +++ b/src/commit_log/mod.rs @@ -87,7 +87,7 @@ where /// Remove expired storage used, if any. Default implementation simply returns with [`Ok(())`] /// /// ### Errors: - /// Possible errors arising in implementations could include errors during remooval of files. + /// Possible errors arising in implementations could include errors during removal of files. async fn remove_expired(&mut self, _expiry_duration: Duration) -> Result<(), Self::Error> { Ok(()) } diff --git a/src/commit_log/segmented_log/mod.rs b/src/commit_log/segmented_log/mod.rs index ca7d0ac89..0337077aa 100644 --- a/src/commit_log/segmented_log/mod.rs +++ b/src/commit_log/segmented_log/mod.rs @@ -59,6 +59,7 @@ where OffsetNotValidToAdvanceTo, } +#[cfg(not(tarpaulin_include))] impl Display for SegmentedLogError where T: Deref, diff --git a/src/commit_log/segmented_log/segment.rs b/src/commit_log/segmented_log/segment.rs index b7a30d0ae..a9946e5b5 100644 --- a/src/commit_log/segmented_log/segment.rs +++ b/src/commit_log/segmented_log/segment.rs @@ -33,6 +33,7 @@ where OffsetOutOfBounds, } +#[cfg(not(tarpaulin_include))] impl, S: Store> Display for SegmentError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { diff --git a/src/common/mod.rs b/src/common/mod.rs index f7fa49fe6..a59e78854 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -14,6 +14,7 @@ pub mod split { fn split_at(self, at: usize) -> Option<(Self, Self)>; } + #[cfg(not(tarpaulin_include))] impl SplitAt for Vec { fn split_at(mut self, at: usize) -> Option<(Self, Self)> { if at > self.len() { @@ -42,45 +43,3 @@ pub mod split { } } } - -pub mod borrow { - //! Module providing types making borrowed values easier to work with. - use bytes::Bytes; - use std::{borrow::Cow, ops::Deref}; - - /// Enumeration to generalize over [`bytes::Bytes`] and [`Cow`]`<[u8]>`. - #[derive(Debug, Clone)] - pub enum BytesCow<'a> { - Owned(Bytes), - Borrowed(Cow<'a, [u8]>), - } - - impl<'a> From> for BytesCow<'a> { - fn from(cow: Cow<'a, [u8]>) -> Self { - match cow { - Cow::Borrowed(_) => BytesCow::Borrowed(cow), - Cow::Owned(owned_bytes) => BytesCow::Owned(Bytes::from(owned_bytes)), - } - } - } - - impl<'a> From> for Cow<'a, [u8]> { - fn from(bytes_cow: BytesCow<'a>) -> Self { - match bytes_cow { - BytesCow::Owned(x) => Into::>::into(x).into(), - BytesCow::Borrowed(cow) => cow, - } - } - } - - impl<'a> Deref for BytesCow<'a> { - type Target = [u8]; - - fn deref(&self) -> &Self::Target { - match self { - BytesCow::Owned(x) => x.deref(), - BytesCow::Borrowed(x) => x.deref(), - } - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 221279e47..e89c234e3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,7 +27,7 @@ //! In order to use `laminarmq` as a library, add the following to your `Cargo.toml`: //! ```toml //! [dependencies] -//! laminarmq = "0.0.2" +//! laminarmq = "0.0.4" //! ``` //! //! The current implementation based on [`glommio`](https://docs.rs/glommio) runs only on linux. `glommio` requires @@ -211,6 +211,7 @@ pub mod commit_log; pub mod common; pub mod server; +#[cfg(not(tarpaulin_include))] pub mod prelude { //! Prelude module for [`laminarmq`](super) with common exports for convenience. diff --git a/src/server/glommio_impl/mod.rs b/src/server/glommio_impl/mod.rs index aa8c4c930..662853000 100644 --- a/src/server/glommio_impl/mod.rs +++ b/src/server/glommio_impl/mod.rs @@ -81,6 +81,8 @@ pub mod worker { } } +#[cfg(not(tarpaulin_include))] pub mod hyper_compat; + pub mod partition; pub mod processor; diff --git a/src/server/glommio_impl/processor/partition_consumer.rs b/src/server/glommio_impl/processor/partition_consumer.rs index fdc2e36d1..5283c43b3 100644 --- a/src/server/glommio_impl/processor/partition_consumer.rs +++ b/src/server/glommio_impl/processor/partition_consumer.rs @@ -171,16 +171,52 @@ mod tests { fn test_partition_consumer() { LocalExecutorBuilder::new(Placement::Unbound) .spawn(|| async move { - let partition_container = Rc::new(RwLock::new(HashMap::< - PartitionId, - Rc>, - >::new())); - let partition_id = PartitionId { topic: "some_topic".into(), partition_number: 0, }; + // test case with multiple Rc refs + let partition = Rc::new(RwLock::new(InMemPartition::new())); + + let _stub = partition.clone(); + + let remainder = PartitionConsumer::with_retries_and_wait_duration( + PartitionRemainder::Rc(partition), + partition_id.clone(), + InMemPartitionCreator, + ConsumeMethod::Remove, + 5, + Duration::from_nanos(1), + ) + .consume() + .await; + + assert!(matches!(remainder, Err(TaskError::PartitionLost(_)))); + + // test case with existing read lock + let partition = Rc::new(RwLock::new(InMemPartition::new())); + + let _stub = partition.read().await.unwrap(); + + let remainder = PartitionConsumer::with_retries_and_wait_duration( + PartitionRemainder::Rc(partition.clone()), + partition_id.clone(), + InMemPartitionCreator, + ConsumeMethod::Remove, + 5, + Duration::from_nanos(1), + ) + .consume() + .await; + + assert!(matches!(remainder, Err(TaskError::PartitionLost(_)))); + + let partition_container = Rc::new(RwLock::new(HashMap::< + PartitionId, + Rc>, + >::new())); + partition_container.write().await.unwrap().insert( partition_id.clone(), Rc::new(RwLock::new(InMemPartition::new())), @@ -193,11 +229,13 @@ mod tests { .remove(&partition_id) .unwrap(); - PartitionConsumer::new( + PartitionConsumer::with_retries_and_wait_duration( PartitionRemainder::Rc(partition), - partition_id, + partition_id.clone(), InMemPartitionCreator, ConsumeMethod::Remove, + 5, + Duration::from_nanos(1), ) .consume() .await diff --git a/src/server/glommio_impl/processor/single_node.rs b/src/server/glommio_impl/processor/single_node.rs index 0c935af91..cbf73534e 100644 --- a/src/server/glommio_impl/processor/single_node.rs +++ b/src/server/glommio_impl/processor/single_node.rs @@ -279,7 +279,7 @@ where #[cfg(test)] mod tests { - use std::{borrow::Cow, ops::Deref}; + use std::ops::Deref; use super::{ super::super::{ @@ -303,22 +303,21 @@ mod tests { }, Processor, }; + use bytes::Bytes; use glommio::{executor, Latency, LocalExecutorBuilder, Placement, Shares}; fn new_single_node_in_memory_partition_task( - worker_request: WorkerRequest>, + worker_request: WorkerRequest, ) -> ( Task< Partition, - Request>, - Response>, - ResponseSender>, Partition>, + Request, + Response, + ResponseSender, Partition>, >, - ResponseReceiver>, Partition>, + ResponseReceiver, Partition>, ) { - new_task::>, Response>>( - worker_request.into(), - ) + new_task::, Response>(worker_request.into()) } #[test] @@ -383,7 +382,7 @@ mod tests { new_single_node_in_memory_partition_task(WorkerRequest::Partition { partition: partition_id_1.clone(), request: PartitionRequest::Append { - record_bytes: sample_record_bytes.into(), + record_bytes: Bytes::from_static(sample_record_bytes), }, }); diff --git a/src/server/mod.rs b/src/server/mod.rs index bbbc447d0..b4f5d2e55 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -124,6 +124,7 @@ pub mod partition; pub mod router; pub mod worker; +#[cfg(not(tarpaulin_include))] pub mod tokio_compat; #[cfg(target_os = "linux")] diff --git a/src/server/partition/single_node/commit_log.rs b/src/server/partition/single_node/commit_log.rs index 309b781bf..1c65f5db9 100644 --- a/src/server/partition/single_node/commit_log.rs +++ b/src/server/partition/single_node/commit_log.rs @@ -22,6 +22,7 @@ where NotSupported, } +#[cfg(not(tarpaulin_include))] impl Display for PartitionError where M: serde::Serialize + serde::de::DeserializeOwned, @@ -37,6 +38,7 @@ where } } +#[cfg(not(tarpaulin_include))] impl Debug for PartitionError where M: serde::Serialize + serde::de::DeserializeOwned, diff --git a/src/server/partition/single_node/in_memory.rs b/src/server/partition/single_node/in_memory.rs index a27200a5b..8552de058 100644 --- a/src/server/partition/single_node/in_memory.rs +++ b/src/server/partition/single_node/in_memory.rs @@ -1,18 +1,15 @@ //! Module providing an in-memory partition implementation that stores records in a hash map. use crate::commit_log::{segmented_log::RecordMetadata, Record}; -use super::super::{ - super::{super::common::borrow::BytesCow, single_node::Response}, - single_node::PartitionRequest, - PartitionId, -}; +use super::super::{super::single_node::Response, single_node::PartitionRequest, PartitionId}; use async_trait::async_trait; -use std::{borrow::Cow, collections::HashMap, error::Error, fmt::Display}; +use bytes::Bytes; +use std::{collections::HashMap, error::Error, fmt::Display}; /// In-memory partition implementation based off of a [`HashMap`] #[derive(Debug)] pub struct Partition { - records: HashMap>, + records: HashMap, size: usize, } @@ -38,6 +35,7 @@ pub enum PartitionError { NotFound, } +#[cfg(not(tarpaulin_include))] impl Display for PartitionError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -52,8 +50,8 @@ impl Error for PartitionError {} #[async_trait(?Send)] impl super::super::Partition for Partition { type Error = PartitionError; - type Request = PartitionRequest>; - type Response = Response>; + type Request = PartitionRequest; + type Response = Response; async fn serve_idempotent( &self, @@ -73,7 +71,7 @@ impl super::super::Partition for Partition { offset, additional_metadata: (), }, - value: x.clone().into(), + value: x.clone(), }, next_offset, } @@ -89,7 +87,7 @@ impl super::super::Partition for Partition { let current_offset = self.size as u64; let record_size = record_bytes.len(); - self.records.insert(current_offset, record_bytes.into()); + self.records.insert(current_offset, record_bytes); self.size += record_size; @@ -198,7 +196,7 @@ mod tests { bytes_written, } = partition .serve(Request::Append { - record_bytes: record.as_bytes().into(), + record_bytes: bytes::Bytes::from_static(record.as_bytes()), }) .await .unwrap() diff --git a/src/server/router.rs b/src/server/router.rs index d6d6fa150..0e7f08834 100644 --- a/src/server/router.rs +++ b/src/server/router.rs @@ -107,6 +107,12 @@ pub mod single_node { } } + impl Default for Router { + fn default() -> Self { + Self::new() + } + } + #[async_trait::async_trait(?Send)] impl super::Router> for Router { async fn route( @@ -167,4 +173,153 @@ pub mod single_node { } } } + + #[cfg(test)] + mod tests { + use hyper::{Body, Request as HyperRequest}; + + use crate::server::{ + partition::single_node::DEFAULT_EXPIRY_DURATION, + router::{single_node::Router, Router as _}, + single_node::Request, + }; + + #[test] + fn test_router() { + futures_lite::future::block_on(async { + let router = Router::default(); + + if let Some(Request::Read { partition, offset }) = router + .route( + HyperRequest::get("/api/v1/topics/some_topic/partitions/68419/records/109") + .body(Body::empty()) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + assert_eq!(offset, 109); + } else { + assert!(false, "Read request not routed!"); + } + + if let Some(Request::Append { + partition, + record_bytes, + }) = router + .route( + HyperRequest::post("/api/v1/topics/some_topic/partitions/68419/records/") + .body(Body::from(bytes::Bytes::from_static(b"Hello World!"))) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + let bytes: &[u8] = b"Hello World!"; + assert_eq!(record_bytes, bytes); + } else { + assert!(false, "Append request not routed!") + } + + if let Some(Request::LowestOffset { partition }) = router + .route( + HyperRequest::get( + "/api/v1/topics/some_topic/partitions/68419/stat/lowest_offset", + ) + .body(Body::empty()) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + } else { + assert!(false, "LowestOffset request not routed!"); + } + + if let Some(Request::HighestOffset { partition }) = router + .route( + HyperRequest::get( + "/api/v1/topics/some_topic/partitions/68419/stat/highest_offset", + ) + .body(Body::empty()) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + } else { + assert!(false, "HighestOffset request not routed!"); + } + + if let Some(Request::RemoveExpired { + partition, + expiry_duration, + }) = router + .route( + HyperRequest::post( + "/api/v1/topics/some_topic/partitions/68419/remove_expired", + ) + .body(Body::empty()) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + assert_eq!(expiry_duration, DEFAULT_EXPIRY_DURATION); + } else { + assert!(false, "RemoveExpired request not routed!"); + } + + if let Some(Request::PartitionHierachy) = router + .route( + HyperRequest::get("/api/v1/hierachy") + .body(Body::empty()) + .unwrap(), + ) + .await + { + } else { + assert!(false, "PartitionHierachy request not routed!"); + } + + if let Some(Request::CreatePartition(partition)) = router + .route( + HyperRequest::post("/api/v1/topics/some_topic/partitions/68419") + .body(Body::empty()) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + } else { + assert!(false, "CreatePartition request not routed!"); + } + + if let Some(Request::RemovePartition(partition)) = router + .route( + HyperRequest::delete("/api/v1/topics/some_topic/partitions/68419") + .body(Body::empty()) + .unwrap(), + ) + .await + { + assert_eq!(partition.topic, "some_topic"); + assert_eq!(partition.partition_number, 68419); + } else { + assert!(false, "RemovePartition request not routed!"); + } + + assert!(router + .route(HyperRequest::get("/bad/uri").body(Body::empty()).unwrap()) + .await + .is_none()); + }); + } + } } diff --git a/src/server/worker.rs b/src/server/worker.rs index 56d876137..8002ca769 100644 --- a/src/server/worker.rs +++ b/src/server/worker.rs @@ -34,6 +34,7 @@ pub enum TaskError { LockAcqFailed, } +#[cfg(not(tarpaulin_include))] impl Display for TaskError

{ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self {