diff --git a/NEWS.md b/NEWS.md index f53b2c60..53460561 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,8 @@ # httr2 (development version) +* `req_perform_stream()` can now stream data as it arrives rather than + waiting for the buffer to fill (#519). + # httr2 1.0.3 * `jwt_encode_hmac()` now calls correct underlying function diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 22bd2e76..dca9ac4d 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -15,10 +15,15 @@ #' worth of data to process. It must return `TRUE` to continue streaming. #' @param timeout_sec Number of seconds to process stream for. #' @param buffer_kb Buffer size, in kilobytes. +#' @param wait_for Number of seconds to wait for more data before calling +#' `callback`. If this is `Inf` (the default), `callback` will be called +#' whenever the buffer is full. If this is `0`, `callback` will be +#' called whenever there is any data. For other values, `callback` will be +#' called whenever the buffer is full or `wait_for` seconds have passed. #' @param round How should the raw vector sent to `callback` be rounded? -#' Choose `"byte"`, `"line"`, or supply your own function that takes a -#' raw vector of `bytes` and returns the locations of possible cut points -#' (or `integer()` if there are none). +#' Choose `"byte"`, `"line"`, `"sse"` (for server-sent events) or supply +#' your own function that takes a raw vector of `bytes` and returns the +#' locations of possible cut points (or `integer()` if there are none). #' @returns An HTTP [response]. The body will be empty if the request was #' successful (since the `callback` function will have handled it). The body #' will contain the HTTP response body if the request was unsuccessful. @@ -36,19 +41,22 @@ req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64, - round = c("byte", "line")) { + wait_for = Inf, + round = c("byte", "line", "sse")) { check_request(req) handle <- req_handle(req) check_function(callback) check_number_decimal(timeout_sec, min = 0) check_number_decimal(buffer_kb, min = 0) + check_number_decimal(wait_for, min = 0) cut_points <- as_round_function(round) stop_time <- Sys.time() + timeout_sec stream <- curl::curl(req$url, handle = handle) - open(stream, "rbf") + is_blocking <- is.infinite(wait_for) + open(stream, "rbf", blocking = is_blocking) withr::defer(close(stream)) res <- curl::handle_data(handle) @@ -74,7 +82,20 @@ req_perform_stream <- function(req, buf <- raw() while (continue && isIncomplete(stream) && Sys.time() < stop_time) { - buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024)) + if (is_blocking) { + buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024)) + } else { + stop <- Sys.time() + wait_for + repeat({ + buf <- c(buf, readBin(stream, raw(), (buffer_kb * 1024) - length(buf))) + if (length(buf) > buffer_kb * 1024) { + break + } + if (Sys.time() > stop) { + break + } + }) + } if (length(buf) > 0) { cut <- cut_points(buf) @@ -96,7 +117,7 @@ req_perform_stream <- function(req, resp } -as_round_function <- function(round = c("byte", "line"), +as_round_function <- function(round = c("byte", "line", "sse"), error_call = caller_env()) { if (is.function(round)) { check_function2(round, args = "bytes") @@ -105,7 +126,11 @@ as_round_function <- function(round = c("byte", "line"), round <- arg_match(round, error_call = error_call) switch(round, byte = function(bytes) length(bytes), - line = function(bytes) which(bytes == charToRaw("\n")) + line = function(bytes) which(bytes == charToRaw("\n")), + sse = function(bytes) { + is_newline <- which(bytes == charToRaw("\n")) + intersect(is_newline, is_newline + 1) + } ) } else { cli::cli_abort( diff --git a/man/req_perform_stream.Rd b/man/req_perform_stream.Rd index 9f6b5619..7d176659 100644 --- a/man/req_perform_stream.Rd +++ b/man/req_perform_stream.Rd @@ -10,7 +10,8 @@ req_perform_stream( callback, timeout_sec = Inf, buffer_kb = 64, - round = c("byte", "line") + wait_for = Inf, + round = c("byte", "line", "sse") ) } \arguments{ @@ -24,10 +25,16 @@ worth of data to process. It must return \code{TRUE} to continue streaming.} \item{buffer_kb}{Buffer size, in kilobytes.} +\item{wait_for}{Number of seconds to wait for more data before calling +\code{callback}. If this is \code{Inf} (the default), \code{callback} will be called +whenever the buffer is full. If this is \code{0}, \code{callback} will be +called whenever there is any data. For other values, \code{callback} will be +called whenever the buffer is full or \code{wait_for} seconds have passed.} + \item{round}{How should the raw vector sent to \code{callback} be rounded? -Choose \code{"byte"}, \code{"line"}, or supply your own function that takes a -raw vector of \code{bytes} and returns the locations of possible cut points -(or \code{integer()} if there are none).} +Choose \code{"byte"}, \code{"line"}, \code{"sse"} (for server-sent events) or supply +your own function that takes a raw vector of \code{bytes} and returns the +locations of possible cut points (or \code{integer()} if there are none).} } \value{ An HTTP \link{response}. The body will be empty if the request was diff --git a/tests/testthat/_snaps/req-perform-stream.md b/tests/testthat/_snaps/req-perform-stream.md index 4bd75193..d50d3b18 100644 --- a/tests/testthat/_snaps/req-perform-stream.md +++ b/tests/testthat/_snaps/req-perform-stream.md @@ -41,7 +41,7 @@ as_round_function("bytes") Condition Error: - ! `round` must be one of "byte" or "line", not "bytes". + ! `round` must be one of "byte", "line", or "sse", not "bytes". i Did you mean "byte"? Code as_round_function(function(x) 1) diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 082961f4..a25746d8 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -59,6 +59,37 @@ test_that("can buffer to lines", { expect_equal(valid_json, rep(TRUE, 10)) }) +test_that("can stream data as it arrives", { + bytes <- integer() + accumulate_bytes <- function(x) { + bytes <<- c(bytes, length(x)) + TRUE + } + + resp <- request_test("/stream-bytes/102400") %>% + req_url_query(chunk_size = 1024) %>% + req_perform_stream(accumulate_bytes, wait_for = 0) + expect_equal(sum(bytes), 102400) + # I'm not sure why this is so much smaller than 100, but I suspect it + # because the endpoint is pouring out bytes as fast as it can, and since + # the code in the callback takes a non-trivial amount of time, more data + # ends up in the curl buffer + expect_gte(length(bytes), 5) +}) + +test_that("can accumulate bytes up to a certain time", { + bytes <- integer() + accumulate_bytes <- function(x) { + bytes <<- c(bytes, length(x)) + TRUE + } + + resp <- request_test("/drip") %>% + req_url_query(duration = 1, numbytes = 2) %>% + req_perform_stream(accumulate_bytes, wait_for = 0.5) + expect_equal(bytes, c(1, 1)) +}) + test_that("can supply custom rounding", { out <- list() accumulate <- function(x) {