Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow req_perform_stream() to perform non-blocking reads #520

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# httr2 (development version)

* `req_perform_stream()` can now stream data as it arrives rather than
waiting for the buffer to fill (#519).

# httr2 1.0.3

* `jwt_encode_hmac()` now calls correct underlying function
Expand Down
41 changes: 33 additions & 8 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
#' worth of data to process. It must return `TRUE` to continue streaming.
#' @param timeout_sec Number of seconds to process stream for.
#' @param buffer_kb Buffer size, in kilobytes.
#' @param wait_for Number of seconds to wait for more data before calling
#' `callback`. If this is `Inf` (the default), `callback` will be called
#' whenever the buffer is full. If this is `0`, `callback` will be
#' called whenever there is any data. For other values, `callback` will be
#' called whenever the buffer is full or `wait_for` seconds have passed.
#' @param round How should the raw vector sent to `callback` be rounded?
#' Choose `"byte"`, `"line"`, or supply your own function that takes a
#' raw vector of `bytes` and returns the locations of possible cut points
#' (or `integer()` if there are none).
#' Choose `"byte"`, `"line"`, `"sse"` (for server-sent events) or supply
#' your own function that takes a raw vector of `bytes` and returns the
#' locations of possible cut points (or `integer()` if there are none).
#' @returns An HTTP [response]. The body will be empty if the request was
#' successful (since the `callback` function will have handled it). The body
#' will contain the HTTP response body if the request was unsuccessful.
Expand All @@ -36,19 +41,22 @@
callback,
timeout_sec = Inf,
buffer_kb = 64,
round = c("byte", "line")) {
wait_for = Inf,
round = c("byte", "line", "sse")) {
check_request(req)

handle <- req_handle(req)
check_function(callback)
check_number_decimal(timeout_sec, min = 0)
check_number_decimal(buffer_kb, min = 0)
check_number_decimal(wait_for, min = 0)

Check warning on line 52 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L52

Added line #L52 was not covered by tests
cut_points <- as_round_function(round)

stop_time <- Sys.time() + timeout_sec

stream <- curl::curl(req$url, handle = handle)
open(stream, "rbf")
is_blocking <- is.infinite(wait_for)
open(stream, "rbf", blocking = is_blocking)

Check warning on line 59 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L58-L59

Added lines #L58 - L59 were not covered by tests
withr::defer(close(stream))

res <- curl::handle_data(handle)
Expand All @@ -74,7 +82,20 @@
buf <- raw()

while (continue && isIncomplete(stream) && Sys.time() < stop_time) {
buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024))
if (is_blocking) {
buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024))
} else {
stop <- Sys.time() + wait_for
repeat({
buf <- c(buf, readBin(stream, raw(), (buffer_kb * 1024) - length(buf)))
if (length(buf) > buffer_kb * 1024) {
break
}
if (Sys.time() > stop) {
break
}
})
}

Check warning on line 98 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L85-L98

Added lines #L85 - L98 were not covered by tests

if (length(buf) > 0) {
cut <- cut_points(buf)
Expand All @@ -96,7 +117,7 @@
resp
}

as_round_function <- function(round = c("byte", "line"),
as_round_function <- function(round = c("byte", "line", "sse"),
error_call = caller_env()) {
if (is.function(round)) {
check_function2(round, args = "bytes")
Expand All @@ -105,7 +126,11 @@
round <- arg_match(round, error_call = error_call)
switch(round,
byte = function(bytes) length(bytes),
line = function(bytes) which(bytes == charToRaw("\n"))
line = function(bytes) which(bytes == charToRaw("\n")),
sse = function(bytes) {
is_newline <- which(bytes == charToRaw("\n"))
intersect(is_newline, is_newline + 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's clever

}

Check warning on line 133 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L129-L133

Added lines #L129 - L133 were not covered by tests
)
} else {
cli::cli_abort(
Expand Down
15 changes: 11 additions & 4 deletions man/req_perform_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tests/testthat/_snaps/req-perform-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
as_round_function("bytes")
Condition
Error:
! `round` must be one of "byte" or "line", not "bytes".
! `round` must be one of "byte", "line", or "sse", not "bytes".
i Did you mean "byte"?
Code
as_round_function(function(x) 1)
Expand Down
31 changes: 31 additions & 0 deletions tests/testthat/test-req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,37 @@ test_that("can buffer to lines", {
expect_equal(valid_json, rep(TRUE, 10))
})

test_that("can stream data as it arrives", {
bytes <- integer()
accumulate_bytes <- function(x) {
bytes <<- c(bytes, length(x))
TRUE
}

resp <- request_test("/stream-bytes/102400") %>%
req_url_query(chunk_size = 1024) %>%
req_perform_stream(accumulate_bytes, wait_for = 0)
expect_equal(sum(bytes), 102400)
# I'm not sure why this is so much smaller than 100, but I suspect it
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jcheng5 does this explanation make sense to you

# because the endpoint is pouring out bytes as fast as it can, and since
# the code in the callback takes a non-trivial amount of time, more data
# ends up in the curl buffer
expect_gte(length(bytes), 5)
})

test_that("can accumulate bytes up to a certain time", {
bytes <- integer()
accumulate_bytes <- function(x) {
bytes <<- c(bytes, length(x))
TRUE
}

resp <- request_test("/drip") %>%
req_url_query(duration = 1, numbytes = 2) %>%
req_perform_stream(accumulate_bytes, wait_for = 0.5)
expect_equal(bytes, c(1, 1))
})

test_that("can supply custom rounding", {
out <- list()
accumulate <- function(x) {
Expand Down
Loading