Skip to content

Commit

Permalink
[Network] Force message streaming.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Nov 14, 2024
1 parent 3c782b8 commit 8522b5a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
8 changes: 4 additions & 4 deletions config/src/config/network_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ pub const CONNECTIVITY_CHECK_INTERVAL_MS: u64 = 5000;
pub const MAX_CONNECTION_DELAY_MS: u64 = 60_000; /* 1 minute */
pub const MAX_FULLNODE_OUTBOUND_CONNECTIONS: usize = 6;
pub const MAX_INBOUND_CONNECTIONS: usize = 100;
pub const MAX_MESSAGE_METADATA_SIZE: usize = 128 * 1024; /* 128 KiB: a buffer for metadata that might be added to messages by networking */
pub const MESSAGE_PADDING_SIZE: usize = 2 * 1024 * 1024; /* 2 MiB: a safety buffer to allow messages to get larger during serialization */
pub const MAX_MESSAGE_METADATA_SIZE: usize = 10 * 1024; /* 10 KB: a buffer for metadata that might be added to messages by networking */
pub const MESSAGE_PADDING_SIZE: usize = 10 * 1024; /* 10 KB: a safety buffer to allow messages to get larger during serialization */
pub const MAX_APPLICATION_MESSAGE_SIZE: usize =
(MAX_MESSAGE_SIZE - MAX_MESSAGE_METADATA_SIZE) - MESSAGE_PADDING_SIZE; /* The message size that applications should check against */
pub const MAX_FRAME_SIZE: usize = 4 * 1024 * 1024; /* 4 MiB large messages will be chunked into multiple frames and streamed */
pub const MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; /* 64 MiB */
pub const MAX_FRAME_SIZE: usize = 100 * 1024; /* 50 KB */
pub const MAX_MESSAGE_SIZE: usize = 200 * 1024; /* 200 KB */
pub const CONNECTION_BACKOFF_BASE: u64 = 2;
pub const IP_BYTE_BUCKET_RATE: usize = 102400 /* 100 KiB */;
pub const IP_BYTE_BUCKET_SIZE: usize = IP_BYTE_BUCKET_RATE;
Expand Down
1 change: 1 addition & 0 deletions network/framework/src/protocols/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ pub struct ReceivedMessage {
pub message: NetworkMessage,
pub sender: PeerNetworkId,

// TODO: clean this up!
// unix microseconds
pub receive_timestamp_micros: u64,

Expand Down
10 changes: 9 additions & 1 deletion network/framework/src/protocols/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::protocols::wire::messaging::v1::{
use anyhow::{bail, ensure};
use aptos_channels::Sender;
use aptos_id_generator::{IdGenerator, U32IdGenerator};
use aptos_logger::info;
use futures_util::SinkExt;
#[cfg(any(test, feature = "fuzzing"))]
use proptest_derive::Arbitrary;
Expand Down Expand Up @@ -182,7 +183,12 @@ impl OutboundStream {
/// Returns true iff the message should be streamed (i.e., broken into chunks)
pub fn should_stream(&self, message_with_metadata: &NetworkMessageWithMetadata) -> bool {
let message_length = message_with_metadata.network_message().data_len();
message_length > self.max_message_size
let result = message_length > self.max_message_size;
info!(
"Message length {} exceed size limit {}, should stream: {}",
message_length, self.max_message_size, result
);
result
}

pub async fn stream_message(
Expand Down Expand Up @@ -225,6 +231,8 @@ impl OutboundStream {
"Number of fragments overflowed"
);

info!("Stream message with {} fragments", num_chunks + 1);

// Create the stream header multiplex message
let header_multiplex_message =
MultiplexMessage::Stream(StreamMessage::Header(StreamHeader {
Expand Down
7 changes: 7 additions & 0 deletions network/framework/src/protocols/wire/messaging/v1/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ impl MessageMetadata {

/// Marks the message as being sent over the network wire by emitting latency metrics
pub fn mark_message_as_sending(&mut self) {
// If this message is a streamed message fragment, there's no need to emit
// any metrics (we only emit metrics for the head and tail of streamed messages).
if self.message_stream_type == MessageStreamType::StreamedMessageFragment {
return;
}

// Otherwise, emit the latency metrics
if let Some(application_send_time) = self.application_send_time {
// Calculate the application to wire send latency
let application_to_wire_latency = application_send_time
Expand Down

0 comments on commit 8522b5a

Please sign in to comment.