Skip to content

Commit

Permalink
Manage our own pushback state
Browse files Browse the repository at this point in the history
Fixes #532
  • Loading branch information
hadley committed Sep 4, 2024
1 parent 06e9133 commit 4ed3709
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 35 deletions.
28 changes: 9 additions & 19 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ req_perform_stream <- function(req,
#' want to do other work in between handling inputs from the stream.
#'
#' @inheritParams req_perform_stream
#' @param mode The mode that should be used for opening the connection.
#' @param blocking When retrieving data, should the connection block and wait
#' for the desired information or immediately return what it has (possibly
#' nothing)?
Expand All @@ -115,13 +114,9 @@ req_perform_stream <- function(req,
#'
#' # Always close the response when you're done
#' close(resp)
req_perform_connection <- function(req,
mode = c("binary", "text"),
blocking = TRUE) {
req_perform_connection <- function(req, blocking = TRUE) {
check_request(req)
check_bool(blocking)
mode <- arg_match(mode)
con_mode <- if (mode == "text") "rf" else "rbf"

handle <- req_handle(req)
the$last_request <- req
Expand All @@ -137,7 +132,7 @@ req_perform_connection <- function(req,
if (!is.null(resp)) {
close(resp$body)
}
resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking)
resp <- req_perform_connection1(req, handle, blocking = blocking)

Check warning on line 135 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L135

Added line #L135 was not covered by tests

if (retry_is_transient(req, resp)) {
tries <- tries + 1
Expand All @@ -159,11 +154,11 @@ req_perform_connection <- function(req,
resp
}

req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) {
req_perform_connection1 <- function(req, handle, blocking = TRUE) {
stream <- curl::curl(req$url, handle = handle)

# Must open the stream in order to initiate the connection
open(stream, con_mode, blocking = blocking)
open(stream, "rbf", blocking = blocking)

Check warning on line 161 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L161

Added line #L161 was not covered by tests
curl_data <- curl::handle_data(handle)

new_response(
Expand Down Expand Up @@ -219,17 +214,12 @@ resp_stream_lines <- function(resp, lines = 1) {
# TODO: max_size
resp_stream_sse <- function(resp) {
check_streaming_response(resp)
conn <- resp$body
if (!identical(summary(conn)$text, "text")) {
cli::cli_abort(c(
"{.arg resp} must have a text mode connection.",
i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.'
))
}

lines <- character(0)
lines <- resp$cache$push_back %||% character()
resp$cache$push_back <- character()

Check warning on line 219 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L218-L219

Added lines #L218 - L219 were not covered by tests

while (TRUE) {
line <- readLines(conn, n = 1)
line <- readLines(resp$body, n = 1)

Check warning on line 222 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L222

Added line #L222 was not covered by tests
if (length(line) == 0) {
break
}
Expand All @@ -243,7 +233,7 @@ resp_stream_sse <- function(resp) {
if (length(lines) > 0) {
# We have a partial event, put it back while we wait
# for more
pushBack(lines, conn)
resp$cache$push_back <- lines

Check warning on line 236 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L236

Added line #L236 was not covered by tests
}

return(NULL)
Expand Down
4 changes: 1 addition & 3 deletions man/req_perform_connection.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 0 additions & 9 deletions tests/testthat/_snaps/req-perform-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@
Error in `resp_stream_raw()`:
! `resp` has already been closed.

# resp_stream_sse() requires a text connection

Code
resp_stream_sse(resp)
Condition
Error in `resp_stream_sse()`:
! `resp` must have a text mode connection.
i Use `mode = "text"` when calling `req_perform_connection()`.

# req_perform_stream checks its inputs

Code
Expand Down
29 changes: 25 additions & 4 deletions tests/testthat/test-req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ test_that("can feed sse events one at a time", {

server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))
resp <- req_perform_connection(req, mode = "text")
resp <- req_perform_connection(req)
on.exit(close(resp))

expect_equal(
Expand All @@ -71,11 +71,32 @@ test_that("can feed sse events one at a time", {
expect_equal(resp_stream_sse(resp), NULL)
})

test_that("resp_stream_sse() requires a text connection", {
resp <- request_test("/stream-bytes/1024") %>% req_perform_connection()
test_that("can join sse events across multiple reads", {
skip_on_covr()
app <- webfakes::new_app()

app$get("/events", function(req, res) {
i <- res$app$locals$i %||% 1
res$app$locals$i <- i + 1

res$send_chunk("data: 1\n")
Sys.sleep(0.2)
res$send_chunk("\n\n")
})

server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))
resp <- req_perform_connection(req, blocking = FALSE)
on.exit(close(resp))

expect_snapshot(resp_stream_sse(resp), error = TRUE)
out <- resp_stream_sse(resp)
expect_equal(out, NULL)
expect_equal(resp$cache$push_back, "data: 1")

Sys.sleep(0.3)
out <- resp_stream_sse(resp)
expect_equal(out, list(type = "message", data = "1", id = character()))
expect_equal(resp$cache$push_back, character())
})

# req_perform_stream() --------------------------------------------------------
Expand Down

0 comments on commit 4ed3709

Please sign in to comment.