From 2a4c969479472e645b4922651da683e57ca28b04 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 11:29:21 -0700 Subject: [PATCH 01/27] Add req_perform_open, which makes resp$body the underlying stream --- NAMESPACE | 1 + R/req-perform-stream.R | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/NAMESPACE b/NAMESPACE index 4531d624..b7fbef62 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -73,6 +73,7 @@ export(req_oauth_refresh) export(req_options) export(req_perform) export(req_perform_iterative) +export(req_perform_open) export(req_perform_parallel) export(req_perform_promise) export(req_perform_sequential) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 22bd2e76..f965ca94 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -96,6 +96,41 @@ req_perform_stream <- function(req, resp } + +#' @export +req_perform_open <- function(req, + buffer_kb = 64) { + + check_request(req) + + handle <- req_handle(req) + check_number_decimal(buffer_kb, min = 0) + + stream <- curl::curl(req$url, handle = handle) + open(stream, "rbf", blocking = FALSE) + + res <- curl::handle_data(handle) + the$last_request <- req + + # Return early if there's a problem + resp <- new_response( + method = req_method_get(req), + url = res$url, + status_code = res$status_code, + headers = as_headers(res$headers), + body = NULL, + request = req + ) + the$last_repsonse <- resp + if (error_is_error(req, resp)) { + resp$body <- read_con(stream) + handle_resp(req, resp) + } else { + resp$body <- stream + resp + } +} + as_round_function <- function(round = c("byte", "line"), error_call = caller_env()) { if (is.function(round)) { From 6c82cef787af3d6e1c26a3cf365ab0e288c513d4 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 11:39:59 -0700 Subject: [PATCH 02/27] Remove unneeded argument --- R/req-perform-stream.R | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index f965ca94..ba6bc71a 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -98,13 +98,11 @@ req_perform_stream <- function(req, #' @export -req_perform_open <- function(req, - buffer_kb = 64) { +req_perform_open <- function(req) { check_request(req) handle <- req_handle(req) - check_number_decimal(buffer_kb, min = 0) stream <- curl::curl(req$url, handle = handle) open(stream, "rbf", blocking = FALSE) From 5755169b3b5dc8e3a55b5dd631c94e4c7ce71db0 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 11:42:57 -0700 Subject: [PATCH 03/27] You might want blocking blocking = TRUE greatly simplifies cases where you don't have better things to do while you're waiting (i.e. most R use cases that are not Shiny or plumber?) --- R/req-perform-stream.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index ba6bc71a..07626fd5 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -98,14 +98,14 @@ req_perform_stream <- function(req, #' @export -req_perform_open <- function(req) { +req_perform_open <- function(req, blocking = TRUE) { check_request(req) handle <- req_handle(req) stream <- curl::curl(req$url, handle = handle) - open(stream, "rbf", blocking = FALSE) + open(stream, "rbf", blocking = blocking) res <- curl::handle_data(handle) the$last_request <- req From bfc60f83e26615fb162da2ad5b20cd3416e8c4e3 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 15:02:13 -0700 Subject: [PATCH 04/27] Rename function Co-authored-by: Hadley Wickham --- R/req-perform-stream.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 07626fd5..6ca9c5a3 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -98,7 +98,7 @@ req_perform_stream <- function(req, #' @export -req_perform_open <- function(req, blocking = TRUE) { +req_perform_connection <- function(req, blocking = TRUE) { check_request(req) From 2b4fa0c2b53acf8b23df83fbd65388f8fcf5b923 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 16:08:55 -0700 Subject: [PATCH 05/27] Roxygenize --- NAMESPACE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NAMESPACE b/NAMESPACE index b7fbef62..33da7956 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -72,8 +72,8 @@ export(req_oauth_password) export(req_oauth_refresh) export(req_options) export(req_perform) +export(req_perform_connection) export(req_perform_iterative) -export(req_perform_open) export(req_perform_parallel) export(req_perform_promise) export(req_perform_sequential) From 5de36ac6f2540f182a16668acbb284590941d4ba Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 17:33:17 -0700 Subject: [PATCH 06/27] Add SSE parsing logic --- R/req-perform-stream.R | 45 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 6ca9c5a3..5882d0b3 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -129,6 +129,51 @@ req_perform_connection <- function(req, blocking = TRUE) { } } +# TODO: max_size +read_sse <- function(conn) { + lines <- character(0) + while (TRUE) { + line <- readLines(conn, n = 1) + if (length(line) == 0) { + break + } + if (line == "") { + # \n\n detected, end of event + return(parse_event(lines)) + } + lines <- c(lines, line) + } + + if (length(lines) > 0) { + # We have a partial event, put it back while we wait + # for more + pushBack(lines, conn) + } + + return(NULL) +} + +parse_event <- function(lines) { + m <- regexec("([^:]*)(: ?)?(.*)", lines) + matches <- regmatches(lines, m) + keys <- c("event", vapply(matches, \(x) x[2], character(1))) + values <- c("message", vapply(matches, \(x) x[4], character(1))) + + remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data" + keys <- keys[!remove_dupes] + values <- values[!remove_dupes] + + event_type <- values[keys == "event"] + data <- values[keys == "data"] + id <- values[keys == "id"] + + list( + type = event_type, + data = data, + id = id + ) +} + as_round_function <- function(round = c("byte", "line"), error_call = caller_env()) { if (is.function(round)) { From 2d3b7819b920860cc9e9a4c964425d171e6b3efe Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Tue, 27 Aug 2024 20:50:55 -0700 Subject: [PATCH 07/27] Export read_sse --- NAMESPACE | 1 + R/req-perform-stream.R | 1 + 2 files changed, 2 insertions(+) diff --git a/NAMESPACE b/NAMESPACE index 33da7956..24536403 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -49,6 +49,7 @@ export(oauth_token) export(oauth_token_cached) export(obfuscate) export(obfuscated) +export(read_sse) export(req_auth_basic) export(req_auth_bearer_token) export(req_body_file) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 5882d0b3..f87d1b9c 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -130,6 +130,7 @@ req_perform_connection <- function(req, blocking = TRUE) { } # TODO: max_size +#' @export read_sse <- function(conn) { lines <- character(0) while (TRUE) { From 3f87ea2b084fc1a673fc19eb2b6538a06173d2d6 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 08:27:42 -0500 Subject: [PATCH 08/27] Docs; fill in missing body type pieces --- NAMESPACE | 2 +- R/req-cache.R | 17 +++++++----- R/req-perform-stream.R | 50 ++++++++++++++++++++++++++++++----- R/resp-body.R | 24 ++++++++++++----- R/resp.R | 8 +++--- man/req_perform_connection.Rd | 27 +++++++++++++++++++ 6 files changed, 106 insertions(+), 22 deletions(-) create mode 100644 man/req_perform_connection.Rd diff --git a/NAMESPACE b/NAMESPACE index 24536403..69348b24 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -49,7 +49,6 @@ export(oauth_token) export(oauth_token_cached) export(obfuscate) export(obfuscated) -export(read_sse) export(req_auth_basic) export(req_auth_bearer_token) export(req_body_file) @@ -113,6 +112,7 @@ export(resp_raw) export(resp_retry_after) export(resp_status) export(resp_status_desc) +export(resp_stream_sse) export(resp_url) export(resp_url_path) export(resp_url_queries) diff --git a/R/req-cache.R b/R/req-cache.R index 904b14ee..93015c59 100644 --- a/R/req-cache.R +++ b/R/req-cache.R @@ -114,7 +114,7 @@ cache_get <- function(req) { } cache_set <- function(req, resp) { - if (is_path(resp$body)) { + if (resp_body_type(resp) == "disk") { body_path <- req_cache_path(req, ".body") file.copy(resp$body, body_path, overwrite = TRUE) resp$body <- new_path(body_path) @@ -246,11 +246,12 @@ cache_body <- function(cached_resp, path = NULL) { return(body) } - if (is_path(body)) { - file.copy(body, path, overwrite = TRUE) - } else { - writeBin(body, path) - } + switch(resp_body_type(cached_resp), + disk = file.copy(body, path, overwrite = TRUE), + memory = writeBin(body, path), + stream = cli::cli_abort("Invalid body type", .internal = TRUE) + ) + new_path(path) } @@ -271,6 +272,10 @@ resp_is_cacheable <- function(resp, control = NULL) { return(FALSE) } + if (resp_body_type(resp) == "stream") { + return(FALSE) + } + control <- control %||% resp_cache_control(resp) if ("no-store" %in% control$flags) { return(FALSE) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index f87d1b9c..84e9b9b4 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -96,10 +96,23 @@ req_perform_stream <- function(req, resp } - +#' Perform a request and return a streaming connection +#' +#' @description +#' Use `req_perform_connection()` to perform a request that includes a +#' connection as the body of the response, then use `resp_steam_sse()` and +#' friends to retrieve data a chunk at a time.. +#' +#' This is an alternative interface to [req_perform_stream()] that returns a +#' connection that you can pull from data, rather than callbacks that are called +#' as the data streams in. This is useful if you want to do other work in +#' between streaming inputs. +#' +#' @inheritParams req_perform_stream +#' @param blocking When retrieving data, should the connection block and wait +#' for the desired information or immediately return what it has? #' @export req_perform_connection <- function(req, blocking = TRUE) { - check_request(req) handle <- req_handle(req) @@ -110,7 +123,6 @@ req_perform_connection <- function(req, blocking = TRUE) { res <- curl::handle_data(handle) the$last_request <- req - # Return early if there's a problem resp <- new_response( method = req_method_get(req), url = res$url, @@ -120,18 +132,26 @@ req_perform_connection <- function(req, blocking = TRUE) { request = req ) the$last_repsonse <- resp + if (error_is_error(req, resp)) { + # Read full body if there's an error resp$body <- read_con(stream) - handle_resp(req, resp) } else { resp$body <- stream - resp } + handle_resp(req, resp) + + resp } # TODO: max_size + #' @export -read_sse <- function(conn) { +#' @rdname req_perform_connection +resp_stream_sse <- function(resp) { + check_streaming_response(resp) + conn <- resp$body + lines <- character(0) while (TRUE) { line <- readLines(conn, n = 1) @@ -154,6 +174,24 @@ read_sse <- function(conn) { return(NULL) } +check_streaming_response <- function(resp, + arg = caller_arg(resp), + call = caller_env()) { + + check_response(resp, arg = arg, call = call) + + if (!inherits(resp$body)) { + stop_input_type( + resp, + "a streaming HTTP response object", + allow_null = FALSE, + arg = arg, + call = call + ) + } +} + + parse_event <- function(lines) { m <- regexec("([^:]*)(: ?)?(.*)", lines) matches <- regmatches(lines, m) diff --git a/R/resp-body.R b/R/resp-body.R index 9ece8d4e..fd1b7eef 100644 --- a/R/resp-body.R +++ b/R/resp-body.R @@ -36,11 +36,13 @@ resp_body_raw <- function(resp) { if (!resp_has_body(resp)) { cli::cli_abort("Can't retrieve empty body.") - } else if (is_path(resp$body)) { - readBin(resp$body, "raw", file.size(resp$body)) - } else { - resp$body } + + switch(resp_body_type(resp), + disk = readBin(resp$body, "raw", file.size(resp$body)), + memory = resp$body, + stream = read_con(resp$body, "raw") + ) } #' @rdname resp_body_raw @@ -48,10 +50,20 @@ resp_body_raw <- function(resp) { resp_has_body <- function(resp) { check_response(resp) + switch(resp_body_type(resp), + disk = file.size(resp$body) > 0, + memory = length(resp$body) > 0, + stream = TRUE + ) +} + +resp_body_type <- function(resp) { if (is_path(resp$body)) { - file.size(resp$body) > 0 + "disk" + } else if (inherits(resp$body, "connection")) { + "stream" } else { - length(resp$body) > 0 + "memory" } } diff --git a/R/resp.R b/R/resp.R index dc191cfb..1f3a766b 100644 --- a/R/resp.R +++ b/R/resp.R @@ -113,10 +113,12 @@ print.httr2_response <- function(x, ...) { body <- x$body if (!resp_has_body(x)) { cli::cli_text("{.field Body}: None") - } else if (is_path(body)) { - cli::cli_text("{.field Body}: On disk {.path {body}} ({file.size(body)} bytes)") } else { - cli::cli_text("{.field Body}: In memory ({length(body)} bytes)") + switch(resp_body_type(x), + disk = cli::cli_text("{.field Body}: On disk {.path {body}} ({file.size(body)} bytes)"), + memory = cli::cli_text("{.field Body}: In memory ({length(body)} bytes)"), + stream = cli::cli_text("{.field Body}: Streaming connection") + ) } invisible(x) diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd new file mode 100644 index 00000000..cb19b9ab --- /dev/null +++ b/man/req_perform_connection.Rd @@ -0,0 +1,27 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/req-perform-stream.R +\name{req_perform_connection} +\alias{req_perform_connection} +\alias{resp_stream_sse} +\title{Perform a request and return a streaming connection} +\usage{ +req_perform_connection(req, blocking = TRUE) + +resp_stream_sse(resp) +} +\arguments{ +\item{req}{A \link{request}.} + +\item{blocking}{When retrieving data, should the connection block and wait +for the desired information or immediately return what it has?} +} +\description{ +Use \code{req_perform_connection()} to perform a request that includes a +connection as the body of the response, then use \code{resp_steam_sse()} and +friends to retrieve data a chunk at a time.. + +This is an alternative interface to \code{\link[=req_perform_stream]{req_perform_stream()}} that returns a +connection that you can pull from data, rather than callbacks that are called +as the data streams in. This is useful if you want to do other work in +between streaming inputs. +} From db4b951a82fc0d017fbbcc62672ae7394b5fa044 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 08:43:34 -0500 Subject: [PATCH 09/27] Use req_perform_connection in req_perform_stream --- R/req-perform-stream.R | 28 +++++------------------- tests/testthat/test-req-perform-stream.R | 2 +- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 84e9b9b4..01923f79 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -47,27 +47,9 @@ req_perform_stream <- function(req, stop_time <- Sys.time() + timeout_sec - stream <- curl::curl(req$url, handle = handle) - open(stream, "rbf") - withr::defer(close(stream)) - - res <- curl::handle_data(handle) - the$last_request <- req - - # Return early if there's a problem - resp <- new_response( - method = req_method_get(req), - url = res$url, - status_code = res$status_code, - headers = as_headers(res$headers), - body = NULL, - request = req - ) - if (error_is_error(req, resp)) { - resp$body <- read_con(stream) - the$last_response <- resp - handle_resp(req, resp) - } + resp <- req_perform_connection(req) + stream <- resp$body + on.exit(close(stream)) continue <- TRUE incomplete <- TRUE @@ -92,6 +74,7 @@ req_perform_stream <- function(req, callback(buf) } + resp$body <- raw() the$last_response <- resp resp } @@ -131,14 +114,15 @@ req_perform_connection <- function(req, blocking = TRUE) { body = NULL, request = req ) - the$last_repsonse <- resp if (error_is_error(req, resp)) { # Read full body if there's an error resp$body <- read_con(stream) + close(stream) } else { resp$body <- stream } + the$last_response <- resp handle_resp(req, resp) resp diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 082961f4..9d2fcf8c 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -5,7 +5,7 @@ test_that("req_stream() is deprecated", { ) }) -test_that("returns empty body; sets last request & response", { +test_that("returns stream body; sets last request & response", { req <- request_test("/stream-bytes/1024") resp <- req_perform_stream(req, function(x) NULL) expect_s3_class(resp, "httr2_response") From 04fb722aa1d9370a76a35fad3d09bb556e28b77a Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 08:46:43 -0500 Subject: [PATCH 10/27] Fix docs --- R/req-perform-stream.R | 1 + man/req_perform_connection.Rd | 2 ++ 2 files changed, 3 insertions(+) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 01923f79..86f003b4 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -92,6 +92,7 @@ req_perform_stream <- function(req, #' between streaming inputs. #' #' @inheritParams req_perform_stream +#' @inheritParams resp_body_raw #' @param blocking When retrieving data, should the connection block and wait #' for the desired information or immediately return what it has? #' @export diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index cb19b9ab..c54a7d91 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -14,6 +14,8 @@ resp_stream_sse(resp) \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has?} + +\item{resp}{A response object.} } \description{ Use \code{req_perform_connection()} to perform a request that includes a From 7a6a5d36079db8c53183038de467b70f8e60b36b Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 09:01:27 -0500 Subject: [PATCH 11/27] Plumbing in more pieces --- NAMESPACE | 3 + R/req-perform-stream.R | 61 +++++++++++++++++++-- R/resp-body.R | 2 +- man/req_perform_connection.Rd | 17 +++++- tests/testthat/_snaps/req-perform-stream.md | 8 +++ tests/testthat/test-req-perform-stream.R | 23 ++++++++ 6 files changed, 107 insertions(+), 7 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index 69348b24..e2f4ff2a 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -3,6 +3,7 @@ S3method("$",httr2_headers) S3method("[",httr2_headers) S3method("[[",httr2_headers) +S3method(close,httr2_response) S3method(print,httr2_cmd) S3method(print,httr2_headers) S3method(print,httr2_oauth_client) @@ -112,6 +113,8 @@ export(resp_raw) export(resp_retry_after) export(resp_status) export(resp_status_desc) +export(resp_stream_lines) +export(resp_stream_raw) export(resp_stream_sse) export(resp_url) export(resp_url_path) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 86f003b4..8cd85b0b 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -83,8 +83,9 @@ req_perform_stream <- function(req, #' #' @description #' Use `req_perform_connection()` to perform a request that includes a -#' connection as the body of the response, then use `resp_steam_sse()` and -#' friends to retrieve data a chunk at a time.. +#' connection as the body of the response, then `resp_stream_bytes()`, +#' `resp_stream_lines()`, or `resp_stream_sse()` to retrieve data a chunk at a +#' timen, and finish by closing the connection with `close()`. #' #' This is an alternative interface to [req_perform_stream()] that returns a #' connection that you can pull from data, rather than callbacks that are called @@ -129,10 +130,44 @@ req_perform_connection <- function(req, blocking = TRUE) { resp } -# TODO: max_size +#' @export +#' @rdname req_perform_connection +close.httr2_response <- function(con, ...) { + check_streaming_response(con) + + close(con$body) + invisible() +} #' @export #' @rdname req_perform_connection +resp_stream_raw <- function(resp, kb = 32) { + check_streaming_response(resp) + conn <- resp$body + + if (isIncomplete(conn)) { + readBin(conn, raw(), kb * 1024) + } else { + raw() + } +} + +#' @export +#' @rdname req_perform_connection +resp_stream_lines <- function(resp, lines = 1) { + check_streaming_response(resp) + conn <- resp$body + + if (isIncomplete(conn)) { + readLines(conn, n = lines) + } else { + character() + } +} + +#' @export +#' @rdname req_perform_connection +# TODO: max_size resp_stream_sse <- function(resp) { check_streaming_response(resp) conn <- resp$body @@ -165,7 +200,7 @@ check_streaming_response <- function(resp, check_response(resp, arg = arg, call = call) - if (!inherits(resp$body)) { + if (resp_body_type(resp) != "stream") { stop_input_type( resp, "a streaming HTTP response object", @@ -174,6 +209,24 @@ check_streaming_response <- function(resp, call = call ) } + + if (!isValid(resp$body)) { + cli::cli_abort("{.arg {arg}} has already been closed.", call = call) + } +} + +# isOpen doesn't work for two reasons: +# 1. It errors if con has been closed, rather than returning FALSE +# 2. If returns TRUE if con has been closed and a new connection opened +# +# So instead we retrieve the connection from its number and compare to the +# original connection. This works because connections have an undocumented +# external pointer. +isValid <- function(con) { + tryCatch( + identical(getConnection(con), con), + error = function(cnd) FALSE + ) } diff --git a/R/resp-body.R b/R/resp-body.R index fd1b7eef..8a7a8a7e 100644 --- a/R/resp-body.R +++ b/R/resp-body.R @@ -53,7 +53,7 @@ resp_has_body <- function(resp) { switch(resp_body_type(resp), disk = file.size(resp$body) > 0, memory = length(resp$body) > 0, - stream = TRUE + stream = isValid(resp$body) ) } diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index c54a7d91..e6673690 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -2,11 +2,20 @@ % Please edit documentation in R/req-perform-stream.R \name{req_perform_connection} \alias{req_perform_connection} +\alias{close.httr2_response} +\alias{resp_stream_raw} +\alias{resp_stream_lines} \alias{resp_stream_sse} \title{Perform a request and return a streaming connection} \usage{ req_perform_connection(req, blocking = TRUE) +\method{close}{httr2_response}(con, ...) + +resp_stream_raw(resp, kb = 32) + +resp_stream_lines(resp, lines = 1) + resp_stream_sse(resp) } \arguments{ @@ -15,12 +24,16 @@ resp_stream_sse(resp) \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has?} +\item{...}{Other arguments passed on to \code{\link[jsonlite:fromJSON]{jsonlite::fromJSON()}} and +\code{\link[xml2:read_xml]{xml2::read_xml()}} respectively.} + \item{resp}{A response object.} } \description{ Use \code{req_perform_connection()} to perform a request that includes a -connection as the body of the response, then use \code{resp_steam_sse()} and -friends to retrieve data a chunk at a time.. +connection as the body of the response, then \code{resp_stream_bytes()}, +\code{resp_stream_lines()}, or \code{resp_stream_sse()} to retrieve data a chunk at a +timen, and finish by closing the connection with \code{close()}. This is an alternative interface to \code{\link[=req_perform_stream]{req_perform_stream()}} that returns a connection that you can pull from data, rather than callbacks that are called diff --git a/tests/testthat/_snaps/req-perform-stream.md b/tests/testthat/_snaps/req-perform-stream.md index 4bd75193..4273a5d3 100644 --- a/tests/testthat/_snaps/req-perform-stream.md +++ b/tests/testthat/_snaps/req-perform-stream.md @@ -7,6 +7,14 @@ `req_stream()` was deprecated in httr2 1.0.0. i Please use `req_perform_stream()` instead. +# can't read from a closed connection + + Code + resp_stream_raw(resp, 1) + Condition + Error in `resp_stream_raw()`: + ! `resp` has already been closed. + # req_perform_stream checks its inputs Code diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 9d2fcf8c..83aab683 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -5,6 +5,29 @@ test_that("req_stream() is deprecated", { ) }) +# req_perform_connection() ---------------------------------------------------- + +test_that("can stream bytes from a connection", { + resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() + on.exit(close(resp)) + + expect_s3_class(resp, "httr2_response") + expect_true(resp_has_body(resp)) + + out <- resp_stream_raw(resp, 1) + expect_length(out, 1024) +}) + +test_that("can't read from a closed connection", { + resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() + close(resp) + + expect_false(resp_has_body(resp)) + expect_snapshot(resp_stream_raw(resp, 1), error = TRUE) +}) + +# req_perform_stream() -------------------------------------------------------- + test_that("returns stream body; sets last request & response", { req <- request_test("/stream-bytes/1024") resp <- req_perform_stream(req, function(x) NULL) From ad0ec7d5e83966529f1a8848237ea9a8f5864600 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 09:06:41 -0500 Subject: [PATCH 12/27] Improve docs --- R/req-perform-stream.R | 25 +++++++++++++++---------- man/req_perform_connection.Rd | 15 +++++++++------ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 8cd85b0b..21caa392 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -93,7 +93,7 @@ req_perform_stream <- function(req, #' between streaming inputs. #' #' @inheritParams req_perform_stream -#' @inheritParams resp_body_raw +#' @param resp,con A httr2 [response]. #' @param blocking When retrieving data, should the connection block and wait #' for the desired information or immediately return what it has? #' @export @@ -132,15 +132,7 @@ req_perform_connection <- function(req, blocking = TRUE) { #' @export #' @rdname req_perform_connection -close.httr2_response <- function(con, ...) { - check_streaming_response(con) - - close(con$body) - invisible() -} - -#' @export -#' @rdname req_perform_connection +#' @param kb How many kilobytes (1024 bytes) of data to read. resp_stream_raw <- function(resp, kb = 32) { check_streaming_response(resp) conn <- resp$body @@ -154,6 +146,7 @@ resp_stream_raw <- function(resp, kb = 32) { #' @export #' @rdname req_perform_connection +#' @param lines How many lines to read resp_stream_lines <- function(resp, lines = 1) { check_streaming_response(resp) conn <- resp$body @@ -194,6 +187,18 @@ resp_stream_sse <- function(resp) { return(NULL) } +#' @export +#' @param ... Not used; included for compatibility with generic. +#' @rdname req_perform_connection +close.httr2_response <- function(con, ...) { + check_streaming_response(con) + + close(con$body) + invisible() +} + +# Helpers ---------------------------------------------------------------------- + check_streaming_response <- function(resp, arg = caller_arg(resp), call = caller_env()) { diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index e6673690..41cabb14 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -2,21 +2,21 @@ % Please edit documentation in R/req-perform-stream.R \name{req_perform_connection} \alias{req_perform_connection} -\alias{close.httr2_response} \alias{resp_stream_raw} \alias{resp_stream_lines} \alias{resp_stream_sse} +\alias{close.httr2_response} \title{Perform a request and return a streaming connection} \usage{ req_perform_connection(req, blocking = TRUE) -\method{close}{httr2_response}(con, ...) - resp_stream_raw(resp, kb = 32) resp_stream_lines(resp, lines = 1) resp_stream_sse(resp) + +\method{close}{httr2_response}(con, ...) } \arguments{ \item{req}{A \link{request}.} @@ -24,10 +24,13 @@ resp_stream_sse(resp) \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has?} -\item{...}{Other arguments passed on to \code{\link[jsonlite:fromJSON]{jsonlite::fromJSON()}} and -\code{\link[xml2:read_xml]{xml2::read_xml()}} respectively.} +\item{resp, con}{An httr2 \link{response}.} + +\item{kb}{How many kilobytes (1024 bytes) of data to read.} + +\item{lines}{How many lines to read} -\item{resp}{A response object.} +\item{...}{Not used; included for compatibility with generic.} } \description{ Use \code{req_perform_connection()} to perform a request that includes a From cf05cfadc8de3a3d3abbc35b955a9964fc273619 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 09:10:38 -0500 Subject: [PATCH 13/27] More testing --- R/req-perform-stream.R | 12 ++---------- R/resp-body.R | 6 +++++- tests/testthat/test-req-perform-stream.R | 16 +++++++++++++++- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 21caa392..8d68d339 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -137,11 +137,7 @@ resp_stream_raw <- function(resp, kb = 32) { check_streaming_response(resp) conn <- resp$body - if (isIncomplete(conn)) { - readBin(conn, raw(), kb * 1024) - } else { - raw() - } + readBin(conn, raw(), kb * 1024) } #' @export @@ -151,11 +147,7 @@ resp_stream_lines <- function(resp, lines = 1) { check_streaming_response(resp) conn <- resp$body - if (isIncomplete(conn)) { - readLines(conn, n = lines) - } else { - character() - } + readLines(conn, n = lines) } #' @export diff --git a/R/resp-body.R b/R/resp-body.R index 8a7a8a7e..364a0896 100644 --- a/R/resp-body.R +++ b/R/resp-body.R @@ -41,7 +41,11 @@ resp_body_raw <- function(resp) { switch(resp_body_type(resp), disk = readBin(resp$body, "raw", file.size(resp$body)), memory = resp$body, - stream = read_con(resp$body, "raw") + stream = { + out <- read_con(resp$body) + close(resp) + out + } ) } diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 83aab683..fcc0f6ef 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -8,7 +8,7 @@ test_that("req_stream() is deprecated", { # req_perform_connection() ---------------------------------------------------- test_that("can stream bytes from a connection", { - resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() + resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() on.exit(close(resp)) expect_s3_class(resp, "httr2_response") @@ -16,6 +16,20 @@ test_that("can stream bytes from a connection", { out <- resp_stream_raw(resp, 1) expect_length(out, 1024) + + out <- resp_stream_raw(resp, 1) + expect_length(out, 1024) + + out <- resp_stream_raw(resp, 1) + expect_length(out, 0) +}) + +test_that("can read all data from a connection", { + resp <- request_test("/stream-bytes/2048") %>% req_perform_connection() + + out <- resp_body_raw(resp) + expect_length(out, 2048) + expect_false(resp_has_body(resp)) }) test_that("can't read from a closed connection", { From 5ef09753280692d84d88c528e52cf63edfc6ce32 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 09:12:23 -0500 Subject: [PATCH 14/27] Test print method --- tests/testthat/_snaps/resp.md | 7 +++++++ tests/testthat/test-resp.R | 3 +++ 2 files changed, 10 insertions(+) diff --git a/tests/testthat/_snaps/resp.md b/tests/testthat/_snaps/resp.md index 2d0162fb..a8e851fe 100644 --- a/tests/testthat/_snaps/resp.md +++ b/tests/testthat/_snaps/resp.md @@ -36,6 +36,13 @@ GET https://example.com Status: 200 OK Body: On disk 'path-content' (15 bytes) + Code + response(200, body = con) + Message + + GET https://example.com + Status: 200 OK + Body: Streaming connection # check_response produces helpful error diff --git a/tests/testthat/test-resp.R b/tests/testthat/test-resp.R index beae3be2..e699de3f 100644 --- a/tests/testthat/test-resp.R +++ b/tests/testthat/test-resp.R @@ -4,6 +4,8 @@ test_that("response has basic print method", { writeBin("sample content", "path-content") withr::defer(unlink(c("path-empty", "path-content"))) + con <- file() + withr::defer(close(con)) expect_snapshot({ response(200) @@ -11,6 +13,7 @@ test_that("response has basic print method", { response(200, body = charToRaw("abcdef")) response(200, body = new_path("path-empty")) response(200, body = new_path("path-content")) + response(200, body = con) }) }) From a27c5647f10ff522879642e65da06e9ff8807060 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 09:20:36 -0500 Subject: [PATCH 15/27] Minor polishing --- R/req-perform-stream.R | 4 +++- man/req_perform_connection.Rd | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 8d68d339..7558734d 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -49,7 +49,7 @@ req_perform_stream <- function(req, resp <- req_perform_connection(req) stream <- resp$body - on.exit(close(stream)) + withr::defer(close(stream)) continue <- TRUE incomplete <- TRUE @@ -74,6 +74,7 @@ req_perform_stream <- function(req, callback(buf) } + # We're done streaming so convert to bodiless response resp$body <- raw() the$last_response <- resp resp @@ -99,6 +100,7 @@ req_perform_stream <- function(req, #' @export req_perform_connection <- function(req, blocking = TRUE) { check_request(req) + check_bool(blocking) handle <- req_handle(req) diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index 41cabb14..23446ec3 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -24,7 +24,7 @@ resp_stream_sse(resp) \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has?} -\item{resp, con}{An httr2 \link{response}.} +\item{resp, con}{A httr2 \link{response}.} \item{kb}{How many kilobytes (1024 bytes) of data to read.} From 7661abe8e06be131abacbf257eebe86355c7deeb Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 09:53:23 -0500 Subject: [PATCH 16/27] Can't use annonymous function shorthand --- R/req-perform-stream.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 7558734d..cbdc2b9f 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -232,8 +232,8 @@ isValid <- function(con) { parse_event <- function(lines) { m <- regexec("([^:]*)(: ?)?(.*)", lines) matches <- regmatches(lines, m) - keys <- c("event", vapply(matches, \(x) x[2], character(1))) - values <- c("message", vapply(matches, \(x) x[4], character(1))) + keys <- c("event", vapply(matches, function(x) x[2], character(1))) + values <- c("message", vapply(matches, function(x) x[4], character(1))) remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data" keys <- keys[!remove_dupes] From 4b06b82562f7abe248d73d8bbc44950ae0efbd3d Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 11:34:32 -0500 Subject: [PATCH 17/27] Add retries --- R/req-perform-stream.R | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index cbdc2b9f..344168a7 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -110,14 +110,29 @@ req_perform_connection <- function(req, blocking = TRUE) { res <- curl::handle_data(handle) the$last_request <- req - resp <- new_response( - method = req_method_get(req), - url = res$url, - status_code = res$status_code, - headers = as_headers(res$headers), - body = NULL, - request = req - ) + tries <- 0 + delay <- 0 + max_tries <- retry_max_tries(req) + deadline <- Sys.time() + retry_max_seconds(req) + while (tries < max_tries && Sys.time() < deadline) { + sys_sleep(delay, "for retry backoff") + + resp <- new_response( + method = req_method_get(req), + url = res$url, + status_code = res$status_code, + headers = as_headers(res$headers), + body = NULL, + request = req + ) + + if (retry_is_transient(req, resp)) { + tries <- tries + 1 + delay <- retry_after(req, resp, tries) + } else { + break + } + } if (error_is_error(req, resp)) { # Read full body if there's an error From 13259c72c4c83f56996714ad7c4f21a8d62a8e7d Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 12:19:01 -0500 Subject: [PATCH 18/27] Actually implement retries with backoff --- R/req-perform-stream.R | 46 +++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 344168a7..6858609e 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -103,28 +103,20 @@ req_perform_connection <- function(req, blocking = TRUE) { check_bool(blocking) handle <- req_handle(req) - - stream <- curl::curl(req$url, handle = handle) - open(stream, "rbf", blocking = blocking) - - res <- curl::handle_data(handle) the$last_request <- req tries <- 0 delay <- 0 max_tries <- retry_max_tries(req) deadline <- Sys.time() + retry_max_seconds(req) - while (tries < max_tries && Sys.time() < deadline) { + resp <- NULL + while (tries <= max_tries && Sys.time() < deadline) { sys_sleep(delay, "for retry backoff") - - resp <- new_response( - method = req_method_get(req), - url = res$url, - status_code = res$status_code, - headers = as_headers(res$headers), - body = NULL, - request = req - ) + + if (!is.null(resp)) { + close(resp$body) + } + resp <- req_perform_connection1(req, handle, blocking = blocking) if (retry_is_transient(req, resp)) { tries <- tries + 1 @@ -136,10 +128,9 @@ req_perform_connection <- function(req, blocking = TRUE) { if (error_is_error(req, resp)) { # Read full body if there's an error - resp$body <- read_con(stream) - close(stream) - } else { - resp$body <- stream + conn <- resp$body + resp$body <- read_con(conn) + close(conn) } the$last_response <- resp handle_resp(req, resp) @@ -147,6 +138,23 @@ req_perform_connection <- function(req, blocking = TRUE) { resp } +req_perform_connection1 <- function(req, handle, blocking = TRUE) { + stream <- curl::curl(req$url, handle = handle) + + # Must open the stream in order to initiate the connection + open(stream, "rbf", blocking = blocking) + curl_data <- curl::handle_data(handle) + + new_response( + method = req_method_get(req), + url = curl_data$url, + status_code = curl_data$status_code, + headers = as_headers(curl_data$headers), + body = stream, + request = req + ) +} + #' @export #' @rdname req_perform_connection #' @param kb How many kilobytes (1024 bytes) of data to read. From 1fb062a35214bbf59b2b5e6b9d6e2833451b954d Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Wed, 28 Aug 2024 11:24:56 -0700 Subject: [PATCH 19/27] Add `mode` argument to req_perform_connection --- R/req-perform-stream.R | 13 +++++++++---- man/req_perform_connection.Rd | 8 +++++++- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 6858609e..31330e28 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -93,12 +93,17 @@ req_perform_stream <- function(req, #' as the data streams in. This is useful if you want to do other work in #' between streaming inputs. #' +#' When using `resp_stream_sse()`, you must call `req_perform_connection` with +#' `mode = "r"` or you will get intermittent errors. +#' #' @inheritParams req_perform_stream #' @param resp,con A httr2 [response]. +#' @param mode The mode that should be used for opening the connection. Use +#' `"rb"` (the default) for binary, `"r"` for text. #' @param blocking When retrieving data, should the connection block and wait #' for the desired information or immediately return what it has? #' @export -req_perform_connection <- function(req, blocking = TRUE) { +req_perform_connection <- function(req, mode = "rb", blocking = TRUE) { check_request(req) check_bool(blocking) @@ -116,7 +121,7 @@ req_perform_connection <- function(req, blocking = TRUE) { if (!is.null(resp)) { close(resp$body) } - resp <- req_perform_connection1(req, handle, blocking = blocking) + resp <- req_perform_connection1(req, handle, mode, blocking = blocking) if (retry_is_transient(req, resp)) { tries <- tries + 1 @@ -138,11 +143,11 @@ req_perform_connection <- function(req, blocking = TRUE) { resp } -req_perform_connection1 <- function(req, handle, blocking = TRUE) { +req_perform_connection1 <- function(req, handle, mode, blocking = TRUE) { stream <- curl::curl(req$url, handle = handle) # Must open the stream in order to initiate the connection - open(stream, "rbf", blocking = blocking) + open(stream, mode, blocking = blocking) curl_data <- curl::handle_data(handle) new_response( diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index 23446ec3..197d87e5 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -8,7 +8,7 @@ \alias{close.httr2_response} \title{Perform a request and return a streaming connection} \usage{ -req_perform_connection(req, blocking = TRUE) +req_perform_connection(req, mode = "rb", blocking = TRUE) resp_stream_raw(resp, kb = 32) @@ -21,6 +21,9 @@ resp_stream_sse(resp) \arguments{ \item{req}{A \link{request}.} +\item{mode}{The mode that should be used for opening the connection. Use +\code{"rb"} (the default) for binary, \code{"r"} for text.} + \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has?} @@ -42,4 +45,7 @@ This is an alternative interface to \code{\link[=req_perform_stream]{req_perform connection that you can pull from data, rather than callbacks that are called as the data streams in. This is useful if you want to do other work in between streaming inputs. + +When using \code{resp_stream_sse()}, you must call \code{req_perform_connection} with +\code{mode = "r"} or you will get intermittent errors. } From bd94cc3bd8bebd18a11439b15c5aebc4141b6c79 Mon Sep 17 00:00:00 2001 From: Joe Cheng Date: Wed, 28 Aug 2024 11:36:53 -0700 Subject: [PATCH 20/27] Add explicit check for sse connection text mode --- R/req-perform-stream.R | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 31330e28..2d766066 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -94,7 +94,7 @@ req_perform_stream <- function(req, #' between streaming inputs. #' #' When using `resp_stream_sse()`, you must call `req_perform_connection` with -#' `mode = "r"` or you will get intermittent errors. +#' `mode = "r"`, not the default of `mode = "rb"`. #' #' @inheritParams req_perform_stream #' @param resp,con A httr2 [response]. @@ -186,6 +186,9 @@ resp_stream_lines <- function(resp, lines = 1) { resp_stream_sse <- function(resp) { check_streaming_response(resp) conn <- resp$body + if (!identical(summary(conn)$text, "text")) { + abort("`resp_stream_sse` requires a `resp` that was created with req_perform_connection(mode=\"r\")") + } lines <- character(0) while (TRUE) { From 6f36858799452cd73709979228a41f2eec827f9e Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 12:45:34 -0500 Subject: [PATCH 21/27] Make close idempotent --- R/req-perform-stream.R | 7 +++++-- tests/testthat/test-req-perform-stream.R | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 2d766066..4ee7b7ec 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -216,9 +216,12 @@ resp_stream_sse <- function(resp) { #' @param ... Not used; included for compatibility with generic. #' @rdname req_perform_connection close.httr2_response <- function(con, ...) { - check_streaming_response(con) + check_response(con) + + if (inherits(con$body, "connection") && isValid(con$body)) { + close(con$body) + } - close(con$body) invisible() } diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index fcc0f6ef..2467591c 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -38,6 +38,9 @@ test_that("can't read from a closed connection", { expect_false(resp_has_body(resp)) expect_snapshot(resp_stream_raw(resp, 1), error = TRUE) + + # and no error if we try to close it again + expect_no_error(close(resp)) }) # req_perform_stream() -------------------------------------------------------- From 66b4d431b5bf2c1023a49768f2a99f16093ea317 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 13:42:41 -0500 Subject: [PATCH 22/27] Add tests for streaming sse --- tests/testthat/test-req-perform-stream.R | 27 ++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 2467591c..77412416 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -43,6 +43,33 @@ test_that("can't read from a closed connection", { expect_no_error(close(resp)) }) +test_that("can feed sse events one at a time", { + app <- webfakes::new_app() + + app$get("/events", function(req, res) { + for(i in 1:3) { + res$send_chunk(sprintf("data: %s\n\n", i)) + } + }) + + server <- webfakes::local_app_process(app) + req <- request(server$url("/events")) + resp <- req_perform_connection(req, mode = "r") + on.exit(close(resp)) + + expect_equal( + resp_stream_sse(resp), + list(type = "message", data = "1", id = character()) + ) + expect_equal( + resp_stream_sse(resp), + list(type = "message", data = "2", id = character()) + ) + resp_stream_sse(resp) + + expect_equal(resp_stream_sse(resp), NULL) +}) + # req_perform_stream() -------------------------------------------------------- test_that("returns stream body; sets last request & response", { From 70f06b7347bc9c8d664b78b7a2ebd4ecba6765c3 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 14:43:18 -0500 Subject: [PATCH 23/27] Tweak text mode setup --- R/req-perform-stream.R | 20 +++++++++++++------- tests/testthat/_snaps/req-perform-stream.md | 9 +++++++++ tests/testthat/test-req-perform-stream.R | 9 ++++++++- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 4ee7b7ec..aac2216b 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -98,14 +98,17 @@ req_perform_stream <- function(req, #' #' @inheritParams req_perform_stream #' @param resp,con A httr2 [response]. -#' @param mode The mode that should be used for opening the connection. Use -#' `"rb"` (the default) for binary, `"r"` for text. +#' @param mode The mode that should be used for opening the connection. #' @param blocking When retrieving data, should the connection block and wait #' for the desired information or immediately return what it has? #' @export -req_perform_connection <- function(req, mode = "rb", blocking = TRUE) { +req_perform_connection <- function(req, + mode = c("binary", "text"), + blocking = TRUE) { check_request(req) check_bool(blocking) + mode <- arg_match(mode) + con_mode <- if (mode == "text") "rf" else "rbf" handle <- req_handle(req) the$last_request <- req @@ -121,7 +124,7 @@ req_perform_connection <- function(req, mode = "rb", blocking = TRUE) { if (!is.null(resp)) { close(resp$body) } - resp <- req_perform_connection1(req, handle, mode, blocking = blocking) + resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking) if (retry_is_transient(req, resp)) { tries <- tries + 1 @@ -143,11 +146,11 @@ req_perform_connection <- function(req, mode = "rb", blocking = TRUE) { resp } -req_perform_connection1 <- function(req, handle, mode, blocking = TRUE) { +req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) { stream <- curl::curl(req$url, handle = handle) # Must open the stream in order to initiate the connection - open(stream, mode, blocking = blocking) + open(stream, con_mode, blocking = blocking) curl_data <- curl::handle_data(handle) new_response( @@ -187,7 +190,10 @@ resp_stream_sse <- function(resp) { check_streaming_response(resp) conn <- resp$body if (!identical(summary(conn)$text, "text")) { - abort("`resp_stream_sse` requires a `resp` that was created with req_perform_connection(mode=\"r\")") + cli::cli_abort(c( + "{.arg resp} must have a text mode connection.", + i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.' + )) } lines <- character(0) diff --git a/tests/testthat/_snaps/req-perform-stream.md b/tests/testthat/_snaps/req-perform-stream.md index 4273a5d3..2a816771 100644 --- a/tests/testthat/_snaps/req-perform-stream.md +++ b/tests/testthat/_snaps/req-perform-stream.md @@ -15,6 +15,15 @@ Error in `resp_stream_raw()`: ! `resp` has already been closed. +# resp_stream_sse() requires a text connection + + Code + resp_stream_sse(resp) + Condition + Error in `resp_stream_sse()`: + ! `resp` must have a text mode connection. + i Use `mode = "text"` when calling `req_perform_connection()`. + # req_perform_stream checks its inputs Code diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 77412416..97ee1d2e 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -54,7 +54,7 @@ test_that("can feed sse events one at a time", { server <- webfakes::local_app_process(app) req <- request(server$url("/events")) - resp <- req_perform_connection(req, mode = "r") + resp <- req_perform_connection(req, mode = "text") on.exit(close(resp)) expect_equal( @@ -70,6 +70,13 @@ test_that("can feed sse events one at a time", { expect_equal(resp_stream_sse(resp), NULL) }) +test_that("resp_stream_sse() requires a text connection", { + resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() + on.exit(close(resp)) + + expect_snapshot(resp_stream_sse(resp), error = TRUE) +}) + # req_perform_stream() -------------------------------------------------------- test_that("returns stream body; sets last request & response", { From 099657d15d48db669916e5cd082654831cba0041 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 14:45:44 -0500 Subject: [PATCH 24/27] Add news bullet --- NEWS.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/NEWS.md b/NEWS.md index f53b2c60..7c1df04d 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,7 @@ # httr2 (development version) +* New `req_perform_connection()` for working with streaming data. Unlike `req_perform_stream()` which uses callbacks, `req_perform_connection()` returns a regular response object with a connection as the body. It's paired with `resp_stream_bytes()`, `resp_stream_lines()`, and `resp_stream_sse()` that allows you to stream chunks as you want them. Unlike `req_perform_stream()` it supports `req_retry()` (with @jcheng5, #519). + # httr2 1.0.3 * `jwt_encode_hmac()` now calls correct underlying function From 8c1297864e1bde9f3438c2d3035db02223c94dbf Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 14:47:45 -0500 Subject: [PATCH 25/27] Update docs --- R/req-perform-stream.R | 11 +++++++++-- man/req_perform_connection.Rd | 18 ++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index aac2216b..4843a413 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -93,8 +93,15 @@ req_perform_stream <- function(req, #' as the data streams in. This is useful if you want to do other work in #' between streaming inputs. #' -#' When using `resp_stream_sse()`, you must call `req_perform_connection` with -#' `mode = "r"`, not the default of `mode = "rb"`. +#' # `resp_stream_sse()` +#' +#' `resp_stream_sse()` helps work with APIs that uses the +#' [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events) +#' protocol. Each call will return one event, as a list with components +#' `type`, `data`, and `id`. +#' +#' It only works with text mode connections so when calling +#' `req_perform_connection()` you must use `mode = "text"`. #' #' @inheritParams req_perform_stream #' @param resp,con A httr2 [response]. diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index 197d87e5..83fac51a 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -8,7 +8,7 @@ \alias{close.httr2_response} \title{Perform a request and return a streaming connection} \usage{ -req_perform_connection(req, mode = "rb", blocking = TRUE) +req_perform_connection(req, mode = c("binary", "text"), blocking = TRUE) resp_stream_raw(resp, kb = 32) @@ -21,8 +21,7 @@ resp_stream_sse(resp) \arguments{ \item{req}{A \link{request}.} -\item{mode}{The mode that should be used for opening the connection. Use -\code{"rb"} (the default) for binary, \code{"r"} for text.} +\item{mode}{The mode that should be used for opening the connection.} \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has?} @@ -45,7 +44,14 @@ This is an alternative interface to \code{\link[=req_perform_stream]{req_perform connection that you can pull from data, rather than callbacks that are called as the data streams in. This is useful if you want to do other work in between streaming inputs. - -When using \code{resp_stream_sse()}, you must call \code{req_perform_connection} with -\code{mode = "r"} or you will get intermittent errors. } +\section{\code{resp_stream_sse()}}{ +\code{resp_stream_sse()} helps work with APIs that uses the +\href{https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events}{server-sent events} +protocol. Each call will return one event, as a list with components +\code{type}, \code{data}, and \code{id}. + +It only works with text mode connections so when calling +\code{req_perform_connection()} you must use \code{mode = "text"}. +} + From 6e23b14770180d88913ede78f75f73f2e783df87 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 15:45:33 -0500 Subject: [PATCH 26/27] Add an example --- R/req-perform-stream.R | 14 +++++++++++++- man/req_perform_connection.Rd | 15 ++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index 4843a413..0f8e6455 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -84,7 +84,7 @@ req_perform_stream <- function(req, #' #' @description #' Use `req_perform_connection()` to perform a request that includes a -#' connection as the body of the response, then `resp_stream_bytes()`, +#' connection as the body of the response, then `resp_stream_raw()`, #' `resp_stream_lines()`, or `resp_stream_sse()` to retrieve data a chunk at a #' timen, and finish by closing the connection with `close()`. #' @@ -109,6 +109,18 @@ req_perform_stream <- function(req, #' @param blocking When retrieving data, should the connection block and wait #' for the desired information or immediately return what it has? #' @export +#' @examples +#' req <- request(example_url()) |> +#' req_url_path("/stream-bytes/32768") +#' resp <- req_perform_connection(req) +#' +#' length(resp_stream_raw(resp, kb = 16)) +#' length(resp_stream_raw(resp, kb = 16)) +#' # When the stream has no more data, you'll get an empty result: +#' length(resp_stream_raw(resp, kb = 16)) +#' +#' # Always close the response when you're done +#' close(resp) req_perform_connection <- function(req, mode = c("binary", "text"), blocking = TRUE) { diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index 83fac51a..c96f5ada 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -36,7 +36,7 @@ for the desired information or immediately return what it has?} } \description{ Use \code{req_perform_connection()} to perform a request that includes a -connection as the body of the response, then \code{resp_stream_bytes()}, +connection as the body of the response, then \code{resp_stream_raw()}, \code{resp_stream_lines()}, or \code{resp_stream_sse()} to retrieve data a chunk at a timen, and finish by closing the connection with \code{close()}. @@ -55,3 +55,16 @@ It only works with text mode connections so when calling \code{req_perform_connection()} you must use \code{mode = "text"}. } +\examples{ +req <- request(example_url()) |> + req_url_path("/stream-bytes/32768") +resp <- req_perform_connection(req) + +length(resp_stream_raw(resp, kb = 16)) +length(resp_stream_raw(resp, kb = 16)) +# When the stream has no more data, you'll get an empty result: +length(resp_stream_raw(resp, kb = 16)) + +# Always close the response when you're done +close(resp) +} From c5be16732844c7a62ab4a18a32906f88c828c3d6 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 28 Aug 2024 15:57:00 -0500 Subject: [PATCH 27/27] Need to skip webfakes on covr --- tests/testthat/test-req-perform-stream.R | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index 97ee1d2e..bdf6a28c 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -44,6 +44,7 @@ test_that("can't read from a closed connection", { }) test_that("can feed sse events one at a time", { + skip_on_covr() app <- webfakes::new_app() app$get("/events", function(req, res) {