Skip to content

Commit

Permalink
Merge pull request #2136 from maqi/refactor_register
Browse files Browse the repository at this point in the history
feat(register)!: network only store ops list
  • Loading branch information
maqi authored Oct 10, 2024
2 parents e6c49b4 + c214a4d commit dd34375
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 1,159 deletions.
158 changes: 78 additions & 80 deletions autonomi/src/client/registers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,22 @@
pub use bls::SecretKey as RegisterSecretKey;
use sn_evm::Amount;
use sn_evm::AttoTokens;
use sn_networking::GetRecordError;
use sn_networking::VerificationKind;
use sn_protocol::storage::RetryStrategy;
pub use sn_registers::{Permissions as RegisterPermissions, RegisterAddress};
use tracing::warn;

use crate::client::data::PayError;
use crate::client::Client;
use bytes::Bytes;
use evmlib::wallet::Wallet;
use libp2p::kad::{Quorum, Record};
use sn_networking::GetRecordCfg;
use sn_networking::NetworkError;
use sn_networking::PutRecordCfg;
use sn_networking::{GetRecordCfg, GetRecordError, NetworkError, PutRecordCfg};
use sn_protocol::storage::try_deserialize_record;
use sn_protocol::storage::try_serialize_record;
use sn_protocol::storage::RecordKind;
use sn_protocol::NetworkAddress;
use sn_registers::Register as ClientRegister;
use sn_registers::SignedRegister;
use sn_registers::{EntryHash, Permissions};
use sn_registers::Register as BaseRegister;
use sn_registers::{Permissions, RegisterCrdt, RegisterOp, SignedRegister};
use std::collections::BTreeSet;
use xor_name::XorName;

Expand All @@ -56,27 +51,67 @@ pub enum RegisterError {

#[derive(Clone, Debug)]
pub struct Register {
pub(crate) inner: SignedRegister,
signed_reg: SignedRegister,
crdt_reg: RegisterCrdt,
}

impl Register {
pub fn address(&self) -> &RegisterAddress {
self.inner.address()
self.signed_reg.address()
}

/// Retrieve the current values of the register. There can be multiple values
/// in case a register was updated concurrently. This is because of the nature
/// of registers, which allows for network concurrency.
pub fn values(&self) -> Vec<Bytes> {
self.inner
.clone()
.register()
.expect("register to be valid")
self.crdt_reg
.read()
.into_iter()
.map(|(_hash, value)| value.into())
.collect()
}

fn new(
initial_value: Option<Bytes>,
name: XorName,
owner: RegisterSecretKey,
permissions: RegisterPermissions,
) -> Result<Register, RegisterError> {
let pk = owner.public_key();

let base_register = BaseRegister::new(pk, name, permissions);

let signature = owner.sign(base_register.bytes().map_err(RegisterError::Write)?);
let signed_reg = SignedRegister::new(base_register, signature, BTreeSet::new());

let crdt_reg = RegisterCrdt::new(*signed_reg.address());

let mut register = Register {
signed_reg,
crdt_reg,
};

if let Some(value) = initial_value {
register.write_atop(&value, &owner)?;
}

Ok(register)
}

fn write_atop(&mut self, entry: &[u8], owner: &RegisterSecretKey) -> Result<(), RegisterError> {
let children: BTreeSet<_> = self.crdt_reg.read().iter().map(|(hash, _)| *hash).collect();

let (_hash, address, crdt_op) = self
.crdt_reg
.write(entry.to_vec(), &children)
.map_err(RegisterError::Write)?;

let op = RegisterOp::new(address, crdt_op, owner);

let _ = self.signed_reg.add_op(op);

Ok(())
}
}

impl Client {
Expand All @@ -99,9 +134,11 @@ impl Client {
is_register: true,
};

let register = match self.network.get_record_from_network(key, &get_cfg).await {
let signed_reg = match self.network.get_record_from_network(key, &get_cfg).await {
Ok(record) => {
try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?
let signed_reg: SignedRegister =
try_deserialize_record(&record).map_err(|_| RegisterError::Serialization)?;
signed_reg
}
// manage forked register case
Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { result_map })) => {
Expand All @@ -128,58 +165,33 @@ impl Client {
};

// Make sure the fetched record contains valid CRDT operations
register.verify().map_err(|err| {
error!("Failed to verify register {address:?} with error: {err}");
RegisterError::FailedVerification
})?;
signed_reg
.verify()
.map_err(|_| RegisterError::FailedVerification)?;

let mut crdt_reg = RegisterCrdt::new(*signed_reg.address());
for op in signed_reg.ops() {
if let Err(err) = crdt_reg.apply_op(op.clone()) {
return Err(RegisterError::Write(err));
}
}

Ok(Register { inner: register })
Ok(Register {
signed_reg,
crdt_reg,
})
}

/// Updates a Register on the network with a new value. This will overwrite existing value(s).
pub async fn register_update(
&self,
register: Register,
mut register: Register,
new_value: Bytes,
owner: RegisterSecretKey,
) -> Result<(), RegisterError> {
// Fetch the current register
let mut signed_register = register.inner;
let mut register = signed_register
.clone()
.register()
.map_err(|err| {
error!(
"Failed to get register from signed register as it failed verification: {err}"
);
RegisterError::FailedVerification
})?
.clone();
register.write_atop(&new_value, &owner)?;

info!("Updating register at addr: {}", register.address());

// Get all current branches
let children: BTreeSet<EntryHash> = register.read().into_iter().map(|(e, _)| e).collect();

// Write the new value to all branches
let (_, op) = register
.write(new_value.into(), &children, &owner)
.map_err(|err| {
error!(
"Failed to write to register at addr: {} : {err}",
register.address()
);
RegisterError::Write(err)
})?;

// Apply the operation to the register
signed_register.add_op(op.clone()).map_err(|err| {
error!(
"Failed to add op to register at addr: {} : {err}",
register.address()
);
RegisterError::Write(err)
})?;
let signed_register = register.signed_reg.clone();

// Prepare the record for network storage
let record = Record {
Expand Down Expand Up @@ -230,7 +242,7 @@ impl Client {
let pk = owner.public_key();
let name = XorName::from_content_parts(&[name.as_bytes()]);
let permissions = Permissions::new_with([pk]);
let register = ClientRegister::new(pk, name, permissions);
let register = Register::new(None, name, owner, permissions)?;
let reg_xor = register.address().xorname();

// get cost to store register
Expand Down Expand Up @@ -281,23 +293,14 @@ impl Client {
permissions: RegisterPermissions,
wallet: &Wallet,
) -> Result<Register, RegisterError> {
let pk = owner.public_key();
info!("Creating register with name: {name}");
let name = XorName::from_content_parts(&[name.as_bytes()]);

// Owner can write to the register.
let mut register = ClientRegister::new(pk, name, permissions);
let address = NetworkAddress::from_register_address(*register.address());

info!("Creating register at address: {address}");
let register = Register::new(Some(value), name, owner, permissions)?;
let address = register.address();

let entries = register
.read()
.into_iter()
.map(|(entry_hash, _value)| entry_hash)
.collect();

let _ = register.write(value.into(), &entries, &owner);
let reg_xor = register.address().xorname();
let reg_xor = address.xorname();
debug!("Paying for register at address: {address}");
let (payment_proofs, _skipped) = self
.pay(std::iter::once(reg_xor), wallet)
Expand All @@ -317,13 +320,10 @@ impl Client {
.to_peer_id_payee()
.ok_or(RegisterError::InvalidQuote)
.inspect_err(|err| error!("Failed to get payee from payment proof: {err}"))?;
let signed_register = register.clone().into_signed(&owner).map_err(|err| {
error!("Failed to sign register at address: {address} : {err}");
RegisterError::CouldNotSign(err)
})?;
let signed_register = register.signed_reg.clone();

let record = Record {
key: address.to_record_key(),
key: NetworkAddress::from_register_address(*address).to_record_key(),
value: try_serialize_record(
&(proof, &signed_register),
RecordKind::RegisterWithPayment,
Expand Down Expand Up @@ -356,8 +356,6 @@ impl Client {
error!("Failed to put record - register {address} to the network: {err}")
})?;

Ok(Register {
inner: signed_register,
})
Ok(register)
}
}
97 changes: 44 additions & 53 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,37 +314,6 @@ impl Client {
///
/// [Signature]
///
/// # Example
/// ```no_run
/// use sn_client::{Client, Error};
/// use bls::SecretKey;
///
/// # #[tokio::main]
/// # async fn main() -> Result<(),Error>{
/// use tracing::callsite::register;
/// use xor_name::XorName;
/// use sn_registers::Register;
/// use sn_protocol::messages::RegisterCmd;
/// let client = Client::new(SecretKey::random(), None, None, None).await?;
///
/// // Set up register prerequisites
/// let mut rng = rand::thread_rng();
/// let xorname = XorName::random(&mut rng);
/// let owner_sk = SecretKey::random();
/// let owner_pk = owner_sk.public_key();
///
/// // set up register
/// let mut register = Register::new(owner_pk, xorname, Default::default());
/// let mut register_clone = register.clone();
///
/// // Use of client.sign() with register through RegisterCmd::Create
/// let cmd = RegisterCmd::Create {
/// register,
/// signature: client.sign(register_clone.bytes()?),
/// };
/// # Ok(())
/// # }
/// ```
pub fn sign<T: AsRef<[u8]>>(&self, data: T) -> Signature {
self.signer.sign(data)
}
Expand Down Expand Up @@ -1105,10 +1074,27 @@ fn merge_register_records(
mod tests {
use std::collections::BTreeSet;

use sn_registers::Register;
use sn_registers::{Register, RegisterCrdt, RegisterOp};

use super::*;

fn write_atop(
signed_reg: &mut SignedRegister,
crdt_reg: &mut RegisterCrdt,
entry: &[u8],
owner: &SecretKey,
) -> eyre::Result<()> {
let children: BTreeSet<_> = crdt_reg.read().iter().map(|(hash, _)| *hash).collect();

let (_hash, address, crdt_op) = crdt_reg.write(entry.to_vec(), &children)?;

let op = RegisterOp::new(address, crdt_op, owner);

signed_reg.add_op(op)?;

Ok(())
}

#[test]
fn test_merge_register_records() -> eyre::Result<()> {
let mut rng = rand::thread_rng();
Expand All @@ -1117,28 +1103,33 @@ mod tests {
let owner_pk = owner_sk.public_key();
let address = RegisterAddress::new(meta, owner_pk);

let base_register = Register::new(owner_pk, meta, Default::default());
let signature = owner_sk.sign(base_register.bytes()?);

// prepare registers
let mut register_root = Register::new(owner_pk, meta, Default::default());
let (root_hash, _) =
register_root.write(b"root_entry".to_vec(), &BTreeSet::default(), &owner_sk)?;
let root = BTreeSet::from_iter(vec![root_hash]);
let signed_root = register_root.clone().into_signed(&owner_sk)?;

let mut register1 = register_root.clone();
let (_hash, op1) = register1.write(b"entry1".to_vec(), &root, &owner_sk)?;
let mut signed_register1 = signed_root.clone();
signed_register1.add_op(op1)?;

let mut register2 = register_root.clone();
let (_hash, op2) = register2.write(b"entry2".to_vec(), &root, &owner_sk)?;
let mut signed_register2 = signed_root;
signed_register2.add_op(op2)?;

let mut register_bad = Register::new(owner_pk, meta, Default::default());
let (_hash, _op_bad) =
register_bad.write(b"bad_root".to_vec(), &BTreeSet::default(), &owner_sk)?;
let invalid_sig = register2.sign(&owner_sk)?; // steal sig from something else
let signed_register_bad = SignedRegister::new(register_bad, invalid_sig);
let mut register_root = SignedRegister::new(base_register, signature, BTreeSet::new());
let mut crdt_reg_root = RegisterCrdt::new(address);

write_atop(
&mut register_root,
&mut crdt_reg_root,
b"root_entry",
&owner_sk,
)?;

let mut signed_register1 = register_root.clone();
let mut crdt_reg1 = crdt_reg_root.clone();
write_atop(&mut signed_register1, &mut crdt_reg1, b"entry1", &owner_sk)?;

let mut signed_register2 = register_root.clone();
let mut crdt_reg2 = crdt_reg_root.clone();
write_atop(&mut signed_register2, &mut crdt_reg2, b"entry2", &owner_sk)?;

let base_register_bad = Register::new(owner_pk, meta, Default::default());
let bad_sk = SecretKey::random();
let signature_bad = bad_sk.sign(base_register_bad.bytes()?);
let signed_register_bad =
SignedRegister::new(base_register_bad, signature_bad, BTreeSet::new());

// prepare records
let record1 = Record {
Expand Down
Loading

0 comments on commit dd34375

Please sign in to comment.