Skip to content

Commit

Permalink
Remove StorageEngine trait
Browse files Browse the repository at this point in the history
For now, we rely on the Storage mod. Don't know the right interface to
enable us to swap storage engines easily yet
  • Loading branch information
rcmgleite committed Jul 20, 2024
1 parent aa2e155 commit 5f9cda6
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 313 deletions.
8 changes: 1 addition & 7 deletions src/client/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@ use crate::{
types::{Context, SerializedContext},
},
error::{Error, Result},
persistency::{
partitioning::consistent_hashing::ConsistentHashing, storage::Value,
storage_engine::in_memory::InMemory, Db,
},
persistency::{partitioning::consistent_hashing::ConsistentHashing, storage::Value, Db},
server::config::Quorum,
test_utils::fault::{Fault, When},
};
Expand Down Expand Up @@ -186,12 +183,9 @@ impl ClientFactory for MockClientFactory {
])
.unwrap();

let storage_engine = Arc::new(InMemory::default());

let mut guard = self.dbs.lock().unwrap();
let db = guard.entry(addr.clone()).or_insert(Db::new(
own_addr,
storage_engine,
cluster_state,
Arc::new(MockClientFactoryBuilder::new().build()),
));
Expand Down
8 changes: 0 additions & 8 deletions src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::fmt::Display;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::persistency::storage_engine::Error as StorageEngineError;
use crate::utils::serde_utf8_bytes;

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -77,12 +76,6 @@ impl From<std::io::Error> for Error {
}
}

impl From<StorageEngineError> for Error {
fn from(err: StorageEngineError) -> Self {
Self::Internal(Internal::StorageEngine(err))
}
}

impl From<serde_json::Error> for Error {
fn from(value: serde_json::Error) -> Self {
Self::InvalidJsonPayload {
Expand All @@ -95,7 +88,6 @@ impl From<serde_json::Error> for Error {
pub enum Internal {
Logic { reason: String },
Unknown { reason: String },
StorageEngine(StorageEngineError),
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
20 changes: 4 additions & 16 deletions src/persistency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ use futures::{stream::FuturesUnordered, StreamExt};
use partitioning::consistent_hashing::murmur3_hash;
use quorum::{min_required_replicas::MinRequiredReplicas, Evaluation, OperationStatus, Quorum};
use std::sync::Arc;
use storage::{Storage, StorageEngine, StorageEntry, Value};
use storage::{Storage, StorageEntry, Value};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{event, instrument, Level};
use versioning::version_vector::{ProcessId, VersionVector};

pub mod partitioning;
pub mod quorum;
pub mod storage;
pub mod storage_engine;
pub mod versioning;

use crate::{
Expand Down Expand Up @@ -119,18 +118,14 @@ impl Db {
/// Returns a new instance of [`Db`] with the provided [`StorageEngine`] and [`ClusterState`].
pub fn new(
own_addr: Bytes,
storage_engine: StorageEngine,
cluster_state: Arc<ClusterState>,
client_factory: ClientFactory,
) -> Self {
let own_state = State::Active {
shared: Arc::new(Shared::new(&own_addr)),
};
Self {
storage: Arc::new(AsyncMutex::new(Storage::new(
storage_engine,
own_state.pid(),
))),
storage: Arc::new(AsyncMutex::new(Storage::new(own_state.pid()))),
cluster_state,
own_state,
client_factory,
Expand Down Expand Up @@ -271,7 +266,7 @@ impl Db {

#[instrument(name = "persistency::local_put", level = "info", skip(self))]
async fn local_put(&self, key: Bytes, value: Value, context: SerializedContext) -> Result<()> {
let storage_guard = self.storage.lock().await;
let mut storage_guard = self.storage.lock().await;
storage_guard.put(key, value, context).await?;

// FIXME: return type
Expand Down Expand Up @@ -464,8 +459,7 @@ mod tests {
error::{Error, InvalidRequest},
persistency::{
partitioning::consistent_hashing::ConsistentHashing, process_id, storage::Value,
storage_engine::in_memory::InMemory, versioning::version_vector::VersionVectorOrd, Db,
VersionVector,
versioning::version_vector::VersionVectorOrd, Db, VersionVector,
},
server::config::Quorum,
utils::generate_random_ascii_string,
Expand Down Expand Up @@ -497,13 +491,10 @@ mod tests {
])
.unwrap();

let storage_engine = Arc::new(InMemory::default());

(
own_local_addr.clone(),
Arc::new(Db::new(
own_local_addr,
storage_engine,
cluster_state,
Arc::new(MockClientFactoryBuilder::new().build()),
)),
Expand Down Expand Up @@ -604,11 +595,8 @@ mod tests {
.unwrap(),
);

let storage_engine = Arc::new(InMemory::default());

let db = Arc::new(Db::new(
own_local_addr,
storage_engine,
cluster_state,
Arc::new(MockClientFactoryBuilder::new().build()),
));
Expand Down
53 changes: 24 additions & 29 deletions src/persistency/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,33 @@
use crate::{
cmd::types::SerializedContext,
error::{Error, InvalidRequest, Result},
persistency::storage_engine::{in_memory::InMemory, StorageEngine as StorageEngineTrait},
utils::{generate_random_ascii_string, serde_utf8_bytes},
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use crc32c::crc32c;
use serde::{Deserialize, Serialize};
use std::{mem::size_of, sync::Arc};
use std::{collections::HashMap, mem::size_of};
use tracing::{event, instrument, Level};

use super::versioning::version_vector::{ProcessId, VersionVector, VersionVectorOrd};

/// type alias to the [`StorageEngine`] that makes it clonable and [`Send`]
pub type StorageEngine = Arc<dyn StorageEngineTrait + Send + Sync + 'static>;
pub type Store = HashMap<Bytes, Bytes>;

/// Since I don't know what the inner [`StorageEngine`] API should look like to enable easy integration with [`crate::persistency::VersionVector`],
/// this intermediate type works as a facade between [`StorageEngine`] and [`crate::persistency::Db`]. This might disappear once I finally
/// come up with a proper [`StorageEngine`] API.
///
/// [`Storage`] is a facade that provides the APIs for something similar to a multi-map - ie: enables a client to store
/// multiples values associated with a single key. This is required for a database that uses leaderless replication
/// as conflicts might occur due to concurrent puts happening on different nodes.
/// When a conflict is detected, the put still succeeds but both values are stored.
/// A followup PUT with the appropriate [`SerializedContext`] is required to resolve the conflict.
#[derive(Debug)]
pub struct Storage {
data_engine: StorageEngine,
data_store: Store,
/// Metadata Storage engine - currently hardcoded to be in-memory
/// The idea of having a separate storage engine just for metadata is do that we can avoid
/// adding framing layers to the data we want to store to append/prepend the metadata.
/// Also, when we do PUTs, we have to validate metadata (version) prior to storing the data,
/// and without a separate storage engine for metadata, we would always require a GET before a PUT,
/// which introduces more overhead than needed.
metadata_engine: InMemory,
metadata_store: Store,
pid: ProcessId,
}

Expand Down Expand Up @@ -117,17 +111,17 @@ pub(crate) fn version_evaluation(
}

impl Storage {
pub fn new(data_engine: StorageEngine, pid: ProcessId) -> Self {
pub fn new(pid: ProcessId) -> Self {
Self {
data_engine,
metadata_engine: InMemory::default(),
data_store: Default::default(),
metadata_store: Default::default(),
pid,
}
}

#[instrument(name = "storage::put", level = "info", skip(self))]
pub async fn put(
&self,
&mut self,
key: Bytes,
value: Value,
context: SerializedContext,
Expand Down Expand Up @@ -174,7 +168,7 @@ impl Storage {
}

#[instrument(name = "storage::do_put", level = "info", skip(self))]
async fn do_put(&self, key: Bytes, items: Vec<StorageEntry>) -> Result<Vec<StorageEntry>> {
async fn do_put(&mut self, key: Bytes, items: Vec<StorageEntry>) -> Result<Vec<StorageEntry>> {
let mut data_buf = BytesMut::new();
let mut version_buf = BytesMut::new();
data_buf.put_u32(items.len() as u32);
Expand All @@ -192,8 +186,8 @@ impl Storage {

// since the metadata is what tells us that data exists, let's store it second, only if the data was stored
// successfully
self.data_engine.put(key.clone(), data_buf.freeze()).await?;
self.metadata_engine.put(key, version_buf.freeze()).await?;
self.data_store.insert(key.clone(), data_buf.freeze());
self.metadata_store.insert(key, version_buf.freeze());

Ok(items)
}
Expand Down Expand Up @@ -233,13 +227,18 @@ impl Storage {
#[instrument(level = "info", skip(self))]
pub async fn get(&self, key: Bytes) -> Result<Vec<StorageEntry>> {
let version = self
.metadata_engine
.metadata_store
.get(&key)
.ok_or(Error::NotFound { key: key.clone() })?
.clone();

let data = self
.data_store
.get(&key)
.await?
.ok_or(Error::NotFound { key: key.clone() })?;
let data = self.data_engine.get(&key).await?.ok_or(Error::Logic {
reason: "Metadata entry found but no data entry found".to_string(),
})?;
.ok_or(Error::Logic {
reason: "Metadata entry found but no data entry found".to_string(),
})?
.clone();

let metadata_items = Self::unmarshall_entries(version)?;
let data_items = Self::unmarshall_entries(data)?;
Expand All @@ -266,18 +265,14 @@ mod tests {
use super::Storage;
use crate::{
cmd::types::Context,
persistency::{
storage::Value, storage_engine::in_memory::InMemory,
versioning::version_vector::VersionVector,
},
persistency::{storage::Value, versioning::version_vector::VersionVector},
};
use bytes::Bytes;
use std::sync::Arc;

// stores the same key twice with conflicting versions and makes sure both are stored
#[tokio::test]
async fn test_storage_conflict() {
let store = Storage::new(Arc::new(InMemory::default()), 0);
let mut store = Storage::new(0);

let key = Bytes::from("key");
let value_pid_0 = Value::new_unchecked(Bytes::from("value 0"));
Expand Down
Loading

0 comments on commit 5f9cda6

Please sign in to comment.