From 983effe6d7c2ef8cdc22bb3bebc3116e0c8fd3bd Mon Sep 17 00:00:00 2001 From: Josh Lind Date: Sun, 10 Nov 2024 09:59:31 -0500 Subject: [PATCH] [Network] Add application send time to outbound RPC request. --- .../network/network_handler.rs | 15 +++-- .../observer/subscription_utils.rs | 3 +- consensus/src/network_tests.rs | 9 +-- mempool/src/tests/test_framework.rs | 3 +- network/framework/src/application/tests.rs | 15 +++-- network/framework/src/peer/mod.rs | 4 +- network/framework/src/peer/test.rs | 29 +++++----- network/framework/src/peer_manager/senders.rs | 7 +-- .../src/protocols/health_checker/test.rs | 5 +- network/framework/src/protocols/rpc/mod.rs | 55 +++++++++++++++---- network/framework/src/testutils/test_node.rs | 29 +++++----- .../client/src/tests/mock.rs | 8 +-- .../aptos-data-client/src/tests/mock.rs | 9 ++- 13 files changed, 111 insertions(+), 80 deletions(-) diff --git a/consensus/src/consensus_observer/network/network_handler.rs b/consensus/src/consensus_observer/network/network_handler.rs index 43acca35856ab..d713a74421e1a 100644 --- a/consensus/src/consensus_observer/network/network_handler.rs +++ b/consensus/src/consensus_observer/network/network_handler.rs @@ -781,25 +781,28 @@ mod test { Ok(peer_manager_request) => { let (protocol_id, peer_manager_notification) = match peer_manager_request { PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => { + // Unpack the request + let (protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts(); + // Verify the message is correct assert!(is_rpc_request); assert_eq!(peer_id, expected_peer_id); - assert_eq!(Some(outbound_rpc_request.protocol_id), expected_rpc_protocol); - assert_eq!(outbound_rpc_request.timeout, Duration::from_millis(RPC_REQUEST_TIMEOUT_MS)); + assert_eq!(Some(protocol_id), expected_rpc_protocol); + assert_eq!(timeout, Duration::from_millis(RPC_REQUEST_TIMEOUT_MS)); // Create and return the received message let received_message = ReceivedMessage { message: NetworkMessage::RpcRequest(RpcRequest{ - protocol_id: outbound_rpc_request.protocol_id, + protocol_id, request_id: 0, priority: 0, - raw_request: outbound_rpc_request.data.into(), + raw_request: data.into(), }), sender: PeerNetworkId::new(expected_network_id, peer_id), receive_timestamp_micros: 0, - rpc_replier: Some(Arc::new(outbound_rpc_request.res_tx)), + rpc_replier: Some(Arc::new(res_tx)), }; - (outbound_rpc_request.protocol_id, received_message) + (protocol_id, received_message) } PeerManagerRequest::SendDirectSend(peer_id, message) => { // Unpack the message diff --git a/consensus/src/consensus_observer/observer/subscription_utils.rs b/consensus/src/consensus_observer/observer/subscription_utils.rs index 13cf6a24dc372..d5262f387823f 100644 --- a/consensus/src/consensus_observer/observer/subscription_utils.rs +++ b/consensus/src/consensus_observer/observer/subscription_utils.rs @@ -1233,8 +1233,7 @@ mod tests { match peer_manager_request_receiver.next().await { Some(PeerManagerRequest::SendRpc(_, network_request)) => { // Parse the network request - let data = network_request.data; - let response_sender = network_request.res_tx; + let (_, data, response_sender, _) = network_request.into_parts(); let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap(); // Process the network message diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index 5d2acf73d7334..7d8901515e22f 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -155,11 +155,12 @@ impl NetworkPlayground { None => continue, // drop rpc }; + let (protocol_id, data, res_tx, _) = outbound_req.into_parts(); if timeout_config .read() .is_message_timedout(&src_twin_id, dst_twin_id) { - outbound_req.res_tx.send(Err(RpcError::TimedOut)).unwrap(); + res_tx.send(Err(RpcError::TimedOut)).unwrap(); continue; } @@ -171,17 +172,17 @@ impl NetworkPlayground { (src_twin_id.author, ProtocolId::ConsensusRpcBcs), ReceivedMessage { message: NetworkMessage::RpcRequest(RpcRequest { - protocol_id: outbound_req.protocol_id, + protocol_id, request_id: 123, priority: 0, - raw_request: outbound_req.data.into(), + raw_request: data.into(), }), sender: PeerNetworkId::new( NetworkId::Validator, src_twin_id.author, ), receive_timestamp_micros: 0, - rpc_replier: Some(Arc::new(outbound_req.res_tx)), + rpc_replier: Some(Arc::new(res_tx)), }, ) .unwrap(); diff --git a/mempool/src/tests/test_framework.rs b/mempool/src/tests/test_framework.rs index c3e8fd6203526..ee8ee07f3d507 100644 --- a/mempool/src/tests/test_framework.rs +++ b/mempool/src/tests/test_framework.rs @@ -377,7 +377,8 @@ impl MempoolNode { let message = self.get_next_network_msg(network_id).await; let (peer_id, protocol_id, data, maybe_rpc_sender) = match message { PeerManagerRequest::SendRpc(peer_id, msg) => { - (peer_id, msg.protocol_id, msg.data, Some(msg.res_tx)) + let (protocol_id, data, res_tx, _) = msg.into_parts(); + (peer_id, protocol_id, data, Some(res_tx)) }, PeerManagerRequest::SendDirectSend(peer_id, msg) => { (peer_id, msg.protocol_id(), msg.data().clone(), None) diff --git a/network/framework/src/application/tests.rs b/network/framework/src/application/tests.rs index c9ae49d34c0ca..383ba45094a37 100644 --- a/network/framework/src/application/tests.rs +++ b/network/framework/src/application/tests.rs @@ -1203,25 +1203,28 @@ async fn wait_for_network_event( Ok(peer_manager_request) => { let (protocol_id, peer_manager_notification) = match peer_manager_request { PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => { + // Unpack the request + let (protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts(); + // Verify the request is correct assert!(is_rpc_request); assert_eq!(peer_id, expected_peer_id); - assert_eq!(Some(outbound_rpc_request.protocol_id), expected_rpc_protocol_id); - assert_eq!(outbound_rpc_request.timeout, message_wait_time); + assert_eq!(Some(protocol_id), expected_rpc_protocol_id); + assert_eq!(timeout, message_wait_time); // Create and return the peer manager notification let rmsg = ReceivedMessage { message: NetworkMessage::RpcRequest(RpcRequest{ - protocol_id: outbound_rpc_request.protocol_id, + protocol_id, request_id: 0, priority: 0, - raw_request: outbound_rpc_request.data.into(), + raw_request: data.into(), }), sender: PeerNetworkId::new(expected_network_id, peer_id), receive_timestamp_micros: 0, - rpc_replier: Some(Arc::new(outbound_rpc_request.res_tx)), + rpc_replier: Some(Arc::new(res_tx)), }; - (outbound_rpc_request.protocol_id, rmsg) + (protocol_id, rmsg) } PeerManagerRequest::SendDirectSend(peer_id, message) => { // Unpack the message diff --git a/network/framework/src/peer/mod.rs b/network/framework/src/peer/mod.rs index 757ba55a8a2b1..5168c416056b1 100644 --- a/network/framework/src/peer/mod.rs +++ b/network/framework/src/peer/mod.rs @@ -24,7 +24,7 @@ use crate::{ peer_manager::{PeerManagerError, TransportNotification}, protocols::{ direct_send::Message, - network::ReceivedMessage, + network::{ReceivedMessage, SerializedRequest}, rpc::{error::RpcError, InboundRpcs, OutboundRpcRequest, OutboundRpcs}, stream::{InboundStreamBuffer, OutboundStream, StreamMessage}, wire::messaging::v1::{ @@ -645,7 +645,7 @@ where } }, PeerRequest::SendRpc(request) => { - let protocol_id = request.protocol_id; + let protocol_id = request.protocol_id(); if let Err(e) = self .outbound_rpcs .handle_outbound_request(request, write_reqs_tx) diff --git a/network/framework/src/peer/test.rs b/network/framework/src/peer/test.rs index 9f8877c348f3d..ba761df00ed1a 100644 --- a/network/framework/src/peer/test.rs +++ b/network/framework/src/peer/test.rs @@ -188,12 +188,7 @@ impl PeerHandle { timeout: Duration, ) -> Result { let (res_tx, res_rx) = oneshot::channel(); - let request = OutboundRpcRequest { - protocol_id, - data, - res_tx, - timeout, - }; + let request = OutboundRpcRequest::new(protocol_id, data, res_tx, timeout); self.0.push(protocol_id, PeerRequest::SendRpc(request))?; let response_data = res_rx.await??; Ok(response_data) @@ -769,12 +764,13 @@ fn peer_send_rpc_cancel() { let test = async move { // Client sends rpc request. let (response_tx, mut response_rx) = oneshot::channel(); - let request = PeerRequest::SendRpc(OutboundRpcRequest { - protocol_id: PROTOCOL, - data: Bytes::from(&b"hello world"[..]), - res_tx: response_tx, + let outbound_rpc_request = OutboundRpcRequest::new( + PROTOCOL, + Bytes::from(&b"hello world"[..]), + response_tx, timeout, - }); + ); + let request = PeerRequest::SendRpc(outbound_rpc_request); peer_handle.0.push(PROTOCOL, request).unwrap(); // Server receives the rpc request from client. @@ -831,12 +827,13 @@ fn peer_send_rpc_timeout() { let test = async move { // Client sends rpc request. let (response_tx, mut response_rx) = oneshot::channel(); - let request = PeerRequest::SendRpc(OutboundRpcRequest { - protocol_id: PROTOCOL, - data: Bytes::from(&b"hello world"[..]), - res_tx: response_tx, + let outbound_rpc_request = OutboundRpcRequest::new( + PROTOCOL, + Bytes::from(&b"hello world"[..]), + response_tx, timeout, - }); + ); + let request = PeerRequest::SendRpc(outbound_rpc_request); peer_handle.0.push(PROTOCOL, request).unwrap(); // Server receives the rpc request from client. diff --git a/network/framework/src/peer_manager/senders.rs b/network/framework/src/peer_manager/senders.rs index b45ab3149ad55..d619bd810d870 100644 --- a/network/framework/src/peer_manager/senders.rs +++ b/network/framework/src/peer_manager/senders.rs @@ -96,12 +96,7 @@ impl PeerManagerRequestSender { timeout: Duration, ) -> Result { let (res_tx, res_rx) = oneshot::channel(); - let request = OutboundRpcRequest { - protocol_id, - data: req, - res_tx, - timeout, - }; + let request = OutboundRpcRequest::new(protocol_id, req, res_tx, timeout); self.inner.push( (peer_id, protocol_id), PeerManagerRequest::SendRpc(peer_id, request), diff --git a/network/framework/src/protocols/health_checker/test.rs b/network/framework/src/protocols/health_checker/test.rs index ed029c4dcc915..7a1bc26474283 100644 --- a/network/framework/src/protocols/health_checker/test.rs +++ b/network/framework/src/protocols/health_checker/test.rs @@ -105,10 +105,7 @@ impl TestHarness { _ => panic!("Unexpected PeerManagerRequest: {:?}", req), }; - let protocol_id = rpc_req.protocol_id; - let req_data = rpc_req.data; - let res_tx = rpc_req.res_tx; - + let (protocol_id, req_data, res_tx, _) = rpc_req.into_parts(); assert_eq!(protocol_id, ProtocolId::HealthCheckerRpc); match bcs::from_bytes(&req_data).unwrap() { diff --git a/network/framework/src/protocols/rpc/mod.rs b/network/framework/src/protocols/rpc/mod.rs index 2be2a22a5f667..b94512ab599b3 100644 --- a/network/framework/src/protocols/rpc/mod.rs +++ b/network/framework/src/protocols/rpc/mod.rs @@ -73,7 +73,13 @@ use futures::{ stream::{FuturesUnordered, StreamExt}, }; use serde::Serialize; -use std::{cmp::PartialEq, collections::HashMap, fmt::Debug, sync::Arc, time::Duration}; +use std::{ + cmp::PartialEq, + collections::HashMap, + fmt::Debug, + sync::Arc, + time::{Duration, SystemTime}, +}; pub mod error; @@ -118,28 +124,59 @@ impl SerializedRequest for InboundRpcRequest { /// A wrapper struct for an outbound rpc request and its associated context. #[derive(Debug, Serialize)] pub struct OutboundRpcRequest { + /// The time at which the request was sent by the application + application_send_time: SystemTime, /// The remote peer's application module that should handle our outbound rpc /// request. /// /// For example, if `protocol_id == ProtocolId::ConsensusRpcBcs`, then this /// outbound rpc request should be handled by the remote peer's consensus /// application module. - pub protocol_id: ProtocolId, + protocol_id: ProtocolId, /// The serialized request data to be sent to the receiver. At this layer in /// the stack, the request data is just an opaque blob. #[serde(skip)] - pub data: Bytes, + data: Bytes, /// Channel over which the rpc response is sent from the rpc layer to the /// upper client layer. /// /// If there is an error while performing the rpc protocol, e.g., the remote /// peer drops the connection, we will send an [`RpcError`] over the channel. #[serde(skip)] - pub res_tx: oneshot::Sender>, + res_tx: oneshot::Sender>, /// The timeout duration for the entire rpc call. If the timeout elapses, the /// rpc layer will send an [`RpcError::TimedOut`] error over the /// `res_tx` channel to the upper client layer. - pub timeout: Duration, + timeout: Duration, +} + +impl OutboundRpcRequest { + pub fn new( + protocol_id: ProtocolId, + data: Bytes, + res_tx: oneshot::Sender>, + timeout: Duration, + ) -> Self { + Self { + application_send_time: SystemTime::now(), + protocol_id, + data, + res_tx, + timeout, + } + } + + /// Consumes the request and returns the protocol id, data, channel, and timeout + pub fn into_parts( + self, + ) -> ( + ProtocolId, + Bytes, + oneshot::Sender>, + Duration, + ) { + (self.protocol_id, self.data, self.res_tx, self.timeout) + } } impl SerializedRequest for OutboundRpcRequest { @@ -440,12 +477,8 @@ impl OutboundRpcs { let peer_id = &self.remote_peer_id; // Unpack request. - let OutboundRpcRequest { - protocol_id, - data: request_data, - timeout, - res_tx: mut application_response_tx, - } = request; + let (protocol_id, request_data, mut application_response_tx, timeout) = + request.into_parts(); let req_len = request_data.len() as u64; // Drop the outbound request if the application layer has already canceled. diff --git a/network/framework/src/testutils/test_node.rs b/network/framework/src/testutils/test_node.rs index 10bf5b6caff06..9512cad70882d 100644 --- a/network/framework/src/testutils/test_node.rs +++ b/network/framework/src/testutils/test_node.rs @@ -7,7 +7,6 @@ use crate::{ peer_manager::PeerManagerRequest, protocols::{ network::{ReceivedMessage, SerializedRequest}, - rpc::OutboundRpcRequest, wire::messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest}, }, transport::ConnectionMetadata, @@ -284,17 +283,13 @@ pub trait TestNode: ApplicationNode + Sync { ) -> (PeerId, ProtocolId, bytes::Bytes) { let message = self.get_next_network_msg(network_id).await; match message { - PeerManagerRequest::SendRpc( - peer_id, - OutboundRpcRequest { - protocol_id, - res_tx, - data, - .. - }, - ) => { - // Forcefully close the oneshot channel, otherwise listening task will hang forever. + PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => { + // Unpack the request + let (protocol_id, data, res_tx, _) = outbound_rpc_request.into_parts(); + + // Forcefully close the oneshot channel, otherwise listening task will hang forever drop(res_tx); + (peer_id, protocol_id, data) }, PeerManagerRequest::SendDirectSend(peer_id, message) => { @@ -309,18 +304,22 @@ pub trait TestNode: ApplicationNode + Sync { let (remote_peer_id, protocol_id, rmsg) = match request { PeerManagerRequest::SendRpc(peer_id, msg) => { + // Unpack the request + let (protocol_id, data, res_tx, _) = msg.into_parts(); + + // Create the received message let rmsg = ReceivedMessage { message: NetworkMessage::RpcRequest(RpcRequest { - protocol_id: msg.protocol_id, + protocol_id, request_id: 0, priority: 0, - raw_request: msg.data().clone().into(), + raw_request: data.into(), }), sender: self.peer_network_id(network_id), receive_timestamp_micros: 0, - rpc_replier: Some(Arc::new(msg.res_tx)), + rpc_replier: Some(Arc::new(res_tx)), }; - (peer_id, msg.protocol_id, rmsg) + (peer_id, protocol_id, rmsg) }, PeerManagerRequest::SendDirectSend(peer_id, msg) => { let rmsg = ReceivedMessage { diff --git a/peer-monitoring-service/client/src/tests/mock.rs b/peer-monitoring-service/client/src/tests/mock.rs index f931db06fdd6c..a6872761e62b7 100644 --- a/peer-monitoring-service/client/src/tests/mock.rs +++ b/peer-monitoring-service/client/src/tests/mock.rs @@ -132,11 +132,11 @@ impl MockMonitoringServer { // Wait for the next request match peer_manager_request_receiver.next().await { Some(PeerManagerRequest::SendRpc(peer_id, network_request)) => { - // Deconstruct the network request + // Unpack the network request + let (protocol_id, request_data, response_sender, _) = network_request.into_parts(); + + // Identify the peer network ID let peer_network_id = PeerNetworkId::new(*network_id, peer_id); - let protocol_id = network_request.protocol_id; - let request_data = network_request.data; - let response_sender = network_request.res_tx; // Deserialize the network message let peer_monitoring_message: PeerMonitoringServiceMessage = diff --git a/state-sync/aptos-data-client/src/tests/mock.rs b/state-sync/aptos-data-client/src/tests/mock.rs index b9fde73ad55f9..51ae95ea98226 100644 --- a/state-sync/aptos-data-client/src/tests/mock.rs +++ b/state-sync/aptos-data-client/src/tests/mock.rs @@ -233,11 +233,13 @@ impl MockNetwork { let peer_mgr_reqs_rx = self.peer_mgr_reqs_rxs.get_mut(&network_id).unwrap(); match peer_mgr_reqs_rx.next().await { Some(PeerManagerRequest::SendRpc(peer_id, network_request)) => { + // Unpack the network request + let (protocol_id, data, res_tx, _) = network_request.into_parts(); + + // Create the peer network ID let peer_network_id = PeerNetworkId::new(network_id, peer_id); - let protocol_id = network_request.protocol_id; - let data = network_request.data; - let res_tx = network_request.res_tx; + // Deserialize the network message let message: StorageServiceMessage = bcs::from_bytes(data.as_ref()).unwrap(); let storage_service_request = match message { StorageServiceMessage::Request(request) => request, @@ -245,6 +247,7 @@ impl MockNetwork { }; let response_sender = ResponseSender::new(res_tx); + // Return the network request Some(NetworkRequest { peer_network_id, protocol_id,