Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce quorum trait #4

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions src/client/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ pub enum Error {
/// Variant returned if the client was unable to interpret the server response
InvalidServerResponse { reason: String },
/// Variant returned when either PUT or GET quorums are not met
QuorumNotReached {
operation: String,
required: usize,
got: usize,
},
QuorumNotReached { operation: String, reason: String },
/// Error for GET requests when the key doesn't exist
NotFound {
#[serde(with = "serde_utf8_bytes")]
Expand Down
1 change: 1 addition & 0 deletions src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
pub mod error;
pub mod heartbeat;
pub mod partitioning;
pub mod quorum;
pub mod state;
191 changes: 191 additions & 0 deletions src/cluster/quorum/min_required_replicas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//! MinRequiredSuccesses is a simple quorum algorithm that returns [`Evaluation::Reached`] if the Hash of the value T
//! provided in [`Quorum::update`] appeared more than the configured minimum times.
use super::{Evaluation, OperationStatus, Quorum, QuorumResult};
use crate::cluster::error::{Error, Result};
use std::hash::Hash;

/// Handle of a MinRequiredSuccess [`Quorum`] type
#[derive(Debug)]
pub struct MinRequiredReplicas<T, E> {
/// Number of replicas that were not yet evaluated. This is initially set to the total number of replicas
/// that exist
remaining_replicas: usize,
/// number of replicas that need to succeed in order for the quorum to me met
required_successes: usize,
/// hashmap containing the frequencies which each T value happened
successes: Vec<T>,
/// tracks all failures received
failures: Vec<E>,
/// Current state of this quorum instance
current_state: Evaluation,
}

impl<T, E> MinRequiredReplicas<T, E> {
/// Constructs a new instance of [`MinRequiredReplicas`]
pub fn new(n_replicas: usize, required_successes: usize) -> Result<Self> {
if n_replicas < required_successes {
return Err(Error::Logic {
reason: format!(
"n_replicas ({}) need to be higher than required_successes({}).",
n_replicas, required_successes
),
});
}

Ok(Self {
remaining_replicas: n_replicas,
required_successes,
successes: Default::default(),
failures: Default::default(),
current_state: Evaluation::NotReached,
})
}
}

impl<T: Eq + Hash, E: std::error::Error> Quorum<T, E> for MinRequiredReplicas<T, E> {
fn update(&mut self, operation_status: OperationStatus<T, E>) -> Result<Evaluation> {
if self.remaining_replicas == 0 {
return Err(Error::Logic { reason: "Calling `update` on MinRequiredReplicas Quorum more times than the total number of replicas. This is a bug".to_string() });
}

match operation_status {
OperationStatus::Success(item) => {
self.successes.push(item);
}
OperationStatus::Failure(err) => {
self.failures.push(err);
}
}

self.remaining_replicas -= 1;

// no point trying to evaluate again given the quorum is already unreachable
if self.current_state == Evaluation::Unreachable {
return Ok(self.current_state);
}

self.current_state = if self.successes.len() >= self.required_successes {
Evaluation::Reached
} else if self.remaining_replicas + self.successes.len() < self.required_successes {
Evaluation::Unreachable
} else {
Evaluation::NotReached
};

Ok(self.current_state)
}

fn finish(self) -> QuorumResult<T, E> {
QuorumResult {
evaluation: self.current_state,
successes: self.successes,
failures: self.failures,
}
}
}

#[cfg(test)]
mod tests {
use crate::{
cluster::quorum::{Evaluation, OperationStatus, Quorum},
error::Error,
};

use super::MinRequiredReplicas;

#[test]
fn test_quorum_reached() {
let mut q = MinRequiredReplicas::new(3, 2).unwrap();
assert_eq!(
q.update(OperationStatus::Success(())).unwrap(),
Evaluation::NotReached
);
assert_eq!(
q.update(OperationStatus::Success(())).unwrap(),
Evaluation::Reached
);

// even after the quorum is reached, there's nothing that prevents a client from calling update again.
assert_eq!(
q.update(OperationStatus::Failure(Error::Generic {
reason: "fake".to_string(),
}))
.unwrap(),
Evaluation::Reached
);

let quorum_result = q.finish();
assert_eq!(quorum_result.evaluation, Evaluation::Reached);
assert_eq!(quorum_result.successes.len(), 2);
assert_eq!(quorum_result.failures.len(), 1);
}

#[test]
fn test_quorum_not_reached() {
let mut q: MinRequiredReplicas<(), Error> = MinRequiredReplicas::new(3, 2).unwrap();
assert_eq!(
q.update(OperationStatus::Failure(Error::Generic {
reason: "fake".to_string(),
}))
.unwrap(),
Evaluation::NotReached
);

assert_eq!(
q.update(OperationStatus::Failure(Error::Generic {
reason: "fake".to_string(),
}))
.unwrap(),
Evaluation::Unreachable
);

assert_eq!(
q.update(OperationStatus::Failure(Error::Generic {
reason: "fake".to_string(),
}))
.unwrap(),
Evaluation::Unreachable
);

let quorum_result = q.finish();
assert_eq!(quorum_result.evaluation, Evaluation::Unreachable);
assert!(quorum_result.successes.is_empty());
assert_eq!(quorum_result.failures.len(), 3);
}

#[test]
fn test_failed_to_construct() {
let err = MinRequiredReplicas::<(), Error>::new(2, 3).err().unwrap();
match err {
crate::cluster::error::Error::Logic { .. } => { /* noop */ }
_ => {
panic!("Unexpected err {}", err);
}
}
}

#[test]
fn test_call_update_more_times_than_allowed() {
let mut q: MinRequiredReplicas<(), Error> = MinRequiredReplicas::new(3, 2).unwrap();
assert_eq!(
q.update(OperationStatus::Success(())).unwrap(),
Evaluation::NotReached
);
assert_eq!(
q.update(OperationStatus::Success(())).unwrap(),
Evaluation::Reached
);
assert_eq!(
q.update(OperationStatus::Success(())).unwrap(),
Evaluation::Reached
);

let err = q.update(OperationStatus::Success(())).err().unwrap();
match err {
crate::cluster::error::Error::Logic { .. } => { /* noop */ }
_ => {
panic!("Unexpected err {}", err);
}
}
}
}
44 changes: 44 additions & 0 deletions src/cluster/quorum/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//! Module that contain quorum algorithm implementations

pub mod min_required_replicas;
use crate::cluster::error::Result;

/// The result of a [`Quorum::finish`] call.
///
/// If quorum was reached, returns the [`Evaluation::Reached`] variant containing the original type T and how many times this value was found
/// If quorum was NOT reached, returns the [`Evaluation::NotReached`] variant containing all errors encountered
///
/// Note 1: This trait allows for multiple results to meet quorum. For that reason, the [`Evaluation::Reached`] variant contains
/// an array of Vec<T, usize> where usize is how many times the given value was received
/// Note 2: The API could be a bit nicer if we always returned the failures (might be useful for logging?)
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Evaluation {
/// Quorum was reached
Reached,
/// Quorum was not reached yet but may reached be in the future
NotReached,
/// Given the current state, it's impossible to reach quorum no matter how many more operations succeed
Unreachable,
}

#[derive(Debug)]
pub struct QuorumResult<T, E> {
pub evaluation: Evaluation,
pub successes: Vec<T>,
pub failures: Vec<E>,
}

/// Argument passed to the [`Quorum::update`] function to mark an operation as either a Success or a Failure
pub enum OperationStatus<T, E> {
Success(T),
Failure(E),
}

/// Trait that defines the Quorum interface.
pub trait Quorum<T, E: std::error::Error> {
/// Updates the Quorum internal state with either a success or a failure
fn update(&mut self, operation_status: OperationStatus<T, E>) -> Result<Evaluation>;

/// Returns Ok if the quorum was met or an Error otherwise
fn finish(self) -> QuorumResult<T, E>;
}
60 changes: 27 additions & 33 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Get [`crate::cmd::Command`]
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -10,6 +10,8 @@ use tracing::{event, Level};

use crate::client::db_client::DbClient;
use crate::client::Client;
use crate::cluster::quorum::min_required_replicas::MinRequiredReplicas;
use crate::cluster::quorum::{Evaluation, OperationStatus, Quorum};
use crate::db::{Db, OwnsKeyResponse};
use crate::error::{Error, Result};
use crate::server::message::IntoMessage;
Expand Down Expand Up @@ -50,6 +52,9 @@ impl Get {
} else {
event!(Level::INFO, "executing a non-replica GET");
if let Some(quorum_config) = db.quorum_config() {
let mut quorum =
MinRequiredReplicas::new(quorum_config.replicas, quorum_config.reads)?;

let mut futures = FuturesUnordered::new();
let preference_list = db.preference_list(&self.key)?;
event!(Level::INFO, "GET preference_list {:?}", preference_list);
Expand All @@ -59,12 +64,10 @@ impl Get {

// TODO: we are waiting for all nodes on the preference list to return either error or success
// this will cause latency issues and it's no necessary.. fix it later
let mut results = Vec::new();
let mut errors = Vec::new();
while let Some(res) = futures.next().await {
match res {
Ok(res) => {
results.push(res);
let _ = quorum.update(OperationStatus::Success(res));
}
Err(err) => {
event!(
Expand All @@ -73,38 +76,30 @@ impl Get {
err
);

errors.push(err);
let _ = quorum.update(OperationStatus::Failure(err));
}
}
}

event!(Level::INFO, "raw results: {:?}", results);
// TODO: very cumbersome logic... will have to make this better later
let mut successes = 0;
let mut result_freq: HashMap<Bytes, usize> = HashMap::new();
for result in results {
*result_freq.entry(result).or_default() += 1;
}
event!(Level::INFO, "quorum: {:?}", quorum);

event!(Level::WARN, "result_freq: {:?}", result_freq);
let quorum_result = quorum.finish();
match quorum_result.evaluation {
Evaluation::Reached => Ok(quorum_result.successes[0].clone()),
Evaluation::NotReached | Evaluation::Unreachable => {
if quorum_result.failures.iter().all(|err| err.is_not_found()) {
return Err(Error::NotFound { key: self.key });
}

for (res, freq) in result_freq {
if freq >= quorum_config.reads {
return Ok(GetResponse { value: res });
Err(Error::QuorumNotReached {
operation: "Get".to_string(),
reason: format!(
"Unable to execute {} successful GETs",
quorum_config.reads
),
})
}

successes = freq;
}

if errors.iter().all(|err| err.is_not_found()) {
return Err(Error::NotFound { key: self.key });
}

Err(Error::QuorumNotReached {
operation: "Get".to_string(),
required: quorum_config.reads,
got: successes,
})
} else {
let value = db.get(&self.key).await?;
if let Some(value) = value {
Expand All @@ -116,7 +111,7 @@ impl Get {
}
}

async fn do_get(key: Bytes, db: Arc<Db>, src_addr: Bytes) -> Result<Bytes> {
async fn do_get(key: Bytes, db: Arc<Db>, src_addr: Bytes) -> Result<GetResponse> {
if let OwnsKeyResponse::True = db.owns_key(&src_addr)? {
event!(
Level::INFO,
Expand All @@ -125,7 +120,7 @@ impl Get {
);
let res = db.get(&key).await?;
if let Some(res) = res {
Ok(res)
Ok(GetResponse { value: res })
} else {
Err(Error::NotFound { key })
}
Expand All @@ -150,8 +145,7 @@ impl Get {
event!(Level::INFO, "connecting to node node: {:?}", src_addr);
client.connect().await?;

let resp = client.get(key.clone(), true).await?;
Ok(resp.value)
Ok(client.get(key.clone(), true).await?)
}
}

Expand All @@ -172,7 +166,7 @@ impl IntoMessage for Get {
}

/// The struct that represents a [`Get`] response payload
#[derive(Debug, Serialize, Deserialize)]
#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
pub struct GetResponse {
#[serde(with = "serde_utf8_bytes")]
pub value: Bytes,
Expand Down
Loading
Loading