-
-
Notifications
You must be signed in to change notification settings - Fork 389
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
bench(udp): run GSO, GRO and recvmmsg permutations
- Loading branch information
Showing
3 changed files
with
101 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,76 +1,124 @@ | ||
use criterion::{criterion_group, criterion_main, Criterion}; | ||
use quinn_udp::{RecvMeta, Transmit, UdpSocketState}; | ||
use std::cmp::min; | ||
use std::net::{Ipv4Addr, Ipv6Addr}; | ||
use std::{io::IoSliceMut, net::UdpSocket, slice}; | ||
use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE}; | ||
use std::{ | ||
cmp::min, | ||
io::{ErrorKind, IoSliceMut}, | ||
net::{Ipv4Addr, Ipv6Addr, UdpSocket}, | ||
}; | ||
use tokio::io::Interest; | ||
use tokio::runtime::Runtime; | ||
|
||
const MAX_IP_UDP_HEADER_SIZE: usize = 48; | ||
const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE; | ||
|
||
pub fn criterion_benchmark(c: &mut Criterion) { | ||
const TOTAL_BYTES: usize = 10 * 1024 * 1024; | ||
// Maximum GSO buffer size is 64k. | ||
const MAX_BUFFER_SIZE: usize = u16::MAX as usize; | ||
const SEGMENT_SIZE: usize = 1280; | ||
|
||
let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) | ||
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) | ||
.unwrap(); | ||
let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) | ||
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) | ||
.unwrap(); | ||
let max_segments = min( | ||
UdpSocketState::new((&send).into()) | ||
.unwrap() | ||
.max_gso_segments(), | ||
MAX_BUFFER_SIZE / SEGMENT_SIZE, | ||
); | ||
let dst_addr = recv.local_addr().unwrap(); | ||
let send_state = UdpSocketState::new((&send).into()).unwrap(); | ||
let recv_state = UdpSocketState::new((&recv).into()).unwrap(); | ||
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy | ||
recv.set_nonblocking(false).unwrap(); | ||
let mut rt = Runtime::new().unwrap(); | ||
let (send_socket, send_state) = new_socket(&mut rt); | ||
let (recv_socket, recv_state) = new_socket(&mut rt); | ||
let dst_addr = recv_socket.local_addr().unwrap(); | ||
|
||
let mut receive_buffer = vec![0; MAX_BUFFER_SIZE]; | ||
let mut meta = RecvMeta::default(); | ||
let mut permutations = vec![]; | ||
for gso_enabled in [ | ||
false, | ||
#[cfg(any(target_os = "linux", target_os = "windows"))] | ||
true, | ||
] { | ||
for gro_enabled in [false, true] { | ||
#[cfg(target_os = "windows")] | ||
if gso_enabled && !gro_enabled { | ||
// Windows requires receive buffer to fit entire datagram on GRO | ||
// enabled socket. | ||
// | ||
// OS error: "A message sent on a datagram socket was larger | ||
// than the internal message buffer or some other network limit, | ||
// or the buffer used to receive a datagram into was smaller | ||
// than the datagram itself." | ||
continue; | ||
} | ||
|
||
for gso_enabled in [false, true] { | ||
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled)); | ||
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); | ||
for recvmmsg_enabled in [false, true] { | ||
permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled)); | ||
} | ||
} | ||
} | ||
|
||
let segments = if gso_enabled { max_segments } else { 1 }; | ||
let msg = vec![0xAB; SEGMENT_SIZE * segments]; | ||
for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations { | ||
let mut group = c.benchmark_group(format!( | ||
"gso_{}_gro_{}_recvmmsg_{}", | ||
gso_enabled, gro_enabled, recvmmsg_enabled | ||
)); | ||
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); | ||
|
||
let gso_segments = if gso_enabled { | ||
send_state.max_gso_segments() | ||
} else { | ||
1 | ||
}; | ||
let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)]; | ||
let transmit = Transmit { | ||
destination: dst_addr, | ||
ecn: None, | ||
contents: &msg, | ||
segment_size: gso_enabled.then_some(SEGMENT_SIZE), | ||
src_ip: None, | ||
}; | ||
let gro_segments = if gro_enabled { | ||
recv_state.gro_segments() | ||
} else { | ||
1 | ||
}; | ||
let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 }; | ||
|
||
group.bench_function("throughput", |b| { | ||
b.iter(|| { | ||
b.to_async(Runtime::new().unwrap()).iter(|| async { | ||
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size]; | ||
let mut receive_slices = receive_buffers | ||
.iter_mut() | ||
.map(|buf| IoSliceMut::new(buf)) | ||
.collect::<Vec<_>>(); | ||
let mut meta = vec![RecvMeta::default(); batch_size]; | ||
|
||
let mut sent: usize = 0; | ||
let mut received: usize = 0; | ||
while sent < TOTAL_BYTES { | ||
send_state.send((&send).into(), &transmit).unwrap(); | ||
send_socket.writable().await.unwrap(); | ||
send_socket | ||
.try_io(Interest::WRITABLE, || { | ||
send_state.send((&send_socket).into(), &transmit) | ||
}) | ||
.unwrap(); | ||
sent += transmit.contents.len(); | ||
|
||
let mut received_segments = 0; | ||
while received_segments < segments { | ||
let n = recv_state | ||
.recv( | ||
(&recv).into(), | ||
&mut [IoSliceMut::new(&mut receive_buffer)], | ||
slice::from_mut(&mut meta), | ||
) | ||
.unwrap(); | ||
assert_eq!(n, 1); | ||
received_segments += meta.len / meta.stride; | ||
while received < sent { | ||
recv_socket.readable().await.unwrap(); | ||
let n = match recv_socket.try_io(Interest::READABLE, || { | ||
recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta) | ||
}) { | ||
Ok(n) => n, | ||
// recv.readable() can lead to false positives. Try again. | ||
Err(e) if e.kind() == ErrorKind::WouldBlock => continue, | ||
e => e.unwrap(), | ||
}; | ||
received += meta.iter().map(|m| m.len).take(n).sum::<usize>(); | ||
} | ||
assert_eq!(received_segments, segments); | ||
} | ||
}) | ||
}); | ||
} | ||
} | ||
|
||
fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) { | ||
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) | ||
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) | ||
.unwrap(); | ||
|
||
let state = UdpSocketState::new((&socket).into()).unwrap(); | ||
let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() }); | ||
(socket, state) | ||
} | ||
|
||
criterion_group!(benches, criterion_benchmark); | ||
criterion_main!(benches); |