Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add req_perform_open, which makes resp$body the underlying stream #521

Merged
merged 27 commits into from
Aug 28, 2024

Conversation

jcheng5
Copy link
Member

@jcheng5 jcheng5 commented Aug 27, 2024

Alternative, lower-level approach to #520. SSE support could be built on top--I'll take a stab at that too.

Example usage:

library(httr2)

# Define the API endpoint and your API key
api_endpoint <- "https://api.openai.com/v1/chat/completions"
api_key <- Sys.getenv("OPENAI_API_KEY")

# Build the request
response <- request(api_endpoint) %>%
  req_headers(
    "Content-Type" = "application/json",
    "Authorization" = paste("Bearer", api_key)
  ) %>%
  req_body_json(list(
    model = "gpt-4o",
    temperature = 0.7,
    stream = TRUE,
    messages = list(
      list(
        role = "system",
        content = "You're an incredibly verbose assistant."
      ),
      list(
        role = "user",
        content = "When did the modern Olympics start?"
      )
    )
  )) %>%
  req_perform_open()

while (isIncomplete(response$body)) {
  line <- readLines(response$body, n = 1)
  if (length(line) > 0) {
    message(line)
  } else {
    # If no data was available, wait a bit
    message("SLEEPING...\n")
    Sys.sleep(0.25)
  }
}

close(response$body)

blocking = TRUE greatly simplifies cases where you don't have better
things to do while you're waiting (i.e. most R use cases that are
not Shiny or plumber?)
R/req-perform-stream.R Outdated Show resolved Hide resolved
@jcheng5
Copy link
Member Author

jcheng5 commented Aug 28, 2024

SSE equivalent usage:

library(httr2)

# Define the API endpoint and your API key
api_endpoint <- "https://api.openai.com/v1/chat/completions"
api_key <- Sys.getenv("OPENAI_API_KEY")

# Build the request
response <- request(api_endpoint) %>%
  req_headers(
    "Content-Type" = "application/json",
    "Authorization" = paste("Bearer", api_key)
  ) %>%
  req_body_json(list(
    model = "gpt-4o",
    temperature = 0.7,
    stream = TRUE,
    messages = list(
      list(
        role = "system",
        content = "You're an incredibly verbose assistant."
      ),
      list(
        role = "user",
        content = "When did the modern Olympics start?"
      )
    )
  )) %>%
  req_perform_open(blocking = FALSE)

while (isIncomplete(response$body)) {
  msg <- read_sse(response$body)
  if (!is.null(msg)) {
    cat(msg$data)
    cat("\n")
  } else {
    message("SLEEPING...\n")
    Sys.sleep(0.25)
  }
}

close(response$body)

@jcheng5
Copy link
Member Author

jcheng5 commented Aug 28, 2024

BTW, req_perform_connection(blocking=TRUE) and blocking=FALSE both make sense for both the readLines and read_sse scenarios. Use the former when you don't have anything better to do but wait, and you'll get the answer back with the least possible delay and maximum efficiency (no "busy wait"). Use the latter when you want to do other things while you wait.

Ideally we'd eventually have a true callback based async version that is both efficient and nonblocking.

#' @export
#' @rdname req_perform_connection
close.httr2_response <- function(con, ...) {
check_streaming_response(con)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean if you call close(resp) on a non-streaming response, you'll get an error? If so, I think a no-op would be better; as a general rule I try not to throw in cleanup code since it's so often invoked in places where errors are not expected or hard to deal with (like in a finally or a gc finalizer).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense.

@jcheng5
Copy link
Member Author

jcheng5 commented Aug 28, 2024

One issue we must address before merging is that resp_stream_sse only works if the response$body was opened in text mode. This is because it uses pushBack, which only works on text connections. We could avoid this restriction by implementing our own pushBack, or do what I did locally, which was to add a mode argument to req_perform_connection (and btw resp_stream_sse is also always UTF-8 per the specification) and make sure to use rt if you know you're going to resp_stream_sse.

@hadley
Copy link
Member

hadley commented Aug 28, 2024

And at a more meta level, we need to add some tests for resp_stream_sse() too. I think I should have time this afternoon.

@jcheng5
Copy link
Member Author

jcheng5 commented Aug 28, 2024

I'm not saying we need to implement this or, even if we want to, that it needs to be part of this PR, but I just wanted to point out two things about the SSE spec:

  1. There is a standard JavaScript API for consuming SSE, a transliteration into R would look something like:
es <- EventSource$new("http://localhost:5000")
es$addEventListener("message", \(e) { message("Message received: ", e$data) })
...
es$close()

I wonder if a high-level SSE wrapper like this (or a more idiomatic version) is something httr2 users would expect to see and/or find useful.

  1. The SSE spec has a mechanism for (client initiated) reconnecting of dropped connections, including tracking the last event that was successfully received and reporting that during reconnection. With this PR as-is, you could do it yourself, but it'd be a bit of an exercise.

jcheng5 added a commit to jcheng5/r-sidebot that referenced this pull request Aug 28, 2024
* Requires r-lib/httr2#521
* Explain plot feature is currently broken
@hadley hadley marked this pull request as ready for review August 28, 2024 20:46
@hadley
Copy link
Member

hadley commented Aug 28, 2024

No one else has asked for SSE support, so I don't think we need to do prospective work, but I'm certainly happy to continue to build on this API as we discover more about what we need.

I'm planning on merging this PR today.

@hadley hadley merged commit 1c17dda into main Aug 28, 2024
13 checks passed
@hadley hadley deleted the expose-curl-stream branch August 28, 2024 21:29
@jcheng5
Copy link
Member Author

jcheng5 commented Aug 28, 2024

Looks great, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants