Skip to content

Commit

Permalink
More documentation added.
Browse files Browse the repository at this point in the history
Now trying to be much more precise about the implementation.
Currently not making changes to code itself. When issues are found while
writing the docs, a TODO is added instead to avoid cluttering the
documentation commits..
  • Loading branch information
rcmgleite committed May 28, 2024
1 parent 0ca6788 commit c278dea
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 10 deletions.
20 changes: 16 additions & 4 deletions src/cmd/cluster/heartbeat.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
//! Heartbeat [`crate::cmd::Command`]
//!
//! This command is issued as part of the Gossip protocol that propagates
//! cluster states to all cluster nodes.
//! Every heartbeat request marshalls the node's own view of the cluster and sends it to X other nodes.
//! The receiving end of the command will merge its view of the cluster with received view and consolidate it
//! by checking each individual [`crate::cluster::state::Node::tick`] field and always favoring the highest one.
//! See [`crate::cluster::state`] docs for more information.
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -13,14 +21,17 @@ pub struct Heartbeat {
}

impl Heartbeat {
/// Constructs a new heartbeat [`crate::cmd::Command`]
pub fn new(nodes: Vec<Node>) -> Self {
Self { nodes }
}

// Heartbeat flow
// 1. receive a heartbeat (possibly from a node that it doesn't know yet)
// 2. update it's view of the ring state including the possibly new node
// 3. responde to the heartbeat with an ACK response
/// Executes a [`Heartbeat`] command

/// Heartbeat flow
/// 1. receive a heartbeat (possibly from a node that it doesn't know yet)
/// 2. update it's view of the ring state including the possibly new node
/// 3. responde to the heartbeat with an ACK response
pub async fn execute(self, db: Arc<Db>) -> Result<HeartbeatResponse> {
db.update_cluster_state(self.nodes)?;

Expand All @@ -44,6 +55,7 @@ impl IntoMessage for Heartbeat {
}
}

/// [Heartbeat] deserialized response payload
#[derive(Serialize, Deserialize)]
pub struct HeartbeatResponse {
message: String,
Expand Down
15 changes: 12 additions & 3 deletions src/cmd/cluster/join_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! [`JoinCluster`] [`crate::cmd::Command`]
//!
//! Every newly bootstrapped node that needs to join a cluster must receive this [`crate::server::message::Message`].
//! it will receive one existing node cluster so that it can establish a TCP connection to it and start receiving
//! cluster information from it.
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -7,6 +12,7 @@ use crate::{cluster::state::Node, db::Db, error::Result, server::message::IntoMe

pub const CMD_CLUSTER_JOIN_CLUSTER: u32 = 101;

/// JoinCluster deserialized [`crate::cmd::Command`]
#[derive(Serialize, Deserialize)]
pub struct JoinCluster {
known_cluster_node_addr: String,
Expand All @@ -19,9 +25,11 @@ impl JoinCluster {
}
}

// This cmd simply adds the provided target node to the cluster state.
// the background heartbeat process will take care of receiving ring state info
// from this node (eventually)
/// Executes a [`JoinCluster`] command.
///
/// This command simply adds the provided target node to the cluster state.
/// the background heartbeat process will take care of receiving ring state info
/// from this node (eventually). See [`crate::cluster::heartbeat`] docs for more information.
pub async fn execute(self, db: Arc<Db>) -> Result<JoinClusterResponse> {
let target_node = Node::new(Bytes::from(self.known_cluster_node_addr));

Expand All @@ -47,6 +55,7 @@ impl IntoMessage for JoinCluster {
}
}

/// Deserialized [`JoinCluster`] response payload.
#[derive(Serialize, Deserialize)]
pub struct JoinClusterResponse {
message: String,
Expand Down
1 change: 1 addition & 0 deletions src/cmd/cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
//! Defines commands that can only be issued while in Cluster mode
pub mod heartbeat;
pub mod join_cluster;
5 changes: 5 additions & 0 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Get [`crate::cmd::Command`]
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -17,10 +18,12 @@ pub struct Get {
}

impl Get {
/// Constructs a new [`Get`] instance
pub fn new(key: Bytes) -> Self {
Self { key }
}

/// Executes the [`Get`] command using the specified [`Db`] instance
pub async fn execute(self, db: Arc<Db>) -> Result<GetResponse> {
if let OwnsKeyResponse::False { addr } = db.owns_key(&self.key)? {
return Err(Error::InvalidRequest {
Expand All @@ -39,6 +42,7 @@ impl Get {
}
}

/// returns the cmd id for [`Get`]
pub fn cmd_id() -> u32 {
GET_CMD
}
Expand All @@ -54,6 +58,7 @@ impl IntoMessage for Get {
}
}

/// The struct that represents a [`Get`] response payload
#[derive(Serialize, Deserialize)]
pub struct GetResponse {
#[serde(with = "serde_utf8_bytes")]
Expand Down
14 changes: 12 additions & 2 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Module that contains all commands implemented by rldb.
pub mod cluster;
pub mod get;
pub mod ping;
Expand All @@ -24,8 +25,10 @@ use crate::{
server::message::Message,
};

// TODO: Note - we are mixing cluster and client commands here... it might be better to split them in the future.
// right now a cluster command issued against the client port will run normally which is a bit weird...
/// Command definition - this enum contains all commands implemented by rldb.
///
/// TODO: Note - we are mixing cluster and client commands here... it might be better to split them in the future.
/// right now a cluster command issued against the client port will run normally which is a bit weird...
pub enum Command {
Ping(PingCommand),
Get(GetCommand),
Expand All @@ -34,6 +37,7 @@ pub enum Command {
JoinCluster(JoinClusterCommand),
}

/// macro that tries to construct a specific [`Command`] from a [`Message`]
macro_rules! try_from_message_with_payload {
($message:expr, $t:ident) => {{
(|| {
Expand Down Expand Up @@ -63,6 +67,7 @@ macro_rules! try_from_message_with_payload {
}

impl Command {
/// Executes a given command by forwarding the [`Db`] instance provided
pub async fn execute(self, db: Arc<Db>) -> Message {
match self {
Command::Ping(cmd) => {
Expand Down Expand Up @@ -94,6 +99,10 @@ impl Command {
}
}

/// Tries to construct a [`Command`] from the provided [`Message`]
///
/// # Errors
/// returns an error if the payload doesn't conform with the specified [`Command`]
pub fn try_from_message(message: Message) -> Result<Command> {
match message.id {
PING_CMD => Ok(Command::Ping(ping::Ping)),
Expand All @@ -120,6 +129,7 @@ impl Command {
}
}

/// Serializes the given payload into json
pub(crate) fn serialize_response_payload<T: Serialize>(payload: T) -> Option<Bytes> {
Some(Bytes::from(serde_json::to_string(&payload).unwrap()))
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/ping.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Ping [`crate::cmd::Command`]
use serde::{Deserialize, Serialize};

use crate::{error::Result, server::message::IntoMessage};
Expand All @@ -21,6 +22,7 @@ impl IntoMessage for Ping {
}
}

/// [`Ping`] response payload
#[derive(Serialize, Deserialize)]
pub struct PingResponse {
message: String,
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/put.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Put [`crate::cmd::Command`]
use std::sync::Arc;

use bytes::Bytes;
Expand All @@ -10,6 +11,7 @@ use crate::utils::serde_utf8_bytes;

pub const PUT_CMD: u32 = 3;

/// Struct that represents a deserialized Put payload
#[derive(Serialize, Deserialize)]
pub struct Put {
#[serde(with = "serde_utf8_bytes")]
Expand All @@ -19,10 +21,12 @@ pub struct Put {
}

impl Put {
/// Constructs a new [`Put`] [`crate::cmd::Command`]
pub fn new(key: Bytes, value: Bytes) -> Self {
Self { key, value }
}

/// Executes a [`Put`] [`crate::cmd::Command`]
pub async fn execute(self, db: Arc<Db>) -> Result<PutResponse> {
if let OwnsKeyResponse::False { addr } = db.owns_key(&self.key)? {
return Err(Error::InvalidRequest {
Expand Down Expand Up @@ -54,6 +58,7 @@ impl IntoMessage for Put {
}
}

/// [`Put`] response payload in its deserialized form.
#[derive(Serialize, Deserialize)]
pub struct PutResponse {
message: String,
Expand Down
23 changes: 22 additions & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! Module that contains the abstraction connecting the [`StorageEngine`] and [`State`] into a single interface.
//!
//! This interface is what a [`crate::cmd::Command`] has access to in order to execute its functionality.
use bytes::Bytes;
use std::sync::Arc;

Expand All @@ -6,11 +9,12 @@ use crate::{
error::Result,
};

/// type alias to the [`StorageEngine`] that makes it clonable and [`Send`]
pub type StorageEngine = Arc<dyn crate::storage_engine::StorageEngine + Send + Sync + 'static>;

/// Db is the abstraction that connects storage_engine and overall database state
/// in a single interface.
/// It exists mainly to hide [`StorageEngine`] and [`PartitioningScheme`] details so that they can
/// It exists mainly to hide [`StorageEngine`] and [`State`] details so that they can
/// be updated later on..
#[derive(Debug)]
pub struct Db {
Expand All @@ -21,27 +25,41 @@ pub struct Db {
cluster_state: Option<Arc<State>>,
}

/// Possibly a bad idea, but using an enum instead of a boolean to determine if a key is owned by a node or not.
/// This is mostly useful because the [`OwnsKeyResponse::False`] variant contains the addrs of the node
/// that actually holds the key, which is sent back to the client as part of the TCP response.
pub enum OwnsKeyResponse {
/// The node provided to [`Db::owns_key`] owns the key
True,
/// The node provided to [`Db::owns_key`] does not own the key. The actual owner is returned in the addr field.
False { addr: Bytes },
}

impl Db {
/// Returns a new instance of [`Db`] with the provided [`StorageEngine`] and [`State`].
pub fn new(storage_engine: StorageEngine, cluster_state: Option<Arc<State>>) -> Self {
Self {
storage_engine,
cluster_state,
}
}

/// Stores the given key and value into the underlying [`StorageEngine`]
///
/// TODO: Should the checks regarding ownership of keys/partitions be moved to this function
/// instead of delegated to the Put [`crate::cmd::Command`]
pub async fn put(&self, key: Bytes, value: Bytes) -> Result<()> {
Ok(self.storage_engine.put(key, value).await?)
}

/// Retrieves the [`Bytes`] associated with the given key.
///
/// If the key is not found, [Option::None] is returned
pub async fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
Ok(self.storage_engine.get(key).await?)
}

/// Verifies if the key provided is owned by self.
pub fn owns_key(&self, key: &[u8]) -> Result<OwnsKeyResponse> {
if let Some(cluster_state) = self.cluster_state.as_ref() {
if cluster_state.owns_key(key)? {
Expand All @@ -56,6 +74,9 @@ impl Db {
}
}

/// Updates the cluster state based on the nodes provided.
///
/// This is used as part of the Gossip protocol to propagate cluster changes across all nodes
pub fn update_cluster_state(&self, nodes: Vec<Node>) -> Result<()> {
if let Some(cluster_state) = self.cluster_state.as_ref() {
Ok(cluster_state.merge_nodes(nodes)?)
Expand Down
3 changes: 3 additions & 0 deletions src/error/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! This module defines client/user visible errors that can be returned by rldb.

use std::fmt::Display;

use bytes::Bytes;
Expand All @@ -7,6 +9,7 @@ use crate::utils::serde_utf8_bytes;

pub type Result<T> = std::result::Result<T, Error>;

/// Error enum with all possible variants
#[derive(Debug, Serialize)]
pub enum Error {
NotFound {
Expand Down
1 change: 1 addition & 0 deletions src/server/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! This module contains the definition of a [`Message`] - the smallest unit of parseable bytes built for the rldb [`crate::server::Server`].
//!
//! When serialized, a [`Message`] looks like the following:
//!
//! [4 bytes - ID][4 bytes - length of payload][payload (dynamic size)]
Expand Down

0 comments on commit c278dea

Please sign in to comment.