Skip to content

Commit

Permalink
Hide mutex usage from Db layer and push it down to the Storage layer
Browse files Browse the repository at this point in the history
This hides a bit the implementation details of the Storage layer from
the Db layer, which ultimately might help once we decide to swap the
Storage implementation from InMemory to something else
  • Loading branch information
rcmgleite committed Jul 21, 2024
1 parent fae49f6 commit a96ceb6
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 77 deletions.
4 changes: 2 additions & 2 deletions src/client/db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl DbClient {
fn get_conn_mut(&mut self) -> Result<&mut TcpStream> {
match &mut self.state {
DbClientState::Connected { connection } => Ok(connection),
DbClientState::Disconnected { .. } => Err(Error::Logic {
DbClientState::Disconnected { .. } => Err(Error::ClientMisuse {
reason: "You must call `connect` before any other method for DbClient".to_string(),
}),
}
Expand Down Expand Up @@ -75,7 +75,7 @@ impl Client for DbClient {
};
}
DbClientState::Connected { .. } => {
return Err(Error::Logic {
return Err(Error::ClientMisuse {
reason: "called `connect` twice on a DbClient".to_string(),
});
}
Expand Down
10 changes: 5 additions & 5 deletions src/cluster/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//!
//! Currently, the state is updated via a gossip protocol in which nodes exchange their view of the cluster
//! to every other host. For more info, see the docs for [`super::heartbeat`].
use crate::error::{Error, Result};
use crate::error::{Error, Internal, Result};
use crate::persistency::partitioning::PartitioningScheme;
use crate::server::config::Quorum;
use crate::utils::serde_utf8_bytes;
Expand Down Expand Up @@ -121,9 +121,9 @@ impl State {
if let Ok(guard) = self.inner.lock() {
Ok(guard)
} else {
Err(Error::Logic {
Err(Error::Internal(Internal::Logic {
reason: "Unable to acquire lock".to_string(),
})
}))
}
}

Expand Down Expand Up @@ -202,10 +202,10 @@ impl State {
inner_guard
.nodes
.get(&node_key)
.ok_or(Error::Logic {
.ok_or(Error::Internal(Internal::Logic {
reason: "Unable to find node inside RingState. This should never happen."
.to_string(),
})
}))
.cloned()
}

Expand Down
17 changes: 12 additions & 5 deletions src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,31 @@ pub enum Error {
/// returned by the server when the request is invalid for any reason -> this means the client has to fix something
InvalidRequest(InvalidRequest),
/// returned during server bootstrap if any configuration is invalid
InvalidServerConfig { reason: String },
InvalidServerConfig {
reason: String,
},
/// Internal error that should be opaque to an external client.. Since today we use the same error type
/// for internal errors and client errors this is a bit moot
Internal(Internal),
/// Self explanatory
Io { reason: String },
Io {
reason: String,
},
/// Error returned either in PUT or GET when quorum is not met
QuorumNotReached {
operation: String,
reason: String,
errors: Vec<Error>,
},
/// Logic is a type of error that signifies a bug in the database.
Logic { reason: String },
/// Error returned when a cluster has a single node and tries to heartbeat to self
SingleNodeCluster,
/// Returned when any invalid payload is returned/received by the server
InvalidJsonPayload { reason: String },
InvalidJsonPayload {
reason: String,
},
ClientMisuse {
reason: String,
},
}

impl Error {
Expand Down
28 changes: 10 additions & 18 deletions src/persistency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use partitioning::consistent_hashing::murmur3_hash;
use quorum::{min_required_replicas::MinRequiredReplicas, Evaluation, OperationStatus, Quorum};
use std::sync::Arc;
use storage::{Storage, StorageEntry, Value};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{event, instrument, Level};
use versioning::version_vector::{ProcessId, VersionVector};

Expand All @@ -47,9 +46,8 @@ pub type ClientFactory = Arc<dyn Factory + Send + Sync>;
/// be updated later on..
#[derive(Clone)]
pub struct Db {
/// TODO/FIXME: This is bad because it means that every operations locks
/// the entire Storage.
storage: Arc<AsyncMutex<Storage>>,
/// Storage engine
storage: Storage,
/// Cluster state.
cluster_state: Arc<ClusterState>,
/// Own state
Expand Down Expand Up @@ -124,7 +122,7 @@ impl Db {
shared: Arc::new(Shared::new(&own_addr)),
};
Self {
storage: Arc::new(AsyncMutex::new(Storage::new(own_state.pid()))),
storage: Storage::new(own_state.pid()),
cluster_state,
own_state,
client_factory,
Expand Down Expand Up @@ -251,8 +249,7 @@ impl Db {

#[instrument(name = "persistency::local_put", level = "info", skip(self))]
async fn local_put(&self, key: Bytes, value: Value, context: SerializedContext) -> Result<()> {
let mut storage_guard = self.storage.lock().await;
storage_guard.put(key, value, context).await?;
self.storage.put(key, value, context)?;

// FIXME: return type
Ok(())
Expand Down Expand Up @@ -300,22 +297,18 @@ impl Db {
Ok(())
}

/// Retrieves the [`Bytes`] associated with the given key.
/// Retrieves [`StorageEntry`]s associated with the given key.
///
/// If the key is not found, [Option::None] is returned
/// If the key is not found, [`Error::NotFound`] is returned.
///
/// # TODOs
/// 1. Handle version conflicts
/// - we have to deal with the case in which we have multiple versions of the same data encountered
/// - this will mean merging [`VersionVector`]s so that the client receives the merged version alongside an array of objects
/// 2. Handle integrity checks properly
/// 3. Implement Read Repair
/// 1. Handle integrity checks properly
/// 2. Implement Read Repair
#[instrument(level = "info")]
pub async fn get(&self, key: Bytes, replica: bool) -> Result<Vec<StorageEntry>> {
if replica {
event!(Level::DEBUG, "Executing a replica GET");
let storage_guard = self.storage.lock().await;
Ok(storage_guard.get(key.clone()).await?)
Ok(self.storage.get(key.clone())?)
} else {
event!(Level::DEBUG, "executing a non-replica GET");
let quorum_config = self.cluster_state.quorum_config();
Expand Down Expand Up @@ -384,8 +377,7 @@ impl Db {
async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result<Vec<StorageEntry>> {
if self.owns_key(&src_addr)? {
event!(Level::DEBUG, "Getting data from local storage");
let storage_guard = self.storage.lock().await;
Ok(storage_guard.get(key.clone()).await?)
Ok(self.storage.get(key.clone())?)
} else {
event!(
Level::DEBUG,
Expand Down
4 changes: 2 additions & 2 deletions src/persistency/partitioning/consistent_hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ impl PartitioningScheme for ConsistentHashing {
impl ConsistentHashing {
fn key_owner_index(&self, key: &[u8]) -> Result<usize> {
if self.nodes.is_empty() {
return Err(Error::Logic {
return Err(Error::Internal(Internal::Logic {
reason: "Can't ask for owner if no nodes are present".to_string(),
});
}));
}

let key_hash = (self.hash_fn)(key);
Expand Down
8 changes: 4 additions & 4 deletions src/persistency/quorum/min_required_replicas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

use super::{Evaluation, OperationStatus, Quorum, QuorumResult};
use crate::error::{Error, Result};
use crate::error::{Error, Internal, Result};

/// Definition of a MinRequiredReplicas [`Quorum`] type
#[derive(Debug)]
Expand All @@ -34,12 +34,12 @@ impl<T, E> MinRequiredReplicas<T, E> {
/// # Error
pub fn new(required_successes: usize) -> Result<Self> {
if required_successes < 1 {
return Err(Error::Logic {
return Err(Error::Internal(Internal::Logic {
reason: format!(
"required_success has to be greated than 1. got{}",
required_successes
),
});
}));
}

Ok(Self {
Expand Down Expand Up @@ -171,7 +171,7 @@ mod tests {
fn test_failed_to_construct() {
let err = MinRequiredReplicas::<(), Error>::new(0).err().unwrap();
match err {
crate::error::Error::Logic { .. } => { /* noop */ }
crate::error::Error::Internal(Internal::Logic { .. }) => { /* noop */ }
_ => {
panic!("Unexpected err {}", err);
}
Expand Down
Loading

0 comments on commit a96ceb6

Please sign in to comment.