Skip to content

Commit

Permalink
[Network] Add application send time to outbound RPC request.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 13, 2024
1 parent cdf7bfd commit 983effe
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 80 deletions.
15 changes: 9 additions & 6 deletions consensus/src/consensus_observer/network/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,25 +781,28 @@ mod test {
Ok(peer_manager_request) => {
let (protocol_id, peer_manager_notification) = match peer_manager_request {
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();

// Verify the message is correct
assert!(is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(outbound_rpc_request.protocol_id), expected_rpc_protocol);
assert_eq!(outbound_rpc_request.timeout, Duration::from_millis(RPC_REQUEST_TIMEOUT_MS));
assert_eq!(Some(protocol_id), expected_rpc_protocol);
assert_eq!(timeout, Duration::from_millis(RPC_REQUEST_TIMEOUT_MS));

// Create and return the received message
let received_message = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
protocol_id: outbound_rpc_request.protocol_id,
protocol_id,
request_id: 0,
priority: 0,
raw_request: outbound_rpc_request.data.into(),
raw_request: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_rpc_request.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
};
(outbound_rpc_request.protocol_id, received_message)
(protocol_id, received_message)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,7 @@ mod tests {
match peer_manager_request_receiver.next().await {
Some(PeerManagerRequest::SendRpc(_, network_request)) => {
// Parse the network request
let data = network_request.data;
let response_sender = network_request.res_tx;
let (_, data, response_sender, _) = network_request.into_parts();
let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap();

// Process the network message
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ impl NetworkPlayground {
None => continue, // drop rpc
};

let (protocol_id, data, res_tx, _) = outbound_req.into_parts();
if timeout_config
.read()
.is_message_timedout(&src_twin_id, dst_twin_id)
{
outbound_req.res_tx.send(Err(RpcError::TimedOut)).unwrap();
res_tx.send(Err(RpcError::TimedOut)).unwrap();
continue;
}

Expand All @@ -171,17 +172,17 @@ impl NetworkPlayground {
(src_twin_id.author, ProtocolId::ConsensusRpcBcs),
ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id: outbound_req.protocol_id,
protocol_id,
request_id: 123,
priority: 0,
raw_request: outbound_req.data.into(),
raw_request: data.into(),
}),
sender: PeerNetworkId::new(
NetworkId::Validator,
src_twin_id.author,
),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_req.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
},
)
.unwrap();
Expand Down
3 changes: 2 additions & 1 deletion mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ impl MempoolNode {
let message = self.get_next_network_msg(network_id).await;
let (peer_id, protocol_id, data, maybe_rpc_sender) = match message {
PeerManagerRequest::SendRpc(peer_id, msg) => {
(peer_id, msg.protocol_id, msg.data, Some(msg.res_tx))
let (protocol_id, data, res_tx, _) = msg.into_parts();
(peer_id, protocol_id, data, Some(res_tx))
},
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
(peer_id, msg.protocol_id(), msg.data().clone(), None)
Expand Down
15 changes: 9 additions & 6 deletions network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,25 +1203,28 @@ async fn wait_for_network_event(
Ok(peer_manager_request) => {
let (protocol_id, peer_manager_notification) = match peer_manager_request {
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();

// Verify the request is correct
assert!(is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(outbound_rpc_request.protocol_id), expected_rpc_protocol_id);
assert_eq!(outbound_rpc_request.timeout, message_wait_time);
assert_eq!(Some(protocol_id), expected_rpc_protocol_id);
assert_eq!(timeout, message_wait_time);

// Create and return the peer manager notification
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
protocol_id: outbound_rpc_request.protocol_id,
protocol_id,
request_id: 0,
priority: 0,
raw_request: outbound_rpc_request.data.into(),
raw_request: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_rpc_request.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
};
(outbound_rpc_request.protocol_id, rmsg)
(protocol_id, rmsg)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
Expand Down
4 changes: 2 additions & 2 deletions network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
peer_manager::{PeerManagerError, TransportNotification},
protocols::{
direct_send::Message,
network::ReceivedMessage,
network::{ReceivedMessage, SerializedRequest},
rpc::{error::RpcError, InboundRpcs, OutboundRpcRequest, OutboundRpcs},
stream::{InboundStreamBuffer, OutboundStream, StreamMessage},
wire::messaging::v1::{
Expand Down Expand Up @@ -645,7 +645,7 @@ where
}
},
PeerRequest::SendRpc(request) => {
let protocol_id = request.protocol_id;
let protocol_id = request.protocol_id();
if let Err(e) = self
.outbound_rpcs
.handle_outbound_request(request, write_reqs_tx)
Expand Down
29 changes: 13 additions & 16 deletions network/framework/src/peer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,7 @@ impl PeerHandle {
timeout: Duration,
) -> Result<Bytes, RpcError> {
let (res_tx, res_rx) = oneshot::channel();
let request = OutboundRpcRequest {
protocol_id,
data,
res_tx,
timeout,
};
let request = OutboundRpcRequest::new(protocol_id, data, res_tx, timeout);
self.0.push(protocol_id, PeerRequest::SendRpc(request))?;
let response_data = res_rx.await??;
Ok(response_data)
Expand Down Expand Up @@ -769,12 +764,13 @@ fn peer_send_rpc_cancel() {
let test = async move {
// Client sends rpc request.
let (response_tx, mut response_rx) = oneshot::channel();
let request = PeerRequest::SendRpc(OutboundRpcRequest {
protocol_id: PROTOCOL,
data: Bytes::from(&b"hello world"[..]),
res_tx: response_tx,
let outbound_rpc_request = OutboundRpcRequest::new(
PROTOCOL,
Bytes::from(&b"hello world"[..]),
response_tx,
timeout,
});
);
let request = PeerRequest::SendRpc(outbound_rpc_request);
peer_handle.0.push(PROTOCOL, request).unwrap();

// Server receives the rpc request from client.
Expand Down Expand Up @@ -831,12 +827,13 @@ fn peer_send_rpc_timeout() {
let test = async move {
// Client sends rpc request.
let (response_tx, mut response_rx) = oneshot::channel();
let request = PeerRequest::SendRpc(OutboundRpcRequest {
protocol_id: PROTOCOL,
data: Bytes::from(&b"hello world"[..]),
res_tx: response_tx,
let outbound_rpc_request = OutboundRpcRequest::new(
PROTOCOL,
Bytes::from(&b"hello world"[..]),
response_tx,
timeout,
});
);
let request = PeerRequest::SendRpc(outbound_rpc_request);
peer_handle.0.push(PROTOCOL, request).unwrap();

// Server receives the rpc request from client.
Expand Down
7 changes: 1 addition & 6 deletions network/framework/src/peer_manager/senders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,7 @@ impl PeerManagerRequestSender {
timeout: Duration,
) -> Result<Bytes, RpcError> {
let (res_tx, res_rx) = oneshot::channel();
let request = OutboundRpcRequest {
protocol_id,
data: req,
res_tx,
timeout,
};
let request = OutboundRpcRequest::new(protocol_id, req, res_tx, timeout);
self.inner.push(
(peer_id, protocol_id),
PeerManagerRequest::SendRpc(peer_id, request),
Expand Down
5 changes: 1 addition & 4 deletions network/framework/src/protocols/health_checker/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ impl TestHarness {
_ => panic!("Unexpected PeerManagerRequest: {:?}", req),
};

let protocol_id = rpc_req.protocol_id;
let req_data = rpc_req.data;
let res_tx = rpc_req.res_tx;

let (protocol_id, req_data, res_tx, _) = rpc_req.into_parts();
assert_eq!(protocol_id, ProtocolId::HealthCheckerRpc);

match bcs::from_bytes(&req_data).unwrap() {
Expand Down
55 changes: 44 additions & 11 deletions network/framework/src/protocols/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ use futures::{
stream::{FuturesUnordered, StreamExt},
};
use serde::Serialize;
use std::{cmp::PartialEq, collections::HashMap, fmt::Debug, sync::Arc, time::Duration};
use std::{
cmp::PartialEq,
collections::HashMap,
fmt::Debug,
sync::Arc,
time::{Duration, SystemTime},
};

pub mod error;

Expand Down Expand Up @@ -118,28 +124,59 @@ impl SerializedRequest for InboundRpcRequest {
/// A wrapper struct for an outbound rpc request and its associated context.
#[derive(Debug, Serialize)]
pub struct OutboundRpcRequest {
/// The time at which the request was sent by the application
application_send_time: SystemTime,
/// The remote peer's application module that should handle our outbound rpc
/// request.
///
/// For example, if `protocol_id == ProtocolId::ConsensusRpcBcs`, then this
/// outbound rpc request should be handled by the remote peer's consensus
/// application module.
pub protocol_id: ProtocolId,
protocol_id: ProtocolId,
/// The serialized request data to be sent to the receiver. At this layer in
/// the stack, the request data is just an opaque blob.
#[serde(skip)]
pub data: Bytes,
data: Bytes,
/// Channel over which the rpc response is sent from the rpc layer to the
/// upper client layer.
///
/// If there is an error while performing the rpc protocol, e.g., the remote
/// peer drops the connection, we will send an [`RpcError`] over the channel.
#[serde(skip)]
pub res_tx: oneshot::Sender<Result<Bytes, RpcError>>,
res_tx: oneshot::Sender<Result<Bytes, RpcError>>,
/// The timeout duration for the entire rpc call. If the timeout elapses, the
/// rpc layer will send an [`RpcError::TimedOut`] error over the
/// `res_tx` channel to the upper client layer.
pub timeout: Duration,
timeout: Duration,
}

impl OutboundRpcRequest {
pub fn new(
protocol_id: ProtocolId,
data: Bytes,
res_tx: oneshot::Sender<Result<Bytes, RpcError>>,
timeout: Duration,
) -> Self {
Self {
application_send_time: SystemTime::now(),
protocol_id,
data,
res_tx,
timeout,
}
}

/// Consumes the request and returns the protocol id, data, channel, and timeout
pub fn into_parts(
self,
) -> (
ProtocolId,
Bytes,
oneshot::Sender<Result<Bytes, RpcError>>,
Duration,
) {
(self.protocol_id, self.data, self.res_tx, self.timeout)
}
}

impl SerializedRequest for OutboundRpcRequest {
Expand Down Expand Up @@ -440,12 +477,8 @@ impl OutboundRpcs {
let peer_id = &self.remote_peer_id;

// Unpack request.
let OutboundRpcRequest {
protocol_id,
data: request_data,
timeout,
res_tx: mut application_response_tx,
} = request;
let (protocol_id, request_data, mut application_response_tx, timeout) =
request.into_parts();
let req_len = request_data.len() as u64;

// Drop the outbound request if the application layer has already canceled.
Expand Down
29 changes: 14 additions & 15 deletions network/framework/src/testutils/test_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
peer_manager::PeerManagerRequest,
protocols::{
network::{ReceivedMessage, SerializedRequest},
rpc::OutboundRpcRequest,
wire::messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest},
},
transport::ConnectionMetadata,
Expand Down Expand Up @@ -284,17 +283,13 @@ pub trait TestNode: ApplicationNode + Sync {
) -> (PeerId, ProtocolId, bytes::Bytes) {
let message = self.get_next_network_msg(network_id).await;
match message {
PeerManagerRequest::SendRpc(
peer_id,
OutboundRpcRequest {
protocol_id,
res_tx,
data,
..
},
) => {
// Forcefully close the oneshot channel, otherwise listening task will hang forever.
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (protocol_id, data, res_tx, _) = outbound_rpc_request.into_parts();

// Forcefully close the oneshot channel, otherwise listening task will hang forever
drop(res_tx);

(peer_id, protocol_id, data)
},
PeerManagerRequest::SendDirectSend(peer_id, message) => {
Expand All @@ -309,18 +304,22 @@ pub trait TestNode: ApplicationNode + Sync {

let (remote_peer_id, protocol_id, rmsg) = match request {
PeerManagerRequest::SendRpc(peer_id, msg) => {
// Unpack the request
let (protocol_id, data, res_tx, _) = msg.into_parts();

// Create the received message
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id: msg.protocol_id,
protocol_id,
request_id: 0,
priority: 0,
raw_request: msg.data().clone().into(),
raw_request: data.into(),
}),
sender: self.peer_network_id(network_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(msg.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
};
(peer_id, msg.protocol_id, rmsg)
(peer_id, protocol_id, rmsg)
},
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
let rmsg = ReceivedMessage {
Expand Down
8 changes: 4 additions & 4 deletions peer-monitoring-service/client/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,11 @@ impl MockMonitoringServer {
// Wait for the next request
match peer_manager_request_receiver.next().await {
Some(PeerManagerRequest::SendRpc(peer_id, network_request)) => {
// Deconstruct the network request
// Unpack the network request
let (protocol_id, request_data, response_sender, _) = network_request.into_parts();

// Identify the peer network ID
let peer_network_id = PeerNetworkId::new(*network_id, peer_id);
let protocol_id = network_request.protocol_id;
let request_data = network_request.data;
let response_sender = network_request.res_tx;

// Deserialize the network message
let peer_monitoring_message: PeerMonitoringServiceMessage =
Expand Down
Loading

0 comments on commit 983effe

Please sign in to comment.