Skip to content

Commit

Permalink
[Network] Add application send time to message.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 13, 2024
1 parent d7c3996 commit cdf7bfd
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 68 deletions.
11 changes: 7 additions & 4 deletions consensus/src/consensus_observer/network/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,23 +802,26 @@ mod test {
(outbound_rpc_request.protocol_id, received_message)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
let (protocol_id, data) = message.into_parts();

// Verify the message is correct
assert!(!is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(message.protocol_id), expected_direct_send_protocol);
assert_eq!(Some(protocol_id), expected_direct_send_protocol);

// Create and return the received message
let received_message = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
protocol_id: message.protocol_id,
protocol_id,
priority: 0,
raw_msg: message.mdata.into(),
raw_msg: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};
(message.protocol_id, received_message)
(protocol_id, received_message)
}
};

Expand Down
8 changes: 4 additions & 4 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ impl NetworkPlayground {
if !self.is_message_dropped(&src_twin_id, dst_twin_id, consensus_msg) {
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.clone().into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
receive_timestamp_micros: 0,
Expand Down Expand Up @@ -419,9 +419,9 @@ impl NetworkPlayground {
for dst_twin_id in dst_twin_ids.iter() {
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.clone().into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
receive_timestamp_micros: 0,
Expand Down
14 changes: 7 additions & 7 deletions mempool/src/tests/multi_node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use aptos_network::{
peer_manager::PeerManagerRequest,
protocols::{
direct_send::Message,
network::ReceivedMessage,
network::{ReceivedMessage, SerializedRequest},
wire::messaging::v1::{DirectSendMsg, NetworkMessage},
},
ProtocolId,
Expand Down Expand Up @@ -344,9 +344,9 @@ impl TestHarness {
let receiver = self.mut_node(&receiver_id);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
Expand Down Expand Up @@ -400,7 +400,7 @@ impl TestHarness {
// Handle outgoing message
match network_req {
PeerManagerRequest::SendDirectSend(remote_peer_id, msg) => {
let mempool_message = common::decompress_and_deserialize(&msg.mdata.to_vec());
let mempool_message = common::decompress_and_deserialize(&msg.data().to_vec());
match mempool_message {
MempoolSyncMsg::BroadcastTransactionsRequest {
transactions,
Expand Down Expand Up @@ -456,7 +456,7 @@ impl TestHarness {

match network_req {
PeerManagerRequest::SendDirectSend(remote_peer_id, msg) => {
let mempool_message = common::decompress_and_deserialize(&msg.mdata.to_vec());
let mempool_message = common::decompress_and_deserialize(&msg.data().to_vec());
match mempool_message {
MempoolSyncMsg::BroadcastTransactionsResponse { .. } => {
// send it to peer
Expand All @@ -477,9 +477,9 @@ impl TestHarness {
let receiver = self.mut_node(&receiver_id);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
Expand Down
5 changes: 3 additions & 2 deletions mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use aptos_network::{
protocols::{
network::{
NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender, ReceivedMessage,
SerializedRequest,
},
wire::{
handshake::v1::ProtocolId::MempoolDirectSend,
Expand Down Expand Up @@ -314,7 +315,7 @@ impl MempoolNode {
match self.get_outbound_handle(network_id).next().await.unwrap() {
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
assert_eq!(peer_id, remote_peer_id);
msg.protocol_id.from_bytes(&msg.mdata).unwrap()
msg.protocol_id().from_bytes(msg.data()).unwrap()
},
_ => panic!("Should not be getting an RPC response"),
}
Expand Down Expand Up @@ -379,7 +380,7 @@ impl MempoolNode {
(peer_id, msg.protocol_id, msg.data, Some(msg.res_tx))
},
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
(peer_id, msg.protocol_id, msg.mdata, None)
(peer_id, msg.protocol_id(), msg.data().clone(), None)
},
};
assert_eq!(peer_id, expected_peer_id);
Expand Down
11 changes: 7 additions & 4 deletions network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1224,23 +1224,26 @@ async fn wait_for_network_event(
(outbound_rpc_request.protocol_id, rmsg)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
let (protocol_id, data) = message.into_parts();

// Verify the request is correct
assert!(!is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(message.protocol_id), expected_direct_send_protocol_id);
assert_eq!(Some(protocol_id), expected_direct_send_protocol_id);

// Create and return the peer manager notification
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
protocol_id: message.protocol_id,
protocol_id,
priority: 0,
raw_msg: message.mdata.into(),
raw_msg: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};
(message.protocol_id, rmsg)
(protocol_id, rmsg)
}
};

Expand Down
9 changes: 6 additions & 3 deletions network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,15 +614,18 @@ where
// To send an outbound DirectSendMsg, we just bump some counters and
// push it onto our outbound writer queue.
PeerRequest::SendDirectSend(message) => {
// Unpack the message
let (protocol_id, data) = message.into_parts();

// Create the direct send message
let message_len = message.mdata.len();
let protocol_id = message.protocol_id;
let message_len = data.len();
let message = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: Priority::default(),
raw_msg: Vec::from(message.mdata.as_ref()),
raw_msg: Vec::from(data.as_ref()),
});

// Send the message to the outbound writer queue
match write_reqs_tx.push((), message) {
Ok(_) => {
self.update_outbound_direct_send_metrics(protocol_id, message_len as u64);
Expand Down
43 changes: 17 additions & 26 deletions network/framework/src/peer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
peer_manager::TransportNotification,
protocols::{
direct_send::Message,
network::ReceivedMessage,
network::{ReceivedMessage, SerializedRequest},
rpc::{error::RpcError, OutboundRpcRequest},
wire::{
handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
Expand Down Expand Up @@ -177,7 +177,7 @@ struct PeerHandle(aptos_channel::Sender<ProtocolId, PeerRequest>);
impl PeerHandle {
fn send_direct_send(&mut self, message: Message) {
self.0
.push(message.protocol_id, PeerRequest::SendDirectSend(message))
.push(message.protocol_id(), PeerRequest::SendDirectSend(message))
.unwrap()
}

Expand Down Expand Up @@ -214,10 +214,7 @@ fn peer_send_message() {
);
let (mut client_sink, mut client_stream) = build_network_sink_stream(&mut connection);

let send_msg = Message {
protocol_id: PROTOCOL,
mdata: Bytes::from("hello world"),
};
let send_msg = Message::new(PROTOCOL, Bytes::from("hello world"));
let recv_msg = MultiplexMessage::Message(NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
Expand Down Expand Up @@ -328,14 +325,8 @@ fn peers_send_message_concurrent() {
let remote_peer_id_b = peer_b.remote_peer_id();

let test = async move {
let msg_a = Message {
protocol_id: PROTOCOL,
mdata: Bytes::from("hello world"),
};
let msg_b = Message {
protocol_id: PROTOCOL,
mdata: Bytes::from("namaste"),
};
let msg_a = Message::new(PROTOCOL, Bytes::from("hello world"));
let msg_b = Message::new(PROTOCOL, Bytes::from("namaste"));

// Peer A -> msg_a -> Peer B
peer_handle_a.send_direct_send(msg_a.clone());
Expand All @@ -350,15 +341,15 @@ fn peers_send_message_concurrent() {
NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: msg_b.mdata.into(),
raw_msg: msg_b.data().clone().into(),
})
);
assert_eq!(
notif_b.unwrap().message,
NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: msg_a.mdata.into(),
raw_msg: msg_a.data().clone().into(),
})
);

Expand Down Expand Up @@ -979,14 +970,14 @@ fn peers_send_multiplex() {
let remote_peer_id_b = peer_b.remote_peer_id();

let test = async move {
let msg_a = Message {
protocol_id: PROTOCOL,
mdata: Bytes::from(vec![0; MAX_MESSAGE_SIZE]), // stream message
};
let msg_b = Message {
protocol_id: PROTOCOL,
mdata: Bytes::from(vec![1; 1024]), // normal message
};
let msg_a = Message::new(
PROTOCOL,
Bytes::from(vec![0; MAX_MESSAGE_SIZE]), // stream message
);
let msg_b = Message::new(
PROTOCOL,
Bytes::from(vec![1; 1024]), // normal message
);

// Peer A -> msg_a -> Peer B
peer_handle_a.send_direct_send(msg_a.clone());
Expand All @@ -1001,15 +992,15 @@ fn peers_send_multiplex() {
NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: msg_b.mdata.into(),
raw_msg: msg_b.data().clone().into(),
})
);
assert_eq!(
notif_b.unwrap().message,
NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: PROTOCOL,
priority: 0,
raw_msg: msg_a.mdata.into(),
raw_msg: msg_a.data().clone().into(),
})
);

Expand Down
7 changes: 4 additions & 3 deletions network/framework/src/peer_manager/senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ impl PeerManagerRequestSender {
protocol_id: ProtocolId,
mdata: Bytes,
) -> Result<(), PeerManagerError> {
let message = Message::new(protocol_id, mdata);
self.inner.push(
(peer_id, protocol_id),
PeerManagerRequest::SendDirectSend(peer_id, Message { protocol_id, mdata }),
PeerManagerRequest::SendDirectSend(peer_id, message),
)?;
Ok(())
}
Expand All @@ -72,15 +73,15 @@ impl PeerManagerRequestSender {
protocol_id: ProtocolId,
mdata: Bytes,
) -> Result<(), PeerManagerError> {
let msg = Message { protocol_id, mdata };
let message = Message::new(protocol_id, mdata);
for recipient in recipients {
// We return `Err` early here if the send fails. Since sending will
// only fail if the queue is unexpectedly shutdown (i.e., receiver
// dropped early), we know that we can't make further progress if
// this send fails.
self.inner.push(
(recipient, protocol_id),
PeerManagerRequest::SendDirectSend(recipient, msg.clone()),
PeerManagerRequest::SendDirectSend(recipient, message.clone()),
)?;
}
Ok(())
Expand Down
41 changes: 32 additions & 9 deletions network/framework/src/protocols/direct_send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,57 @@
use crate::{protocols::network::SerializedRequest, ProtocolId};
use bytes::Bytes;
use serde::Serialize;
use std::fmt::Debug;
use std::{fmt::Debug, time::SystemTime};

#[derive(Clone, Eq, PartialEq, Serialize)]
pub struct Message {
/// The time at which the message was sent by the application
application_send_time: SystemTime,
/// The [`ProtocolId`] for which of our upstream application modules should
/// handle (i.e., deserialize and then respond to) this inbound rpc request.
///
/// For example, if `protocol_id == ProtocolId::ConsensusRpcBcs`, then this
/// inbound rpc request will be dispatched to consensus for handling.
pub protocol_id: ProtocolId,
protocol_id: ProtocolId,
/// The serialized request data received from the sender. At this layer in
/// the stack, the request data is just an opaque blob and will only be fully
/// deserialized later in the handling application module.
#[serde(skip)]
pub mdata: Bytes,
data: Bytes,
}

impl Message {
pub fn new(protocol_id: ProtocolId, data: Bytes) -> Self {
Self {
application_send_time: SystemTime::now(),
protocol_id,
data,
}
}

/// Returns the time at which the message was sent by the application
pub fn application_send_time(&self) -> SystemTime {
self.application_send_time
}

/// Consumes the message and returns the protocol id and data
pub fn into_parts(self) -> (ProtocolId, Bytes) {
(self.protocol_id, self.data)
}
}

impl Debug for Message {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mdata_str = if self.mdata.len() <= 10 {
format!("{:?}", self.mdata)
let mdata_str = if self.data().len() <= 10 {
format!("{:?}", self.data())
} else {
format!("{:?}...", self.mdata.slice(..10))
format!("{:?}...", self.data().slice(..10))
};
write!(
f,
"Message {{ protocol: {:?}, mdata: {} }}",
self.protocol_id, mdata_str
"Message {{ protocol: {:?}, data: {} }}",
self.protocol_id(),
mdata_str
)
}
}
Expand All @@ -43,6 +66,6 @@ impl SerializedRequest for Message {
}

fn data(&self) -> &Bytes {
&self.mdata
&self.data
}
}
Loading

0 comments on commit cdf7bfd

Please sign in to comment.