Skip to content

Commit

Permalink
[Network] Force message streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 14, 2024
1 parent 3c782b8 commit 3910c19
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
8 changes: 4 additions & 4 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub const CONNECTIVITY_CHECK_INTERVAL_MS: u64 = 5000;
pub const MAX_CONNECTION_DELAY_MS: u64 = 60_000; /* 1 minute */
pub const MAX_FULLNODE_OUTBOUND_CONNECTIONS: usize = 6;
pub const MAX_INBOUND_CONNECTIONS: usize = 100;
pub const MAX_MESSAGE_METADATA_SIZE: usize = 128 * 1024; /* 128 KiB: a buffer for metadata that might be added to messages by networking */
pub const MESSAGE_PADDING_SIZE: usize = 2 * 1024 * 1024; /* 2 MiB: a safety buffer to allow messages to get larger during serialization */
pub const MAX_MESSAGE_METADATA_SIZE: usize = 10 * 1024; /* 10 KB: a buffer for metadata that might be added to messages by networking */
pub const MESSAGE_PADDING_SIZE: usize = 10 * 1024; /* 10 KB: a safety buffer to allow messages to get larger during serialization */
pub const MAX_APPLICATION_MESSAGE_SIZE: usize =
(MAX_MESSAGE_SIZE - MAX_MESSAGE_METADATA_SIZE) - MESSAGE_PADDING_SIZE; /* The message size that applications should check against */
pub const MAX_FRAME_SIZE: usize = 4 * 1024 * 1024; /* 4 MiB large messages will be chunked into multiple frames and streamed */
pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */
pub const MAX_FRAME_SIZE: usize = 100 * 1024; /* 50 KB */
pub const MAX_MESSAGE_SIZE: usize = 200 * 1024; /* 200 KB */
pub const CONNECTION_BACKOFF_BASE: u64 = 2;
pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */;
pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE;
Expand Down
1 change: 1 addition & 0 deletions network/framework/src/protocols/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub struct ReceivedMessage {
pub message: NetworkMessage,
pub sender: PeerNetworkId,

// TODO: clean this up!
// unix microseconds
pub receive_timestamp_micros: u64,

Expand Down
12 changes: 11 additions & 1 deletion network/framework/src/protocols/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures_util::SinkExt;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use aptos_logger::info;

#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[cfg_attr(any(test, feature = "fuzzing"), derive(Arbitrary))]
Expand Down Expand Up @@ -182,7 +183,14 @@ impl OutboundStream {
/// Returns true iff the message should be streamed (i.e., broken into chunks)
pub fn should_stream(&self, message_with_metadata: &NetworkMessageWithMetadata) -> bool {
let message_length = message_with_metadata.network_message().data_len();
message_length > self.max_message_size
let result = message_length > self.max_message_size;
info!(
"Message length {} exceed size limit {}, should stream: {}",
message_length,
self.max_message_size,
result
);
result
}

pub async fn stream_message(
Expand Down Expand Up @@ -225,6 +233,8 @@ impl OutboundStream {
"Number of fragments overflowed"
);

info!("Stream message with {} fragments", num_chunks + 1);

// Create the stream header multiplex message
let header_multiplex_message =
MultiplexMessage::Stream(StreamMessage::Header(StreamHeader {
Expand Down
7 changes: 7 additions & 0 deletions network/framework/src/protocols/wire/messaging/v1/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ impl MessageMetadata {

/// Marks the message as being sent over the network wire by emitting latency metrics
pub fn mark_message_as_sending(&mut self) {
// If this message is a streamed message fragment, there's no need to emit
// any metrics (we only emit metrics for the head and tail of streamed messages).
if self.message_stream_type == MessageStreamType::StreamedMessageFragment {
return;
}

// Otherwise, emit the latency metrics
if let Some(application_send_time) = self.application_send_time {
// Calculate the application to wire send latency
let application_to_wire_latency = application_send_time
Expand Down

0 comments on commit 3910c19

Please sign in to comment.