Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Network] Add simple queuing metrics for message sends. #15268

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub const HANDSHAKE_VERSION: u8 = 0;
pub const NETWORK_CHANNEL_SIZE: usize = 1024;
pub const PING_INTERVAL_MS: u64 = 10_000;
pub const PING_TIMEOUT_MS: u64 = 20_000;
pub const PING_FAILURES_TOLERATED: u64 = 3;
pub const PING_FAILURES_TOLERATED: u64 = 10;
pub const CONNECTIVITY_CHECK_INTERVAL_MS: u64 = 5000;
pub const MAX_CONNECTION_DELAY_MS: u64 = 60_000; /* 1 minute */
pub const MAX_FULLNODE_OUTBOUND_CONNECTIONS: usize = 6;
Expand All @@ -47,7 +47,7 @@ pub const MAX_MESSAGE_METADATA_SIZE: usize = 128 * 1024; /* 128 KiB: a buffer fo
pub const MESSAGE_PADDING_SIZE: usize = 2 * 1024 * 1024; /* 2 MiB: a safety buffer to allow messages to get larger during serialization */
pub const MAX_APPLICATION_MESSAGE_SIZE: usize =
(MAX_MESSAGE_SIZE - MAX_MESSAGE_METADATA_SIZE) - MESSAGE_PADDING_SIZE; /* The message size that applications should check against */
pub const MAX_FRAME_SIZE: usize = 4 * 1024 * 1024; /* 4 MiB large messages will be chunked into multiple frames and streamed */
pub const MAX_FRAME_SIZE: usize = 100 * 1024; /* 100KB large messages will be chunked into multiple frames and streamed */
pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */
pub const CONNECTION_BACKOFF_BASE: u64 = 2;
pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */;
Expand Down
26 changes: 16 additions & 10 deletions consensus/src/consensus_observer/network/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,44 +781,50 @@ mod test {
Ok(peer_manager_request) => {
let (protocol_id, peer_manager_notification) = match peer_manager_request {
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (_, protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();

// Verify the message is correct
assert!(is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(outbound_rpc_request.protocol_id), expected_rpc_protocol);
assert_eq!(outbound_rpc_request.timeout, Duration::from_millis(RPC_REQUEST_TIMEOUT_MS));
assert_eq!(Some(protocol_id), expected_rpc_protocol);
assert_eq!(timeout, Duration::from_millis(RPC_REQUEST_TIMEOUT_MS));

// Create and return the received message
let received_message = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
protocol_id: outbound_rpc_request.protocol_id,
protocol_id,
request_id: 0,
priority: 0,
raw_request: outbound_rpc_request.data.into(),
raw_request: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_rpc_request.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
};
(outbound_rpc_request.protocol_id, received_message)
(protocol_id, received_message)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
let (_, protocol_id, data) = message.into_parts();

// Verify the message is correct
assert!(!is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(message.protocol_id), expected_direct_send_protocol);
assert_eq!(Some(protocol_id), expected_direct_send_protocol);

// Create and return the received message
let received_message = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
protocol_id: message.protocol_id,
protocol_id,
priority: 0,
raw_msg: message.mdata.into(),
raw_msg: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};
(message.protocol_id, received_message)
(protocol_id, received_message)
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,8 +1233,7 @@ mod tests {
match peer_manager_request_receiver.next().await {
Some(PeerManagerRequest::SendRpc(_, network_request)) => {
// Parse the network request
let data = network_request.data;
let response_sender = network_request.res_tx;
let (_, _, data, response_sender, _) = network_request.into_parts();
let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap();

// Process the network message
Expand Down
17 changes: 9 additions & 8 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ impl NetworkPlayground {
None => continue, // drop rpc
};

let (_, protocol_id, data, res_tx, _) = outbound_req.into_parts();
if timeout_config
.read()
.is_message_timedout(&src_twin_id, dst_twin_id)
{
outbound_req.res_tx.send(Err(RpcError::TimedOut)).unwrap();
res_tx.send(Err(RpcError::TimedOut)).unwrap();
continue;
}

Expand All @@ -171,17 +172,17 @@ impl NetworkPlayground {
(src_twin_id.author, ProtocolId::ConsensusRpcBcs),
ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id: outbound_req.protocol_id,
protocol_id,
request_id: 123,
priority: 0,
raw_request: outbound_req.data.into(),
raw_request: data.into(),
}),
sender: PeerNetworkId::new(
NetworkId::Validator,
src_twin_id.author,
),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_req.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
},
)
.unwrap();
Expand Down Expand Up @@ -297,9 +298,9 @@ impl NetworkPlayground {
if !self.is_message_dropped(&src_twin_id, dst_twin_id, consensus_msg) {
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.clone().into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
receive_timestamp_micros: 0,
Expand Down Expand Up @@ -419,9 +420,9 @@ impl NetworkPlayground {
for dst_twin_id in dst_twin_ids.iter() {
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.clone().into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
receive_timestamp_micros: 0,
Expand Down
14 changes: 7 additions & 7 deletions mempool/src/tests/multi_node_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use aptos_network::{
peer_manager::PeerManagerRequest,
protocols::{
direct_send::Message,
network::ReceivedMessage,
network::{ReceivedMessage, SerializedRequest},
wire::messaging::v1::{DirectSendMsg, NetworkMessage},
},
ProtocolId,
Expand Down Expand Up @@ -344,9 +344,9 @@ impl TestHarness {
let receiver = self.mut_node(&receiver_id);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
Expand Down Expand Up @@ -400,7 +400,7 @@ impl TestHarness {
// Handle outgoing message
match network_req {
PeerManagerRequest::SendDirectSend(remote_peer_id, msg) => {
let mempool_message = common::decompress_and_deserialize(&msg.mdata.to_vec());
let mempool_message = common::decompress_and_deserialize(&msg.data().to_vec());
match mempool_message {
MempoolSyncMsg::BroadcastTransactionsRequest {
transactions,
Expand Down Expand Up @@ -456,7 +456,7 @@ impl TestHarness {

match network_req {
PeerManagerRequest::SendDirectSend(remote_peer_id, msg) => {
let mempool_message = common::decompress_and_deserialize(&msg.mdata.to_vec());
let mempool_message = common::decompress_and_deserialize(&msg.data().to_vec());
match mempool_message {
MempoolSyncMsg::BroadcastTransactionsResponse { .. } => {
// send it to peer
Expand All @@ -477,9 +477,9 @@ impl TestHarness {
let receiver = self.mut_node(&receiver_id);
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
protocol_id: msg.protocol_id(),
priority: 0,
raw_msg: msg.mdata.into(),
raw_msg: msg.data().clone().into(),
}),
sender: PeerNetworkId::new(network_id, sender_peer_id),
receive_timestamp_micros: 0,
Expand Down
8 changes: 5 additions & 3 deletions mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use aptos_network::{
protocols::{
network::{
NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender, ReceivedMessage,
SerializedRequest,
},
wire::{
handshake::v1::ProtocolId::MempoolDirectSend,
Expand Down Expand Up @@ -314,7 +315,7 @@ impl MempoolNode {
match self.get_outbound_handle(network_id).next().await.unwrap() {
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
assert_eq!(peer_id, remote_peer_id);
msg.protocol_id.from_bytes(&msg.mdata).unwrap()
msg.protocol_id().from_bytes(msg.data()).unwrap()
},
_ => panic!("Should not be getting an RPC response"),
}
Expand Down Expand Up @@ -376,10 +377,11 @@ impl MempoolNode {
let message = self.get_next_network_msg(network_id).await;
let (peer_id, protocol_id, data, maybe_rpc_sender) = match message {
PeerManagerRequest::SendRpc(peer_id, msg) => {
(peer_id, msg.protocol_id, msg.data, Some(msg.res_tx))
let (_, protocol_id, data, res_tx, _) = msg.into_parts();
(peer_id, protocol_id, data, Some(res_tx))
},
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
(peer_id, msg.protocol_id, msg.mdata, None)
(peer_id, msg.protocol_id(), msg.data().clone(), None)
},
};
assert_eq!(peer_id, expected_peer_id);
Expand Down
1 change: 0 additions & 1 deletion network/framework/src/application/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub trait NetworkClientInterface<Message: NetworkMessageTrait>: Clone + Send + S

/// Requests that the network connection for the specified peer
/// is disconnected.
// TODO: support disconnect reasons.
async fn disconnect_from_peer(
&self,
_peer: PeerNetworkId,
Expand Down
26 changes: 16 additions & 10 deletions network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1203,44 +1203,50 @@ async fn wait_for_network_event(
Ok(peer_manager_request) => {
let (protocol_id, peer_manager_notification) = match peer_manager_request {
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (_, protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();

// Verify the request is correct
assert!(is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(outbound_rpc_request.protocol_id), expected_rpc_protocol_id);
assert_eq!(outbound_rpc_request.timeout, message_wait_time);
assert_eq!(Some(protocol_id), expected_rpc_protocol_id);
assert_eq!(timeout, message_wait_time);

// Create and return the peer manager notification
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
protocol_id: outbound_rpc_request.protocol_id,
protocol_id,
request_id: 0,
priority: 0,
raw_request: outbound_rpc_request.data.into(),
raw_request: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_rpc_request.res_tx)),
rpc_replier: Some(Arc::new(res_tx)),
};
(outbound_rpc_request.protocol_id, rmsg)
(protocol_id, rmsg)
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
let (_, protocol_id, data) = message.into_parts();

// Verify the request is correct
assert!(!is_rpc_request);
assert_eq!(peer_id, expected_peer_id);
assert_eq!(Some(message.protocol_id), expected_direct_send_protocol_id);
assert_eq!(Some(protocol_id), expected_direct_send_protocol_id);

// Create and return the peer manager notification
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
protocol_id: message.protocol_id,
protocol_id,
priority: 0,
raw_msg: message.mdata.into(),
raw_msg: data.into(),
}),
sender: PeerNetworkId::new(expected_network_id, peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};
(message.protocol_id, rmsg)
(protocol_id, rmsg)
}
};

Expand Down
43 changes: 41 additions & 2 deletions network/framework/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::protocols::wire::handshake::v1::ProtocolId;
use aptos_config::network_id::NetworkContext;
use crate::protocols::wire::{
handshake::v1::ProtocolId,
messaging::v1::metadata::{MessageSendType, MessageStreamType},
};
use aptos_config::network_id::{NetworkContext, NetworkId};
use aptos_metrics_core::{
exponential_buckets, register_histogram_vec, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
Expand Down Expand Up @@ -197,6 +200,42 @@ pub static APTOS_NETWORK_DISCOVERY_NOTES: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

/// Time it takes to for messages to be sent by the network layer (e.g.,
/// between when the application sends the message and when it is sent on the wire).
pub static APTOS_NETWORK_MESSAGE_SEND_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_network_message_send_latency",
"Time it takes for messages to be sent by the network layer",
&[
"network_id",
"protocol_id",
"message_type",
"message_stream_type",
],
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});

/// Observes the value for the provided histogram and labels
pub fn observe_value_with_labels(
histogram: &Lazy<HistogramVec>,
network_id: &NetworkId,
protocol_id: &ProtocolId,
message_send_type: &MessageSendType,
message_stream_type: &MessageStreamType,
value: f64,
) {
histogram
.with_label_values(&[
network_id.as_str(),
protocol_id.as_str(),
message_send_type.get_label(),
message_stream_type.get_label(),
])
.observe(value)
}

pub static APTOS_NETWORK_RPC_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!("aptos_network_rpc_messages", "Number of RPC messages", &[
"role_type",
Expand Down
Loading
Loading