Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
refactor(messaging): add expected aggregation scheme, and use an itin…
Browse files Browse the repository at this point in the history
…erary

BREAKING CHANGE: send_message api now requires an itinerary argument
  • Loading branch information
oetyng authored and maqi committed Mar 2, 2021
1 parent 02398bd commit a79d2d0
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 117 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ rand_chacha = "~0.2.2"
thiserror = "1.0.23"
xor_name = "1.1.0"
resource_proof = "0.8.0"
sn_messaging = "~6.0.0"
sn_messaging = "~7.0.0"
sn_data_types = "~0.15.0"

[dependencies.bls]
Expand Down
16 changes: 10 additions & 6 deletions examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ use rand::{
Rng,
};
use serde::{Deserialize, Serialize};
use sn_messaging::{DstLocation, SrcLocation};
use sn_messaging::{
location::{Aggregation, Itinerary},
DstLocation, SrcLocation,
};
use sn_routing::{
Config, Error as RoutingError, Event as RoutingEvent, NodeElderChange, Routing, TransportConfig,
};
Expand Down Expand Up @@ -353,7 +356,6 @@ impl Network {
let dst = match dst {
DstLocation::Section(name) => name,
DstLocation::Node(name) => name,
DstLocation::AccumulatingNode(name) => name,
DstLocation::Direct | DstLocation::EndUser(_) => {
return Err(format_err!("unexpected probe message dst: {:?}", dst))
}
Expand Down Expand Up @@ -455,11 +457,13 @@ impl Network {
},
};
let bytes = bincode::serialize(&message)?.into();
let itry = Itinerary {
src: SrcLocation::Node(src),
dst: DstLocation::Section(dst),
aggregation: Aggregation::None,
};

match node
.send_message(SrcLocation::Node(src), DstLocation::Section(dst), bytes)
.await
{
match node.send_message(itry, bytes).await {
Ok(()) => Ok(true),
Err(RoutingError::InvalidSrcLocation) => Ok(false), // node name changed
Err(error) => Err(error.into()),
Expand Down
3 changes: 1 addition & 2 deletions src/delivery_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub(crate) fn delivery_targets(
let target_name = user.name();
section_candidates(&target_name, our_name, section, network)?
}
DstLocation::Node(target_name) | DstLocation::AccumulatingNode(target_name) => {
DstLocation::Node(target_name) => {
if target_name == our_name {
return Ok((Vec::new(), 0));
}
Expand Down Expand Up @@ -167,7 +167,6 @@ where
{
let dst_name = match dst {
DstLocation::Node(name) => *name,
DstLocation::AccumulatingNode(name) => *name,
DstLocation::Section(name) => *name,
DstLocation::EndUser(_) | DstLocation::Direct => {
error!("Invalid destination for signature targets: {:?}", dst);
Expand Down
22 changes: 19 additions & 3 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
use bls_signature_aggregator::ProofShare;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use sn_messaging::DstLocation;
use sn_messaging::{Aggregation, DstLocation};
use std::fmt::{self, Debug, Formatter};
use thiserror::Error;
use xor_name::{Prefix, XorName};
Expand All @@ -40,6 +40,8 @@ pub(crate) struct Message {
src: SrcAuthority,
/// Destination location.
dst: DstLocation,
///
aggregation: Aggregation,
/// The body of the message.
variant: Variant,
/// Proof chain to verify the message trust. Does not need to be signed.
Expand Down Expand Up @@ -116,6 +118,7 @@ impl Message {
let mut msg = Message {
dst,
src,
aggregation: Aggregation::None,
proof_chain,
variant,
dst_key,
Expand All @@ -133,12 +136,12 @@ impl Message {
pub(crate) fn for_dst_accumulation(
node: &Node,
key_share: &SectionKeyShare,
dst_node_name: XorName,
dst: DstLocation,
user_msg: Bytes,
proof_chain: SectionProofChain,
dst_key: Option<bls::PublicKey>,
src_section: XorName,
) -> Result<Self, CreateError> {
let dst = DstLocation::AccumulatingNode(dst_node_name);
let variant = Variant::UserMessage(user_msg);
let serialized = bincode::serialize(&SignableView {
dst: &dst,
Expand All @@ -152,6 +155,7 @@ impl Message {
signature_share,
};
let src = SrcAuthority::BlsShare {
src_section,
proof_share,
public_key: node.keypair.public,
age: node.age,
Expand Down Expand Up @@ -306,6 +310,18 @@ impl Message {
&self.hash
}

/// Elders will aggregate a group sig before
/// they all all send one copy of it each to dst.
pub fn aggregate_at_src(&self) -> bool {
matches!(self.src, SrcAuthority::Section { .. })
}

/// Elders will send their signed message, which
/// recipients aggregate.
pub fn aggregate_at_dst(&self) -> bool {
matches!(self.src, SrcAuthority::BlsShare { .. })
}

/// Returns the attached proof chain, if any.
pub(crate) fn proof_chain(&self) -> Result<&SectionProofChain> {
self.proof_chain.as_ref().ok_or(Error::InvalidMessage)
Expand Down
4 changes: 3 additions & 1 deletion src/messages/src_authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub enum SrcAuthority {
},
/// Authority of a single peer that uses it's BLS Keyshare to sign the message.
BlsShare {
/// Section name at the time
src_section: XorName,
/// Public key of the source peer.
public_key: PublicKey,
/// Age of the source peer.
Expand All @@ -57,7 +59,7 @@ impl SrcAuthority {
match self {
Self::Node { public_key, .. } => SrcLocation::Node(name(public_key)),
Self::BlsShare { public_key, .. } => SrcLocation::Node(name(public_key)),
Self::Section { prefix, .. } => SrcLocation::Section(*prefix),
Self::Section { prefix, .. } => SrcLocation::Section(prefix.name()),
}
}

Expand Down
128 changes: 61 additions & 67 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use sn_messaging::{
section_info::{
Error as TargetSectionError, GetSectionResponse, Message as SectionInfoMsg, SectionInfo,
},
DstLocation, EndUser, MessageType, SrcLocation,
DstLocation, EndUser, Itinerary, MessageType, SrcLocation,
};
use std::{cmp, net::SocketAddr, slice};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -749,9 +749,7 @@ impl Approved {
// If elder, always handle UserMessage, otherwise handle it only if addressed directly to us
// as a node.
fn should_handle_user_message(&self, dst: &DstLocation) -> bool {
self.is_elder()
|| dst == &DstLocation::Node(self.node.name())
|| dst == &DstLocation::AccumulatingNode(self.node.name())
self.is_elder() || dst == &DstLocation::Node(self.node.name())
}

// Decide how to handle a `Vote` message.
Expand Down Expand Up @@ -815,15 +813,14 @@ impl Approved {
sender: Option<SocketAddr>,
msg: Message,
) -> Result<Command> {
let src = msg.src().src_location();
let src_name = match src {
SrcLocation::Node(name) => name,
SrcLocation::Section(prefix) => prefix.name(),
SrcLocation::EndUser(_) => return Err(Error::InvalidSrcLocation),
let src_name = match msg.src() {
SrcAuthority::Node { public_key, .. } => crypto::name(public_key),
SrcAuthority::BlsShare { public_key, .. } => crypto::name(public_key),
SrcAuthority::Section { prefix, .. } => prefix.name(),
};

let bounce_dst_key = *self.section_key_by_name(&src_name);
let bounce_dst = if src.is_section() {
let bounce_dst = if msg.aggregate_at_src() {
DstLocation::Section(src_name)
} else {
DstLocation::Node(src_name)
Expand Down Expand Up @@ -1005,8 +1002,16 @@ impl Approved {
message: MessageType::ClientMessage(ClientMessage::from(content)?),
}]);
}
if let DstLocation::AccumulatingNode(_name) = &dst {
if let SrcAuthority::BlsShare { proof_share, .. } = &src {
if msg.aggregate_at_dst() {
if !matches!(dst, DstLocation::Node(_)) {
return Err(Error::InvalidDstLocation);
}
if let SrcAuthority::BlsShare {
proof_share,
src_section,
..
} = &src
{
let signed_bytes = bincode::serialize(&msg.signable_view())?;
match self
.message_accumulator
Expand All @@ -1018,7 +1023,7 @@ impl Approved {
if key.verify(&proof.signature, signed_bytes) {
self.send_event(Event::MessageReceived {
content,
src: src.src_location(),
src: SrcLocation::Section(*src_section),
dst,
});
} else {
Expand Down Expand Up @@ -2031,73 +2036,62 @@ impl Approved {
Ok(commands)
}

pub fn send_user_message(
&mut self,
src: SrcLocation,
dst: DstLocation,
content: Bytes,
) -> Result<Vec<Command>> {
if !src.contains(&self.node.name()) {
pub fn send_user_message(&mut self, itry: Itinerary, content: Bytes) -> Result<Vec<Command>> {
let are_we_src =
matches!(itry.src, SrcLocation::Node(_)) && itry.src.name() == self.node.name();
if !are_we_src {
error!(
"Not sending user message {:?} -> {:?}: not part of the source location",
src, dst
"Not sending user message {:?} -> {:?}: we are not the source location",
itry.src, itry.dst
);
return Err(Error::InvalidSrcLocation);
}

if matches!(dst, DstLocation::Direct) {
if matches!(itry.src, SrcLocation::EndUser(_)) {
return Err(Error::InvalidSrcLocation);
}
if matches!(itry.dst, DstLocation::Direct) {
error!(
"Not sending user message {:?} -> {:?}: direct dst not supported",
src, dst
itry.src, itry.dst
);
return Err(Error::InvalidDstLocation);
}

if matches!(dst, DstLocation::AccumulatingNode(_)) && !matches!(src, SrcLocation::Node(_)) {
error!("Not sending user message {:?} -> {:?}: src should be a single node for dst accumulation", src, dst);
return Err(Error::InvalidSrcLocation);
}

match src {
SrcLocation::Node(_) => {
// If the source is a single node, we don't even need to vote, so let's cut this short.
let msg = if let DstLocation::AccumulatingNode(name) = dst {
Message::for_dst_accumulation(
&self.node,
self.section_keys_provider.key_share()?,
name,
content,
self.section().create_proof_chain_for_our_info(None),
None,
)?
} else {
let variant = Variant::UserMessage(content);
Message::single_src(&self.node, dst, variant, None, None)?
};
let mut commands = vec![];
// If the source is a single node, we don't even need to vote, so let's cut this short.
let msg = if itry.aggregate_at_dst() {
Message::for_dst_accumulation(
&self.node,
self.section_keys_provider.key_share()?,
itry.dst,
content,
self.section().create_proof_chain_for_our_info(None),
None,
self.section().prefix().name(),
)?
} else if itry.aggregate_at_src() {
let variant = Variant::UserMessage(content);
let vote = self.create_send_message_vote(itry.dst, variant, None)?;
let recipients = delivery_group::signature_targets(
&itry.dst,
self.section.elders_info().peers().copied(),
);
return self.send_vote(&recipients, vote);
} else {
let variant = Variant::UserMessage(content);
Message::single_src(&self.node, itry.dst, variant, None, None)?
};
let mut commands = vec![];

if dst.contains(&self.node.name(), self.section.prefix()) {
commands.push(Command::HandleMessage {
sender: Some(self.node.addr),
message: msg.clone(),
});
}
if itry.dst.contains(&self.node.name(), self.section.prefix()) {
commands.push(Command::HandleMessage {
sender: Some(self.node.addr),
message: msg.clone(),
});
}

commands.extend(self.relay_message(&msg)?);
commands.extend(self.relay_message(&msg)?);

Ok(commands)
}
SrcLocation::Section(_) => {
let variant = Variant::UserMessage(content);
let vote = self.create_send_message_vote(dst, variant, None)?;
let recipients = delivery_group::signature_targets(
&dst,
self.section.elders_info().peers().copied(),
);
self.send_vote(&recipients, vote)
}
SrcLocation::EndUser(_) => Err(Error::InvalidSrcLocation),
}
Ok(commands)
}

fn create_send_message_vote(
Expand Down
14 changes: 4 additions & 10 deletions src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use bls_signature_aggregator::Proof;
use bytes::Bytes;
use hex_fmt::HexFmt;
use sn_messaging::{
node::NodeMessage, section_info::Message as SectionInfoMsg, DstLocation, MessageType,
SrcLocation,
node::NodeMessage, section_info::Message as SectionInfoMsg, Itinerary, MessageType,
};
use std::{
fmt::{self, Debug, Formatter},
Expand Down Expand Up @@ -71,11 +70,7 @@ pub(crate) enum Command {
message: MessageType,
},
/// Send `UserMessage` with the given source and destination.
SendUserMessage {
src: SrcLocation,
dst: DstLocation,
content: Bytes,
},
SendUserMessage { itry: Itinerary, content: Bytes },
/// Schedule a timeout after the given duration. When the timeout expires, a `HandleTimeout`
/// command is raised. The token is used to identify the timeout.
ScheduleTimeout { duration: Duration, token: u64 },
Expand Down Expand Up @@ -167,10 +162,9 @@ impl Debug for Command {
.field("delivery_group_size", delivery_group_size)
.field("message", message)
.finish(),
Self::SendUserMessage { src, dst, content } => f
Self::SendUserMessage { itry, content } => f
.debug_struct("SendUserMessage")
.field("src", src)
.field("dst", dst)
.field("itry", itry)
.field("content", &format_args!("{:10}", HexFmt(content)))
.finish(),
Self::ScheduleTimeout { duration, token } => f
Expand Down
Loading

0 comments on commit a79d2d0

Please sign in to comment.