Skip to content

Commit

Permalink
[Network] Add metrics for queuing time.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 13, 2024
1 parent 983effe commit 3c782b8
Show file tree
Hide file tree
Showing 19 changed files with 560 additions and 122 deletions.
4 changes: 2 additions & 2 deletions consensus/src/consensus_observer/network/network_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ mod test {
let (protocol_id, peer_manager_notification) = match peer_manager_request {
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();
let (_, protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();

// Verify the message is correct
assert!(is_rpc_request);
Expand All @@ -806,7 +806,7 @@ mod test {
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
let (protocol_id, data) = message.into_parts();
let (_, protocol_id, data) = message.into_parts();

// Verify the message is correct
assert!(!is_rpc_request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ mod tests {
match peer_manager_request_receiver.next().await {
Some(PeerManagerRequest::SendRpc(_, network_request)) => {
// Parse the network request
let (_, data, response_sender, _) = network_request.into_parts();
let (_, _, data, response_sender, _) = network_request.into_parts();
let message: ConsensusObserverMessage = bcs::from_bytes(data.as_ref()).unwrap();

// Process the network message
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl NetworkPlayground {
None => continue, // drop rpc
};

let (protocol_id, data, res_tx, _) = outbound_req.into_parts();
let (_, protocol_id, data, res_tx, _) = outbound_req.into_parts();
if timeout_config
.read()
.is_message_timedout(&src_twin_id, dst_twin_id)
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl MempoolNode {
let message = self.get_next_network_msg(network_id).await;
let (peer_id, protocol_id, data, maybe_rpc_sender) = match message {
PeerManagerRequest::SendRpc(peer_id, msg) => {
let (protocol_id, data, res_tx, _) = msg.into_parts();
let (_, protocol_id, data, res_tx, _) = msg.into_parts();
(peer_id, protocol_id, data, Some(res_tx))
},
PeerManagerRequest::SendDirectSend(peer_id, msg) => {
Expand Down
1 change: 0 additions & 1 deletion network/framework/src/application/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub trait NetworkClientInterface<Message: NetworkMessageTrait>: Clone + Send + S

/// Requests that the network connection for the specified peer
/// is disconnected.
// TODO: support disconnect reasons.
async fn disconnect_from_peer(
&self,
_peer: PeerNetworkId,
Expand Down
4 changes: 2 additions & 2 deletions network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,7 @@ async fn wait_for_network_event(
let (protocol_id, peer_manager_notification) = match peer_manager_request {
PeerManagerRequest::SendRpc(peer_id, outbound_rpc_request) => {
// Unpack the request
let (protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();
let (_, protocol_id, data, res_tx, timeout) = outbound_rpc_request.into_parts();

// Verify the request is correct
assert!(is_rpc_request);
Expand All @@ -1228,7 +1228,7 @@ async fn wait_for_network_event(
}
PeerManagerRequest::SendDirectSend(peer_id, message) => {
// Unpack the message
let (protocol_id, data) = message.into_parts();
let (_, protocol_id, data) = message.into_parts();

// Verify the request is correct
assert!(!is_rpc_request);
Expand Down
43 changes: 41 additions & 2 deletions network/framework/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::protocols::wire::handshake::v1::ProtocolId;
use aptos_config::network_id::NetworkContext;
use crate::protocols::wire::{
handshake::v1::ProtocolId,
messaging::v1::metadata::{MessageSendType, MessageStreamType},
};
use aptos_config::network_id::{NetworkContext, NetworkId};
use aptos_metrics_core::{
exponential_buckets, register_histogram_vec, register_int_counter_vec, register_int_gauge,
register_int_gauge_vec, Histogram, HistogramTimer, HistogramVec, IntCounter, IntCounterVec,
Expand Down Expand Up @@ -197,6 +200,42 @@ pub static APTOS_NETWORK_DISCOVERY_NOTES: Lazy<IntGaugeVec> = Lazy::new(|| {
.unwrap()
});

/// Time it takes to for messages to be sent by the network layer (e.g.,
/// between when the application sends the message and when it is sent on the wire).
pub static APTOS_NETWORK_MESSAGE_SEND_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_network_message_send_latency",
"Time it takes for messages to be sent by the network layer",
&[
"network_id",
"protocol_id",
"message_type",
"message_stream_type",
],
exponential_buckets(/*start=*/ 1e-6, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});

/// Observes the value for the provided histogram and labels
pub fn observe_value_with_labels(
histogram: &Lazy<HistogramVec>,
network_id: &NetworkId,
protocol_id: &ProtocolId,
message_send_type: &MessageSendType,
message_stream_type: &MessageStreamType,
value: f64,
) {
histogram
.with_label_values(&[
network_id.as_str(),
protocol_id.as_str(),
message_send_type.get_label(),
message_stream_type.get_label(),
])
.observe(value)
}

pub static APTOS_NETWORK_RPC_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!("aptos_network_rpc_messages", "Number of RPC messages", &[
"role_type",
Expand Down
94 changes: 62 additions & 32 deletions network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ use crate::{
rpc::{error::RpcError, InboundRpcs, OutboundRpcRequest, OutboundRpcs},
stream::{InboundStreamBuffer, OutboundStream, StreamMessage},
wire::messaging::v1::{
DirectSendMsg, ErrorCode, MultiplexMessage, MultiplexMessageSink,
MultiplexMessageStream, NetworkMessage, Priority, ReadError, WriteError,
metadata::{
MessageMetadata, MessageSendType, MultiplexMessageWithMetadata,
NetworkMessageWithMetadata,
},
ErrorCode, MultiplexMessage, MultiplexMessageSink, MultiplexMessageStream,
NetworkMessage, ReadError, WriteError,
},
},
transport::{self, Connection, ConnectionMetadata},
Expand Down Expand Up @@ -274,7 +278,7 @@ where
maybe_response = self.inbound_rpcs.next_completed_response() => {
// Extract the relevant metadata from the message
let message_metadata = match &maybe_response {
Ok((response, protocol_id)) => Some((response.request_id, *protocol_id)),
Ok((response_with_metadata, protocol_id)) => Some((response_with_metadata.rpc_response().request_id, *protocol_id)),
_ => None,
};

Expand Down Expand Up @@ -334,19 +338,24 @@ where
max_frame_size: usize,
max_message_size: usize,
) -> (
aptos_channel::Sender<(), NetworkMessage>,
aptos_channel::Sender<(), NetworkMessageWithMetadata>,
oneshot::Sender<()>,
) {
let remote_peer_id = connection_metadata.remote_peer_id;
let (write_reqs_tx, mut write_reqs_rx): (aptos_channel::Sender<(), NetworkMessage>, _) =
aptos_channel::new(
QueueStyle::KLAST,
1024,
Some(&counters::PENDING_WIRE_MESSAGES),
);
let (write_reqs_tx, mut write_reqs_rx): (
aptos_channel::Sender<(), NetworkMessageWithMetadata>,
_,
) = aptos_channel::new(
QueueStyle::KLAST,
1024,
Some(&counters::PENDING_WIRE_MESSAGES),
);
let (close_tx, mut close_rx) = oneshot::channel();

let (mut msg_tx, msg_rx) = aptos_channels::new(1024, &counters::PENDING_MULTIPLEX_MESSAGE);
let (mut msg_tx, msg_rx) = aptos_channels::new::<MultiplexMessageWithMetadata>(
1024,
&counters::PENDING_MULTIPLEX_MESSAGE,
);
let (stream_msg_tx, stream_msg_rx) =
aptos_channels::new(1024, &counters::PENDING_MULTIPLEX_STREAM);

Expand All @@ -357,8 +366,15 @@ where
NetworkSchema::new(&network_context).connection_metadata(&connection_metadata);
loop {
futures::select! {
message = stream.select_next_some() => {
if let Err(err) = timeout(transport::TRANSPORT_TIMEOUT,writer.send(&message)).await {
message_with_metadata = stream.select_next_some() => {
// Extract the message and metadata
let (mut message_metadata, message) = message_with_metadata.into_parts();

// Mark the message as sending (on the metadata)
message_metadata.mark_message_as_sending();

// Send the message along the wire
if let Err(err) = timeout(transport::TRANSPORT_TIMEOUT, writer.send(&message)).await {
warn!(
log_context,
error = %err,
Expand Down Expand Up @@ -416,17 +432,23 @@ where
},
}
};

// the task ends when the write_reqs_tx is dropped
let multiplex_task = async move {
let mut outbound_stream =
OutboundStream::new(max_frame_size, max_message_size, stream_msg_tx);
while let Some(message) = write_reqs_rx.next().await {
while let Some(message_with_metadata) = write_reqs_rx.next().await {
// either channel full would block the other one
let result = if outbound_stream.should_stream(&message) {
outbound_stream.stream_message(message).await
let result = if outbound_stream.should_stream(&message_with_metadata) {
outbound_stream.stream_message(message_with_metadata).await
} else {
// Transform the message into a multiplex message
let multiplex_message_with_metadata =
message_with_metadata.into_multiplex_message();

// Send the message to the writer task
msg_tx
.send(MultiplexMessage::Message(message))
.send(multiplex_message_with_metadata)
.await
.map_err(|_| anyhow::anyhow!("Writer task ended"))
};
Expand All @@ -440,8 +462,10 @@ where
}
}
};

executor.spawn(writer_task);
executor.spawn(multiplex_task);

(write_reqs_tx, close_tx)
}

Expand Down Expand Up @@ -561,7 +585,7 @@ where
fn handle_inbound_message(
&mut self,
message: Result<MultiplexMessage, ReadError>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessageWithMetadata>,
) -> Result<(), PeerManagerError> {
trace!(
NetworkSchema::new(&self.network_context)
Expand All @@ -582,8 +606,17 @@ where
let protocol_id = frame_prefix.as_ref().get(1).unwrap_or(&0);
let error_code = ErrorCode::parsing_error(*message_type, *protocol_id);
let message = NetworkMessage::Error(error_code);
let message_with_metadata = NetworkMessageWithMetadata::new(
MessageMetadata::new(
self.network_context.network_id(),
ProtocolId::Unknown,
MessageSendType::DirectSend,
None,
),
message,
);

write_reqs_tx.push((), message)?;
write_reqs_tx.push((), message_with_metadata)?;
return Err(err.into());
},
ReadError::IoError(_) => {
Expand All @@ -603,7 +636,7 @@ where
fn handle_outbound_request(
&mut self,
request: PeerRequest,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessage>,
write_reqs_tx: &mut aptos_channel::Sender<(), NetworkMessageWithMetadata>,
) {
trace!(
"Peer {} PeerRequest::{:?}",
Expand All @@ -614,21 +647,18 @@ where
// To send an outbound DirectSendMsg, we just bump some counters and
// push it onto our outbound writer queue.
PeerRequest::SendDirectSend(message) => {
// Unpack the message
let (protocol_id, data) = message.into_parts();
// Get the data length and protocol id
let data_len = message.data().len();
let protocol_id = message.protocol_id();

// Create the direct send message
let message_len = data.len();
let message = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: Priority::default(),
raw_msg: Vec::from(data.as_ref()),
});
// Convert the message into a network message with metadata
let message_with_metadata =
message.into_network_message(self.network_context.network_id());

// Send the message to the outbound writer queue
match write_reqs_tx.push((), message) {
match write_reqs_tx.push((), message_with_metadata) {
Ok(_) => {
self.update_outbound_direct_send_metrics(protocol_id, message_len as u64);
self.update_outbound_direct_send_metrics(protocol_id, data_len as u64);
},
Err(e) => {
counters::direct_send_messages(&self.network_context, FAILED_LABEL).inc();
Expand Down Expand Up @@ -685,7 +715,7 @@ where

async fn do_shutdown(
mut self,
write_req_tx: aptos_channel::Sender<(), NetworkMessage>,
write_req_tx: aptos_channel::Sender<(), NetworkMessageWithMetadata>,
writer_close_tx: oneshot::Sender<()>,
reason: DisconnectReason,
) {
Expand Down
40 changes: 33 additions & 7 deletions network/framework/src/protocols/direct_send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,17 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{protocols::network::SerializedRequest, ProtocolId};
use crate::{
protocols::{
network::SerializedRequest,
wire::messaging::v1::{
metadata::{MessageMetadata, MessageSendType, NetworkMessageWithMetadata},
DirectSendMsg, NetworkMessage, Priority,
},
},
ProtocolId,
};
use aptos_config::network_id::NetworkId;
use bytes::Bytes;
use serde::Serialize;
use std::{fmt::Debug, time::SystemTime};
Expand Down Expand Up @@ -33,14 +43,30 @@ impl Message {
}
}

/// Returns the time at which the message was sent by the application
pub fn application_send_time(&self) -> SystemTime {
self.application_send_time
/// Transforms the message into a direct send network message with metadata
pub fn into_network_message(self, network_id: NetworkId) -> NetworkMessageWithMetadata {
// Create the direct send network message
let network_message = NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: self.protocol_id,
priority: Priority::default(),
raw_msg: Vec::from(self.data.as_ref()),
});

// Create and return the network message with metadata
let message_metadata = MessageMetadata::new(
network_id,
self.protocol_id,
MessageSendType::DirectSend,
Some(self.application_send_time),
);
NetworkMessageWithMetadata::new(message_metadata, network_message)
}

/// Consumes the message and returns the protocol id and data
pub fn into_parts(self) -> (ProtocolId, Bytes) {
(self.protocol_id, self.data)
/// Consumes the message and returns the individual parts.
/// Note: this is only for testing purposes (but, it cannot be marked
/// as `#[cfg(test)]` because of several non-wrapped test utils).
pub fn into_parts(self) -> (SystemTime, ProtocolId, Bytes) {
(self.application_send_time, self.protocol_id, self.data)
}
}

Expand Down
2 changes: 1 addition & 1 deletion network/framework/src/protocols/health_checker/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl TestHarness {
_ => panic!("Unexpected PeerManagerRequest: {:?}", req),
};

let (protocol_id, req_data, res_tx, _) = rpc_req.into_parts();
let (_, protocol_id, req_data, res_tx, _) = rpc_req.into_parts();
assert_eq!(protocol_id, ProtocolId::HealthCheckerRpc);

match bcs::from_bytes(&req_data).unwrap() {
Expand Down
Loading

0 comments on commit 3c782b8

Please sign in to comment.