Skip to content

Commit

Permalink
feat(transport): accept borrowed instead of owned input datagram
Browse files Browse the repository at this point in the history
Previously `process_input` (and the like) took a `Datagram<Vec<u8>>` as input.
In other words, it required allocating each UDP datagram payload in a dedicated
`Vec<u8>` before passing it to `process_input`.

With this patch, `process_input` accepts a `Datagram<&[u8]>`. In other words, it
accepts a `Datagram` with a borrowed view into an existing buffer (`&[u8]`),
e.g. a long lived receive buffer.
  • Loading branch information
mxinden committed Oct 20, 2024
1 parent c2520e2 commit 9330563
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 24 deletions.
2 changes: 1 addition & 1 deletion neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl super::Client for Connection {

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I: IntoIterator<Item = Datagram<&'a [u8]>>,
{
self.process_multiple_input(dgrams, now);
}
Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl super::Client for Http3Client {

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I: IntoIterator<Item = Datagram<&'a [u8]>>,
{
self.process_multiple_input(dgrams, now);
}
Expand Down
4 changes: 2 additions & 2 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ trait Client {
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>;
I: IntoIterator<Item = Datagram<&'a [u8]>>;
fn has_events(&self) -> bool;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
Expand Down Expand Up @@ -462,7 +462,7 @@ impl<'a, H: Handler> Runner<'a, H> {
break;
}
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
.process_multiple_input(dgrams.iter().map(Datagram::borrow), Instant::now());
self.process_output().await?;
}

Expand Down
6 changes: 3 additions & 3 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,13 +873,13 @@ impl Http3Client {
/// packets need to be sent or if a timer needs to be updated.
///
/// [1]: ../neqo_transport/enum.ConnectionEvent.html
pub fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_multiple_input(iter::once(dgram), now);
pub fn process_input<'a>(&mut self, dgram: impl Into<Datagram<&'a [u8]>>, now: Instant) {
self.process_multiple_input(iter::once(dgram.into()), now);
}

pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I: IntoIterator<Item = Datagram<&'a [u8]>>,
{
let mut dgrams = dgrams.into_iter().peekable();
qtrace!([self], "Process multiple datagrams");
Expand Down
44 changes: 29 additions & 15 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,14 +1022,14 @@ impl Connection {
}

/// Process new input datagrams on the connection.
pub fn process_input(&mut self, d: &Datagram, now: Instant) {
self.process_multiple_input(iter::once(d), now);
pub fn process_input<'a>(&mut self, d: impl Into<Datagram<&'a [u8]>>, now: Instant) {
self.process_multiple_input(iter::once(d.into()), now);
}

/// Process new input datagrams on the connection.
pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I: IntoIterator<Item = Datagram<&'a [u8]>>,
{
let mut dgrams = dgrams.into_iter().peekable();
if dgrams.peek().is_none() {
Expand Down Expand Up @@ -1241,7 +1241,7 @@ impl Connection {
}
}

fn is_stateless_reset(&self, path: &PathRef, d: &Datagram) -> bool {
fn is_stateless_reset(&self, path: &PathRef, d: &Datagram<&[u8]>) -> bool {
// If the datagram is too small, don't try.
// If the connection is connected, then the reset token will be invalid.
if d.len() < 16 || !self.state.connected() {
Expand All @@ -1254,7 +1254,7 @@ impl Connection {
fn check_stateless_reset(
&mut self,
path: &PathRef,
d: &Datagram,
d: &Datagram<&[u8]>,
first: bool,
now: Instant,
) -> Res<()> {
Expand All @@ -1280,23 +1280,29 @@ impl Connection {
debug_assert!(self.crypto.states.rx_hp(self.version, cspace).is_some());
for saved in self.saved_datagrams.take_saved() {
qtrace!([self], "input saved @{:?}: {:?}", saved.t, saved.d);
self.input(&saved.d, saved.t, now);
self.input(saved.d.borrow(), saved.t, now);
}
}
}

/// In case a datagram arrives that we can only partially process, save any
/// part that we don't have keys for.
fn save_datagram(&mut self, cspace: CryptoSpace, d: &Datagram, remaining: usize, now: Instant) {
fn save_datagram(
&mut self,
cspace: CryptoSpace,
d: &Datagram<&[u8]>,
remaining: usize,
now: Instant,
) {
let d = if remaining < d.len() {
Datagram::new(
d.source(),
d.destination(),
d.tos(),
&d[d.len() - remaining..],
d[d.len() - remaining..].to_vec(),
)
} else {
d.clone()
d.to_owned()
};
self.saved_datagrams.save(cspace, d, now);
self.stats.borrow_mut().saved_datagrams += 1;
Expand Down Expand Up @@ -1498,7 +1504,7 @@ impl Connection {
fn postprocess_packet(
&mut self,
path: &PathRef,
d: &Datagram,
d: &Datagram<&[u8]>,
packet: &PublicPacket,
migrate: bool,
now: Instant,
Expand Down Expand Up @@ -1530,7 +1536,9 @@ impl Connection {

/// Take a datagram as input. This reports an error if the packet was bad.
/// This takes two times: when the datagram was received, and the current time.
fn input(&mut self, d: &Datagram, received: Instant, now: Instant) {
fn input<'a>(&mut self, d: impl Into<Datagram<&'a [u8]>>, received: Instant, now: Instant) {
let d = d.into();

// First determine the path.
let path = self.paths.find_path_with_rebinding(
d.destination(),
Expand All @@ -1540,15 +1548,15 @@ impl Connection {
now,
);
path.borrow_mut().add_received(d.len());
let res = self.input_path(&path, d, received);
let res = self.input_path(&path, &d, received);
self.capture_error(Some(path), now, 0, res).ok();
}

fn input_path(&mut self, path: &PathRef, d: &Datagram, now: Instant) -> Res<()> {
fn input_path(&mut self, path: &PathRef, d: &Datagram<&[u8]>, now: Instant) -> Res<()> {
let mut slc = &d[..];
let mut dcid = None;

qtrace!([self], "{} input {}", path.borrow(), hex(&**d));
qtrace!([self], "{} input {}", path.borrow(), hex(d));
let pto = path.borrow().rtt().pto(self.confirmed());

// Handle each packet in the datagram.
Expand Down Expand Up @@ -1958,7 +1966,13 @@ impl Connection {
Ok(())
}

fn handle_migration(&mut self, path: &PathRef, d: &Datagram, migrate: bool, now: Instant) {
fn handle_migration(
&mut self,
path: &PathRef,
d: &Datagram<&[u8]>,
migrate: bool,
now: Instant,
) {
if !migrate {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion neqo-transport/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ impl Path {
// with the current value.
let tos = self.tos();
self.ecn_info.on_packet_sent(stats);
Datagram::new(self.local, self.remote, tos, payload)
Datagram::new(self.local, self.remote, tos, payload.into())
}

/// Get local address as `SocketAddr`
Expand Down
2 changes: 1 addition & 1 deletion neqo-transport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl Server {
let mut callback = None;

for connection in &mut self.connections {
match connection.borrow_mut().process(None, now) {
match connection.borrow_mut().process_output(now) {
Output::None => {}
d @ Output::Datagram(_) => return d,
Output::Callback(next) => match callback {
Expand Down

0 comments on commit 9330563

Please sign in to comment.