Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
Merge pull request #2376 from madadam/additional-proof-chain-key
Browse files Browse the repository at this point in the history
feat!: support adding additional proof chain keys to user messages
  • Loading branch information
madadam authored Mar 29, 2021
2 parents 2e10be6 + 5316286 commit bb6a73a
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 89 deletions.
4 changes: 3 additions & 1 deletion examples/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
index, elders.prefix, elders.key, sibling_elders, elders.elders, self_status_change
);
}
Event::MessageReceived { content, src, dst } => info!(
Event::MessageReceived {
content, src, dst, ..
} => info!(
"Node #{} received message - src: {:?}, dst: {:?}, content: {}",
index,
src,
Expand Down
2 changes: 1 addition & 1 deletion examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ impl Network {
aggregation: Aggregation::None,
};

match node.send_message(itinerary, bytes).await {
match node.send_message(itinerary, bytes, None).await {
Ok(()) => Ok(true),
Err(RoutingError::InvalidSrcLocation) => Ok(false), // node name changed
Err(error) => {
Expand Down
4 changes: 2 additions & 2 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl DkgCommand {
dkg_key,
message,
} => {
let variant = Variant::DKGMessage { dkg_key, message };
let variant = Variant::DkgMessage { dkg_key, message };
let message = Message::single_src(node, DstLocation::Direct, variant, None, None)?;

Ok(Command::send_message_to_nodes(
Expand All @@ -616,7 +616,7 @@ impl DkgCommand {
dkg_key,
proof,
} => {
let variant = Variant::DKGFailureObservation { dkg_key, proof };
let variant = Variant::DkgFailureObservation { dkg_key, proof };
let message = Message::single_src(node, DstLocation::Direct, variant, None, None)?;

Ok(Command::send_message_to_nodes(
Expand Down
7 changes: 6 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::section::SectionChain;
use bytes::Bytes;
use ed25519_dalek::Keypair;
use hex_fmt::HexFmt;
Expand Down Expand Up @@ -78,6 +79,8 @@ pub enum Event {
src: SrcLocation,
/// The destination location that receives the message.
dst: DstLocation,
/// The proof chain for the message.
proof_chain: SectionChain,
},
/// A new peer joined our section.
MemberJoined {
Expand Down Expand Up @@ -136,7 +139,9 @@ impl Debug for Event {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
match self {
Self::Genesis => write!(formatter, "Genesis"),
Self::MessageReceived { content, src, dst } => write!(
Self::MessageReceived {
content, src, dst, ..
} => write!(
formatter,
"MessageReceived {{ content: \"{:<8}\", src: {:?}, dst: {:?} }}",
HexFmt(content),
Expand Down
24 changes: 12 additions & 12 deletions src/messages/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,27 @@ pub(crate) enum Variant {
message: Bytes,
},
/// Sent to the new elder candidates to start the DKG process.
DKGStart {
DkgStart {
/// The identifier of the DKG session to start.
dkg_key: DkgKey,
/// The DKG particpants.
elders_info: EldersInfo,
},
/// Message exchanged for DKG process.
DKGMessage {
DkgMessage {
/// The identifier of the DKG session this message is for.
dkg_key: DkgKey,
/// The DKG message.
message: DkgMessage,
},
/// Broadcasted to the other DKG participants when a DKG failure is observed.
DKGFailureObservation {
DkgFailureObservation {
dkg_key: DkgKey,
proof: DkgFailureProof,
},
/// Sent to the current elders by the DKG participants when at least majority of them observe
/// a DKG failure.
DKGFailureAgreement(DkgFailureProofSet),
DkgFailureAgreement(DkgFailureProofSet),
/// Message containing a single `Vote` to be accumulated in the vote accumulator.
Vote {
content: Vote,
Expand Down Expand Up @@ -207,26 +207,26 @@ impl Debug for Variant {
.field("src_key", src_key)
.field("message_hash", &MessageHash::from_bytes(message))
.finish(),
Self::DKGStart {
Self::DkgStart {
dkg_key,
elders_info,
} => f
.debug_struct("DKGStart")
.debug_struct("DkgStart")
.field("dkg_key", dkg_key)
.field("elders_info", elders_info)
.finish(),
Self::DKGMessage { dkg_key, message } => f
.debug_struct("DKGMessage")
Self::DkgMessage { dkg_key, message } => f
.debug_struct("DkgMessage")
.field("dkg_key", &dkg_key)
.field("message", message)
.finish(),
Self::DKGFailureObservation { dkg_key, proof } => f
.debug_struct("DKGFailureObservation")
Self::DkgFailureObservation { dkg_key, proof } => f
.debug_struct("DkgFailureObservation")
.field("dkg_key", dkg_key)
.field("proof", proof)
.finish(),
Self::DKGFailureAgreement(proofs) => {
f.debug_tuple("DKGFailureAgreement").field(proofs).finish()
Self::DkgFailureAgreement(proofs) => {
f.debug_tuple("DkgFailureAgreement").field(proofs).finish()
}
Self::Vote {
content,
Expand Down
86 changes: 26 additions & 60 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ impl Approved {
}

pub fn handle_dkg_failure(&mut self, proofs: DkgFailureProofSet) -> Result<Command> {
let variant = Variant::DKGFailureAgreement(proofs);
let variant = Variant::DkgFailureAgreement(proofs);
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None, None)?;
Ok(self.send_message_to_our_elders(message.to_bytes()))
}
Expand Down Expand Up @@ -570,7 +570,7 @@ impl Approved {
return Ok(MessageStatus::Useless);
}
}
Variant::DKGStart { elders_info, .. } => {
Variant::DkgStart { elders_info, .. } => {
if !elders_info.elders.contains_key(&self.node.name()) {
return Ok(MessageStatus::Useless);
}
Expand Down Expand Up @@ -604,9 +604,9 @@ impl Approved {
Variant::Relocate(_)
| Variant::BouncedUntrustedMessage(_)
| Variant::BouncedUnknownMessage { .. }
| Variant::DKGMessage { .. }
| Variant::DKGFailureObservation { .. }
| Variant::DKGFailureAgreement { .. }
| Variant::DkgMessage { .. }
| Variant::DkgFailureObservation { .. }
| Variant::DkgFailureAgreement { .. }
| Variant::ResourceChallenge { .. } => {}
}

Expand Down Expand Up @@ -701,17 +701,17 @@ impl Approved {
src_key,
)
}
Variant::DKGStart {
Variant::DkgStart {
dkg_key,
elders_info,
} => self.handle_dkg_start(*dkg_key, elders_info.clone()),
Variant::DKGMessage { dkg_key, message } => {
Variant::DkgMessage { dkg_key, message } => {
self.handle_dkg_message(*dkg_key, message.clone(), msg.src().name())
}
Variant::DKGFailureObservation { dkg_key, proof } => {
Variant::DkgFailureObservation { dkg_key, proof } => {
self.handle_dkg_failure_observation(*dkg_key, *proof)
}
Variant::DKGFailureAgreement(proofs) => {
Variant::DkgFailureAgreement(proofs) => {
self.handle_dkg_failure_agreement(&msg.src().name(), proofs)
}
Variant::Vote {
Expand Down Expand Up @@ -1022,9 +1022,7 @@ impl Approved {
}

fn handle_user_message(&mut self, msg: &Message, content: Bytes) -> Result<Vec<Command>> {
let src = msg.src().clone();
let dst = *msg.dst();
if let DstLocation::EndUser(end_user) = &dst {
if let DstLocation::EndUser(end_user) = msg.dst() {
let recipients = match end_user {
EndUser::AllClients(public_key) => {
self.get_all_socket_addr(public_key).copied().collect()
Expand All @@ -1046,45 +1044,12 @@ impl Approved {
message: MessageType::ClientMessage(ClientMessage::from(content)?),
}]);
}
if let SrcAuthority::BlsShare {
proof_share,
src_name,
..
} = &src
{
let signed_bytes = bincode::serialize(&msg.signable_view())?;
match self
.message_accumulator
.add(&signed_bytes, proof_share.clone())
{
Ok(proof) => {
trace!("Successfully aggregated signatures for message: {:?}", msg);
let key = msg.proof_chain_last_key()?;
if key.verify(&proof.signature, signed_bytes) {
self.send_event(Event::MessageReceived {
content,
src: SrcLocation::Section(*src_name),
dst,
});
} else {
trace!(
"Aggregated signature is invalid. Handling message {:?} skipped",
msg
);
}
}
Err(AggregatorError::NotEnoughShares) => {}
Err(err) => {
trace!("Error accumulating message at destination: {:?}", err);
}
}
return Ok(vec![]);
}

self.send_event(Event::MessageReceived {
content,
src: src.src_location(),
dst,
src: msg.src().src_location(),
dst: *msg.dst(),
proof_chain: msg.proof_chain()?.clone(),
});
Ok(vec![])
}
Expand Down Expand Up @@ -1351,7 +1316,7 @@ impl Approved {
dkg_key: DkgKey,
new_elders_info: EldersInfo,
) -> Result<Vec<Command>> {
trace!("Received DKGStart for {}", new_elders_info);
trace!("Received DkgStart for {}", new_elders_info);
self.dkg_voter
.start(&self.node.keypair, dkg_key, new_elders_info)
.into_commands(&self.node)
Expand Down Expand Up @@ -1981,12 +1946,12 @@ impl Approved {
elders_info: EldersInfo,
recipients: &[Peer],
) -> Result<Vec<Command>> {
trace!("Send DKGStart for {} to {:?}", elders_info, recipients);
trace!("Send DkgStart for {} to {:?}", elders_info, recipients);

let src_prefix = elders_info.prefix;
let generation = self.section.chain().main_branch_len() as u64;
let dkg_key = DkgKey::new(&elders_info, generation);
let variant = Variant::DKGStart {
let variant = Variant::DkgStart {
dkg_key,
elders_info,
};
Expand Down Expand Up @@ -2070,6 +2035,7 @@ impl Approved {
&mut self,
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<&bls::PublicKey>,
) -> Result<Vec<Command>> {
let are_we_src = itinerary.src.equals(&self.node.name())
|| itinerary.src.equals(&self.section().prefix().name());
Expand All @@ -2092,6 +2058,7 @@ impl Approved {
}

let variant = Variant::UserMessage(content);
let proof_chain = self.create_proof_chain(&itinerary.dst, additional_proof_chain_key)?;

// If the msg is to be aggregated at dst, we don't vote among our peers, we simply send the
// msg as our vote to the dst.
Expand All @@ -2101,18 +2068,18 @@ impl Approved {
itinerary.src.name(),
itinerary.dst,
variant,
self.create_proof_chain(&itinerary.dst, None)?,
proof_chain,
None,
)?
} else if itinerary.aggregate_at_src() {
let vote = self.create_accumulate_at_src_vote(itinerary.dst, variant, None)?;
let vote = self.create_accumulate_at_src_vote(itinerary.dst, variant, proof_chain);
let recipients = delivery_group::signature_targets(
&itinerary.dst,
self.section.elders_info().peers().copied(),
);
return self.send_vote(&recipients, vote);
} else {
Message::single_src(&self.node, itinerary.dst, variant, None, None)?
Message::single_src(&self.node, itinerary.dst, variant, Some(proof_chain), None)?
};
let mut commands = vec![];

Expand All @@ -2137,10 +2104,10 @@ impl Approved {
src: XorName,
dst: DstLocation,
variant: Variant,
proof_chain_first_key: Option<&bls::PublicKey>,
additional_proof_chain_key: Option<&bls::PublicKey>,
recipients: &[Peer],
) -> Result<Vec<Command>> {
let proof_chain = self.create_proof_chain(&dst, proof_chain_first_key)?;
let proof_chain = self.create_proof_chain(&dst, additional_proof_chain_key)?;
let dst_key = if let Some(name) = dst.name() {
*self.section_key_by_name(&name)
} else {
Expand Down Expand Up @@ -2215,9 +2182,8 @@ impl Approved {
&self,
dst: DstLocation,
variant: Variant,
proof_chain_first_key: Option<&bls::PublicKey>,
) -> Result<Vote> {
let proof_chain = self.create_proof_chain(&dst, proof_chain_first_key)?;
proof_chain: SectionChain,
) -> Vote {
let dst_key = if let Some(name) = dst.name() {
*self.section_key_by_name(&name)
} else {
Expand All @@ -2241,7 +2207,7 @@ impl Approved {

trace!("Create {:?}", vote);

Ok(vote)
vote
}

fn create_proof_chain(
Expand Down
8 changes: 7 additions & 1 deletion src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub(crate) enum Command {
SendUserMessage {
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<bls::PublicKey>,
},
/// Schedule a timeout after the given duration. When the timeout expires, a `HandleTimeout`
/// command is raised. The token is used to identify the timeout.
Expand Down Expand Up @@ -150,10 +151,15 @@ impl Debug for Command {
.field("delivery_group_size", delivery_group_size)
.field("message", message)
.finish(),
Self::SendUserMessage { itinerary, content } => f
Self::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
} => f
.debug_struct("SendUserMessage")
.field("itinerary", itinerary)
.field("content", &format_args!("{:10}", HexFmt(content)))
.field("additional_proof_chain_key", additional_proof_chain_key)
.finish(),
Self::ScheduleTimeout { duration, token } => f
.debug_struct("ScheduleTimeout")
Expand Down
17 changes: 15 additions & 2 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,16 @@ impl Routing {
/// Send a message.
/// Messages sent here, either section to section or node to node are signed
/// and validated upon receipt by routing itself.
pub async fn send_message(&self, itinerary: Itinerary, content: Bytes) -> Result<()> {
///
/// `additional_proof_chain_key` is a key to be included in the proof chain attached to the
/// message. This is useful when the message contains some data that is signed with a different
/// key than the whole message is so that the recipient can verify such key.
pub async fn send_message(
&self,
itinerary: Itinerary,
content: Bytes,
additional_proof_chain_key: Option<bls::PublicKey>,
) -> Result<()> {
if let DstLocation::EndUser(EndUser::Client {
socket_id,
public_key,
Expand All @@ -330,7 +339,11 @@ impl Routing {
debug!("Sending user message instead.. (Command::SendUserMessage)");
}
}
let command = Command::SendUserMessage { itinerary, content };
let command = Command::SendUserMessage {
itinerary,
content,
additional_proof_chain_key,
};
self.stage.clone().handle_commands(command).await
}

Expand Down
Loading

0 comments on commit bb6a73a

Please sign in to comment.