From 6a4ede8dba7d9128311602d92ddfdb618e16365d Mon Sep 17 00:00:00 2001 From: Rafael Leite Date: Tue, 2 Jul 2024 23:45:54 -0700 Subject: [PATCH] Rework version conflict evaluation on Put --- src/cmd/get.rs | 22 ++++---- src/persistency/mod.rs | 40 +++++++------ src/persistency/storage/mod.rs | 100 +++++++++++++++++---------------- 3 files changed, 81 insertions(+), 81 deletions(-) diff --git a/src/cmd/get.rs b/src/cmd/get.rs index f5da516..7487d1c 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use crate::error::{Error, Result}; +use crate::error::Result; use crate::persistency::{Db, Metadata}; use crate::server::message::IntoMessage; use crate::utils::serde_utf8_bytes; @@ -40,17 +40,15 @@ impl Get { /// Executes the [`Get`] command using the specified [`Db`] instance pub async fn execute(self, db: Arc) -> Result> { - if let Some(resp) = db.get(self.key.clone(), self.replica).await? { - Ok(resp - .into_iter() - .map(|entry| GetResponse { - value: entry.value, - metadata: hex::encode(entry.metadata.serialize()), - }) - .collect()) - } else { - Err(Error::NotFound { key: self.key }) - } + Ok(db + .get(self.key.clone(), self.replica) + .await? + .into_iter() + .map(|entry| GetResponse { + value: entry.value, + metadata: hex::encode(entry.metadata.serialize()), + }) + .collect()) } /// returns the cmd id for [`Get`] diff --git a/src/persistency/mod.rs b/src/persistency/mod.rs index acf181d..0dfa333 100644 --- a/src/persistency/mod.rs +++ b/src/persistency/mod.rs @@ -355,7 +355,7 @@ impl Db { /// - this will mean merging [`VersionVector`]s so that the client receives the merged version alongside an array of objects /// 2. Handle integrity checks properly /// 3. Implement Read Repair - pub async fn get(&self, key: Bytes, replica: bool) -> Result>> { + pub async fn get(&self, key: Bytes, replica: bool) -> Result> { if replica { event!(Level::DEBUG, "Executing a replica GET"); let storage_guard = self.storage.lock().await; @@ -420,7 +420,7 @@ impl Db { } } - async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result>> { + async fn do_get(&self, key: Bytes, src_addr: Bytes) -> Result> { if self.owns_key(&src_addr)? { event!(Level::DEBUG, "Getting data from local storage"); let storage_guard = self.storage.lock().await; @@ -448,18 +448,17 @@ impl Db { let resp = client.get(key.clone(), true).await?; // FIXME: remove unwrap() - Ok(Some( - resp.into_iter() - .map(|entry| StorageEntry { - value: entry.value, - metadata: Metadata::deserialize( - self.own_state.pid(), - hex::decode(entry.metadata).unwrap().into(), - ) - .unwrap(), - }) - .collect(), - )) + Ok(resp + .into_iter() + .map(|entry| StorageEntry { + value: entry.value, + metadata: Metadata::deserialize( + self.own_state.pid(), + hex::decode(entry.metadata).unwrap().into(), + ) + .unwrap(), + }) + .collect()) } } @@ -551,7 +550,7 @@ mod tests { .await .unwrap(); - let mut get_result = db.get(key, replication).await.unwrap().unwrap(); + let mut get_result = db.get(key, replication).await.unwrap(); assert_eq!(get_result.len(), 1); let entry = get_result.remove(0); @@ -608,7 +607,7 @@ mod tests { .await .unwrap(); - let mut get_result = db.get(key, replication).await.unwrap().unwrap(); + let mut get_result = db.get(key, replication).await.unwrap(); let entry = get_result.remove(0); assert_eq!(entry.value, value); @@ -678,8 +677,7 @@ mod tests { .await .unwrap(); - // let (first_metadata, data) = db.get(key.clone(), replication).await.unwrap().unwrap(); - let mut entries = db.get(key.clone(), replication).await.unwrap().unwrap(); + let mut entries = db.get(key.clone(), replication).await.unwrap(); assert_eq!(entries.len(), 1); @@ -698,7 +696,7 @@ mod tests { .await .unwrap(); - let mut second_read_entries = db.get(key, replication).await.unwrap().unwrap(); + let mut second_read_entries = db.get(key, replication).await.unwrap(); assert_eq!(second_read_entries.len(), 1); let deserialized_first_metadata = first_entry.metadata; let second_entry = second_read_entries.remove(0); @@ -724,7 +722,7 @@ mod tests { .await .unwrap(); - let mut first_entries = db.get(key.clone(), replication).await.unwrap().unwrap(); + let mut first_entries = db.get(key.clone(), replication).await.unwrap(); assert_eq!(first_entries.len(), 1); let first_entry = first_entries.remove(0); @@ -767,7 +765,7 @@ mod tests { .await .unwrap(); - let mut first_entries = db.get(key.clone(), replication).await.unwrap().unwrap(); + let mut first_entries = db.get(key.clone(), replication).await.unwrap(); assert_eq!(first_entries.len(), 1); let first_entry = first_entries.remove(0); diff --git a/src/persistency/storage/mod.rs b/src/persistency/storage/mod.rs index 97ea65f..a07326d 100644 --- a/src/persistency/storage/mod.rs +++ b/src/persistency/storage/mod.rs @@ -3,7 +3,7 @@ use crate::{ storage_engine::{in_memory::InMemory, StorageEngine as StorageEngineTrait}, }; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use std::{mem::size_of, sync::Arc}; +use std::{collections::HashMap, mem::size_of, sync::Arc}; use tracing::{event, Level}; use super::{ @@ -75,41 +75,46 @@ impl Storage { } pub async fn put(&self, key: Bytes, entry: StorageEntry) -> Result> { - let current_versions = self.get(key.clone()).await?; + let current_versions = match self.get(key.clone()).await { + Ok(current_versions) => current_versions, + Err(Error::NotFound { .. }) => Vec::new(), + Err(err) => { + return Err(err); + } + }; event!( Level::INFO, "current versions - key: {:?} - {:?}", key, current_versions ); - let versions_to_store = if let Some(current_versions) = current_versions { - let mut entries_to_store = Vec::new(); - for existing_entry in current_versions { - if let super::MetadataEvaluation::Override = - metadata_evaluation(&entry.metadata, &existing_entry.metadata)? - { - entries_to_store.push(entry.clone()); - } else { - entries_to_store.push(existing_entry); - entries_to_store.push(entry.clone()); - } - } - entries_to_store.sort(); - entries_to_store.dedup(); + let mut version_map = HashMap::::new(); + version_map.insert( + entry.metadata.clone(), + StorageEntry { + metadata: entry.metadata.clone(), + value: entry.value, + }, + ); - entries_to_store - } else { - vec![entry] - }; + for existing_entry in current_versions { + if let super::MetadataEvaluation::Conflict = + metadata_evaluation(&entry.metadata, &existing_entry.metadata)? + { + version_map.insert(existing_entry.metadata.clone(), existing_entry); + } + } + + let entries_to_store = version_map.into_iter().map(|e| e.1).collect(); event!( Level::INFO, "versions that will be stored - key: {:?} - {:?}", key, - versions_to_store + entries_to_store ); - self.do_put(key, versions_to_store).await + self.do_put(key, entries_to_store).await } async fn do_put(&self, key: Bytes, items: Vec) -> Result> { @@ -165,34 +170,33 @@ impl Storage { Ok(res) } - pub async fn get(&self, key: Bytes) -> Result>> { - let metadata = self.metadata_engine.get(&key).await?; - if let Some(metadata) = metadata { - let data = self.data_engine.get(&key).await?.ok_or(Error::Logic { - reason: "Metadata entry found but no data entry found".to_string(), - })?; + pub async fn get(&self, key: Bytes) -> Result> { + let metadata = self + .metadata_engine + .get(&key) + .await? + .ok_or(Error::NotFound { key: key.clone() })?; + let data = self.data_engine.get(&key).await?.ok_or(Error::Logic { + reason: "Metadata entry found but no data entry found".to_string(), + })?; - let metadata_items = Self::unmarshall_entries(metadata)?; - let data_items = Self::unmarshall_entries(data)?; - - if metadata_items.len() != data_items.len() { - return Err(Error::Logic { - reason: "Data and Metadata items must have the same length".to_string(), - }); - } + let metadata_items = Self::unmarshall_entries(metadata)?; + let data_items = Self::unmarshall_entries(data)?; - let data_and_metadata_items: Vec = - std::iter::zip(metadata_items, data_items) - .map(|(m, d)| StorageEntry { - value: d, - metadata: Metadata::deserialize(self.pid, m).unwrap(), // TODO: unwrap() - }) - .collect(); - - Ok(Some(data_and_metadata_items)) - } else { - Ok(None) + if metadata_items.len() != data_items.len() { + return Err(Error::Logic { + reason: "Data and Metadata items must have the same length".to_string(), + }); } + + let data_and_metadata_items: Vec = std::iter::zip(metadata_items, data_items) + .map(|(m, d)| StorageEntry { + value: d, + metadata: Metadata::deserialize(self.pid, m).unwrap(), // TODO: unwrap() + }) + .collect(); + + Ok(data_and_metadata_items) } } @@ -241,7 +245,7 @@ mod tests { store.put(key.clone(), entry_pid_1).await.unwrap(); - let mut get_entries = store.get(key).await.unwrap().unwrap(); + let mut get_entries = store.get(key).await.unwrap(); assert_eq!(get_entries.len(), 2); let entry_0 = get_entries.remove(0);