From 5d831d9c64e0855e532c0f667e747e4bf7b90c5f Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Tue, 27 Aug 2024 09:05:29 -0500 Subject: [PATCH 1/5] Allow `req_perform_stream()` to perform non-blocking reads The default behaviour stays the same, but you can now specify `wait_for` in order to switch to a non-blocking request and call the callback as the data arrives. Fixes #519 --- NEWS.md | 3 +++ R/req-perform-stream.R | 25 +++++++++++++++++++++-- man/req_perform_stream.Rd | 6 ++++++ tests/testthat/test-req-perform-stream.R | 26 ++++++++++++++++++++++++ 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/NEWS.md b/NEWS.md index f53b2c60..53460561 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,8 @@ # httr2 (development version) +* `req_perform_stream()` can now stream data as it arrives rather than + waiting for the buffer to fill (#519). + # httr2 1.0.3 * `jwt_encode_hmac()` now calls correct underlying function diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 22bd2e76..a3ebf24b 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -15,6 +15,11 @@ #' worth of data to process. It must return `TRUE` to continue streaming. #' @param timeout_sec Number of seconds to process stream for. #' @param buffer_kb Buffer size, in kilobytes. +#' @param wait_for Number of seconds to wait for more data before calling +#' `callback`. If this is `Inf` (the default), `callback` will be called +#' whenever the buffer is full. If this is `0`, `callback` will be +#' called whenever there is any data. For other values, `callback` will be +#' called whenever the buffer is full or `wait_for` seconds have passed. #' @param round How should the raw vector sent to `callback` be rounded? #' Choose `"byte"`, `"line"`, or supply your own function that takes a #' raw vector of `bytes` and returns the locations of possible cut points @@ -36,6 +41,7 @@ req_perform_stream <- function(req, callback, timeout_sec = Inf, buffer_kb = 64, + wait_for = Inf, round = c("byte", "line")) { check_request(req) @@ -43,12 +49,14 @@ req_perform_stream <- function(req, check_function(callback) check_number_decimal(timeout_sec, min = 0) check_number_decimal(buffer_kb, min = 0) + check_number_decimal(wait_for, min = 0) cut_points <- as_round_function(round) stop_time <- Sys.time() + timeout_sec stream <- curl::curl(req$url, handle = handle) - open(stream, "rbf") + is_blocking <- is.infinite(wait_for) + open(stream, "rbf", blocking = is_blocking) withr::defer(close(stream)) res <- curl::handle_data(handle) @@ -74,7 +82,20 @@ req_perform_stream <- function(req, buf <- raw() while (continue && isIncomplete(stream) && Sys.time() < stop_time) { - buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024)) + if (is_blocking) { + buf <- c(buf, readBin(stream, raw(), buffer_kb * 1024)) + } else { + stop <- Sys.time() + wait_for + repeat({ + buf <- c(buf, readBin(stream, raw(), buffer_kb)) + if (length(buf) > buffer_kb * 1024) { + break + } + if (Sys.time() > stop) { + break + } + }) + } if (length(buf) > 0) { cut <- cut_points(buf) diff --git a/man/req_perform_stream.Rd b/man/req_perform_stream.Rd index 9f6b5619..c59b7be8 100644 --- a/man/req_perform_stream.Rd +++ b/man/req_perform_stream.Rd @@ -10,6 +10,7 @@ req_perform_stream( callback, timeout_sec = Inf, buffer_kb = 64, + wait_for = Inf, round = c("byte", "line") ) } @@ -24,6 +25,11 @@ worth of data to process. It must return \code{TRUE} to continue streaming.} \item{buffer_kb}{Buffer size, in kilobytes.} +\item{wait_for}{Number of seconds to wait for more data before calling +\code{callback}. If this is \code{Inf} (the default), \code{callback} will be called +whenever there is the buffer is full. If this is \code{0}, \code{callback} will be +called whenever there is any data.} + \item{round}{How should the raw vector sent to \code{callback} be rounded? Choose \code{"byte"}, \code{"line"}, or supply your own function that takes a raw vector of \code{bytes} and returns the locations of possible cut points diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 082961f4..86fcfa99 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -59,6 +59,32 @@ test_that("can buffer to lines", { expect_equal(valid_json, rep(TRUE, 10)) }) +test_that("can stream data as it arrives", { + bytes <- integer() + accumulate_bytes <- function(x) { + bytes <<- c(bytes, length(x)) + TRUE + } + + resp <- request_test("/stream-bytes/1024?chunk-size=64") |> + req_perform_stream(accumulate_bytes, wait_for = 0) + expect_equal(sum(bytes), 1024) + expect_equal(length(bytes), 1024 / 64) +}) + +test_that("can accumulate bytes up to a certain time", { + bytes <- integer() + accumulate_bytes <- function(x) { + bytes <<- c(bytes, length(x)) + TRUE + } + + resp <- request_test("/drip") |> + req_url_query(duration = 1, numbytes = 2) |> + req_perform_stream(accumulate_bytes, wait_for = 0.5) + expect_equal(bytes, c(1, 1)) +}) + test_that("can supply custom rounding", { out <- list() accumulate <- function(x) { From ae7d95b83f9b8c264040097561a9977e5af32a88 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Tue, 27 Aug 2024 09:26:30 -0500 Subject: [PATCH 2/5] Fix some mistakes --- R/req-perform-stream.R | 2 +- tests/testthat/test-req-perform-stream.R | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index a3ebf24b..846ee5a3 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -87,7 +87,7 @@ req_perform_stream <- function(req, } else { stop <- Sys.time() + wait_for repeat({ - buf <- c(buf, readBin(stream, raw(), buffer_kb)) + buf <- c(buf, readBin(stream, raw(), (buffer_kb * 1024) - length(buf))) if (length(buf) > buffer_kb * 1024) { break } diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 86fcfa99..28f9ff3b 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -66,10 +66,15 @@ test_that("can stream data as it arrives", { TRUE } - resp <- request_test("/stream-bytes/1024?chunk-size=64") |> + resp <- request_test("/stream-bytes/102400") |> + req_url_query(chunk_size = 1024) |> req_perform_stream(accumulate_bytes, wait_for = 0) - expect_equal(sum(bytes), 1024) - expect_equal(length(bytes), 1024 / 64) + expect_equal(sum(bytes), 102400) + # I'm not sure why this is so much smaller than 100, but I suspect it + # because the endpoint is pouring out bytes as fast as it can, and since + # the code in the callback takes a non-trivial amount of time, more data + # ends up in the curl buffer + expect_gte(length(bytes), 5) }) test_that("can accumulate bytes up to a certain time", { From b75c1fc3ecdfb776db9d5db438f96d93dd035aa5 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Tue, 27 Aug 2024 09:28:32 -0500 Subject: [PATCH 3/5] Add rounding for SSE --- R/req-perform-stream.R | 12 ++++++++---- man/req_perform_stream.Rd | 11 ++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 846ee5a3..78599906 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -21,9 +21,9 @@ #' called whenever there is any data. For other values, `callback` will be #' called whenever the buffer is full or `wait_for` seconds have passed. #' @param round How should the raw vector sent to `callback` be rounded? -#' Choose `"byte"`, `"line"`, or supply your own function that takes a -#' raw vector of `bytes` and returns the locations of possible cut points -#' (or `integer()` if there are none). +#' Choose `"byte"`, `"line"`, `"sse"` (for server-sent events) or supply +#' your own function that takes a raw vector of `bytes` and returns the +#' locations of possible cut points (or `integer()` if there are none). #' @returns An HTTP [response]. The body will be empty if the request was #' successful (since the `callback` function will have handled it). The body #' will contain the HTTP response body if the request was unsuccessful. @@ -126,7 +126,11 @@ as_round_function <- function(round = c("byte", "line"), round <- arg_match(round, error_call = error_call) switch(round, byte = function(bytes) length(bytes), - line = function(bytes) which(bytes == charToRaw("\n")) + line = function(bytes) which(bytes == charToRaw("\n")), + sse = function(bytes) { + is_newline <- which(bytes == charToRaw("\n")) + intersect(is_newline, is_newline + 1) + } ) } else { cli::cli_abort( diff --git a/man/req_perform_stream.Rd b/man/req_perform_stream.Rd index c59b7be8..603e8532 100644 --- a/man/req_perform_stream.Rd +++ b/man/req_perform_stream.Rd @@ -27,13 +27,14 @@ worth of data to process. It must return \code{TRUE} to continue streaming.} \item{wait_for}{Number of seconds to wait for more data before calling \code{callback}. If this is \code{Inf} (the default), \code{callback} will be called -whenever there is the buffer is full. If this is \code{0}, \code{callback} will be -called whenever there is any data.} +whenever the buffer is full. If this is \code{0}, \code{callback} will be +called whenever there is any data. For other values, \code{callback} will be +called whenever the buffer is full or \code{wait_for} seconds have passed.} \item{round}{How should the raw vector sent to \code{callback} be rounded? -Choose \code{"byte"}, \code{"line"}, or supply your own function that takes a -raw vector of \code{bytes} and returns the locations of possible cut points -(or \code{integer()} if there are none).} +Choose \code{"byte"}, \code{"line"}, \code{"sse"} (for server-sent events) or supply +your own function that takes a raw vector of \code{bytes} and returns the +locations of possible cut points (or \code{integer()} if there are none).} } \value{ An HTTP \link{response}. The body will be empty if the request was From b4a1ef834b23a9d23e6d0d7c095347033461020b Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Tue, 27 Aug 2024 09:51:48 -0500 Subject: [PATCH 4/5] Use magrittr pipe; update docs --- R/req-perform-stream.R | 4 ++-- man/req_perform_stream.Rd | 2 +- tests/testthat/test-req-perform-stream.R | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 78599906..dca9ac4d 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -42,7 +42,7 @@ req_perform_stream <- function(req, timeout_sec = Inf, buffer_kb = 64, wait_for = Inf, - round = c("byte", "line")) { + round = c("byte", "line", "sse")) { check_request(req) handle <- req_handle(req) @@ -117,7 +117,7 @@ req_perform_stream <- function(req, resp } -as_round_function <- function(round = c("byte", "line"), +as_round_function <- function(round = c("byte", "line", "sse"), error_call = caller_env()) { if (is.function(round)) { check_function2(round, args = "bytes") diff --git a/man/req_perform_stream.Rd b/man/req_perform_stream.Rd index 603e8532..7d176659 100644 --- a/man/req_perform_stream.Rd +++ b/man/req_perform_stream.Rd @@ -11,7 +11,7 @@ req_perform_stream( timeout_sec = Inf, buffer_kb = 64, wait_for = Inf, - round = c("byte", "line") + round = c("byte", "line", "sse") ) } \arguments{ diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 28f9ff3b..a25746d8 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -66,8 +66,8 @@ test_that("can stream data as it arrives", { TRUE } - resp <- request_test("/stream-bytes/102400") |> - req_url_query(chunk_size = 1024) |> + resp <- request_test("/stream-bytes/102400") %>% + req_url_query(chunk_size = 1024) %>% req_perform_stream(accumulate_bytes, wait_for = 0) expect_equal(sum(bytes), 102400) # I'm not sure why this is so much smaller than 100, but I suspect it @@ -84,8 +84,8 @@ test_that("can accumulate bytes up to a certain time", { TRUE } - resp <- request_test("/drip") |> - req_url_query(duration = 1, numbytes = 2) |> + resp <- request_test("/drip") %>% + req_url_query(duration = 1, numbytes = 2) %>% req_perform_stream(accumulate_bytes, wait_for = 0.5) expect_equal(bytes, c(1, 1)) }) From 7ac614ea92ca6d0ab92d38c2a96eb7164bc26630 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Tue, 27 Aug 2024 09:55:58 -0500 Subject: [PATCH 5/5] Update snapshot test --- tests/testthat/_snaps/req-perform-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testthat/_snaps/req-perform-stream.md b/tests/testthat/_snaps/req-perform-stream.md index 4bd75193..d50d3b18 100644 --- a/tests/testthat/_snaps/req-perform-stream.md +++ b/tests/testthat/_snaps/req-perform-stream.md @@ -41,7 +41,7 @@ as_round_function("bytes") Condition Error: - ! `round` must be one of "byte" or "line", not "bytes". + ! `round` must be one of "byte", "line", or "sse", not "bytes". i Did you mean "byte"? Code as_round_function(function(x) 1)