Skip to content

Commit

Permalink
Add support for min-timeout waits
Browse files Browse the repository at this point in the history
This adds io_uring_submit_and_wait_min_timeout(), which is like
io_uring_submit_and_wait_timeout() except it adds support for a minimum
batch timeout as well.

Also adds a test case.

Commit message to be expanded...

Signed-off-by: Jens Axboe <[email protected]>
  • Loading branch information
axboe committed Jan 9, 2024
1 parent 8043c63 commit 877acb9
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 6 deletions.
6 changes: 6 additions & 0 deletions src/include/liburing.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ int io_uring_submit_and_wait_timeout(struct io_uring *ring,
unsigned wait_nr,
struct __kernel_timespec *ts,
sigset_t *sigmask);
int io_uring_submit_and_wait_min_timeout(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr,
unsigned wait_nr,
struct __kernel_timespec *ts,
unsigned min_wait,
sigset_t *sigmask);

int io_uring_register_buffers(struct io_uring *ring, const struct iovec *iovecs,
unsigned nr_iovecs);
Expand Down
2 changes: 1 addition & 1 deletion src/include/liburing/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ enum {
struct io_uring_getevents_arg {
__u64 sigmask;
__u32 sigmask_sz;
__u32 pad;
__u32 min_wait_usec;
__u64 ts;
};

Expand Down
5 changes: 5 additions & 0 deletions src/liburing.map
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,8 @@ LIBURING_2.5 {
global:
io_uring_queue_init_mem;
} LIBURING_2.4;

LIBURING_2.6 {
global:
io_uring_submit_and_wait_min_timeout;
} LIBURING_2.5;
31 changes: 26 additions & 5 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,10 @@ int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
}

int io_uring_submit_and_wait_timeout(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr,
unsigned wait_nr,
struct __kernel_timespec *ts,
sigset_t *sigmask)
static int __io_uring_submit_and_wait_timeout(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr, unsigned wait_nr,
struct __kernel_timespec *ts,
unsigned int min_wait, sigset_t *sigmask)
{
int to_submit;

Expand All @@ -327,6 +326,7 @@ int io_uring_submit_and_wait_timeout(struct io_uring *ring,
struct io_uring_getevents_arg arg = {
.sigmask = (unsigned long) sigmask,
.sigmask_sz = _NSIG / 8,
.min_wait_usec = min_wait,
.ts = (unsigned long) ts
};
struct get_data data = {
Expand All @@ -349,6 +349,27 @@ int io_uring_submit_and_wait_timeout(struct io_uring *ring,
return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
}

int io_uring_submit_and_wait_min_timeout(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr,
unsigned wait_nr,
struct __kernel_timespec *ts,
unsigned min_wait,
sigset_t *sigmask)
{
return __io_uring_submit_and_wait_timeout(ring, cqe_ptr, wait_nr, ts,
min_wait, sigmask);
}

int io_uring_submit_and_wait_timeout(struct io_uring *ring,
struct io_uring_cqe **cqe_ptr,
unsigned wait_nr,
struct __kernel_timespec *ts,
sigset_t *sigmask)
{
return __io_uring_submit_and_wait_timeout(ring, cqe_ptr, wait_nr, ts, 0,
sigmask);
}

/*
* See io_uring_wait_cqes() - this function is the same, it just always uses
* '1' as the wait_nr.
Expand Down
1 change: 1 addition & 0 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ test_srcs := \
link_drain.c \
link-timeout.c \
madvise.c \
min-timeout.c \
mkdir.c \
msg-ring.c \
msg-ring-flags.c \
Expand Down
223 changes: 223 additions & 0 deletions test/min-timeout.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/* SPDX-License-Identifier: MIT */
/*
* Description: test min timeout handling
*
*/
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <sys/resource.h>
#include <pthread.h>

#include "liburing.h"
#include "helpers.h"

#define NPIPES 8
#define NWRITES 6

#define WAIT_USEC (250000)

struct d {
int fd[NPIPES];
long delay;
};

static void *thread_fn(void *data)
{
struct d *d = data;
char buf[32];
int i;

memset(buf, 0x55, sizeof(buf));

for (i = 0; i < NWRITES; i++) {
int ret;

usleep(d->delay);
ret = write(d->fd[i], buf, sizeof(buf));
if (ret != sizeof(buf)) {
fprintf(stderr, "bad write %d\n", ret);
break;
}
}
return NULL;
}

/*
* Allow 25% tolerance
*/
static int within_range(unsigned int target, unsigned int msec)
{
unsigned int high, low;

low = (target * 3) / 4;
high = (target * 5) / 4;
return (msec >= low && msec <= high);
}

static unsigned long long mtime_since(const struct timeval *s,
const struct timeval *e)
{
long long sec, usec;

sec = e->tv_sec - s->tv_sec;
usec = (e->tv_usec - s->tv_usec);
if (sec > 0 && usec < 0) {
sec--;
usec += 1000000;
}

sec *= 1000;
usec /= 1000;
return sec + usec;
}

static unsigned long long mtime_since_now(struct timeval *tv)
{
struct timeval end;

gettimeofday(&end, NULL);
return mtime_since(tv, &end);
}

static int test(int flags, int expected_ctx, int min_wait, int write_delay,
int nr_cqes, int msec_target)
{
struct io_uring_cqe *cqe;
struct io_uring_sqe *sqe;
struct io_uring ring;
struct __kernel_timespec ts;
struct rusage s, e;
pthread_t thread;
struct d d;
int ret, fds[NPIPES][2], i;
struct timeval start_time;
char buf[32];
void *tret;
long ttime;

ret = io_uring_queue_init(NPIPES, &ring, flags);
if (ret == -EINVAL)
return T_EXIT_SKIP;

for (i = 0; i < NPIPES; i++) {
if (pipe(fds[i]) < 0) {
perror("pipe");
return 1;
}
d.fd[i] = fds[i][1];
}

d.delay = write_delay;
pthread_create(&thread, NULL, thread_fn, &d);

for (i = 0; i < NPIPES; i++) {
sqe = io_uring_get_sqe(&ring);
io_uring_prep_read(sqe, fds[i][0], buf, sizeof(buf), 0);
}

ts.tv_sec = 0;
ts.tv_nsec = WAIT_USEC * 1000LL;

gettimeofday(&start_time, NULL);
getrusage(RUSAGE_THREAD, &s);
ret = io_uring_submit_and_wait_min_timeout(&ring, &cqe, 8, &ts, min_wait, NULL);
if (ret != NPIPES)
fprintf(stderr, "submit_and_wait=%d\n", ret);

getrusage(RUSAGE_THREAD, &e);
e.ru_nvcsw -= s.ru_nvcsw;
ttime = mtime_since_now(&start_time);
if (!within_range(msec_target, ttime)) {
fprintf(stderr, "Expected %d msec, got %ld msec\n", msec_target,
ttime);
fprintf(stderr, "flags=%x, min_wait=%d, write_delay=%d\n",
flags, min_wait, write_delay);
}
/* will usually be accurate, but allow for offset of 1 */
if (e.ru_nvcsw != expected_ctx &&
(e.ru_nvcsw - expected_ctx > 1))
fprintf(stderr, "%ld ctx switches, expected %d\n", e.ru_nvcsw,
expected_ctx);

for (i = 0; i < NPIPES; i++) {
ret = io_uring_peek_cqe(&ring, &cqe);
if (ret)
break;
io_uring_cqe_seen(&ring, cqe);
}

if (i != nr_cqes)
fprintf(stderr, "Got %d CQEs, expected %d\n", i, nr_cqes);

pthread_join(thread, &tret);

for (i = 0; i < NPIPES; i++) {
close(fds[i][0]);
close(fds[i][1]);
}

return T_EXIT_PASS;
}

int main(int argc, char *argv[])
{
int ret;

if (argc > 1)
return T_EXIT_SKIP;

ret = test(0, NWRITES + 1, 0, 2000, NWRITES, WAIT_USEC / 1000);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

ret = test(0, NWRITES + 1, 50000, 2000, NWRITES, 50);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

ret = test(0, NWRITES + 1, 500000, 2000, NWRITES, 500);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

/* no writes within min timeout, but it's given. expect 1 cqe */
ret = test(0, 1, 10000, 20000, 1, 20);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

/* same as above, but no min timeout. should time out and we get 6 */
ret = test(0, NWRITES + 1, 0, 20000, NWRITES, WAIT_USEC / 1000);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

ret = test(IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN, 1,
0, 2000, NWRITES, WAIT_USEC / 1000);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

ret = test(IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN, 1,
50000, 2000, NWRITES, 50);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

ret = test(IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN, 1,
500000, 2000, NWRITES, 500);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

/* no writes within min timeout, but it's given. expect 1 cqe */
ret = test(IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN, 1,
10000, 20000, 1, 20);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

/* same as above, but no min timeout. should time out and we get 6 */
ret = test(IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN, 1,
0, 20000, NWRITES, WAIT_USEC / 1000);
if (ret == T_EXIT_FAIL)
return T_EXIT_FAIL;

return ret;
}

0 comments on commit 877acb9

Please sign in to comment.