Skip to content

Commit

Permalink
Rework version conflict evaluation on Put
Browse files Browse the repository at this point in the history
  • Loading branch information
rcmgleite committed Jul 3, 2024
1 parent ce8ba68 commit 6a4ede8
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 81 deletions.
22 changes: 10 additions & 12 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use serde::{Deserialize, Serialize};

use crate::error::{Error, Result};
use crate::error::Result;
use crate::persistency::{Db, Metadata};
use crate::server::message::IntoMessage;
use crate::utils::serde_utf8_bytes;
Expand Down Expand Up @@ -40,17 +40,15 @@ impl Get {

/// Executes the [`Get`] command using the specified [`Db`] instance
pub async fn execute(self, db: Arc<Db>) -> Result<Vec<GetResponse>> {
if let Some(resp) = db.get(self.key.clone(), self.replica).await? {
Ok(resp
.into_iter()
.map(|entry| GetResponse {
value: entry.value,
metadata: hex::encode(entry.metadata.serialize()),
})
.collect())
} else {
Err(Error::NotFound { key: self.key })
}
Ok(db
.get(self.key.clone(), self.replica)
.await?
.into_iter()
.map(|entry| GetResponse {
value: entry.value,
metadata: hex::encode(entry.metadata.serialize()),
})
.collect())
}

/// returns the cmd id for [`Get`]
Expand Down
40 changes: 19 additions & 21 deletions src/persistency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl Db {
/// - this will mean merging [`VersionVector`]s so that the client receives the merged version alongside an array of objects
/// 2. Handle integrity checks properly
/// 3. Implement Read Repair
pub async fn get(&self, key: Bytes, replica: bool) -> Result<Option<Vec<StorageEntry>>> {
pub async fn get(&self, key: Bytes, replica: bool) -> Result<Vec<StorageEntry>> {
if replica {
event!(Level::DEBUG, "Executing a replica GET");
let storage_guard = self.storage.lock().await;
Expand Down Expand Up @@ -420,7 +420,7 @@ impl Db {
}
}

async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result<Option<Vec<StorageEntry>>> {
async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result<Vec<StorageEntry>> {
if self.owns_key(&src_addr)? {
event!(Level::DEBUG, "Getting data from local storage");
let storage_guard = self.storage.lock().await;
Expand Down Expand Up @@ -448,18 +448,17 @@ impl Db {

let resp = client.get(key.clone(), true).await?;
// FIXME: remove unwrap()
Ok(Some(
resp.into_iter()
.map(|entry| StorageEntry {
value: entry.value,
metadata: Metadata::deserialize(
self.own_state.pid(),
hex::decode(entry.metadata).unwrap().into(),
)
.unwrap(),
})
.collect(),
))
Ok(resp
.into_iter()
.map(|entry| StorageEntry {
value: entry.value,
metadata: Metadata::deserialize(
self.own_state.pid(),
hex::decode(entry.metadata).unwrap().into(),
)
.unwrap(),
})
.collect())
}
}

Expand Down Expand Up @@ -551,7 +550,7 @@ mod tests {
.await
.unwrap();

let mut get_result = db.get(key, replication).await.unwrap().unwrap();
let mut get_result = db.get(key, replication).await.unwrap();
assert_eq!(get_result.len(), 1);

let entry = get_result.remove(0);
Expand Down Expand Up @@ -608,7 +607,7 @@ mod tests {
.await
.unwrap();

let mut get_result = db.get(key, replication).await.unwrap().unwrap();
let mut get_result = db.get(key, replication).await.unwrap();

let entry = get_result.remove(0);
assert_eq!(entry.value, value);
Expand Down Expand Up @@ -678,8 +677,7 @@ mod tests {
.await
.unwrap();

// let (first_metadata, data) = db.get(key.clone(), replication).await.unwrap().unwrap();
let mut entries = db.get(key.clone(), replication).await.unwrap().unwrap();
let mut entries = db.get(key.clone(), replication).await.unwrap();

assert_eq!(entries.len(), 1);

Expand All @@ -698,7 +696,7 @@ mod tests {
.await
.unwrap();

let mut second_read_entries = db.get(key, replication).await.unwrap().unwrap();
let mut second_read_entries = db.get(key, replication).await.unwrap();
assert_eq!(second_read_entries.len(), 1);
let deserialized_first_metadata = first_entry.metadata;
let second_entry = second_read_entries.remove(0);
Expand All @@ -724,7 +722,7 @@ mod tests {
.await
.unwrap();

let mut first_entries = db.get(key.clone(), replication).await.unwrap().unwrap();
let mut first_entries = db.get(key.clone(), replication).await.unwrap();

assert_eq!(first_entries.len(), 1);
let first_entry = first_entries.remove(0);
Expand Down Expand Up @@ -767,7 +765,7 @@ mod tests {
.await
.unwrap();

let mut first_entries = db.get(key.clone(), replication).await.unwrap().unwrap();
let mut first_entries = db.get(key.clone(), replication).await.unwrap();

assert_eq!(first_entries.len(), 1);
let first_entry = first_entries.remove(0);
Expand Down
100 changes: 52 additions & 48 deletions src/persistency/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
storage_engine::{in_memory::InMemory, StorageEngine as StorageEngineTrait},
};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::{mem::size_of, sync::Arc};
use std::{collections::HashMap, mem::size_of, sync::Arc};
use tracing::{event, Level};

use super::{
Expand Down Expand Up @@ -75,41 +75,46 @@ impl Storage {
}

pub async fn put(&self, key: Bytes, entry: StorageEntry) -> Result<Vec<StorageEntry>> {
let current_versions = self.get(key.clone()).await?;
let current_versions = match self.get(key.clone()).await {
Ok(current_versions) => current_versions,
Err(Error::NotFound { .. }) => Vec::new(),
Err(err) => {
return Err(err);
}
};
event!(
Level::INFO,
"current versions - key: {:?} - {:?}",
key,
current_versions
);
let versions_to_store = if let Some(current_versions) = current_versions {
let mut entries_to_store = Vec::new();
for existing_entry in current_versions {
if let super::MetadataEvaluation::Override =
metadata_evaluation(&entry.metadata, &existing_entry.metadata)?
{
entries_to_store.push(entry.clone());
} else {
entries_to_store.push(existing_entry);
entries_to_store.push(entry.clone());
}
}

entries_to_store.sort();
entries_to_store.dedup();
let mut version_map = HashMap::<Metadata, StorageEntry>::new();
version_map.insert(
entry.metadata.clone(),
StorageEntry {
metadata: entry.metadata.clone(),
value: entry.value,
},
);

entries_to_store
} else {
vec![entry]
};
for existing_entry in current_versions {
if let super::MetadataEvaluation::Conflict =
metadata_evaluation(&entry.metadata, &existing_entry.metadata)?
{
version_map.insert(existing_entry.metadata.clone(), existing_entry);
}
}

let entries_to_store = version_map.into_iter().map(|e| e.1).collect();

event!(
Level::INFO,
"versions that will be stored - key: {:?} - {:?}",
key,
versions_to_store
entries_to_store
);
self.do_put(key, versions_to_store).await
self.do_put(key, entries_to_store).await
}

async fn do_put(&self, key: Bytes, items: Vec<StorageEntry>) -> Result<Vec<StorageEntry>> {
Expand Down Expand Up @@ -165,34 +170,33 @@ impl Storage {
Ok(res)
}

pub async fn get(&self, key: Bytes) -> Result<Option<Vec<StorageEntry>>> {
let metadata = self.metadata_engine.get(&key).await?;
if let Some(metadata) = metadata {
let data = self.data_engine.get(&key).await?.ok_or(Error::Logic {
reason: "Metadata entry found but no data entry found".to_string(),
})?;
pub async fn get(&self, key: Bytes) -> Result<Vec<StorageEntry>> {
let metadata = self
.metadata_engine
.get(&key)
.await?
.ok_or(Error::NotFound { key: key.clone() })?;
let data = self.data_engine.get(&key).await?.ok_or(Error::Logic {
reason: "Metadata entry found but no data entry found".to_string(),
})?;

let metadata_items = Self::unmarshall_entries(metadata)?;
let data_items = Self::unmarshall_entries(data)?;

if metadata_items.len() != data_items.len() {
return Err(Error::Logic {
reason: "Data and Metadata items must have the same length".to_string(),
});
}
let metadata_items = Self::unmarshall_entries(metadata)?;
let data_items = Self::unmarshall_entries(data)?;

let data_and_metadata_items: Vec<StorageEntry> =
std::iter::zip(metadata_items, data_items)
.map(|(m, d)| StorageEntry {
value: d,
metadata: Metadata::deserialize(self.pid, m).unwrap(), // TODO: unwrap()
})
.collect();

Ok(Some(data_and_metadata_items))
} else {
Ok(None)
if metadata_items.len() != data_items.len() {
return Err(Error::Logic {
reason: "Data and Metadata items must have the same length".to_string(),
});
}

let data_and_metadata_items: Vec<StorageEntry> = std::iter::zip(metadata_items, data_items)
.map(|(m, d)| StorageEntry {
value: d,
metadata: Metadata::deserialize(self.pid, m).unwrap(), // TODO: unwrap()
})
.collect();

Ok(data_and_metadata_items)
}
}

Expand Down Expand Up @@ -241,7 +245,7 @@ mod tests {

store.put(key.clone(), entry_pid_1).await.unwrap();

let mut get_entries = store.get(key).await.unwrap().unwrap();
let mut get_entries = store.get(key).await.unwrap();

assert_eq!(get_entries.len(), 2);
let entry_0 = get_entries.remove(0);
Expand Down

0 comments on commit 6a4ede8

Please sign in to comment.