Skip to content

Commit

Permalink
Add req_perform_connect() (#521)
Browse files Browse the repository at this point in the history
Alternative (better) interface for `req_perform_stream()`. Makes the connection object the body of the response, and then provides `resp_stream_*()` to pull down various chunks of data.

Fixes #520
  • Loading branch information
jcheng5 authored Aug 28, 2024
1 parent f140308 commit 1c17dda
Show file tree
Hide file tree
Showing 11 changed files with 451 additions and 36 deletions.
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
S3method("$",httr2_headers)
S3method("[",httr2_headers)
S3method("[[",httr2_headers)
S3method(close,httr2_response)
S3method(print,httr2_cmd)
S3method(print,httr2_headers)
S3method(print,httr2_oauth_client)
Expand Down Expand Up @@ -72,6 +73,7 @@ export(req_oauth_password)
export(req_oauth_refresh)
export(req_options)
export(req_perform)
export(req_perform_connection)
export(req_perform_iterative)
export(req_perform_parallel)
export(req_perform_promise)
Expand Down Expand Up @@ -111,6 +113,9 @@ export(resp_raw)
export(resp_retry_after)
export(resp_status)
export(resp_status_desc)
export(resp_stream_lines)
export(resp_stream_raw)
export(resp_stream_sse)
export(resp_url)
export(resp_url_path)
export(resp_url_queries)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# httr2 (development version)

* New `req_perform_connection()` for working with streaming data. Unlike `req_perform_stream()` which uses callbacks, `req_perform_connection()` returns a regular response object with a connection as the body. It's paired with `resp_stream_bytes()`, `resp_stream_lines()`, and `resp_stream_sse()` that allows you to stream chunks as you want them. Unlike `req_perform_stream()` it supports `req_retry()` (with @jcheng5, #519).

# httr2 1.0.3

* `jwt_encode_hmac()` now calls correct underlying function
Expand Down
17 changes: 11 additions & 6 deletions R/req-cache.R
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ cache_get <- function(req) {
}

cache_set <- function(req, resp) {
if (is_path(resp$body)) {
if (resp_body_type(resp) == "disk") {
body_path <- req_cache_path(req, ".body")
file.copy(resp$body, body_path, overwrite = TRUE)
resp$body <- new_path(body_path)
Expand Down Expand Up @@ -246,11 +246,12 @@ cache_body <- function(cached_resp, path = NULL) {
return(body)
}

if (is_path(body)) {
file.copy(body, path, overwrite = TRUE)
} else {
writeBin(body, path)
}
switch(resp_body_type(cached_resp),
disk = file.copy(body, path, overwrite = TRUE),
memory = writeBin(body, path),
stream = cli::cli_abort("Invalid body type", .internal = TRUE)
)

new_path(path)
}

Expand All @@ -271,6 +272,10 @@ resp_is_cacheable <- function(resp, control = NULL) {
return(FALSE)
}

if (resp_body_type(resp) == "stream") {
return(FALSE)
}

control <- control %||% resp_cache_control(resp)
if ("no-store" %in% control$flags) {
return(FALSE)
Expand Down
253 changes: 233 additions & 20 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,10 @@ req_perform_stream <- function(req,

stop_time <- Sys.time() + timeout_sec

stream <- curl::curl(req$url, handle = handle)
open(stream, "rbf")
resp <- req_perform_connection(req)
stream <- resp$body
withr::defer(close(stream))

res <- curl::handle_data(handle)
the$last_request <- req

# Return early if there's a problem
resp <- new_response(
method = req_method_get(req),
url = res$url,
status_code = res$status_code,
headers = as_headers(res$headers),
body = NULL,
request = req
)
if (error_is_error(req, resp)) {
resp$body <- read_con(stream)
the$last_response <- resp
handle_resp(req, resp)
}

continue <- TRUE
incomplete <- TRUE
buf <- raw()
Expand All @@ -92,10 +74,241 @@ req_perform_stream <- function(req,
callback(buf)
}

# We're done streaming so convert to bodiless response
resp$body <- raw()
the$last_response <- resp
resp
}

#' Perform a request and return a streaming connection
#'
#' @description
#' Use `req_perform_connection()` to perform a request that includes a
#' connection as the body of the response, then `resp_stream_raw()`,
#' `resp_stream_lines()`, or `resp_stream_sse()` to retrieve data a chunk at a
#' timen, and finish by closing the connection with `close()`.
#'
#' This is an alternative interface to [req_perform_stream()] that returns a
#' connection that you can pull from data, rather than callbacks that are called
#' as the data streams in. This is useful if you want to do other work in
#' between streaming inputs.
#'
#' # `resp_stream_sse()`
#'
#' `resp_stream_sse()` helps work with APIs that uses the
#' [server-sent events](https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events)
#' protocol. Each call will return one event, as a list with components
#' `type`, `data`, and `id`.
#'
#' It only works with text mode connections so when calling
#' `req_perform_connection()` you must use `mode = "text"`.
#'
#' @inheritParams req_perform_stream
#' @param resp,con A httr2 [response].
#' @param mode The mode that should be used for opening the connection.
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has?
#' @export
#' @examples
#' req <- request(example_url()) |>
#' req_url_path("/stream-bytes/32768")
#' resp <- req_perform_connection(req)
#'
#' length(resp_stream_raw(resp, kb = 16))
#' length(resp_stream_raw(resp, kb = 16))
#' # When the stream has no more data, you'll get an empty result:
#' length(resp_stream_raw(resp, kb = 16))
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req,
mode = c("binary", "text"),
blocking = TRUE) {
check_request(req)
check_bool(blocking)
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

handle <- req_handle(req)
the$last_request <- req

tries <- 0
delay <- 0
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)
resp <- NULL
while (tries <= max_tries && Sys.time() < deadline) {
sys_sleep(delay, "for retry backoff")

if (!is.null(resp)) {
close(resp$body)
}
resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking)

if (retry_is_transient(req, resp)) {
tries <- tries + 1
delay <- retry_after(req, resp, tries)
} else {
break
}
}

if (error_is_error(req, resp)) {
# Read full body if there's an error
conn <- resp$body
resp$body <- read_con(conn)
close(conn)
}
the$last_response <- resp
handle_resp(req, resp)

resp
}

req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

# Must open the stream in order to initiate the connection
open(stream, con_mode, blocking = blocking)
curl_data <- curl::handle_data(handle)

new_response(
method = req_method_get(req),
url = curl_data$url,
status_code = curl_data$status_code,
headers = as_headers(curl_data$headers),
body = stream,
request = req
)
}

#' @export
#' @rdname req_perform_connection
#' @param kb How many kilobytes (1024 bytes) of data to read.
resp_stream_raw <- function(resp, kb = 32) {
check_streaming_response(resp)
conn <- resp$body

readBin(conn, raw(), kb * 1024)
}

#' @export
#' @rdname req_perform_connection
#' @param lines How many lines to read
resp_stream_lines <- function(resp, lines = 1) {
check_streaming_response(resp)
conn <- resp$body

readLines(conn, n = lines)
}

#' @export
#' @rdname req_perform_connection
# TODO: max_size
resp_stream_sse <- function(resp) {
check_streaming_response(resp)
conn <- resp$body
if (!identical(summary(conn)$text, "text")) {
cli::cli_abort(c(
"{.arg resp} must have a text mode connection.",
i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.'
))
}

lines <- character(0)
while (TRUE) {
line <- readLines(conn, n = 1)
if (length(line) == 0) {
break
}
if (line == "") {
# \n\n detected, end of event
return(parse_event(lines))
}
lines <- c(lines, line)
}

if (length(lines) > 0) {
# We have a partial event, put it back while we wait
# for more
pushBack(lines, conn)
}

return(NULL)
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname req_perform_connection
close.httr2_response <- function(con, ...) {
check_response(con)

if (inherits(con$body, "connection") && isValid(con$body)) {
close(con$body)
}

invisible()
}

# Helpers ----------------------------------------------------------------------

check_streaming_response <- function(resp,
arg = caller_arg(resp),
call = caller_env()) {

check_response(resp, arg = arg, call = call)

if (resp_body_type(resp) != "stream") {
stop_input_type(
resp,
"a streaming HTTP response object",
allow_null = FALSE,
arg = arg,
call = call
)
}

if (!isValid(resp$body)) {
cli::cli_abort("{.arg {arg}} has already been closed.", call = call)
}
}

# isOpen doesn't work for two reasons:
# 1. It errors if con has been closed, rather than returning FALSE
# 2. If returns TRUE if con has been closed and a new connection opened
#
# So instead we retrieve the connection from its number and compare to the
# original connection. This works because connections have an undocumented
# external pointer.
isValid <- function(con) {
tryCatch(
identical(getConnection(con), con),
error = function(cnd) FALSE
)
}


parse_event <- function(lines) {
m <- regexec("([^:]*)(: ?)?(.*)", lines)
matches <- regmatches(lines, m)
keys <- c("event", vapply(matches, function(x) x[2], character(1)))
values <- c("message", vapply(matches, function(x) x[4], character(1)))

remove_dupes <- duplicated(keys, fromLast = TRUE) & keys != "data"
keys <- keys[!remove_dupes]
values <- values[!remove_dupes]

event_type <- values[keys == "event"]
data <- values[keys == "data"]
id <- values[keys == "id"]

list(
type = event_type,
data = data,
id = id
)
}

as_round_function <- function(round = c("byte", "line"),
error_call = caller_env()) {
if (is.function(round)) {
Expand Down
28 changes: 22 additions & 6 deletions R/resp-body.R
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,38 @@ resp_body_raw <- function(resp) {

if (!resp_has_body(resp)) {
cli::cli_abort("Can't retrieve empty body.")
} else if (is_path(resp$body)) {
readBin(resp$body, "raw", file.size(resp$body))
} else {
resp$body
}

switch(resp_body_type(resp),
disk = readBin(resp$body, "raw", file.size(resp$body)),
memory = resp$body,
stream = {
out <- read_con(resp$body)
close(resp)
out
}
)
}

#' @rdname resp_body_raw
#' @export
resp_has_body <- function(resp) {
check_response(resp)

switch(resp_body_type(resp),
disk = file.size(resp$body) > 0,
memory = length(resp$body) > 0,
stream = isValid(resp$body)
)
}

resp_body_type <- function(resp) {
if (is_path(resp$body)) {
file.size(resp$body) > 0
"disk"
} else if (inherits(resp$body, "connection")) {
"stream"
} else {
length(resp$body) > 0
"memory"
}
}

Expand Down
Loading

0 comments on commit 1c17dda

Please sign in to comment.