Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add req_perform_open, which makes resp$body the underlying stream #521

Merged
merged 27 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
S3method("$",httr2_headers)
S3method("[",httr2_headers)
S3method("[[",httr2_headers)
S3method(close,httr2_response)
S3method(print,httr2_cmd)
S3method(print,httr2_headers)
S3method(print,httr2_oauth_client)
Expand Down Expand Up @@ -72,6 +73,7 @@ export(req_oauth_password)
export(req_oauth_refresh)
export(req_options)
export(req_perform)
export(req_perform_connection)
export(req_perform_iterative)
export(req_perform_parallel)
export(req_perform_promise)
Expand Down Expand Up @@ -111,6 +113,9 @@ export(resp_raw)
export(resp_retry_after)
export(resp_status)
export(resp_status_desc)
export(resp_stream_lines)
export(resp_stream_raw)
export(resp_stream_sse)
export(resp_url)
export(resp_url_path)
export(resp_url_queries)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# httr2 (development version)

* New `req_perform_connection()` for working with streaming data. Unlike `req_perform_stream()` which uses callbacks, `req_perform_connection()` returns a regular response object with a connection as the body. It's paired with `resp_stream_bytes()`, `resp_stream_lines()`, and `resp_stream_sse()` that allows you to stream chunks as you want them. Unlike `req_perform_stream()` it supports `req_retry()` (with @jcheng5, #519).

# httr2 1.0.3

* `jwt_encode_hmac()` now calls correct underlying function
Expand Down
17 changes: 11 additions & 6 deletions R/req-cache.R
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
}

cache_set <- function(req, resp) {
if (is_path(resp$body)) {
if (resp_body_type(resp) == "disk") {
body_path <- req_cache_path(req, ".body")
file.copy(resp$body, body_path, overwrite = TRUE)
resp$body <- new_path(body_path)
Expand Down Expand Up @@ -246,11 +246,12 @@
return(body)
}

if (is_path(body)) {
file.copy(body, path, overwrite = TRUE)
} else {
writeBin(body, path)
}
switch(resp_body_type(cached_resp),
disk = file.copy(body, path, overwrite = TRUE),
memory = writeBin(body, path),
stream = cli::cli_abort("Invalid body type", .internal = TRUE)

Check warning on line 252 in R/req-cache.R

View check run for this annotation

Codecov / codecov/patch

R/req-cache.R#L252

Added line #L252 was not covered by tests
)

new_path(path)
}

Expand All @@ -271,6 +272,10 @@
return(FALSE)
}

if (resp_body_type(resp) == "stream") {
return(FALSE)

Check warning on line 276 in R/req-cache.R

View check run for this annotation

Codecov / codecov/patch

R/req-cache.R#L276

Added line #L276 was not covered by tests
}

control <- control %||% resp_cache_control(resp)
if ("no-store" %in% control$flags) {
return(FALSE)
Expand Down
253 changes: 233 additions & 20 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,10 @@

stop_time <- Sys.time() + timeout_sec

stream <- curl::curl(req$url, handle = handle)
open(stream, "rbf")
resp <- req_perform_connection(req)
stream <- resp$body

Check warning on line 51 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L50-L51

Added lines #L50 - L51 were not covered by tests
withr::defer(close(stream))

res <- curl::handle_data(handle)
the$last_request <- req

# Return early if there's a problem
resp <- new_response(
method = req_method_get(req),
url = res$url,
status_code = res$status_code,
headers = as_headers(res$headers),
body = NULL,
request = req
)
if (error_is_error(req, resp)) {
resp$body <- read_con(stream)
the$last_response <- resp
handle_resp(req, resp)
}

continue <- TRUE
incomplete <- TRUE
buf <- raw()
Expand All @@ -92,10 +74,241 @@
callback(buf)
}

# We're done streaming so convert to bodiless response
resp$body <- raw()

Check warning on line 78 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L78

Added line #L78 was not covered by tests
the$last_response <- resp
resp
}

#' Perform a request and return a streaming connection
#'
#' @description
#' Use `req_perform_connection()` to perform a request that includes a
#' connection as the body of the response, then `resp_stream_raw()`,
#' `resp_stream_lines()`, or `resp_stream_sse()` to retrieve data a chunk at a
#' timen, and finish by closing the connection with `close()`.
#'
#' This is an alternative interface to [req_perform_stream()] that returns a
#' connection that you can pull from data, rather than callbacks that are called
#' as the data streams in. This is useful if you want to do other work in
#' between streaming inputs.
#'
#' # `resp_stream_sse()`
#'
#' `resp_stream_sse()` helps work with APIs that uses the
#' [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
#' protocol. Each call will return one event, as a list with components
#' `type`, `data`, and `id`.
#'
#' It only works with text mode connections so when calling
#' `req_perform_connection()` you must use `mode = "text"`.
#'
#' @inheritParams req_perform_stream
#' @param resp,con A httr2 [response].
#' @param mode The mode that should be used for opening the connection.
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has?
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/stream-bytes/32768")
#' resp <- req_perform_connection(req)
#'
#' length(resp_stream_raw(resp, kb = 16))
#' length(resp_stream_raw(resp, kb = 16))
#' # When the stream has no more data, you'll get an empty result:
#' length(resp_stream_raw(resp, kb = 16))
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req,
mode = c("binary", "text"),
blocking = TRUE) {
check_request(req)
check_bool(blocking)
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

Check warning on line 130 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L128-L130

Added lines #L128 - L130 were not covered by tests

handle <- req_handle(req)
the$last_request <- req

Check warning on line 133 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L132-L133

Added lines #L132 - L133 were not covered by tests

tries <- 0
delay <- 0
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)
resp <- NULL
while (tries <= max_tries && Sys.time() < deadline) {
sys_sleep(delay, "for retry backoff")

Check warning on line 141 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L135-L141

Added lines #L135 - L141 were not covered by tests

if (!is.null(resp)) {
close(resp$body)
}
resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking)

Check warning on line 146 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L143-L146

Added lines #L143 - L146 were not covered by tests

if (retry_is_transient(req, resp)) {
tries <- tries + 1
delay <- retry_after(req, resp, tries)
} else {
break
}
}

Check warning on line 154 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L148-L154

Added lines #L148 - L154 were not covered by tests

if (error_is_error(req, resp)) {

Check warning on line 156 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L156

Added line #L156 was not covered by tests
# Read full body if there's an error
conn <- resp$body
resp$body <- read_con(conn)
close(conn)

Check warning on line 160 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L158-L160

Added lines #L158 - L160 were not covered by tests
}
the$last_response <- resp
handle_resp(req, resp)

Check warning on line 163 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L162-L163

Added lines #L162 - L163 were not covered by tests

resp

Check warning on line 165 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L165

Added line #L165 was not covered by tests
}

req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

Check warning on line 169 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L169

Added line #L169 was not covered by tests

# Must open the stream in order to initiate the connection
open(stream, con_mode, blocking = blocking)
curl_data <- curl::handle_data(handle)

Check warning on line 173 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L172-L173

Added lines #L172 - L173 were not covered by tests

new_response(
method = req_method_get(req),
url = curl_data$url,
status_code = curl_data$status_code,
headers = as_headers(curl_data$headers),
body = stream,
request = req
)

Check warning on line 182 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L175-L182

Added lines #L175 - L182 were not covered by tests
}

#' @export
#' @rdname req_perform_connection
#' @param kb How many kilobytes (1024 bytes) of data to read.
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

Check warning on line 190 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L189-L190

Added lines #L189 - L190 were not covered by tests

readBin(conn, raw(), kb * 1024)

Check warning on line 192 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L192

Added line #L192 was not covered by tests
}

#' @export
#' @rdname req_perform_connection
#' @param lines How many lines to read
resp_stream_lines <- function(resp, lines = 1) {
check_streaming_response(resp)
conn <- resp$body

Check warning on line 200 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L199-L200

Added lines #L199 - L200 were not covered by tests

readLines(conn, n = lines)

Check warning on line 202 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L202

Added line #L202 was not covered by tests
}

#' @export
#' @rdname req_perform_connection
# TODO: max_size
resp_stream_sse <- function(resp) {
check_streaming_response(resp)
conn <- resp$body
if (!identical(summary(conn)$text, "text")) {
cli::cli_abort(c(
"{.arg resp} must have a text mode connection.",
i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.'
))

Check warning on line 215 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L209-L215

Added lines #L209 - L215 were not covered by tests
}

lines <- character(0)
while (TRUE) {
line <- readLines(conn, n = 1)
if (length(line) == 0) {
break
}
if (line == "") {

Check warning on line 224 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L218-L224

Added lines #L218 - L224 were not covered by tests
# \n\n detected, end of event
return(parse_event(lines))
}
lines <- c(lines, line)
}

Check warning on line 229 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L226-L229

Added lines #L226 - L229 were not covered by tests

if (length(lines) > 0) {

Check warning on line 231 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L231

Added line #L231 was not covered by tests
# We have a partial event, put it back while we wait
# for more
pushBack(lines, conn)

Check warning on line 234 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L234

Added line #L234 was not covered by tests
}

return(NULL)

Check warning on line 237 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L237

Added line #L237 was not covered by tests
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname req_perform_connection
close.httr2_response <- function(con, ...) {
check_response(con)

Check warning on line 244 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L244

Added line #L244 was not covered by tests

if (inherits(con$body, "connection") && isValid(con$body)) {
close(con$body)

Check warning on line 247 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L246-L247

Added lines #L246 - L247 were not covered by tests
}

invisible()

Check warning on line 250 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L250

Added line #L250 was not covered by tests
}

# Helpers ----------------------------------------------------------------------

check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

Check warning on line 259 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L259

Added line #L259 was not covered by tests

if (resp_body_type(resp) != "stream") {
stop_input_type(
resp,
"a streaming HTTP response object",
allow_null = FALSE,
arg = arg,
call = call
)

Check warning on line 268 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L261-L268

Added lines #L261 - L268 were not covered by tests
}

if (!isValid(resp$body)) {
cli::cli_abort("{.arg {arg}} has already been closed.", call = call)

Check warning on line 272 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L271-L272

Added lines #L271 - L272 were not covered by tests
}
}

# isOpen doesn't work for two reasons:
# 1. It errors if con has been closed, rather than returning FALSE
# 2. If returns TRUE if con has been closed and a new connection opened
#
# So instead we retrieve the connection from its number and compare to the
# original connection. This works because connections have an undocumented
# external pointer.
isValid <- function(con) {
tryCatch(
identical(getConnection(con), con),
error = function(cnd) FALSE
)
}


parse_event <- function(lines) {
m <- regexec("([^:]*)(: ?)?(.*)", lines)
matches <- regmatches(lines, m)
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
values <- c("message", vapply(matches, function(x) x[4], character(1)))

Check warning on line 295 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L292-L295

Added lines #L292 - L295 were not covered by tests

remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data"
keys <- keys[!remove_dupes]
values <- values[!remove_dupes]

Check warning on line 299 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L297-L299

Added lines #L297 - L299 were not covered by tests

event_type <- values[keys == "event"]
data <- values[keys == "data"]
id <- values[keys == "id"]

Check warning on line 303 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L301-L303

Added lines #L301 - L303 were not covered by tests

list(
type = event_type,
data = data,
id = id
)

Check warning on line 309 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L305-L309

Added lines #L305 - L309 were not covered by tests
}

as_round_function <- function(round = c("byte", "line"),
error_call = caller_env()) {
if (is.function(round)) {
Expand Down
28 changes: 22 additions & 6 deletions R/resp-body.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,38 @@

if (!resp_has_body(resp)) {
cli::cli_abort("Can't retrieve empty body.")
} else if (is_path(resp$body)) {
readBin(resp$body, "raw", file.size(resp$body))
} else {
resp$body
}

switch(resp_body_type(resp),
disk = readBin(resp$body, "raw", file.size(resp$body)),

Check warning on line 42 in R/resp-body.R

View check run for this annotation

Codecov / codecov/patch

R/resp-body.R#L42

Added line #L42 was not covered by tests
memory = resp$body,
stream = {
out <- read_con(resp$body)
close(resp)
out

Check warning on line 47 in R/resp-body.R

View check run for this annotation

Codecov / codecov/patch

R/resp-body.R#L45-L47

Added lines #L45 - L47 were not covered by tests
}
)
}

#' @rdname resp_body_raw
#' @export
resp_has_body <- function(resp) {
check_response(resp)

switch(resp_body_type(resp),
disk = file.size(resp$body) > 0,
memory = length(resp$body) > 0,
stream = isValid(resp$body)
)
}

resp_body_type <- function(resp) {
if (is_path(resp$body)) {
file.size(resp$body) > 0
"disk"
} else if (inherits(resp$body, "connection")) {
"stream"
} else {
length(resp$body) > 0
"memory"
}
}

Expand Down
Loading
Loading