From 534db5f0b673048573a2e9c0e3b1da1a682974b2 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 17 Oct 2024 12:15:45 +0200 Subject: [PATCH 1/2] feat(udp): return borrowed Datagram on receive Previously `recv_inner` would return `Datagram>`. In other words, it would allocate a new `Vec` for each UDP datagram payload. Now `recv_inner` reads into a provided buffer and returns `Datagram<&[u8]>`, i.e. it returns a view into the provided buffer without allocating. --- neqo-bin/src/udp.rs | 5 +- neqo-udp/src/lib.rs | 158 +++++++++++++++++++++++++------------------- 2 files changed, 95 insertions(+), 68 deletions(-) diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index 148ff43175..c418f5ee3c 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -7,6 +7,7 @@ use std::{io, net::SocketAddr}; use neqo_common::Datagram; +use neqo_transport::RECV_BUFFER_SIZE; /// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox. /// @@ -56,10 +57,12 @@ impl Socket { /// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with /// the provided local address. pub fn recv(&self, local_address: &SocketAddr) -> Result, io::Error> { + let mut recv_buf = vec![0; RECV_BUFFER_SIZE]; self.inner .try_io(tokio::io::Interest::READABLE, || { - neqo_udp::recv_inner(local_address, &self.state, &self.inner) + neqo_udp::recv_inner(local_address, &self.state, &self.inner, &mut recv_buf) }) + .map(|dgrams| dgrams.map(|d| d.to_owned()).collect()) .or_else(|e| { if e.kind() == io::ErrorKind::WouldBlock { Ok(vec![]) diff --git a/neqo-udp/src/lib.rs b/neqo-udp/src/lib.rs index 5f1fb3dbe6..e6ae78d9ae 100644 --- a/neqo-udp/src/lib.rs +++ b/neqo-udp/src/lib.rs @@ -7,10 +7,9 @@ #![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp. use std::{ - cell::RefCell, io::{self, IoSliceMut}, net::SocketAddr, - slice, + slice::{self, Chunks}, }; use neqo_common::{qdebug, qtrace, Datagram, IpTos}; @@ -21,11 +20,7 @@ use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState}; /// Allows reading multiple datagrams in a single [`Socket::recv`] call. // // TODO: Experiment with different values across platforms. -const RECV_BUF_SIZE: usize = u16::MAX as usize; - -std::thread_local! { - static RECV_BUF: RefCell> = RefCell::new(vec![0; RECV_BUF_SIZE]); -} +pub const RECV_BUF_SIZE: usize = u16::MAX as usize; pub fn send_inner( state: &UdpSocketState, @@ -57,63 +52,89 @@ use std::os::fd::AsFd as SocketRef; #[cfg(windows)] use std::os::windows::io::AsSocket as SocketRef; -pub fn recv_inner( +pub fn recv_inner<'a>( local_address: &SocketAddr, state: &UdpSocketState, socket: impl SocketRef, -) -> Result, io::Error> { - let dgrams = RECV_BUF.with_borrow_mut(|recv_buf| -> Result, io::Error> { - let mut meta; - - loop { - meta = RecvMeta::default(); - - state.recv( - (&socket).into(), - &mut [IoSliceMut::new(recv_buf)], - slice::from_mut(&mut meta), - )?; - - if meta.len == 0 || meta.stride == 0 { - qdebug!( - "ignoring datagram from {} to {} len {} stride {}", - meta.addr, - local_address, - meta.len, - meta.stride - ); - continue; - } - - break; + recv_buf: &'a mut [u8], +) -> Result, io::Error> { + let mut meta; + + let data = loop { + meta = RecvMeta::default(); + + state.recv( + (&socket).into(), + &mut [IoSliceMut::new(recv_buf)], + slice::from_mut(&mut meta), + )?; + + if meta.len == 0 || meta.stride == 0 { + qdebug!( + "ignoring datagram from {} to {} len {} stride {}", + meta.addr, + local_address, + meta.len, + meta.stride + ); + continue; } - Ok(recv_buf[0..meta.len] - .chunks(meta.stride) - .map(|d| { - qtrace!( - "received {} bytes from {} to {}", - d.len(), - meta.addr, - local_address, - ); - Datagram::new( - meta.addr, - *local_address, - meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), - d, - ) - }) - .collect()) - })?; + break &recv_buf[..meta.len]; + }; qtrace!( - "received {} datagrams ({:?})", - dgrams.len(), - dgrams.iter().map(|d| d.len()).collect::>(), + "received {} bytes from {} to {} in {} segments", + data.len(), + meta.addr, + local_address, + data.len().div_ceil(meta.stride), ); - Ok(dgrams) + Ok(DatagramIter { + meta, + datagrams: data.chunks(meta.stride), + local_address: *local_address, + }) +} + +pub struct DatagramIter<'a> { + meta: RecvMeta, + datagrams: Chunks<'a, u8>, + local_address: SocketAddr, +} + +impl<'a> std::fmt::Debug for DatagramIter<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Datagrams") + .field("meta", &self.meta) + .field("local_address", &self.local_address) + .finish() + } +} + +impl<'a> Iterator for DatagramIter<'a> { + type Item = Datagram<&'a [u8]>; + + fn next(&mut self) -> Option { + self.datagrams.next().map(|d| { + Datagram::from_slice( + self.meta.addr, + self.local_address, + self.meta + .ecn + .map(|n| IpTos::from(n as u8)) + .unwrap_or_default(), + d, + ) + }) + } +} + +impl<'a> ExactSizeIterator for DatagramIter<'a> { + fn len(&self) -> usize { + self.datagrams.len() + } } /// A wrapper around a UDP socket, sending and receiving [`Datagram`]s. @@ -138,8 +159,12 @@ impl Socket { /// Receive a batch of [`Datagram`]s on the given [`Socket`], each /// set with the provided local address. - pub fn recv(&self, local_address: &SocketAddr) -> Result, io::Error> { - recv_inner(local_address, &self.state, &self.inner) + pub fn recv<'a>( + &self, + local_address: &SocketAddr, + recv_buf: &'a mut [u8], + ) -> Result, io::Error> { + recv_inner(local_address, &self.state, &self.inner, recv_buf) } } @@ -170,7 +195,8 @@ mod tests { ); sender.send(&datagram)?; - let res = receiver.recv(&receiver_addr); + let mut recv_buf = vec![0; RECV_BUF_SIZE]; + let res = receiver.recv(&receiver_addr, &mut recv_buf); assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::WouldBlock); Ok(()) @@ -191,17 +217,15 @@ mod tests { sender.send(&datagram)?; - let received_datagram = receiver - .recv(&receiver_addr) - .expect("receive to succeed") - .into_iter() - .next() - .expect("receive to yield datagram"); + let mut recv_buf = vec![0; RECV_BUF_SIZE]; + let mut received_datagrams = receiver + .recv(&receiver_addr, &mut recv_buf) + .expect("receive to succeed"); // Assert that the ECN is correct. assert_eq!( IpTosEcn::from(datagram.tos()), - IpTosEcn::from(received_datagram.tos()) + IpTosEcn::from(received_datagrams.next().unwrap().tos()) ); Ok(()) @@ -236,11 +260,11 @@ mod tests { // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg. let mut num_received = 0; + let mut recv_buf = vec![0; RECV_BUF_SIZE]; while num_received < max_gso_segments { receiver - .recv(&receiver_addr) + .recv(&receiver_addr, &mut recv_buf) .expect("receive to succeed") - .into_iter() .for_each(|d| { assert_eq!( SEGMENT_SIZE, From 1030d1b81990f8e69ff891e42e575896b1a12b34 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 18 Oct 2024 11:08:04 +0200 Subject: [PATCH 2/2] feat(bin): don't allocate in UDP recv path (#2189) Pass a long lived receive buffer to `neqo_udp::recv_inner`, receiving an iterator of `Datagram<&[u8]>`s pointing into that buffer, thus no longer allocating in UDP receive path. --- neqo-bin/src/client/http09.rs | 9 ++--- neqo-bin/src/client/http3.rs | 9 ++--- neqo-bin/src/client/mod.rs | 62 ++++++++++++++++++++--------------- neqo-bin/src/server/mod.rs | 9 +++-- neqo-bin/src/udp.rs | 15 +++++---- 5 files changed, 61 insertions(+), 43 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index d4a1829892..728088a3f8 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -181,10 +181,11 @@ impl super::Client for Connection { self.process_output(now) } - fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) - where - I: IntoIterator>, - { + fn process_multiple_input<'a>( + &mut self, + dgrams: impl IntoIterator>, + now: Instant, + ) { self.process_multiple_input(dgrams, now); } diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index b8745a1fd6..e667355d9b 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -132,10 +132,11 @@ impl super::Client for Http3Client { self.process_output(now) } - fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) - where - I: IntoIterator>, - { + fn process_multiple_input<'a>( + &mut self, + dgrams: impl IntoIterator>, + now: Instant, + ) { self.process_multiple_input(dgrams, now); } diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index beac31dda3..9b06195ec7 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -374,9 +374,11 @@ enum CloseState { /// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`]. trait Client { fn process_output(&mut self, now: Instant) -> Output; - fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) - where - I: IntoIterator>; + fn process_multiple_input<'a>( + &mut self, + dgrams: impl IntoIterator>, + now: Instant, + ); fn has_events(&self) -> bool; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where @@ -392,9 +394,28 @@ struct Runner<'a, H: Handler> { handler: H, timeout: Option>>, args: &'a Args, + recv_buf: Vec, } impl<'a, H: Handler> Runner<'a, H> { + fn new( + local_addr: SocketAddr, + socket: &'a mut crate::udp::Socket, + client: H::Client, + handler: H, + args: &'a Args, + ) -> Self { + Self { + local_addr, + socket, + client, + handler, + args, + timeout: None, + recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE], + } + } + async fn run(mut self) -> Res> { loop { let handler_done = self.handler.handle(&mut self.client)?; @@ -457,12 +478,13 @@ impl<'a, H: Handler> Runner<'a, H> { async fn process_multiple_input(&mut self) -> Res<()> { loop { - let dgrams = self.socket.recv(&self.local_addr)?; - if dgrams.is_empty() { + let Some(dgrams) = self.socket.recv(&self.local_addr, &mut self.recv_buf)? else { + break; + }; + if dgrams.len() == 0 { break; } - self.client - .process_multiple_input(dgrams.iter().map(Datagram::borrow), Instant::now()); + self.client.process_multiple_input(dgrams, Instant::now()); self.process_output().await?; } @@ -573,32 +595,18 @@ pub async fn client(mut args: Args) -> Res<()> { let handler = http09::Handler::new(to_request, &args); - Runner { - args: &args, - client, - handler, - local_addr: real_local, - socket: &mut socket, - timeout: None, - } - .run() - .await? + Runner::new(real_local, &mut socket, client, handler, &args) + .run() + .await? } else { let client = http3::create_client(&args, real_local, remote_addr, &hostname, token) .expect("failed to create client"); let handler = http3::Handler::new(to_request, &args); - Runner { - args: &args, - client, - handler, - local_addr: real_local, - socket: &mut socket, - timeout: None, - } - .run() - .await? + Runner::new(real_local, &mut socket, client, handler, &args) + .run() + .await? }; } } diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index abf614f1f8..2383450565 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -205,6 +205,7 @@ pub struct ServerRunner { server: Box, timeout: Option>>, sockets: Vec<(SocketAddr, crate::udp::Socket)>, + recv_buf: Vec, } impl ServerRunner { @@ -219,6 +220,7 @@ impl ServerRunner { server, timeout: None, sockets, + recv_buf: vec![0; neqo_udp::RECV_BUF_SIZE], } } @@ -289,10 +291,13 @@ impl ServerRunner { match self.ready().await? { Ready::Socket(inx) => loop { let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let dgrams = socket.recv(host)?; - if dgrams.is_empty() { + let Some(dgrams) = socket.recv(host, &mut self.recv_buf)? else { + break; + }; + if dgrams.len() == 0 { break; } + let dgrams: Vec = dgrams.map(|d| d.to_owned()).collect(); for dgram in dgrams { self.process(Some(&dgram)).await?; } diff --git a/neqo-bin/src/udp.rs b/neqo-bin/src/udp.rs index c418f5ee3c..0ea6edc449 100644 --- a/neqo-bin/src/udp.rs +++ b/neqo-bin/src/udp.rs @@ -7,7 +7,7 @@ use std::{io, net::SocketAddr}; use neqo_common::Datagram; -use neqo_transport::RECV_BUFFER_SIZE; +use neqo_udp::DatagramIter; /// Ideally this would live in [`neqo-udp`]. [`neqo-udp`] is used in Firefox. /// @@ -56,16 +56,19 @@ impl Socket { /// Receive a batch of [`Datagram`]s on the given [`Socket`], each set with /// the provided local address. - pub fn recv(&self, local_address: &SocketAddr) -> Result, io::Error> { - let mut recv_buf = vec![0; RECV_BUFFER_SIZE]; + pub fn recv<'a>( + &self, + local_address: &SocketAddr, + recv_buf: &'a mut [u8], + ) -> Result>, io::Error> { self.inner .try_io(tokio::io::Interest::READABLE, || { - neqo_udp::recv_inner(local_address, &self.state, &self.inner, &mut recv_buf) + neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf) }) - .map(|dgrams| dgrams.map(|d| d.to_owned()).collect()) + .map(Some) .or_else(|e| { if e.kind() == io::ErrorKind::WouldBlock { - Ok(vec![]) + Ok(None) } else { Err(e) }