From c214a4d0bb7454f2e872ae224978af2b261b4e61 Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 24 Sep 2024 23:07:09 +0800 Subject: [PATCH] feat(register)!: network only store ops list BREAKING CHANGE! crdt MerkleReg only stored in client locally node side only store ops list (signed by owner) --- autonomi/src/client/registers.rs | 158 +++-- sn_client/src/api.rs | 97 ++- sn_client/src/register.rs | 267 +++---- sn_client/src/uploader/mod.rs | 4 +- sn_client/src/uploader/tests/setup.rs | 22 +- sn_client/src/uploader/upload.rs | 27 +- sn_networking/src/driver.rs | 4 +- sn_node/tests/storage_payments.rs | 21 + sn_registers/src/lib.rs | 1 + sn_registers/src/reg_crdt.rs | 103 ++- sn_registers/src/register.rs | 972 ++++---------------------- sn_registers/src/register_op.rs | 2 +- 12 files changed, 519 insertions(+), 1159 deletions(-) diff --git a/autonomi/src/client/registers.rs b/autonomi/src/client/registers.rs index ad279837e6..6e65389c2a 100644 --- a/autonomi/src/client/registers.rs +++ b/autonomi/src/client/registers.rs @@ -10,27 +10,22 @@ pub use bls::SecretKey as RegisterSecretKey; use sn_evm::Amount; use sn_evm::AttoTokens; -use sn_networking::GetRecordError; use sn_networking::VerificationKind; use sn_protocol::storage::RetryStrategy; pub use sn_registers::{Permissions as RegisterPermissions, RegisterAddress}; -use tracing::warn; use crate::client::data::PayError; use crate::client::Client; use bytes::Bytes; use evmlib::wallet::Wallet; use libp2p::kad::{Quorum, Record}; -use sn_networking::GetRecordCfg; -use sn_networking::NetworkError; -use sn_networking::PutRecordCfg; +use sn_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg}; use sn_protocol::storage::try_deserialize_record; use sn_protocol::storage::try_serialize_record; use sn_protocol::storage::RecordKind; use sn_protocol::NetworkAddress; -use sn_registers::Register as ClientRegister; -use sn_registers::SignedRegister; -use sn_registers::{EntryHash, Permissions}; +use sn_registers::Register as BaseRegister; +use sn_registers::{Permissions, RegisterCrdt, RegisterOp, SignedRegister}; use std::collections::BTreeSet; use xor_name::XorName; @@ -56,27 +51,67 @@ pub enum RegisterError { #[derive(Clone, Debug)] pub struct Register { - pub(crate) inner: SignedRegister, + signed_reg: SignedRegister, + crdt_reg: RegisterCrdt, } impl Register { pub fn address(&self) -> &RegisterAddress { - self.inner.address() + self.signed_reg.address() } /// Retrieve the current values of the register. There can be multiple values /// in case a register was updated concurrently. This is because of the nature /// of registers, which allows for network concurrency. pub fn values(&self) -> Vec { - self.inner - .clone() - .register() - .expect("register to be valid") + self.crdt_reg .read() .into_iter() .map(|(_hash, value)| value.into()) .collect() } + + fn new( + initial_value: Option, + name: XorName, + owner: RegisterSecretKey, + permissions: RegisterPermissions, + ) -> Result { + let pk = owner.public_key(); + + let base_register = BaseRegister::new(pk, name, permissions); + + let signature = owner.sign(base_register.bytes().map_err(RegisterError::Write)?); + let signed_reg = SignedRegister::new(base_register, signature, BTreeSet::new()); + + let crdt_reg = RegisterCrdt::new(*signed_reg.address()); + + let mut register = Register { + signed_reg, + crdt_reg, + }; + + if let Some(value) = initial_value { + register.write_atop(&value, &owner)?; + } + + Ok(register) + } + + fn write_atop(&mut self, entry: &[u8], owner: &RegisterSecretKey) -> Result<(), RegisterError> { + let children: BTreeSet<_> = self.crdt_reg.read().iter().map(|(hash, _)| *hash).collect(); + + let (_hash, address, crdt_op) = self + .crdt_reg + .write(entry.to_vec(), &children) + .map_err(RegisterError::Write)?; + + let op = RegisterOp::new(address, crdt_op, owner); + + let _ = self.signed_reg.add_op(op); + + Ok(()) + } } impl Client { @@ -99,9 +134,11 @@ impl Client { is_register: true, }; - let register = match self.network.get_record_from_network(key, &get_cfg).await { + let signed_reg = match self.network.get_record_from_network(key, &get_cfg).await { Ok(record) => { - try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)? + let signed_reg: SignedRegister = + try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?; + signed_reg } // manage forked register case Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => { @@ -128,58 +165,33 @@ impl Client { }; // Make sure the fetched record contains valid CRDT operations - register.verify().map_err(|err| { - error!("Failed to verify register {address:?} with error: {err}"); - RegisterError::FailedVerification - })?; + signed_reg + .verify() + .map_err(|_| RegisterError::FailedVerification)?; + + let mut crdt_reg = RegisterCrdt::new(*signed_reg.address()); + for op in signed_reg.ops() { + if let Err(err) = crdt_reg.apply_op(op.clone()) { + return Err(RegisterError::Write(err)); + } + } - Ok(Register { inner: register }) + Ok(Register { + signed_reg, + crdt_reg, + }) } /// Updates a Register on the network with a new value. This will overwrite existing value(s). pub async fn register_update( &self, - register: Register, + mut register: Register, new_value: Bytes, owner: RegisterSecretKey, ) -> Result<(), RegisterError> { - // Fetch the current register - let mut signed_register = register.inner; - let mut register = signed_register - .clone() - .register() - .map_err(|err| { - error!( - "Failed to get register from signed register as it failed verification: {err}" - ); - RegisterError::FailedVerification - })? - .clone(); + register.write_atop(&new_value, &owner)?; - info!("Updating register at addr: {}", register.address()); - - // Get all current branches - let children: BTreeSet = register.read().into_iter().map(|(e, _)| e).collect(); - - // Write the new value to all branches - let (_, op) = register - .write(new_value.into(), &children, &owner) - .map_err(|err| { - error!( - "Failed to write to register at addr: {} : {err}", - register.address() - ); - RegisterError::Write(err) - })?; - - // Apply the operation to the register - signed_register.add_op(op.clone()).map_err(|err| { - error!( - "Failed to add op to register at addr: {} : {err}", - register.address() - ); - RegisterError::Write(err) - })?; + let signed_register = register.signed_reg.clone(); // Prepare the record for network storage let record = Record { @@ -230,7 +242,7 @@ impl Client { let pk = owner.public_key(); let name = XorName::from_content_parts(&[name.as_bytes()]); let permissions = Permissions::new_with([pk]); - let register = ClientRegister::new(pk, name, permissions); + let register = Register::new(None, name, owner, permissions)?; let reg_xor = register.address().xorname(); // get cost to store register @@ -281,23 +293,14 @@ impl Client { permissions: RegisterPermissions, wallet: &Wallet, ) -> Result { - let pk = owner.public_key(); + info!("Creating register with name: {name}"); let name = XorName::from_content_parts(&[name.as_bytes()]); // Owner can write to the register. - let mut register = ClientRegister::new(pk, name, permissions); - let address = NetworkAddress::from_register_address(*register.address()); - - info!("Creating register at address: {address}"); + let register = Register::new(Some(value), name, owner, permissions)?; + let address = register.address(); - let entries = register - .read() - .into_iter() - .map(|(entry_hash, _value)| entry_hash) - .collect(); - - let _ = register.write(value.into(), &entries, &owner); - let reg_xor = register.address().xorname(); + let reg_xor = address.xorname(); debug!("Paying for register at address: {address}"); let (payment_proofs, _skipped) = self .pay(std::iter::once(reg_xor), wallet) @@ -317,13 +320,10 @@ impl Client { .to_peer_id_payee() .ok_or(RegisterError::InvalidQuote) .inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?; - let signed_register = register.clone().into_signed(&owner).map_err(|err| { - error!("Failed to sign register at address: {address} : {err}"); - RegisterError::CouldNotSign(err) - })?; + let signed_register = register.signed_reg.clone(); let record = Record { - key: address.to_record_key(), + key: NetworkAddress::from_register_address(*address).to_record_key(), value: try_serialize_record( &(proof, &signed_register), RecordKind::RegisterWithPayment, @@ -356,8 +356,6 @@ impl Client { error!("Failed to put record - register {address} to the network: {err}") })?; - Ok(Register { - inner: signed_register, - }) + Ok(register) } } diff --git a/sn_client/src/api.rs b/sn_client/src/api.rs index e13cdd21a0..54bf53f8a2 100644 --- a/sn_client/src/api.rs +++ b/sn_client/src/api.rs @@ -314,37 +314,6 @@ impl Client { /// /// [Signature] /// - /// # Example - /// ```no_run - /// use sn_client::{Client, Error}; - /// use bls::SecretKey; - /// - /// # #[tokio::main] - /// # async fn main() -> Result<(),Error>{ - /// use tracing::callsite::register; - /// use xor_name::XorName; - /// use sn_registers::Register; - /// use sn_protocol::messages::RegisterCmd; - /// let client = Client::new(SecretKey::random(), None, None, None).await?; - /// - /// // Set up register prerequisites - /// let mut rng = rand::thread_rng(); - /// let xorname = XorName::random(&mut rng); - /// let owner_sk = SecretKey::random(); - /// let owner_pk = owner_sk.public_key(); - /// - /// // set up register - /// let mut register = Register::new(owner_pk, xorname, Default::default()); - /// let mut register_clone = register.clone(); - /// - /// // Use of client.sign() with register through RegisterCmd::Create - /// let cmd = RegisterCmd::Create { - /// register, - /// signature: client.sign(register_clone.bytes()?), - /// }; - /// # Ok(()) - /// # } - /// ``` pub fn sign>(&self, data: T) -> Signature { self.signer.sign(data) } @@ -1105,10 +1074,27 @@ fn merge_register_records( mod tests { use std::collections::BTreeSet; - use sn_registers::Register; + use sn_registers::{Register, RegisterCrdt, RegisterOp}; use super::*; + fn write_atop( + signed_reg: &mut SignedRegister, + crdt_reg: &mut RegisterCrdt, + entry: &[u8], + owner: &SecretKey, + ) -> eyre::Result<()> { + let children: BTreeSet<_> = crdt_reg.read().iter().map(|(hash, _)| *hash).collect(); + + let (_hash, address, crdt_op) = crdt_reg.write(entry.to_vec(), &children)?; + + let op = RegisterOp::new(address, crdt_op, owner); + + signed_reg.add_op(op)?; + + Ok(()) + } + #[test] fn test_merge_register_records() -> eyre::Result<()> { let mut rng = rand::thread_rng(); @@ -1117,28 +1103,33 @@ mod tests { let owner_pk = owner_sk.public_key(); let address = RegisterAddress::new(meta, owner_pk); + let base_register = Register::new(owner_pk, meta, Default::default()); + let signature = owner_sk.sign(base_register.bytes()?); + // prepare registers - let mut register_root = Register::new(owner_pk, meta, Default::default()); - let (root_hash, _) = - register_root.write(b"root_entry".to_vec(), &BTreeSet::default(), &owner_sk)?; - let root = BTreeSet::from_iter(vec![root_hash]); - let signed_root = register_root.clone().into_signed(&owner_sk)?; - - let mut register1 = register_root.clone(); - let (_hash, op1) = register1.write(b"entry1".to_vec(), &root, &owner_sk)?; - let mut signed_register1 = signed_root.clone(); - signed_register1.add_op(op1)?; - - let mut register2 = register_root.clone(); - let (_hash, op2) = register2.write(b"entry2".to_vec(), &root, &owner_sk)?; - let mut signed_register2 = signed_root; - signed_register2.add_op(op2)?; - - let mut register_bad = Register::new(owner_pk, meta, Default::default()); - let (_hash, _op_bad) = - register_bad.write(b"bad_root".to_vec(), &BTreeSet::default(), &owner_sk)?; - let invalid_sig = register2.sign(&owner_sk)?; // steal sig from something else - let signed_register_bad = SignedRegister::new(register_bad, invalid_sig); + let mut register_root = SignedRegister::new(base_register, signature, BTreeSet::new()); + let mut crdt_reg_root = RegisterCrdt::new(address); + + write_atop( + &mut register_root, + &mut crdt_reg_root, + b"root_entry", + &owner_sk, + )?; + + let mut signed_register1 = register_root.clone(); + let mut crdt_reg1 = crdt_reg_root.clone(); + write_atop(&mut signed_register1, &mut crdt_reg1, b"entry1", &owner_sk)?; + + let mut signed_register2 = register_root.clone(); + let mut crdt_reg2 = crdt_reg_root.clone(); + write_atop(&mut signed_register2, &mut crdt_reg2, b"entry2", &owner_sk)?; + + let base_register_bad = Register::new(owner_pk, meta, Default::default()); + let bad_sk = SecretKey::random(); + let signature_bad = bad_sk.sign(base_register_bad.bytes()?); + let signed_register_bad = + SignedRegister::new(base_register_bad, signature_bad, BTreeSet::new()); // prepare records let record1 = Record { diff --git a/sn_client/src/register.rs b/sn_client/src/register.rs index 1b164a2f71..f657898bf6 100644 --- a/sn_client/src/register.rs +++ b/sn_client/src/register.rs @@ -15,36 +15,41 @@ use libp2p::{ }; use sn_networking::{GetRecordCfg, PutRecordCfg, VerificationKind}; use sn_protocol::{ - error::Error as ProtocolError, - messages::RegisterCmd, storage::{try_serialize_record, RecordKind, RetryStrategy}, NetworkAddress, }; -use sn_registers::{Entry, EntryHash, Permissions, Register, RegisterAddress, SignedRegister}; +use sn_registers::{ + Entry, EntryHash, Error as RegisterError, Permissions, Register, RegisterAddress, RegisterCrdt, + RegisterOp, SignedRegister, +}; use sn_transfers::{NanoTokens, Payment}; -use std::collections::{BTreeSet, HashSet, LinkedList}; +use std::collections::{BTreeSet, HashSet}; use xor_name::XorName; -/// Cached operations made to an offline Register instance are applied locally only, +/// Cached operations made to an offline RegisterCrdt instance are applied locally only, /// and accumulated until the user explicitly calls 'sync'. The user can /// switch back to sync with the network for every op by invoking `online` API. #[derive(Clone, custom_debug::Debug)] pub struct ClientRegister { #[debug(skip)] client: Client, - pub register: Register, - pub ops: LinkedList, // Cached operations. + register: Register, + /// CRDT data of the Register + crdt: RegisterCrdt, + /// Cached operations. + ops: BTreeSet, } impl ClientRegister { - fn create_register(client: Client, meta: XorName, perms: Permissions) -> Self { - let public_key = client.signer_pk(); - - let register = Register::new(public_key, meta, perms); + /// Create with specified meta and permission + pub fn create_register(client: Client, meta: XorName, perms: Permissions) -> Self { + let register = Register::new(client.signer_pk(), meta, perms); + let crdt = RegisterCrdt::new(*register.address()); Self { client, register, - ops: LinkedList::new(), + crdt, + ops: BTreeSet::new(), } } @@ -95,10 +100,12 @@ impl ClientRegister { /// ``` pub fn create_with_addr(client: Client, addr: RegisterAddress) -> Self { let register = Register::new(addr.owner(), addr.meta(), Permissions::default()); + let crdt = RegisterCrdt::new(addr); Self { client, register, - ops: LinkedList::new(), + crdt, + ops: BTreeSet::new(), } } @@ -158,13 +165,12 @@ impl ClientRegister { /// Retrieve a Register from the network to work on it offline. pub(super) async fn retrieve(client: Client, address: RegisterAddress) -> Result { - let register = Self::get_register_from_network(&client, address).await?; + let signed_register = Self::get_register_from_network(&client, address).await?; - Ok(Self { - client, - register, - ops: LinkedList::new(), - }) + let mut register = Self::create_with_addr(client, address); + register.merge(&signed_register); + + Ok(register) } /// Return type: [RegisterAddress] @@ -303,14 +309,17 @@ impl ClientRegister { /// # } /// ``` pub fn size(&self) -> u64 { - self.register.size() + self.crdt.size() } /// Return a value corresponding to the provided 'hash', if present. // No usages found in All Places pub fn get(&self, hash: EntryHash) -> Result<&Entry> { - let entry = self.register.get(hash)?; - Ok(entry) + if let Some(entry) = self.crdt.get(hash) { + Ok(entry) + } else { + Err(RegisterError::NoSuchEntry(hash).into()) + } } /// Read the last entry, or entries when there are branches, if the register is not empty. @@ -333,7 +342,7 @@ impl ClientRegister { /// # } /// ``` pub fn read(&self) -> BTreeSet<(EntryHash, Entry)> { - self.register.read() + self.crdt.read() } /// Write a new value onto the Register atop latest value. @@ -360,7 +369,7 @@ impl ClientRegister { /// # } /// ``` pub fn write(&mut self, entry: &[u8]) -> Result { - let children = self.register.read(); + let children = self.crdt.read(); if children.len() > 1 { return Err(Error::ContentBranchDetected(children)); } @@ -395,12 +404,8 @@ impl ClientRegister { /// # } /// ``` pub fn write_merging_branches(&mut self, entry: &[u8]) -> Result { - let children: BTreeSet = self - .register - .read() - .into_iter() - .map(|(hash, _)| hash) - .collect(); + let children: BTreeSet = + self.crdt.read().into_iter().map(|(hash, _)| hash).collect(); self.write_atop(entry, &children) } @@ -440,14 +445,13 @@ impl ClientRegister { let public_key = self.client.signer_pk(); self.register.check_user_permissions(public_key)?; - let (entry_hash, op) = self - .register - .write(entry.into(), children, self.client.signer())?; - let cmd = RegisterCmd::Edit(op); + let (hash, address, crdt_op) = self.crdt.write(entry.to_vec(), children)?; - self.ops.push_front(cmd); + let op = RegisterOp::new(address, crdt_op, self.client.signer()); - Ok(entry_hash) + let _ = self.ops.insert(op); + + Ok(hash) } // ********* Online methods ********* @@ -500,8 +504,7 @@ impl ClientRegister { let mut royalties_fees = NanoTokens::zero(); let reg_result = if verify_store { debug!("VERIFYING REGISTER STORED {:?}", self.address()); - - let res = if payment_info.is_some() { + if payment_info.is_some() { // we expect this to be a _fresh_ register. // It still could have been PUT previously, but we'll do a quick verification // instead of thorough one. @@ -510,28 +513,21 @@ impl ClientRegister { .await } else { self.client.verify_register_stored(*self.address()).await - }; - - // we need to keep the error here if verifying, so we can retry and pay for storage - // once more below - match res { - Ok(r) => Ok(r.register()?), - Err(error) => Err(error), } } else { Self::get_register_from_network(&self.client, addr).await }; - let remote_replica = match reg_result { - Ok(r) => r, + + match reg_result { + Ok(remote_replica) => { + self.merge(&remote_replica); + self.push(verify_store).await?; + } // any error here will result in a repayment of the register // TODO: be smart about this and only pay for storage if we need to Err(err) => { debug!("Failed to get register: {err:?}"); debug!("Creating Register as it doesn't exist at {addr:?}!"); - let cmd = RegisterCmd::Create { - register: self.register.clone(), - signature: self.client.sign(self.register.bytes()?), - }; // Let's check if the user has already paid for this address first if payment_info.is_none() { @@ -546,13 +542,11 @@ impl ClientRegister { payment_info = Some((payment, payee)); } - Self::publish_register(self.client.clone(), cmd, payment_info, verify_store) - .await?; - self.register.clone() + // The `creation register` has to come with `payment`. + // Hence it needs to be `published` to network separately. + self.publish_register(payment_info, verify_store).await?; } - }; - self.register.merge(&remote_replica)?; - self.push(verify_store).await?; + } Ok((storage_cost, royalties_fees)) } @@ -581,27 +575,14 @@ impl ClientRegister { /// ``` pub async fn push(&mut self, verify_store: bool) -> Result<()> { let ops_len = self.ops.len(); + let address = *self.address(); if ops_len > 0 { - let address = *self.address(); - debug!("Pushing {ops_len} cached Register cmds at {address}!"); - - // TODO: send them all concurrently - while let Some(cmd) = self.ops.pop_back() { - // We don't need to send the payment proofs here since - // these are all Register mutation cmds which don't require payment. - let result = - Self::publish_register(self.client.clone(), cmd.clone(), None, verify_store) - .await; - - if let Err(err) = result { - warn!("Did not push Register cmd on all nodes in the close group!: {err}"); - // We keep the cmd for next sync to retry - self.ops.push_back(cmd); - return Err(err); - } + if let Err(err) = self.publish_register(None, verify_store).await { + warn!("Failed to push register {address:?} to network!: {err}"); + return Err(err); } - debug!("Successfully pushed {ops_len} Register cmds at {address}!"); + debug!("Successfully pushed register {address:?} to network!"); } Ok(()) @@ -674,51 +655,20 @@ impl ClientRegister { self.push(verify_store).await } - /// Write a new value onto the Register atop the set of branches/entries - /// referenced by the provided list to their corresponding entry hash. - /// Note you can use `write_merging_branches` API if you - /// want to write atop of all exiting branches/entries instead. - /// - /// # Arguments - /// * 'entry' - u8 (i.e .as_bytes) - /// * 'children' - [BTreeSet]<[EntryHash]> - /// * 'verify_store' - Boolean - /// - /// Return type: - /// - /// # Example - /// ```no_run - /// # use sn_client::{Client, ClientRegister, Error}; - /// # use bls::SecretKey; - /// # use xor_name::XorName; - /// # #[tokio::main] - /// # async fn main() -> Result<(),Error>{ - /// # use std::collections::BTreeSet; - /// let mut rng = rand::thread_rng(); - /// let address = XorName::random(&mut rng); - /// let client = Client::new(SecretKey::random(), None, None, None).await?; - /// let entry = "Entry".as_bytes(); - /// let tree_set = BTreeSet::new(); - /// // Use of the 'write_atop_online': - /// let mut binding = ClientRegister::create(client, address); - /// let mut register = binding.write_atop_online(entry,&tree_set,false); - /// # Ok(()) - /// # } - /// ``` - pub async fn write_atop_online( - &mut self, - entry: &[u8], - children: &BTreeSet, - verify_store: bool, - ) -> Result<()> { - self.write_atop(entry, children)?; - self.push(verify_store).await - } - /// Access the underlying MerkleReg (e.g. for access to history) /// NOTE: This API is unstable and may be removed in the future pub fn merkle_reg(&self) -> &MerkleReg { - self.register.merkle_reg() + self.crdt.merkle_reg() + } + + /// Returns the local ops list + pub fn ops_list(&self) -> &BTreeSet { + &self.ops + } + + /// Log the crdt DAG in tree structured view + pub fn log_update_history(&self) -> String { + self.crdt.log_update_history() } // ********* Private helpers ********* @@ -761,44 +711,22 @@ impl ClientRegister { /// Publish a `Register` command on the network. /// If `verify_store` is true, it will verify the Register was stored on the network. /// Optionally contains the Payment and the PeerId that we paid to. - pub(crate) async fn publish_register( - client: Client, - cmd: RegisterCmd, + pub async fn publish_register( + &self, payment: Option<(Payment, PeerId)>, verify_store: bool, ) -> Result<()> { - let cmd_dst = cmd.dst(); - debug!("Querying existing Register for cmd: {cmd_dst:?}"); - let network_reg = client.get_signed_register_from_network(cmd.dst()).await; - - debug!("Publishing Register cmd: {cmd_dst:?}"); - let register = match cmd { - RegisterCmd::Create { - register, - signature, - } => { - if let Ok(existing_reg) = network_reg { - if existing_reg.owner() != register.owner() { - return Err(ProtocolError::RegisterAlreadyClaimed(existing_reg.owner()))?; - } - } - SignedRegister::new(register, signature) - } - RegisterCmd::Edit(op) => { - let mut reg = network_reg?; - reg.add_op(op)?; - reg - } - }; + let client = self.client.clone(); + let signed_reg = self.get_signed_reg()?; - let network_address = NetworkAddress::from_register_address(*register.address()); + let network_address = NetworkAddress::from_register_address(*self.register.address()); let key = network_address.to_record_key(); let (record, payee) = match payment { Some((payment, payee)) => { let record = Record { key: key.clone(), value: try_serialize_record( - &(payment, ®ister), + &(payment, &signed_reg), RecordKind::RegisterWithPayment, )? .to_vec(), @@ -810,7 +738,7 @@ impl ClientRegister { None => { let record = Record { key: key.clone(), - value: try_serialize_record(®ister, RecordKind::Register)?.to_vec(), + value: try_serialize_record(&signed_reg, RecordKind::Register)?.to_vec(), publisher: None, expires: None, }; @@ -829,7 +757,7 @@ impl ClientRegister { ( Some(Record { key, - value: try_serialize_record(®ister, RecordKind::Register)?.to_vec(), + value: try_serialize_record(&signed_reg, RecordKind::Register)?.to_vec(), publisher: None, expires: None, }), @@ -858,13 +786,48 @@ impl ClientRegister { } /// Retrieve a `Register` from the Network. - async fn get_register_from_network( + pub async fn get_register_from_network( client: &Client, address: RegisterAddress, - ) -> Result { + ) -> Result { debug!("Retrieving Register from: {address}"); - let reg = client.get_signed_register_from_network(address).await?; - reg.verify_with_address(address)?; - Ok(reg.register()?) + let signed_reg = client.get_signed_register_from_network(address).await?; + signed_reg.verify_with_address(address)?; + Ok(signed_reg) + } + + /// Merge a network fetched copy with the local one. + /// Note the `get_register_from_network` already verified + /// * the fetched register is the same (address) as to the local one + /// * the ops of the fetched copy are all signed by the owner + pub fn merge(&mut self, signed_reg: &SignedRegister) { + debug!("Merging Register of: {:?}", self.register.address()); + + // Take out the difference between local ops and fetched ops + // note the `difference` functions gives entry that: in a but not in b + let diff: Vec<_> = signed_reg.ops().difference(&self.ops).cloned().collect(); + + // Apply the new ops to local + for op in diff { + // in case of deploying error, record then continue to next + if let Err(err) = self.crdt.apply_op(op.clone()) { + error!( + "Apply op to local Register {:?} failed with {err:?}", + self.register.address() + ); + } else { + let _ = self.ops.insert(op); + } + } + } + + /// Generate SignedRegister from local copy, so that can be published to network + fn get_signed_reg(&self) -> Result { + let signature = self.client.sign(self.register.bytes()?); + Ok(SignedRegister::new( + self.register.clone(), + signature, + self.ops.clone(), + )) } } diff --git a/sn_client/src/uploader/mod.rs b/sn_client/src/uploader/mod.rs index 8b8d6005fa..c3495b99ab 100644 --- a/sn_client/src/uploader/mod.rs +++ b/sn_client/src/uploader/mod.rs @@ -18,7 +18,7 @@ use sn_protocol::{ storage::{Chunk, ChunkAddress, RetryStrategy}, NetworkAddress, }; -use sn_registers::{Register, RegisterAddress}; +use sn_registers::{RegisterAddress, SignedRegister}; use sn_transfers::{NanoTokens, WalletApi}; use std::{ collections::{BTreeMap, BTreeSet}, @@ -420,7 +420,7 @@ impl UploadItem { #[derive(Debug)] enum TaskResult { GetRegisterFromNetworkOk { - remote_register: Register, + remote_register: SignedRegister, }, GetRegisterFromNetworkErr(XorName), PushRegisterOk { diff --git a/sn_client/src/uploader/tests/setup.rs b/sn_client/src/uploader/tests/setup.rs index 328489c24d..59f9005c4a 100644 --- a/sn_client/src/uploader/tests/setup.rs +++ b/sn_client/src/uploader/tests/setup.rs @@ -22,7 +22,7 @@ use libp2p_identity::Keypair; use rand::thread_rng; use sn_networking::{NetworkBuilder, PayeeQuote}; use sn_protocol::{storage::RetryStrategy, NetworkAddress}; -use sn_registers::{Register, RegisterAddress}; +use sn_registers::{Permissions, RegisterAddress, SignedRegister}; use sn_transfers::{MainSecretKey, NanoTokens, PaymentQuote, WalletApi}; use std::{ collections::{BTreeMap, VecDeque}, @@ -50,7 +50,7 @@ impl UploaderInterface for TestUploader { fn submit_get_register_task( &mut self, - _client: Client, + client: Client, reg_addr: RegisterAddress, _task_result_sender: mpsc::Sender, ) { @@ -67,12 +67,10 @@ impl UploaderInterface for TestUploader { match step { TestSteps::GetRegisterOk => { handle.spawn(async move { - let reg = Register::test_new_from_address(reg_addr); - + let remote_register = + SignedRegister::test_new_from_address(reg_addr, client.signer()); task_result_sender - .send(TaskResult::GetRegisterFromNetworkOk { - remote_register: reg, - }) + .send(TaskResult::GetRegisterFromNetworkOk { remote_register }) .await .expect("Failed to send task result"); }); @@ -449,9 +447,13 @@ pub fn get_dummy_registers(num: usize, client: Client) -> Vec { let mut rng = thread_rng(); let mut registers = Vec::with_capacity(num); for _ in 0..num { - let mut client_reg = ClientRegister::create(client.clone(), XorName::random(&mut rng)); - // test_new_from_address that is used during get_register, uses AnyoneCanWrite permission, so use the same here - client_reg.register = Register::test_new_from_address(*client_reg.address()); + // test_new_from_address that is used during get_register, + // uses AnyoneCanWrite permission, so use the same here + let client_reg = ClientRegister::create_register( + client.clone(), + XorName::random(&mut rng), + Permissions::AnyoneCanWrite, + ); registers.push(client_reg); } diff --git a/sn_client/src/uploader/upload.rs b/sn_client/src/uploader/upload.rs index 0fdc4280de..857c9fc31c 100644 --- a/sn_client/src/uploader/upload.rs +++ b/sn_client/src/uploader/upload.rs @@ -20,11 +20,10 @@ use itertools::Either; use libp2p::PeerId; use sn_networking::PayeeQuote; use sn_protocol::{ - messages::RegisterCmd, storage::{Chunk, RetryStrategy}, NetworkAddress, }; -use sn_registers::{Register, RegisterAddress}; +use sn_registers::{RegisterAddress, SignedRegister}; use sn_transfers::{NanoTokens, WalletApi}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, @@ -275,8 +274,7 @@ pub(super) async fn start_upload( .get_mut(&xorname) .ok_or(ClientError::UploadableItemNotFound(xorname))?; if let UploadItem::Register { reg, .. } = reg { - // todo: not error out here - reg.register.merge(&remote_register)?; + reg.merge(&remote_register); uploader.pending_to_push_register.push(xorname); } } @@ -938,10 +936,8 @@ impl InnerUploader { // ====== Logic ====== - async fn get_register(client: Client, reg_addr: RegisterAddress) -> Result { - let reg = client.verify_register_stored(reg_addr).await?; - let reg = reg.register()?; - Ok(reg) + async fn get_register(client: Client, reg_addr: RegisterAddress) -> Result { + client.verify_register_stored(reg_addr).await } async fn push_register(upload_item: UploadItem, verify_store: bool) -> Result { @@ -1029,19 +1025,8 @@ impl InnerUploader { trace!("Client upload completed for chunk: {xorname:?}"); } UploadItem::Register { address: _, reg } => { - let signature = client.sign(reg.register.bytes()?); - trace!("Client upload started for register: {xorname:?}"); - - ClientRegister::publish_register( - client, - RegisterCmd::Create { - register: reg.register, - signature, - }, - Some((payment, payee)), - verify_store, - ) - .await?; + reg.publish_register(Some((payment, payee)), verify_store) + .await?; trace!("Client upload completed for register: {xorname:?}"); } } diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index d440109764..ec716cb4df 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -167,8 +167,8 @@ impl GetRecordCfg { } }; - // Only compare root values of the register - target_register.base_register().read() == fetched_register.base_register().read() + target_register.base_register() == fetched_register.base_register() + && target_register.ops() == fetched_register.ops() } else { target_record == record } diff --git a/sn_node/tests/storage_payments.rs b/sn_node/tests/storage_payments.rs index 6e11295cbd..23fe9c53b0 100644 --- a/sn_node/tests/storage_payments.rs +++ b/sn_node/tests/storage_payments.rs @@ -374,6 +374,21 @@ // ), // ); +// println!( +// "current retrieved register entry length is {}", +// retrieved_reg.read().len() +// ); +// println!("current expected entry length is {}", register.read().len()); + +// println!( +// "current retrieved register ops length is {}", +// retrieved_reg.ops_list().len() +// ); +// println!( +// "current local cached ops length is {}", +// register.ops_list().len() +// ); + // // TODO adapt to evm // // let _ = wallet_client // // .mut_wallet() @@ -391,6 +406,12 @@ // Err(ClientError::Protocol(ProtocolError::RegisterNotFound(addr))) if *addr == address // )); +// println!("Current fetched register is {:?}", retrieved_reg.address()); +// println!( +// "Fetched register has update history of {}", +// retrieved_reg.log_update_history() +// ); + // let random_entry = rng.gen::<[u8; 32]>().to_vec(); // register.write(&random_entry)?; diff --git a/sn_registers/src/lib.rs b/sn_registers/src/lib.rs index 2fb85cd71f..e9cc34e4f0 100644 --- a/sn_registers/src/lib.rs +++ b/sn_registers/src/lib.rs @@ -19,6 +19,7 @@ pub use self::{ error::Error, metadata::{Entry, EntryHash}, permissions::Permissions, + reg_crdt::RegisterCrdt, register::{Register, SignedRegister}, register_op::RegisterOp, }; diff --git a/sn_registers/src/reg_crdt.rs b/sn_registers/src/reg_crdt.rs index 844b3bfce3..f93002aefc 100644 --- a/sn_registers/src/reg_crdt.rs +++ b/sn_registers/src/reg_crdt.rs @@ -9,17 +9,21 @@ use crate::{error::Result, Entry, EntryHash, Error, RegisterAddress, RegisterOp}; use crdts::merkle_reg::Node as MerkleDagEntry; -use crdts::{merkle_reg::MerkleReg, CmRDT, CvRDT}; +use crdts::{ + merkle_reg::{Hash as CrdtHash, MerkleReg}, + CmRDT, CvRDT, +}; use serde::{Deserialize, Serialize}; use std::{ - collections::BTreeSet, + collections::{BTreeSet, HashSet}, fmt::{self, Debug, Display, Formatter}, hash::Hash, }; +use xor_name::XorName; /// Register data type as a CRDT with Access Control #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash, PartialOrd)] -pub(crate) struct RegisterCrdt { +pub struct RegisterCrdt { /// Address on the network of this piece of data address: RegisterAddress, /// CRDT to store the actual data, i.e. the items of the Register. @@ -41,7 +45,7 @@ impl Display for RegisterCrdt { impl RegisterCrdt { /// Constructs a new '`RegisterCrdtImpl`'. - pub(crate) fn new(address: RegisterAddress) -> Self { + pub fn new(address: RegisterAddress) -> Self { Self { address, data: MerkleReg::new(), @@ -49,23 +53,23 @@ impl RegisterCrdt { } /// Returns the address. - pub(crate) fn address(&self) -> &RegisterAddress { + pub fn address(&self) -> &RegisterAddress { &self.address } /// Merge another register into this one. - pub(crate) fn merge(&mut self, other: Self) { + pub fn merge(&mut self, other: Self) { self.data.merge(other.data); } /// Returns total number of items in the register. - pub(crate) fn size(&self) -> u64 { + pub fn size(&self) -> u64 { (self.data.num_nodes() + self.data.num_orphans()) as u64 } /// Write a new entry to the `RegisterCrdt`, returning the hash /// of the entry and the CRDT operation without a signature - pub(crate) fn write( + pub fn write( &mut self, entry: Entry, children: &BTreeSet, @@ -81,7 +85,7 @@ impl RegisterCrdt { } /// Apply a remote data CRDT operation to this replica of the `RegisterCrdtImpl`. - pub(crate) fn apply_op(&mut self, op: RegisterOp) -> Result<()> { + pub fn apply_op(&mut self, op: RegisterOp) -> Result<()> { // Let's first check the op is validly signed. // Note: Perms and valid sig for the op are checked at the upper Register layer. @@ -100,12 +104,12 @@ impl RegisterCrdt { } /// Get the entry corresponding to the provided `hash` if it exists. - pub(crate) fn get(&self, hash: EntryHash) -> Option<&Entry> { + pub fn get(&self, hash: EntryHash) -> Option<&Entry> { self.data.node(hash.0).map(|node| &node.value) } /// Read current entries (multiple entries occur on concurrent writes). - pub(crate) fn read(&self) -> BTreeSet<(EntryHash, Entry)> { + pub fn read(&self) -> BTreeSet<(EntryHash, Entry)> { self.data .read() .hashes_and_nodes() @@ -124,9 +128,84 @@ impl RegisterCrdt { /// Access the underlying MerkleReg (e.g. for access to history) /// NOTE: This API is unstable and may be removed in the future - pub(crate) fn merkle_reg(&self) -> &MerkleReg { + pub fn merkle_reg(&self) -> &MerkleReg { &self.data } + + /// Log the structure of the MerkleReg as a tree view. + /// This is actually being the `update history` of the register. + pub fn log_update_history(&self) -> String { + let mut output = "MerkleReg Structure:\n".to_string(); + output = format!( + "{output}Total entries: {}\n", + self.data.num_nodes() + self.data.num_orphans() + ); + + // Find root nodes (entries with no parents) + let roots: Vec<_> = self.data.read().hashes().into_iter().collect(); + + // Print the tree starting from each root + for (i, root) in roots.iter().enumerate() { + let mut visited = HashSet::new(); + Self::print_tree( + root, + &self.data, + &mut output, + "", + i == roots.len() - 1, + &mut visited, + ); + } + + output + } + + // Helper function to recursively print the MerkleReg tree + fn print_tree( + hash: &CrdtHash, + merkle_reg: &MerkleReg, + output: &mut String, + prefix: &str, + is_last: bool, + visited: &mut HashSet, + ) { + let pretty_hash = format!("{}", XorName::from_content(hash)); + if !visited.insert(*hash) { + *output = format!( + "{}{prefix}{}* {pretty_hash} (cycle detected)\n", + output, + if is_last { "└── " } else { "├── " }, + ); + return; + } + + let entry = if let Some(node) = merkle_reg.node(*hash) { + format!("value: {}", XorName::from_content(&node.value)) + } else { + "value: None".to_string() + }; + *output = format!( + "{}{prefix}{}{pretty_hash}: {entry}\n", + output, + if is_last { "└── " } else { "├── " }, + ); + + let children: Vec<_> = merkle_reg.children(*hash).hashes().into_iter().collect(); + let new_prefix = format!("{prefix}{} ", if is_last { " " } else { "│" }); + + for (i, child) in children.iter().enumerate() { + Self::print_tree( + child, + merkle_reg, + output, + &new_prefix, + i == children.len() - 1, + visited, + ); + } + + visited.remove(hash); + } } #[cfg(test)] diff --git a/sn_registers/src/register.rs b/sn_registers/src/register.rs index 366f73ef0e..2bfda88aa3 100644 --- a/sn_registers/src/register.rs +++ b/sn_registers/src/register.rs @@ -6,15 +6,12 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{ - error::Result, reg_crdt::RegisterCrdt, Entry, EntryHash, Error, Permissions, RegisterAddress, - RegisterOp, -}; - -use bls::{PublicKey, SecretKey, Signature}; -use crdts::merkle_reg::{Hash, MerkleReg}; +use crate::{error::Result, Error, Permissions, RegisterAddress, RegisterOp}; +#[cfg(feature = "test-utils")] +use bls::SecretKey; +use bls::{PublicKey, Signature}; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeSet, HashSet}; +use std::collections::BTreeSet; use xor_name::XorName; /// Arbitrary maximum size of a register entry. @@ -26,8 +23,8 @@ const MAX_REG_NUM_ENTRIES: u16 = 1024; /// A Register on the SAFE Network #[derive(Clone, Eq, PartialEq, PartialOrd, Hash, Serialize, Deserialize, Debug)] pub struct Register { - /// CRDT data of the Register - crdt: RegisterCrdt, + /// contains the info of meta (XorName) and owner (PublicKey) + address: RegisterAddress, /// Permissions of the Register /// Depending on the permissions, the owner can allow other users to write to the register /// Everyone can always read the Register because all data is public @@ -39,8 +36,8 @@ pub struct Register { #[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Hash)] pub struct SignedRegister { /// the base register we had at creation - base_register: Register, - /// signature over the above by the owner + register: Register, + /// signature over the above register by the owner signature: Signature, /// operations to apply on this register, /// they contain a signature of the writer @@ -49,24 +46,29 @@ pub struct SignedRegister { impl SignedRegister { /// Create a new SignedRegister - pub fn new(base_register: Register, signature: Signature) -> Self { + pub fn new(register: Register, signature: Signature, ops: BTreeSet) -> Self { Self { - base_register, + register, signature, - ops: BTreeSet::new(), + ops, } } /// Return the base register. This is the register before any operations have been applied. pub fn base_register(&self) -> &Register { - &self.base_register + &self.register } /// Verfies a SignedRegister pub fn verify(&self) -> Result<()> { - let bytes = self.base_register.bytes()?; + let reg_size = self.ops.len(); + if reg_size >= MAX_REG_NUM_ENTRIES as usize { + return Err(Error::TooManyEntries(reg_size)); + } + + let bytes = self.register.bytes()?; if !self - .base_register + .register .owner() .verify(&self.signature, bytes.as_slice()) { @@ -74,13 +76,20 @@ impl SignedRegister { } for op in &self.ops { - self.base_register.check_register_op(op)?; + self.register.check_register_op(op)?; + let size = op.crdt_op.value.len(); + if size > MAX_REG_ENTRY_SIZE { + return Err(Error::EntryTooBig { + size, + max: MAX_REG_ENTRY_SIZE, + }); + } } Ok(()) } pub fn verify_with_address(&self, address: RegisterAddress) -> Result<()> { - if self.base_register.address() != &address { + if self.register.address() != &address { return Err(Error::InvalidRegisterAddress { requested: Box::new(address), got: Box::new(*self.address()), @@ -89,19 +98,9 @@ impl SignedRegister { self.verify() } - /// Return the Register after applying all the operations - pub fn register(self) -> Result { - let mut register = self.base_register; - for op in self.ops { - register.apply_op(op)?; - } - Ok(register) - } - /// Merge two SignedRegisters pub fn merge(&mut self, other: &Self) -> Result<()> { - self.base_register - .verify_is_mergeable(&other.base_register)?; + self.register.verify_is_mergeable(&other.register)?; self.ops.extend(other.ops.clone()); Ok(()) } @@ -109,8 +108,7 @@ impl SignedRegister { /// Merge two SignedRegisters but verify the incoming content /// Significantly slower than merge, use when you want to trust but verify the `other` pub fn verified_merge(&mut self, other: &Self) -> Result<()> { - self.base_register - .verify_is_mergeable(&other.base_register)?; + self.register.verify_is_mergeable(&other.register)?; other.verify()?; self.ops.extend(other.ops.clone()); Ok(()) @@ -118,89 +116,80 @@ impl SignedRegister { /// Return the address. pub fn address(&self) -> &RegisterAddress { - self.base_register.address() + self.register.address() } /// Return the owner of the data. pub fn owner(&self) -> PublicKey { - self.base_register.owner() + self.register.owner() } /// Check and add an Op to the SignedRegister pub fn add_op(&mut self, op: RegisterOp) -> Result<()> { - self.base_register.check_register_op(&op)?; + let reg_size = self.ops.len(); + if reg_size >= MAX_REG_NUM_ENTRIES as usize { + return Err(Error::TooManyEntries(reg_size)); + } + + let size = op.crdt_op.value.len(); + if size > MAX_REG_ENTRY_SIZE { + return Err(Error::EntryTooBig { + size, + max: MAX_REG_ENTRY_SIZE, + }); + } + + self.register.check_register_op(&op)?; self.ops.insert(op); Ok(()) } - /// Access the underlying MerkleReg (e.g. for access to history) - /// NOTE: This API is unstable and may be removed in the future - pub fn merkle_reg(&self) -> &MerkleReg { - self.base_register.merkle_reg() + /// Returns the reference to the ops list + pub fn ops(&self) -> &BTreeSet { + &self.ops + } + + /// Used in tests. + #[cfg(feature = "test-utils")] + pub fn test_new_from_address(address: RegisterAddress, owner: &SecretKey) -> Self { + let base_register = Register { + address, + permissions: Permissions::AnyoneCanWrite, + }; + let bytes = if let Ok(bytes) = base_register.bytes() { + bytes + } else { + panic!("Failed to serialize register {base_register:?}"); + }; + let signature = owner.sign(bytes); + Self::new(base_register, signature, BTreeSet::new()) } } impl Register { /// Create a new Register pub fn new(owner: PublicKey, meta: XorName, mut permissions: Permissions) -> Self { - let address = RegisterAddress { meta, owner }; permissions.add_writer(owner); Self { - crdt: RegisterCrdt::new(address), + address: RegisterAddress { meta, owner }, permissions, } } - /// Sign a Register and return the signature, makes sure the signer is the owner in the process - pub fn sign(&self, secret_key: &SecretKey) -> Result { - if self.owner() != secret_key.public_key() { - return Err(Error::InvalidSecretKey); - } - let bytes = self.bytes()?; - let signature = secret_key.sign(bytes); - Ok(signature) - } - /// Returns a bytes version of the Register used for signing /// Use this API when you want to sign a Register withtout providing a secret key to the Register API pub fn bytes(&self) -> Result> { rmp_serde::to_vec(self).map_err(|_| Error::SerialisationFailed) } - /// Sign a Register into a SignedRegister - pub fn into_signed(self, secret_key: &SecretKey) -> Result { - let signature = self.sign(secret_key)?; - Ok(SignedRegister::new(self, signature)) - } - /// Return the address. pub fn address(&self) -> &RegisterAddress { - self.crdt.address() + &self.address } /// Return the owner of the data. pub fn owner(&self) -> PublicKey { - self.address().owner() - } - - /// Return the number of items held in the register - pub fn size(&self) -> u64 { - self.crdt.size() - } - - /// Return a value corresponding to the provided 'hash', if present. - pub fn get(&self, hash: EntryHash) -> Result<&Entry> { - self.crdt.get(hash).ok_or(Error::NoSuchEntry(hash)) - } - - /// Read the last entry, or entries when there are branches, if the register is not empty. - pub fn read(&self) -> BTreeSet<(EntryHash, Entry)> { - self.crdt.read() - } - - /// Returns the children of an entry, along with their corresponding entry hashes - pub fn children(&self, hash: &EntryHash) -> BTreeSet<(EntryHash, Entry)> { - self.crdt.children(hash) + self.address.owner() } /// Return the permission. @@ -208,37 +197,6 @@ impl Register { &self.permissions } - /// Write an entry to the Register, returning the generated - /// CRDT operation so the caller can sign and broadcast it to other replicas, - /// along with the hash of the entry just written. - pub fn write( - &mut self, - entry: Entry, - children: &BTreeSet, - signer: &SecretKey, - ) -> Result<(EntryHash, RegisterOp)> { - self.check_entry_and_reg_sizes(&entry)?; - // check permissions before writing on the underlying CRDT - self.check_user_permissions(signer.public_key())?; - let (hash, address, crdt_op) = self.crdt.write(entry, children)?; - let op = RegisterOp::new(address, crdt_op, signer); - Ok((hash, op)) - } - - /// Apply a signed data CRDT operation. - pub fn apply_op(&mut self, op: RegisterOp) -> Result<()> { - self.check_entry_and_reg_sizes(&op.crdt_op.value)?; - self.check_register_op(&op)?; - self.crdt.apply_op(op) - } - - /// Merge another Register into this one. - pub fn merge(&mut self, other: &Self) -> Result<()> { - self.verify_is_mergeable(other)?; - self.crdt.merge(other.crdt.clone()); - Ok(()) - } - /// Check if a register op is valid for our current register pub fn check_register_op(&self, op: &RegisterOp) -> Result<()> { if self.permissions.can_anyone_write() { @@ -261,107 +219,6 @@ impl Register { } } - /// Access the underlying MerkleReg (e.g. for access to history) - /// NOTE: This API is unstable and may be removed in the future - pub fn merkle_reg(&self) -> &MerkleReg { - self.crdt.merkle_reg() - } - - /// Log the structure of the MerkleReg within this Register's CRDT as a tree view. - /// This is actually being the `update history` of the register. - pub fn log_update_history(&self) -> String { - let mut output = "MerkleReg Structure:\n".to_string(); - let merkle_reg = self.crdt.merkle_reg(); - output = format!( - "{output}Total entries: {}\n", - merkle_reg.num_nodes() + merkle_reg.num_orphans() - ); - - // Find root nodes (entries with no parents) - let roots: Vec<_> = merkle_reg.read().hashes().into_iter().collect(); - - // Print the tree starting from each root - for (i, root) in roots.iter().enumerate() { - let mut visited = HashSet::new(); - Self::print_tree( - root, - merkle_reg, - &mut output, - "", - i == roots.len() - 1, - &mut visited, - ); - } - - output - } - - // Helper function to recursively print the MerkleReg tree - fn print_tree( - hash: &Hash, - merkle_reg: &MerkleReg, - output: &mut String, - prefix: &str, - is_last: bool, - visited: &mut HashSet, - ) { - let pretty_hash = format!("{}", XorName::from_content(hash)); - if !visited.insert(*hash) { - *output = format!( - "{}{prefix}{}* {pretty_hash} (cycle detected)\n", - output, - if is_last { "└── " } else { "├── " }, - ); - return; - } - - let entry = if let Some(node) = merkle_reg.node(*hash) { - format!("value: {}", XorName::from_content(&node.value)) - } else { - "value: None".to_string() - }; - *output = format!( - "{}{prefix}{}{pretty_hash}: {entry}\n", - output, - if is_last { "└── " } else { "├── " }, - ); - - let children: Vec<_> = merkle_reg.children(*hash).hashes().into_iter().collect(); - let new_prefix = format!("{prefix}{} ", if is_last { " " } else { "│" }); - - for (i, child) in children.iter().enumerate() { - Self::print_tree( - child, - merkle_reg, - output, - &new_prefix, - i == children.len() - 1, - visited, - ); - } - - visited.remove(hash); - } - - // Private helper to check the given Entry's size is within define limit, - // as well as check the Register hasn't already reached the maximum number of entries. - fn check_entry_and_reg_sizes(&self, entry: &Entry) -> Result<()> { - let size = entry.len(); - if size > MAX_REG_ENTRY_SIZE { - return Err(Error::EntryTooBig { - size, - max: MAX_REG_ENTRY_SIZE, - }); - } - - let reg_size = self.crdt.size(); - if reg_size >= MAX_REG_NUM_ENTRIES.into() { - return Err(Error::TooManyEntries(reg_size as usize)); - } - - Ok(()) - } - // Private helper to check if this Register is mergeable with another fn verify_is_mergeable(&self, other: &Self) -> Result<()> { if self.address() != other.address() || self.permissions != other.permissions { @@ -369,30 +226,17 @@ impl Register { } Ok(()) } - - /// Used in tests. - #[cfg(feature = "test-utils")] - pub fn test_new_from_address(address: RegisterAddress) -> Self { - Register { - crdt: RegisterCrdt::new(address), - permissions: Permissions::AnyoneCanWrite, - } - } } #[cfg(test)] mod tests { - use crate::RegisterOp; + use crate::{RegisterCrdt, RegisterOp}; - use super::{ - EntryHash, Error, Permissions, Register, RegisterAddress, Result, MAX_REG_NUM_ENTRIES, - }; + use super::*; use bls::SecretKey; - use eyre::Context; - use proptest::prelude::*; - use rand::{rngs::OsRng, seq::SliceRandom, thread_rng, Rng}; - use std::{collections::BTreeSet, sync::Arc}; + use rand::{thread_rng, Rng}; + use std::collections::BTreeSet; use xor_name::XorName; #[test] @@ -408,111 +252,61 @@ mod tests { assert_eq!(*register.address(), address); } - #[test] - fn register_generate_entry_hash() -> eyre::Result<()> { - let authority_sk = SecretKey::random(); - let authority = authority_sk.public_key(); - - let meta: XorName = xor_name::rand::random(); - - let mut replica1 = Register::new(authority, meta, Permissions::default()); - let mut replica2 = Register::new(authority, meta, Permissions::default()); - - // Different item from same replica's root shall having different entry_hash - let item1 = random_register_entry(); - let item2 = random_register_entry(); - let (entry_hash1_1, _) = replica1.write(item1.clone(), &BTreeSet::new(), &authority_sk)?; - let (entry_hash1_2, _) = replica1.write(item2, &BTreeSet::new(), &authority_sk)?; - assert!(entry_hash1_1 != entry_hash1_2); - - // Same item from different replica's root shall remain same - let (entry_hash2_1, _) = replica2.write(item1, &BTreeSet::new(), &authority_sk)?; - assert_eq!(entry_hash1_1, entry_hash2_1); - - let mut parents = BTreeSet::new(); - // Different item from different replica with same parents shall be different - let _ = parents.insert(entry_hash1_1); - let item3 = random_register_entry(); - let item4 = random_register_entry(); - let (entry_hash1_1_3, _) = replica1.write(item3, &parents, &authority_sk)?; - let (entry_hash2_1_4, _) = replica2.write(item4, &parents, &authority_sk)?; - assert!(entry_hash1_1_3 != entry_hash2_1_4); - - Ok(()) - } - #[test] fn register_permissions() -> eyre::Result<()> { let owner_sk = SecretKey::random(); let owner = owner_sk.public_key(); - let other_user_sk = SecretKey::random(); - let other_user = other_user_sk.public_key(); + let user_sk_1 = SecretKey::random(); + let other_user = user_sk_1.public_key(); + let user_sk_2 = SecretKey::random(); let meta: XorName = xor_name::rand::random(); - let item = random_register_entry(); + let address = RegisterAddress { meta, owner }; // Create replicas where anyone can write to them, including the owner ofc - let mut replica1 = Register::new(owner, meta, Permissions::new_anyone_can_write()); - let mut replica2 = replica1.clone(); - let mut signed_replica3 = replica1.clone().into_signed(&owner_sk)?; - // ...owner and the other user can both write to them - let (_, op1) = replica1.write(item.clone(), &BTreeSet::new(), &owner_sk)?; - let (_, op2) = replica1.write(item.clone(), &BTreeSet::new(), &other_user_sk)?; - replica2.apply_op(op1)?; - replica2.apply_op(op2)?; - signed_replica3.verified_merge(&replica2.into_signed(&owner_sk)?)?; + let mut signed_reg_1 = create_reg_replica_with( + meta, + Some(owner_sk.clone()), + Some(Permissions::new_anyone_can_write()), + ); + // ...owner and any other users can both write to them + let op = generate_random_op(address, &owner_sk)?; + assert!(signed_reg_1.add_op(op).is_ok()); + let op = generate_random_op(address, &user_sk_1)?; + assert!(signed_reg_1.add_op(op).is_ok()); + let op = generate_random_op(address, &user_sk_2)?; + assert!(signed_reg_1.add_op(op).is_ok()); // Create replicas allowing both the owner and other user to write to them - let mut replica1 = Register::new(owner, meta, Permissions::new_with([other_user])); - let mut replica2 = replica1.clone(); - let mut signed_replica3 = replica1.clone().into_signed(&owner_sk)?; - // ...owner and the other user can both write to them - let (_, op1) = replica1.write(item.clone(), &BTreeSet::new(), &owner_sk)?; - let (_, op2) = replica1.write(item.clone(), &BTreeSet::new(), &other_user_sk)?; - replica2.apply_op(op1)?; - replica2.apply_op(op2)?; - signed_replica3.verified_merge(&replica2.into_signed(&owner_sk)?)?; + let mut signed_reg_2 = create_reg_replica_with( + meta, + Some(owner_sk.clone()), + Some(Permissions::new_with([other_user])), + ); + // ...owner and the other user can both write to them, others shall fail + let op = generate_random_op(address, &owner_sk)?; + assert!(signed_reg_2.add_op(op).is_ok()); + let op = generate_random_op(address, &user_sk_1)?; + assert!(signed_reg_2.add_op(op).is_ok()); + let op = generate_random_op(address, &user_sk_2)?; + assert!(signed_reg_2.add_op(op).is_err()); // Create replicas with the owner as the only allowed to write - let mut replica1 = Register::new(owner, meta, Permissions::default()); - let mut replica2 = replica1.clone(); + let mut signed_reg_3 = create_reg_replica_with(meta, Some(owner_sk.clone()), None); // ...owner can write to them - let (_, op) = replica1.write(item.clone(), &BTreeSet::new(), &owner_sk)?; - replica2.apply_op(op.clone())?; + let op = generate_random_op(address, &owner_sk)?; + assert!(signed_reg_3.add_op(op).is_ok()); // ...whilst other user cannot write to them - let res = replica1.write(item.clone(), &BTreeSet::new(), &other_user_sk); + let op = generate_random_op(address, &user_sk_1)?; + let res = signed_reg_3.add_op(op); assert!( matches!(&res, Err(err) if err == &Error::AccessDenied(other_user)), "Unexpected result: {res:?}" ); - let (_, address, crdt_op) = replica1.crdt.write(item.clone(), &BTreeSet::new())?; - let op_signed_by_other_user = RegisterOp::new(address, crdt_op, &other_user_sk); - let res = replica2.apply_op(op_signed_by_other_user); - assert!( - matches!(&res, Err(err) if err == &Error::AccessDenied(other_user)), - "Unexpected result: {res:?}" - ); - - // Create Registers with different permissions to write - let mut reg1 = Register::new(owner, meta, Permissions::default()); - let mut reg2 = Register::new(owner, meta, Permissions::new_with([other_user])); - // ...owner can write to both of them, the other user only to one of them - reg1.write(item.clone(), &BTreeSet::new(), &owner_sk)?; - reg2.write(item.clone(), &BTreeSet::new(), &owner_sk)?; - reg2.write(item.clone(), &BTreeSet::new(), &other_user_sk)?; - // ...but they cannot be merged due to different permissions sets - let res1 = reg1.merge(®2); - let res2 = reg2.merge(®1); - assert!( - matches!(&res1, Err(err) if err == &Error::DifferentBaseRegister), - "Unexpected result: {res1:?}" - ); - assert_eq!(res1, res2); - let mut signed_reg1 = reg1.into_signed(&owner_sk)?; - let mut signed_reg2 = reg2.into_signed(&owner_sk)?; - let res1 = signed_reg1.verified_merge(&signed_reg2); - let res2 = signed_reg2.verified_merge(&signed_reg1); + // Registers with different permission can not be merged + let res1 = signed_reg_1.merge(&signed_reg_2); + let res2 = signed_reg_2.merge(&signed_reg_1); assert!( matches!(&res1, Err(err) if err == &Error::DifferentBaseRegister), "Unexpected result: {res1:?}" @@ -522,85 +316,6 @@ mod tests { Ok(()) } - #[test] - fn register_concurrent_write_ops() -> eyre::Result<()> { - let authority_sk1 = SecretKey::random(); - let authority1 = authority_sk1.public_key(); - let authority_sk2 = SecretKey::random(); - let authority2 = authority_sk2.public_key(); - - let meta: XorName = xor_name::rand::random(); - - // We'll have 'authority1' as the owner in both replicas and - // grant permissions for Write to 'authority2' in both replicas too - let perms = Permissions::new_with([authority1, authority2]); - - // Instantiate the same Register on two replicas - let mut replica1 = Register::new(authority_sk1.public_key(), meta, perms); - let mut replica2 = replica1.clone(); - - // And let's write an item to replica1 with autority1 - let item1 = random_register_entry(); - let (_, op1) = replica1.write(item1, &BTreeSet::new(), &authority_sk1)?; - - // Let's assert current state on both replicas - assert_eq!(replica1.size(), 1); - assert_eq!(replica2.size(), 0); - - // Concurrently write another item with authority2 on replica2 - let item2 = random_register_entry(); - let (_, op2) = replica2.write(item2, &BTreeSet::new(), &authority_sk2)?; - - // Item should be writed on replica2 - assert_eq!(replica2.size(), 1); - - // Write operations are now broadcasted and applied to both replicas - replica1.apply_op(op2)?; - replica2.apply_op(op1)?; - - // Let's assert data convergence on both replicas - verify_data_convergence(&[replica1, replica2], 2)?; - - Ok(()) - } - - #[test] - fn register_get_by_hash() -> eyre::Result<()> { - let (sk, register) = &mut create_reg_replicas(1)[0]; - - let entry1 = random_register_entry(); - let entry2 = random_register_entry(); - let entry3 = random_register_entry(); - - let (entry1_hash, _) = register.write(entry1.clone(), &BTreeSet::new(), sk)?; - - // this creates a fork since entry1 is not set as child of entry2 - let (entry2_hash, _) = register.write(entry2.clone(), &BTreeSet::new(), sk)?; - - // we'll write entry2 but having the entry1 and entry2 as children, - // i.e. solving the fork created by them - let children = [entry1_hash, entry2_hash].into_iter().collect(); - - let (entry3_hash, _) = register.write(entry3.clone(), &children, sk)?; - - assert_eq!(register.size(), 3); - - let first_entry = register.get(entry1_hash)?; - assert_eq!(first_entry, &entry1); - - let second_entry = register.get(entry2_hash)?; - assert_eq!(second_entry, &entry2); - - let third_entry = register.get(entry3_hash)?; - assert_eq!(third_entry, &entry3); - - let non_existing_hash = EntryHash::default(); - let entry_not_found = register.get(non_existing_hash); - assert_eq!(entry_not_found, Err(Error::NoSuchEntry(non_existing_hash))); - - Ok(()) - } - #[test] fn register_query_public_perms() -> eyre::Result<()> { let meta = xor_name::rand::random(); @@ -627,21 +342,27 @@ mod tests { // check register 1 is public assert_eq!(replica1.owner(), authority_pk1); - assert_eq!(replica1.check_user_permissions(owner1), Ok(())); - assert_eq!(replica1.check_user_permissions(owner2), Ok(())); - assert_eq!(replica1.check_user_permissions(random_user), Ok(())); - assert_eq!(replica1.check_user_permissions(random_user2), Ok(())); + assert_eq!(replica1.register.check_user_permissions(owner1), Ok(())); + assert_eq!(replica1.register.check_user_permissions(owner2), Ok(())); + assert_eq!( + replica1.register.check_user_permissions(random_user), + Ok(()) + ); + assert_eq!( + replica1.register.check_user_permissions(random_user2), + Ok(()) + ); // check register 2 has only owner1 and owner2 write allowed assert_eq!(replica2.owner(), authority_pk2); - assert_eq!(replica2.check_user_permissions(owner1), Ok(())); - assert_eq!(replica2.check_user_permissions(owner2), Ok(())); + assert_eq!(replica2.register.check_user_permissions(owner1), Ok(())); + assert_eq!(replica2.register.check_user_permissions(owner2), Ok(())); assert_eq!( - replica2.check_user_permissions(random_user), + replica2.register.check_user_permissions(random_user), Err(Error::AccessDenied(random_user)) ); assert_eq!( - replica2.check_user_permissions(random_user2), + replica2.register.check_user_permissions(random_user2), Err(Error::AccessDenied(random_user2)) ); @@ -654,25 +375,20 @@ mod tests { // one replica will allow write ops to anyone let authority_sk1 = SecretKey::random(); + let owner = authority_sk1.public_key(); let perms1 = Permissions::new_anyone_can_write(); + let address = RegisterAddress { meta, owner }; - let mut replica = create_reg_replica_with(meta, Some(authority_sk1), Some(perms1)); + let mut replica = create_reg_replica_with(meta, Some(authority_sk1.clone()), Some(perms1)); for _ in 0..MAX_REG_NUM_ENTRIES { - let (_hash, _op) = replica - .write( - random_register_entry(), - &BTreeSet::new(), - &SecretKey::random(), - ) - .context("Failed to write register entry")?; + let op = generate_random_op(address, &authority_sk1)?; + assert!(replica.add_op(op).is_ok()); } - let excess_entry = replica.write( - random_register_entry(), - &BTreeSet::new(), - &SecretKey::random(), - ); + let op = generate_random_op(address, &authority_sk1)?; + + let excess_entry = replica.add_op(op); match excess_entry { Err(Error::TooManyEntries(size)) => { @@ -693,14 +409,18 @@ mod tests { meta: XorName, perms: Option, count: usize, - ) -> Vec<(SecretKey, Register)> { - let replicas: Vec<(SecretKey, Register)> = (0..count) + ) -> Vec<(SecretKey, SignedRegister)> { + let replicas: Vec<(SecretKey, SignedRegister)> = (0..count) .map(|_| { let authority_sk = authority_sk.clone().unwrap_or_else(SecretKey::random); let authority = authority_sk.public_key(); let perms = perms.clone().unwrap_or_default(); let register = Register::new(authority, meta, perms); - (authority_sk, register) + + let signature = authority_sk.sign(register.bytes().unwrap()); + let signed_reg = SignedRegister::new(register, signature, Default::default()); + + (authority_sk, signed_reg) }) .collect(); @@ -708,424 +428,24 @@ mod tests { replicas } - fn create_reg_replicas(count: usize) -> Vec<(SecretKey, Register)> { - let meta = xor_name::rand::random(); - - gen_reg_replicas(None, meta, None, count) - } - fn create_reg_replica_with( meta: XorName, authority_sk: Option, perms: Option, - ) -> Register { + ) -> SignedRegister { let replicas = gen_reg_replicas(authority_sk, meta, perms, 1); replicas[0].1.clone() } - // verify data convergence on a set of replicas and with the expected length - fn verify_data_convergence(replicas: &[Register], expected_size: u64) -> Result<()> { - // verify all replicas have the same and expected size - for r in replicas { - assert_eq!(r.size(), expected_size); - } - - // now verify that the items are the same in all replicas - let r0 = &replicas[0]; - for r in replicas { - assert_eq!(r.crdt, r0.crdt); - } - - Ok(()) - } - - // Generate a vec of Register replicas of some length, with corresponding vec of keypairs for signing, and the overall owner of the register - fn generate_replicas( - max_quantity: usize, - ) -> impl Strategy, Arc)>> { - let xorname = xor_name::rand::random(); - - let owner_sk = Arc::new(SecretKey::random()); - let owner = owner_sk.public_key(); - let perms = Permissions::new_anyone_can_write(); - - (1..max_quantity + 1).prop_map(move |quantity| { - let mut replicas = Vec::with_capacity(quantity); - for _ in 0..quantity { - let replica = Register::new(owner, xorname, perms.clone()); - - replicas.push(replica); - } - - Ok((replicas, Arc::clone(&owner_sk))) - }) - } - - // Generate a Register entry - fn generate_reg_entry() -> impl Strategy> { - "\\PC*".prop_map(|s| s.into_bytes()) - } - - // Generate a vec of Register entries - fn generate_dataset(max_quantity: usize) -> impl Strategy>> { - prop::collection::vec(generate_reg_entry(), 1..max_quantity + 1) - } - - // Generates a vec of Register entries each with a value suggesting - // the delivery chance of the op that gets created with the entry - fn generate_dataset_and_probability( - max_quantity: usize, - ) -> impl Strategy, u8)>> { - prop::collection::vec((generate_reg_entry(), any::()), 1..max_quantity + 1) - } - - proptest! { - #[test] - fn proptest_reg_doesnt_crash_with_random_data( - _data in generate_reg_entry() - ) { - // Instantiate the same Register on two replicas - let meta = xor_name::rand::random(); - let owner_sk = SecretKey::random(); - let perms = Default::default(); - - let mut replicas = gen_reg_replicas( - Some(owner_sk.clone()), - meta, - Some(perms), - 2); - let (_, mut replica1) = replicas.remove(0); - let (_, mut replica2) = replicas.remove(0); - - // Write an item on replicas - let (_, op) = replica1.write(random_register_entry(), &BTreeSet::new(), &owner_sk)?; - replica2.apply_op(op)?; - - verify_data_convergence(&[replica1, replica2], 1)?; - } - - #[test] - fn proptest_reg_converge_with_many_random_data( - dataset in generate_dataset(1000) - ) { - // Instantiate the same Register on two replicas - let meta = xor_name::rand::random(); - let owner_sk = SecretKey::random(); - let perms = Default::default(); - - // Instantiate the same Register on two replicas - let mut replicas = gen_reg_replicas( - Some(owner_sk.clone()), - meta, - Some(perms), - 2); - let (_, mut replica1) = replicas.remove(0); - let (_, mut replica2) = replicas.remove(0); - - let dataset_length = dataset.len() as u64; - - // insert our data at replicas - let mut children = BTreeSet::new(); - for _data in dataset { - // Write an item on replica1 - let (hash, op) = replica1.write(random_register_entry(), &children, &owner_sk)?; - // now apply that op to replica 2 - replica2.apply_op(op)?; - children = vec![hash].into_iter().collect(); - } - - verify_data_convergence(&[replica1, replica2], dataset_length)?; - } - - #[test] - fn proptest_reg_converge_with_many_random_data_random_entry_children( - dataset in generate_dataset(1000) - ) { - // Instantiate the same Register on two replicas - let meta = xor_name::rand::random(); - let owner_sk = SecretKey::random(); - let perms = Default::default(); - - // Instantiate the same Register on two replicas - let mut replicas = gen_reg_replicas( - Some(owner_sk.clone()), - meta, - Some(perms), - 2); - let (_, mut replica1) = replicas.remove(0); - let (_, mut replica2) = replicas.remove(0); - - let dataset_length = dataset.len() as u64; - - // insert our data at replicas - let mut list_of_hashes = Vec::new(); - let mut rng = thread_rng(); - for _data in dataset { - // choose a random set of children - let num_of_children: usize = rng.gen(); - let children = list_of_hashes.choose_multiple(&mut OsRng, num_of_children).cloned().collect(); - - // Write an item on replica1 using the randomly generated set of children - let (hash, op) = replica1.write(random_register_entry(), &children, &owner_sk)?; - - // now apply that op to replica 2 - replica2.apply_op(op)?; - list_of_hashes.push(hash); - } - - verify_data_convergence(&[replica1, replica2], dataset_length)?; - } - - #[test] - fn proptest_reg_converge_with_many_random_data_across_arbitrary_number_of_replicas( - dataset in generate_dataset(500), - res in generate_replicas(50) - ) { - let (mut replicas, owner_sk) = res?; - let dataset_length = dataset.len() as u64; - - // insert our data at replicas - let mut children = BTreeSet::new(); - for _data in dataset { - // first generate an op from one replica... - let (hash, op)= replicas[0].write(random_register_entry(), &children, &owner_sk)?; - - // then apply this to all replicas - for replica in &mut replicas { - replica.apply_op(op.clone())?; - } - children = vec![hash].into_iter().collect(); - } - - verify_data_convergence(&replicas, dataset_length)?; - - } - - #[test] - fn proptest_converge_with_shuffled_op_set_across_arbitrary_number_of_replicas( - dataset in generate_dataset(100), - res in generate_replicas(500) - ) { - let (mut replicas, owner_sk) = res?; - let dataset_length = dataset.len() as u64; - - // generate an ops set from one replica - let mut ops = vec![]; - - let mut children = BTreeSet::new(); - for _data in dataset { - let (hash, op) = replicas[0].write(random_register_entry(), &children, &owner_sk)?; - ops.push(op); - children = vec![hash].into_iter().collect(); - } - - // now we randomly shuffle ops and apply at each replica - for replica in &mut replicas { - let mut ops = ops.clone(); - ops.shuffle(&mut OsRng); - - for op in ops { - replica.apply_op(op)?; - } - } - - verify_data_convergence(&replicas, dataset_length)?; - } - - #[test] - fn proptest_converge_with_shuffled_ops_from_many_replicas_across_arbitrary_number_of_replicas( - dataset in generate_dataset(1000), - res in generate_replicas(7) - ) { - let (mut replicas, owner_sk) = res?; - let dataset_length = dataset.len() as u64; - - // generate an ops set using random replica for each data - let mut ops = vec![]; - let mut children = BTreeSet::new(); - for _data in dataset { - if let Some(replica) = replicas.choose_mut(&mut OsRng) - { - let (hash, op) = replica.write(random_register_entry(), &children, &owner_sk)?; - ops.push(op); - children = vec![hash].into_iter().collect(); - } - } - - let opslen = ops.len() as u64; - prop_assert_eq!(dataset_length, opslen); - - // now we randomly shuffle ops and apply at each replica - for replica in &mut replicas { - let mut ops = ops.clone(); - ops.shuffle(&mut OsRng); - - for op in ops { - replica.apply_op(op)?; - } - } - - verify_data_convergence(&replicas, dataset_length)?; - } - - #[test] - fn proptest_dropped_data_can_be_reapplied_and_we_converge( - dataset in generate_dataset_and_probability(1000), - ) { - // Instantiate the same Register on two replicas - let meta = xor_name::rand::random(); - let owner_sk = SecretKey::random(); - let perms = Default::default(); - - // Instantiate the same Register on two replicas - let mut replicas = gen_reg_replicas( - Some(owner_sk.clone()), - meta, - Some(perms), - 2); - let (_, mut replica1) = replicas.remove(0); - let (_, mut replica2) = replicas.remove(0); - - let dataset_length = dataset.len() as u64; - - let mut ops = vec![]; - let mut children = BTreeSet::new(); - for (_data, delivery_chance) in dataset { - let (hash, op)= replica1.write(random_register_entry(), &children, &owner_sk)?; - - ops.push((op, delivery_chance)); - children = vec![hash].into_iter().collect(); - } - - for (op, delivery_chance) in ops.clone() { - if delivery_chance < u8::MAX / 3 { - replica2.apply_op(op)?; - } - } - - // here we statistically should have dropped some messages - if dataset_length > 50 { - assert_ne!(replica2.size(), replica1.size()); - } - - // reapply all ops - for (op, _) in ops { - replica2.apply_op(op)?; - } - - // now we converge - verify_data_convergence(&[replica1, replica2], dataset_length)?; - } - - #[test] - fn proptest_converge_with_shuffled_ops_from_many_while_dropping_some_at_random( - dataset in generate_dataset_and_probability(1000), - res in generate_replicas(7), - ) { - let (mut replicas, owner_sk) = res?; - let dataset_length = dataset.len() as u64; - - // generate an ops set using random replica for each data - let mut ops = vec![]; - let mut children = BTreeSet::new(); - for (_data, delivery_chance) in dataset { - // a random index within the replicas range - let index: usize = OsRng.gen_range(0..replicas.len()); - let replica = &mut replicas[index]; - - let (hash, op)=replica.write(random_register_entry(), &children, &owner_sk)?; - ops.push((op, delivery_chance)); - children = vec![hash].into_iter().collect(); - } - - let opslen = ops.len() as u64; - prop_assert_eq!(dataset_length, opslen); - - // now we randomly shuffle ops and apply at each replica - for replica in &mut replicas { - let mut ops = ops.clone(); - ops.shuffle(&mut OsRng); - - for (op, delivery_chance) in ops.clone() { - if delivery_chance > u8::MAX / 3 { - replica.apply_op(op)?; - } - } - - // reapply all ops, simulating lazy messaging filling in the gaps - for (op, _) in ops { - replica.apply_op(op)?; - } - } - - verify_data_convergence(&replicas, dataset_length)?; - } - - #[test] - fn proptest_converge_with_shuffled_ops_including_bad_ops_which_error_and_are_not_applied( - dataset in generate_dataset(10), - bogus_dataset in generate_dataset(10), // should be same number as dataset - gen_replicas_result in generate_replicas(10), - - ) { - let (mut replicas, owner_sk) = gen_replicas_result?; - let dataset_length = dataset.len(); - let bogus_dataset_length = bogus_dataset.len(); - let number_replicas = replicas.len(); - - // generate the real ops set using random replica for each data - let mut ops = vec![]; - let mut children = BTreeSet::new(); - for _data in dataset { - if let Some(replica) = replicas.choose_mut(&mut OsRng) - { - let (hash, op)=replica.write(random_register_entry(), &children, &owner_sk)?; - ops.push(op); - children = vec![hash].into_iter().collect(); - } - } - - // set up a replica that has nothing to do with the rest, random xor... different owner... - let xorname = xor_name::rand::random(); - let random_owner_sk = SecretKey::random(); - let mut bogus_replica = Register::new(random_owner_sk.public_key(), xorname, Permissions::default()); - - // add bogus ops from bogus replica + bogus data - let mut children = BTreeSet::new(); - for _data in bogus_dataset { - let (hash, bogus_op) = bogus_replica.write(random_register_entry(), &children, &random_owner_sk)?; - bogus_replica.apply_op(bogus_op.clone())?; - ops.push(bogus_op); - children = vec![hash].into_iter().collect(); - } - - let opslen = ops.len(); - prop_assert_eq!(dataset_length + bogus_dataset_length, opslen); - - let mut err_count = vec![]; - // now we randomly shuffle ops and apply at each replica - for replica in &mut replicas { - let mut ops = ops.clone(); - ops.shuffle(&mut OsRng); - - for op in ops { - match replica.apply_op(op) { - Ok(_) => {}, - // record all errors to check this matches bogus data - Err(error) => {err_count.push(error)}, - } - } - } - - // check we get an error per bogus datum per replica - assert_eq!(err_count.len(), bogus_dataset_length * number_replicas); - - verify_data_convergence(&replicas, dataset_length as u64)?; - } - } - fn random_register_entry() -> Vec { let random_bytes = thread_rng().gen::<[u8; 32]>(); random_bytes.to_vec() } + + fn generate_random_op(address: RegisterAddress, writer_sk: &SecretKey) -> Result { + let mut crdt_reg = RegisterCrdt::new(address); + let item = random_register_entry(); + let (_hash, addr, crdt_op) = crdt_reg.write(item, &BTreeSet::new())?; + Ok(RegisterOp::new(addr, crdt_op, writer_sk)) + } } diff --git a/sn_registers/src/register_op.rs b/sn_registers/src/register_op.rs index 936529cdf1..455d26b43d 100644 --- a/sn_registers/src/register_op.rs +++ b/sn_registers/src/register_op.rs @@ -39,7 +39,7 @@ impl std::hash::Hash for RegisterOp { impl RegisterOp { /// Create a new RegisterOp - pub(crate) fn new( + pub fn new( address: RegisterAddress, crdt_op: MerkleDagEntry, signer: &SecretKey,