Skip to content

Commit

Permalink
Add a done callback (#542)
Browse files Browse the repository at this point in the history
So we can ensure that open connections are always closed. Fixes #534.
  • Loading branch information
hadley authored Sep 5, 2024
1 parent fa27e9e commit 88f0932
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 29 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# httr2 (development version)

* `req_body_file()` no longer leaks a connection if the response doesn't complete succesfully (#534).
* `req_perform()` no longer displays a progress bar when sleeping during tests. You can override this behaviour by setting the option `httr2_progress`.
* `req_cache()` now re-caches the response if the body is hasn't been modified but the headers have changed (#442).
* `req_cache()` works better when `req_perform()` sets a path (#442).
Expand Down
4 changes: 4 additions & 0 deletions R/multi-req.R
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ pool_run <- function(pool, perfs, on_error = "continue") {
# Wrap up all components of request -> response in a single object
Performance <- R6Class("Performance", public = list(
req = NULL,
req_prep = NULL,
path = NULL,

handle = NULL,
Expand All @@ -166,6 +167,7 @@ Performance <- R6Class("Performance", public = list(
if (is_response(req)) {
self$resp <- req
} else {
self$req_prep <- req_prepare(req)
self$handle <- req_handle(req)
curl::handle_setopt(self$handle, url = req$url)
}
Expand All @@ -190,6 +192,7 @@ Performance <- R6Class("Performance", public = list(

succeed = function(res) {
self$progress$update()
req_completed(self$req_prep)

if (is.null(self$path)) {
body <- res$content
Expand Down Expand Up @@ -220,6 +223,7 @@ Performance <- R6Class("Performance", public = list(

fail = function(msg) {
self$progress$update()
req_completed(self$req_prep)

self$resp <- error_cnd(
"httr2_failure",
Expand Down
30 changes: 6 additions & 24 deletions R/req-body.R
Original file line number Diff line number Diff line change
Expand Up @@ -229,35 +229,17 @@ req_body_apply <- function(req) {

if (type == "raw-file") {
size <- file.info(data)$size
done <- FALSE
# Only open connection if needed
delayedAssign("con", file(data, "rb"))

# Leaks connection if request doesn't complete
readfunction <- function(nbytes, ...) {
if (done) {
return(raw())
}
out <- readBin(con, "raw", nbytes)
if (length(out) < nbytes) {
close(con)
done <<- TRUE
con <<- NULL
}
out
}
seekfunction <- function(offset, ...) {
if (done) {
con <<- file(data, "rb")
done <<- FALSE
}
seek(con, where = offset)
}

req <- req_policies(
req,
done = function() close(con)
)
req <- req_options(req,
post = TRUE,
readfunction = readfunction,
seekfunction = seekfunction,
readfunction = function(nbytes, ...) readBin(con, "raw", nbytes),
seekfunction = function(offset, ...) seek(con, where = offset),
postfieldsize_large = size
)
} else if (type == "raw") {
Expand Down
6 changes: 4 additions & 2 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ req_perform_stream <- function(req,
round = c("byte", "line")) {
check_request(req)

handle <- req_handle(req)
check_function(callback)
check_number_decimal(timeout_sec, min = 0)
check_number_decimal(buffer_kb, min = 0)
Expand Down Expand Up @@ -123,7 +122,8 @@ req_perform_connection <- function(req,
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

handle <- req_handle(req)
req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
the$last_request <- req

tries <- 0
Expand All @@ -147,6 +147,8 @@ req_perform_connection <- function(req,
}
}

req_completed(req_prep)

if (error_is_error(req, resp)) {
# Read full body if there's an error
conn <- resp$body
Expand Down
18 changes: 15 additions & 3 deletions R/req-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ req_perform <- function(
return(req)
}

handle <- req_handle(req)
req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
max_tries <- retry_max_tries(req)
deadline <- Sys.time() + retry_max_seconds(req)

Expand Down Expand Up @@ -122,14 +123,16 @@ req_perform <- function(
)
}
)
req_completed(req_prep)

if (is_error(resp)) {
tries <- tries + 1
delay <- retry_backoff(req, tries)
} else if (!reauth && resp_is_invalid_oauth_token(req, resp)) {
reauth <- TRUE
req <- auth_oauth_sign(req, TRUE)
handle <- req_handle(req)
req_prep <- req_prepare(req)
handle <- req_handle(req_prep)
delay <- 0
} else if (retry_is_transient(req, resp)) {
tries <- tries + 1
Expand Down Expand Up @@ -258,6 +261,7 @@ req_dry_run <- function(req, quiet = FALSE, redact_headers = TRUE) {
req <- req_options(req, debugfunction = debug, verbose = TRUE)
}

req <- req_prepare(req)
handle <- req_handle(req)
curl::handle_setopt(handle, url = req$url)
resp <- curl::curl_echo(handle, progress = FALSE)
Expand All @@ -269,14 +273,19 @@ req_dry_run <- function(req, quiet = FALSE, redact_headers = TRUE) {
))
}

req_handle <- function(req) {
# Must call req_prepare(), then req_handle(), then after the request has been
# performed, req_completed()
req_prepare <- function(req) {
req <- req_method_apply(req)
req <- req_body_apply(req)

if (!has_name(req$options, "useragent")) {
req <- req_user_agent(req)
}

req
}
req_handle <- function(req) {
handle <- curl::new_handle()
curl::handle_setheaders(handle, .list = headers_flatten(req$headers))
curl::handle_setopt(handle, .list = req$options)
Expand All @@ -286,6 +295,9 @@ req_handle <- function(req) {

handle
}
req_completed <- function(req) {
req_policy_call(req, "done", list(), NULL)
}

new_path <- function(x) structure(x, class = "httr2_path")
is_path <- function(x) inherits(x, "httr2_path")
3 changes: 3 additions & 0 deletions tests/testthat/helper.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
testthat::set_state_inspector(function() {
getAllConnections()
})

0 comments on commit 88f0932

Please sign in to comment.