From 4ed370988d947c086a1bae9e68e829454e2d943f Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Wed, 4 Sep 2024 11:21:55 +0100 Subject: [PATCH] Manage our own pushback state Fixes #532 --- R/req-perform-stream.R | 28 +++++++------------- man/req_perform_connection.Rd | 4 +-- tests/testthat/_snaps/req-perform-stream.md | 9 ------- tests/testthat/test-req-perform-stream.R | 29 ++++++++++++++++++--- 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/R/req-perform-stream.R b/R/req-perform-stream.R index cdc8ac44..d7b14190 100644 --- a/R/req-perform-stream.R +++ b/R/req-perform-stream.R @@ -98,7 +98,6 @@ req_perform_stream <- function(req, #' want to do other work in between handling inputs from the stream. #' #' @inheritParams req_perform_stream -#' @param mode The mode that should be used for opening the connection. #' @param blocking When retrieving data, should the connection block and wait #' for the desired information or immediately return what it has (possibly #' nothing)? @@ -115,13 +114,9 @@ req_perform_stream <- function(req, #' #' # Always close the response when you're done #' close(resp) -req_perform_connection <- function(req, - mode = c("binary", "text"), - blocking = TRUE) { +req_perform_connection <- function(req, blocking = TRUE) { check_request(req) check_bool(blocking) - mode <- arg_match(mode) - con_mode <- if (mode == "text") "rf" else "rbf" handle <- req_handle(req) the$last_request <- req @@ -137,7 +132,7 @@ req_perform_connection <- function(req, if (!is.null(resp)) { close(resp$body) } - resp <- req_perform_connection1(req, handle, con_mode, blocking = blocking) + resp <- req_perform_connection1(req, handle, blocking = blocking) if (retry_is_transient(req, resp)) { tries <- tries + 1 @@ -159,11 +154,11 @@ req_perform_connection <- function(req, resp } -req_perform_connection1 <- function(req, handle, con_mode = "rbf", blocking = TRUE) { +req_perform_connection1 <- function(req, handle, blocking = TRUE) { stream <- curl::curl(req$url, handle = handle) # Must open the stream in order to initiate the connection - open(stream, con_mode, blocking = blocking) + open(stream, "rbf", blocking = blocking) curl_data <- curl::handle_data(handle) new_response( @@ -219,17 +214,12 @@ resp_stream_lines <- function(resp, lines = 1) { # TODO: max_size resp_stream_sse <- function(resp) { check_streaming_response(resp) - conn <- resp$body - if (!identical(summary(conn)$text, "text")) { - cli::cli_abort(c( - "{.arg resp} must have a text mode connection.", - i = 'Use {.code mode = "text"} when calling {.fn req_perform_connection}.' - )) - } - lines <- character(0) + lines <- resp$cache$push_back %||% character() + resp$cache$push_back <- character() + while (TRUE) { - line <- readLines(conn, n = 1) + line <- readLines(resp$body, n = 1) if (length(line) == 0) { break } @@ -243,7 +233,7 @@ resp_stream_sse <- function(resp) { if (length(lines) > 0) { # We have a partial event, put it back while we wait # for more - pushBack(lines, conn) + resp$cache$push_back <- lines } return(NULL) diff --git a/man/req_perform_connection.Rd b/man/req_perform_connection.Rd index 541ab038..b7886223 100644 --- a/man/req_perform_connection.Rd +++ b/man/req_perform_connection.Rd @@ -4,13 +4,11 @@ \alias{req_perform_connection} \title{Perform a request and return a streaming connection} \usage{ -req_perform_connection(req, mode = c("binary", "text"), blocking = TRUE) +req_perform_connection(req, blocking = TRUE) } \arguments{ \item{req}{A httr2 \link{request} object.} -\item{mode}{The mode that should be used for opening the connection.} - \item{blocking}{When retrieving data, should the connection block and wait for the desired information or immediately return what it has (possibly nothing)?} diff --git a/tests/testthat/_snaps/req-perform-stream.md b/tests/testthat/_snaps/req-perform-stream.md index 2a816771..4273a5d3 100644 --- a/tests/testthat/_snaps/req-perform-stream.md +++ b/tests/testthat/_snaps/req-perform-stream.md @@ -15,15 +15,6 @@ Error in `resp_stream_raw()`: ! `resp` has already been closed. -# resp_stream_sse() requires a text connection - - Code - resp_stream_sse(resp) - Condition - Error in `resp_stream_sse()`: - ! `resp` must have a text mode connection. - i Use `mode = "text"` when calling `req_perform_connection()`. - # req_perform_stream checks its inputs Code diff --git a/tests/testthat/test-req-perform-stream.R b/tests/testthat/test-req-perform-stream.R index bdf6a28c..05de5766 100644 --- a/tests/testthat/test-req-perform-stream.R +++ b/tests/testthat/test-req-perform-stream.R @@ -55,7 +55,7 @@ test_that("can feed sse events one at a time", { server <- webfakes::local_app_process(app) req <- request(server$url("/events")) - resp <- req_perform_connection(req, mode = "text") + resp <- req_perform_connection(req) on.exit(close(resp)) expect_equal( @@ -71,11 +71,32 @@ test_that("can feed sse events one at a time", { expect_equal(resp_stream_sse(resp), NULL) }) -test_that("resp_stream_sse() requires a text connection", { - resp <- request_test("/stream-bytes/1024") %>% req_perform_connection() +test_that("can join sse events across multiple reads", { + skip_on_covr() + app <- webfakes::new_app() + + app$get("/events", function(req, res) { + i <- res$app$locals$i %||% 1 + res$app$locals$i <- i + 1 + + res$send_chunk("data: 1\n") + Sys.sleep(0.2) + res$send_chunk("\n\n") + }) + + server <- webfakes::local_app_process(app) + req <- request(server$url("/events")) + resp <- req_perform_connection(req, blocking = FALSE) on.exit(close(resp)) - expect_snapshot(resp_stream_sse(resp), error = TRUE) + out <- resp_stream_sse(resp) + expect_equal(out, NULL) + expect_equal(resp$cache$push_back, "data: 1") + + Sys.sleep(0.3) + out <- resp_stream_sse(resp) + expect_equal(out, list(type = "message", data = "1", id = character())) + expect_equal(resp$cache$push_back, character()) }) # req_perform_stream() --------------------------------------------------------