Skip to content

Commit

Permalink
Minor refactor on Message and Server
Browse files Browse the repository at this point in the history
  • Loading branch information
rcmgleite committed Jul 16, 2024
1 parent 487373f commit b96e5a9
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 99 deletions.
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,5 @@ tracing = "0.1.36"
tracing-bunyan-formatter = "0.3.3"
tracing-opentelemetry = "0.19.0"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
strum = "0.26.3"
strum_macros = "0.26.4"
1 change: 0 additions & 1 deletion src/bin/db_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ enum Commands {
key: Bytes,
},
#[command()]
// FIXME: Allow metadata to be passed as optional
Put {
#[arg(short)]
port: u16,
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::{
cluster::state::Node, cmd::CLUSTER_CLUSTER_STATE_CMD, error::Result, persistency::Db,
cluster::state::Node, cmd::CommandId, error::Result, persistency::Db,
server::message::IntoMessage,
};

Expand All @@ -29,13 +29,13 @@ impl ClusterState {
})
}

pub fn cmd_id() -> u32 {
CLUSTER_CLUSTER_STATE_CMD
pub fn cmd_id() -> CommandId {
CommandId::ClusterState
}
}

impl IntoMessage for ClusterState {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
Self::cmd_id()
}

Expand All @@ -51,7 +51,7 @@ pub struct ClusterStateResponse {
}

impl IntoMessage for Result<ClusterStateResponse> {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
ClusterState::cmd_id()
}

Expand Down
10 changes: 5 additions & 5 deletions src/cmd/cluster/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use tracing::instrument;

use crate::{
cluster::state::Node, cmd::CLUSTER_HEARTBEAT_CMD, error::Result, persistency::Db,
cluster::state::Node, cmd::CommandId, error::Result, persistency::Db,
server::message::IntoMessage,
};

Expand Down Expand Up @@ -43,13 +43,13 @@ impl Heartbeat {
})
}

pub fn cmd_id() -> u32 {
CLUSTER_HEARTBEAT_CMD
pub fn cmd_id() -> CommandId {
CommandId::Heartbeat
}
}

impl IntoMessage for Heartbeat {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
Self::cmd_id()
}

Expand All @@ -65,7 +65,7 @@ pub struct HeartbeatResponse {
}

impl IntoMessage for Result<HeartbeatResponse> {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
Heartbeat::cmd_id()
}

Expand Down
10 changes: 5 additions & 5 deletions src/cmd/cluster/join_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
use tracing::instrument;

use crate::{
cluster::state::Node, cmd::CLUSTER_JOIN_CLUSTER_CMD, error::Result, persistency::Db,
cluster::state::Node, cmd::CommandId, error::Result, persistency::Db,
server::message::IntoMessage,
};

Expand Down Expand Up @@ -43,13 +43,13 @@ impl JoinCluster {
})
}

pub fn cmd_id() -> u32 {
CLUSTER_JOIN_CLUSTER_CMD
pub fn cmd_id() -> CommandId {
CommandId::JoinCluster
}
}

impl IntoMessage for JoinCluster {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
Self::cmd_id()
}

Expand All @@ -65,7 +65,7 @@ pub struct JoinClusterResponse {
}

impl IntoMessage for Result<JoinClusterResponse> {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
JoinCluster::cmd_id()
}

Expand Down
10 changes: 5 additions & 5 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::server::message::IntoMessage;
use crate::utils::serde_utf8_bytes;

use super::types::Context;
use super::GET_CMD;
use super::CommandId;

#[derive(Debug, Serialize, Deserialize)]
pub struct Get {
Expand Down Expand Up @@ -55,13 +55,13 @@ impl Get {
}

/// returns the cmd id for [`Get`]
pub fn cmd_id() -> u32 {
GET_CMD
pub fn cmd_id() -> CommandId {
CommandId::Get
}
}

impl IntoMessage for Get {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
Self::cmd_id()
}

Expand All @@ -83,7 +83,7 @@ pub struct GetResponse {
}

impl IntoMessage for Result<GetResponse> {
fn id(&self) -> u32 {
fn cmd_id(&self) -> CommandId {
Get::cmd_id()
}

Expand Down
91 changes: 52 additions & 39 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use get::Get as GetCommand;
use ping::Ping as PingCommand;
use put::Put as PutCommand;
use replication_get::ReplicationGet as ReplicationGetCommand;
use strum_macros::FromRepr;
use tracing::{event, instrument, Level};

use crate::{
Expand All @@ -30,15 +31,34 @@ use crate::{
server::message::Message,
};

/// Command ids used to figure out the layout of the payload to be parsed from a [`Message`]
pub(crate) const GET_CMD: u32 = 1;
pub(crate) const REPLICATION_GET_CMD: u32 = 2;
pub(crate) const PUT_CMD: u32 = 3;
pub(crate) const PING_CMD: u32 = 4;
#[derive(Debug, FromRepr, PartialEq, Eq)]
#[repr(u8)]
pub enum CommandId {
NotSet = 0,
Ping = 1,
Get = 2,
Put = 3,
ReplicationGet = 4,

// cluster commands
Heartbeat = 5,
JoinCluster = 6,
ClusterState = 7,
}

pub(crate) const CLUSTER_HEARTBEAT_CMD: u32 = 101;
pub(crate) const CLUSTER_JOIN_CLUSTER_CMD: u32 = 102;
pub(crate) const CLUSTER_CLUSTER_STATE_CMD: u32 = 103;
impl TryFrom<u8> for CommandId {
type Error = Error;

fn try_from(v: u8) -> std::result::Result<Self, Self::Error> {
if let Some(s) = Self::from_repr(v) {
Ok(s)
} else {
Err(Error::InvalidRequest(InvalidRequest::UnrecognizedCommand {
id: v,
}))
}
}
}

/// Command definition - this enum contains all commands implemented by rldb.
///
Expand All @@ -59,11 +79,11 @@ pub enum Command {
macro_rules! try_from_message_with_payload {
($message:expr, $t:ident) => {{
(|| {
if $message.id != $t::cmd_id() {
if $message.cmd_id != $t::cmd_id() {
return Err(Error::InvalidRequest(
InvalidRequest::UnableToConstructCommandFromMessage {
expected_id: $t::cmd_id(),
got: $message.id,
expected_id: $t::cmd_id() as u8,
got: $message.cmd_id as u8,
},
));
}
Expand Down Expand Up @@ -101,43 +121,40 @@ impl Command {
/// returns an error if the payload doesn't conform with the specified [`Command`]
#[instrument(level = "info")]
pub fn try_from_message(message: Message) -> Result<Command> {
match message.id {
PING_CMD => Ok(Command::Ping(ping::Ping)),
GET_CMD => Ok(Command::Get(try_from_message_with_payload!(
match message.cmd_id {
CommandId::Ping => Ok(Command::Ping(ping::Ping)),
CommandId::Get => Ok(Command::Get(try_from_message_with_payload!(
message, GetCommand
)?)),
REPLICATION_GET_CMD => Ok(Command::ReplicationGet(try_from_message_with_payload!(
message,
ReplicationGetCommand
)?)),
PUT_CMD => Ok(Command::Put(try_from_message_with_payload!(
CommandId::ReplicationGet => Ok(Command::ReplicationGet(
try_from_message_with_payload!(message, ReplicationGetCommand)?,
)),
CommandId::Put => Ok(Command::Put(try_from_message_with_payload!(
message, PutCommand
)?)),
CLUSTER_HEARTBEAT_CMD => Ok(Command::Heartbeat(try_from_message_with_payload!(
CommandId::Heartbeat => Ok(Command::Heartbeat(try_from_message_with_payload!(
message,
HeartbeatCommand
)?)),
CLUSTER_JOIN_CLUSTER_CMD => Ok(Command::JoinCluster(try_from_message_with_payload!(
CommandId::JoinCluster => Ok(Command::JoinCluster(try_from_message_with_payload!(
message,
JoinClusterCommand
)?)),
CLUSTER_CLUSTER_STATE_CMD => Ok(Command::ClusterState(try_from_message_with_payload!(
CommandId::ClusterState => Ok(Command::ClusterState(try_from_message_with_payload!(
message,
ClusterStateCommand
)?)),
_ => {
event!(Level::WARN, "Unrecognized command: {}", message.id);
Err(Error::InvalidRequest(InvalidRequest::UnrecognizedCommand {
id: message.id,
}))
CommandId::NotSet => {
event!(Level::WARN, "CommandId not set (or set to zero)");
Err(Error::InvalidRequest(InvalidRequest::CommandIdNotSet))
}
}
}
}

#[cfg(test)]
mod tests {
use super::Command;
use super::{Command, CommandId};
use crate::cmd::get::Get;
use crate::cmd::put::Put;
use crate::error::{Error, InvalidRequest};
Expand All @@ -149,7 +166,7 @@ mod tests {
fn invalid_request_mixed_message_id() {
let put_cmd = Get::new(Bytes::from("foo"));
let mut message = Message::from(put_cmd);
message.id = Put::cmd_id();
message.cmd_id = Put::cmd_id();

let err = Command::try_from_message(message).err().unwrap();
match err {
Expand All @@ -161,24 +178,20 @@ mod tests {
}

#[test]
fn invalid_request_unrecognized_command() {
fn invalid_request_command_id_not_set() {
let put_cmd = Put::new(
Bytes::from("foo"),
Value::new_unchecked(Bytes::from("bar")),
None,
);
let mut message = Message::from(put_cmd);
message.id = 99999;
message.cmd_id = CommandId::NotSet;

let err = Command::try_from_message(message).err().unwrap();
match err {
Error::InvalidRequest(InvalidRequest::UnrecognizedCommand { id }) => {
assert_eq!(id, 99999);
}
_ => {
panic!("Unexpected error: {}", err);
}
}
assert!(matches!(
err,
Error::InvalidRequest(InvalidRequest::CommandIdNotSet)
));
}

#[test]
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tracing::instrument;

use crate::{error::Result, server::message::IntoMessage};

use super::PING_CMD;
use super::CommandId;

#[derive(Debug, Serialize)]
pub struct Ping;
Expand All @@ -20,8 +20,8 @@ impl Ping {
}

impl IntoMessage for Ping {
fn id(&self) -> u32 {
PING_CMD
fn cmd_id(&self) -> CommandId {
CommandId::Ping
}
}

Expand All @@ -32,8 +32,8 @@ pub struct PingResponse {
}

impl IntoMessage for Result<PingResponse> {
fn id(&self) -> u32 {
PING_CMD
fn cmd_id(&self) -> CommandId {
CommandId::Ping
}

fn payload(&self) -> Option<bytes::Bytes> {
Expand Down
Loading

0 comments on commit b96e5a9

Please sign in to comment.