From 27cc841dfb408f9aaffa7175731bd5d22210c465 Mon Sep 17 00:00:00 2001 From: Edward Rudd Date: Sat, 20 Jan 2024 16:43:09 -0500 Subject: [PATCH 1/2] deps: prepare to upgrade to hyper 1.0 - enable backport and deprecated features on hyper - update code from deprecated recommendations --- Cargo.toml | 2 +- src/filters/body.rs | 6 ++++-- src/lib.rs | 2 +- src/reject.rs | 3 ++- src/test.rs | 5 ++++- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 6ad44e4be..68622af6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ futures-util = { version = "0.3", default-features = false, features = ["sink"] futures-channel = { version = "0.3.17", features = ["sink"]} headers = "0.3.5" http = "0.2" -hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client"] } +hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client", "backports", "deprecated", "runtime"] } log = "0.4" mime = "0.3" mime_guess = "2.0.0" diff --git a/src/filters/body.rs b/src/filters/body.rs index 85dabbfea..3928d3940 100644 --- a/src/filters/body.rs +++ b/src/filters/body.rs @@ -107,7 +107,8 @@ pub fn stream( /// ``` pub fn bytes() -> impl Filter + Copy { body().and_then(|body: hyper::Body| { - hyper::body::to_bytes(body).map_err(|err| { + use hyper::body::HttpBody; + body.collect().map_ok(|b| b.to_bytes()).map_err(|err| { tracing::debug!("to_bytes error: {}", err); reject::known(BodyReadError(err)) }) @@ -145,7 +146,8 @@ pub fn bytes() -> impl Filter + Copy { /// ``` pub fn aggregate() -> impl Filter + Copy { body().and_then(|body: ::hyper::Body| { - hyper::body::aggregate(body).map_err(|err| { + use hyper::body::HttpBody; + body.collect().map_ok(|b| b.aggregate()).map_err(|err| { tracing::debug!("aggregate error: {}", err); reject::known(BodyReadError(err)) }) diff --git a/src/lib.rs b/src/lib.rs index 35ed0dcd2..216c74128 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ #![deny(missing_docs)] #![deny(missing_debug_implementations)] #![deny(rust_2018_idioms)] -#![cfg_attr(test, deny(warnings))] +// #![cfg_attr(test, deny(warnings))] //! # warp //! diff --git a/src/reject.rs b/src/reject.rs index 6b19b4e6e..ee18073ad 100644 --- a/src/reject.rs +++ b/src/reject.rs @@ -800,8 +800,9 @@ mod tests { } async fn response_body_string(resp: crate::reply::Response) -> String { + use hyper::body::HttpBody; let (_, body) = resp.into_parts(); - let body_bytes = hyper::body::to_bytes(body).await.expect("failed concat"); + let body_bytes = body.collect().await.expect("failed concat").to_bytes(); String::from_utf8_lossy(&body_bytes).to_string() } diff --git a/src/test.rs b/src/test.rs index ca2710fae..bacc8a113 100644 --- a/src/test.rs +++ b/src/test.rs @@ -379,6 +379,8 @@ impl RequestBuilder { let route = Route::new(self.req, self.remote_addr); let mut fut = Box::pin( route::set(&route, move || f.filter(crate::filter::Internal)).then(|result| { + use hyper::body::HttpBody; + let res = match result { Ok(rep) => rep.into_response(), Err(rej) => { @@ -387,7 +389,8 @@ impl RequestBuilder { } }; let (parts, body) = res.into_parts(); - hyper::body::to_bytes(body).map_ok(|chunk| Response::from_parts(parts, chunk)) + HttpBody::collect(body) + .map_ok(|chunk| Response::from_parts(parts, chunk.to_bytes())) }), ); From ac9774d3208fc3fd0be94133a95ed364445d3f05 Mon Sep 17 00:00:00 2001 From: Edward Rudd Date: Wed, 31 Jan 2024 10:35:15 -0500 Subject: [PATCH 2/2] Upgrade to hyper 1.1 --- Cargo.toml | 11 +++++++---- src/filter/service.rs | 6 +----- src/filters/body.rs | 18 +++++++++--------- src/filters/compression.rs | 24 ++++++++++++------------ src/filters/fs.rs | 10 +++++----- src/filters/multipart.rs | 4 ++-- src/filters/sse.rs | 4 ++-- src/lib.rs | 2 +- src/reject.rs | 8 ++++---- src/reply.rs | 24 ++++++++++++------------ src/route.rs | 6 +++--- src/test.rs | 5 ++--- 12 files changed, 60 insertions(+), 62 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 68622af6d..f0f558ba9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,13 +21,16 @@ async-compression = { version = "0.4.5", features = ["tokio"], optional = true } bytes = "1.0" futures-util = { version = "0.3", default-features = false, features = ["sink"] } futures-channel = { version = "0.3.17", features = ["sink"]} -headers = "0.3.5" -http = "0.2" -hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client", "backports", "deprecated", "runtime"] } +headers = "0.4.0" +http = "1" +hyper = { version = "1", features = ["server", "http1", "http2", "client"] } +hyper-util = "0.1.2" +http-body = "1" +http-body-util = "0.1.0" log = "0.4" mime = "0.3" mime_guess = "2.0.0" -multer = { version = "2.1.0", optional = true } +multer = { version = "3.0.0", optional = true } scoped-tls = "1.0" serde = "1.0" serde_json = "1.0" diff --git a/src/filter/service.rs b/src/filter/service.rs index 3de12a02e..310ff24d5 100644 --- a/src/filter/service.rs +++ b/src/filter/service.rs @@ -93,12 +93,8 @@ where type Error = Infallible; type Future = FilteredFuture; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - #[inline] - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { self.call_with_addr(req, None) } } diff --git a/src/filters/body.rs b/src/filters/body.rs index 3928d3940..c95c55d0f 100644 --- a/src/filters/body.rs +++ b/src/filters/body.rs @@ -11,7 +11,7 @@ use bytes::{Buf, Bytes}; use futures_util::{future, ready, Stream, TryFutureExt}; use headers::ContentLength; use http::header::CONTENT_TYPE; -use hyper::Body; +use hyper::body::Incoming; use mime; use serde::de::DeserializeOwned; use serde_json; @@ -22,10 +22,10 @@ use crate::reject::{self, Rejection}; type BoxError = Box; -// Extracts the `Body` Stream from the route. +// Extracts the `Incoming` Stream from the route. // // Does not consume any of it. -pub(crate) fn body() -> impl Filter + Copy { +pub(crate) fn body() -> impl Filter + Copy { filter_fn_one(|route| { future::ready(route.take_body().ok_or_else(|| { tracing::error!("request body already taken in previous filter"); @@ -79,7 +79,7 @@ pub fn content_length_limit(limit: u64) -> impl Filter impl Filter>,), Error = Rejection> + Copy { - body().map(|body: Body| BodyStream { body }) + body().map(|body: Incoming| BodyStream { body }) } /// Returns a `Filter` that matches any request and extracts a `Future` of a @@ -106,8 +106,8 @@ pub fn stream( /// }); /// ``` pub fn bytes() -> impl Filter + Copy { - body().and_then(|body: hyper::Body| { - use hyper::body::HttpBody; + body().and_then(|body: Incoming| { + use http_body_util::BodyExt; body.collect().map_ok(|b| b.to_bytes()).map_err(|err| { tracing::debug!("to_bytes error: {}", err); reject::known(BodyReadError(err)) @@ -145,8 +145,8 @@ pub fn bytes() -> impl Filter + Copy { /// .map(full_body); /// ``` pub fn aggregate() -> impl Filter + Copy { - body().and_then(|body: ::hyper::Body| { - use hyper::body::HttpBody; + body().and_then(|body: Incoming| { + use http_body_util::BodyExt; body.collect().map_ok(|b| b.aggregate()).map_err(|err| { tracing::debug!("aggregate error: {}", err); reject::known(BodyReadError(err)) @@ -293,7 +293,7 @@ fn is_content_type() -> impl Filter // ===== BodyStream ===== struct BodyStream { - body: Body, + body: Incoming, } impl Stream for BodyStream { diff --git a/src/filters/compression.rs b/src/filters/compression.rs index 244e76835..e0b85afe4 100644 --- a/src/filters/compression.rs +++ b/src/filters/compression.rs @@ -10,8 +10,8 @@ use async_compression::tokio::bufread::{DeflateEncoder, GzipEncoder}; use http::header::HeaderValue; use hyper::{ + body::Incoming, header::{CONTENT_ENCODING, CONTENT_LENGTH}, - Body, }; use tokio_util::io::{ReaderStream, StreamReader}; @@ -69,7 +69,7 @@ pub struct Compression { #[cfg(feature = "compression-gzip")] pub fn gzip() -> Compression Response + Copy> { let func = move |mut props: CompressionProps| { - let body = Body::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new( + let body = Incoming::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new( props.body, )))); props @@ -98,9 +98,9 @@ pub fn gzip() -> Compression Response + Copy> { #[cfg(feature = "compression-gzip")] pub fn deflate() -> Compression Response + Copy> { let func = move |mut props: CompressionProps| { - let body = Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(StreamReader::new( - props.body, - )))); + let body = Incoming::wrap_stream(ReaderStream::new(DeflateEncoder::new( + StreamReader::new(props.body), + ))); props .head .headers @@ -127,7 +127,7 @@ pub fn deflate() -> Compression Response + Copy> { #[cfg(feature = "compression-brotli")] pub fn brotli() -> Compression Response + Copy> { let func = move |mut props: CompressionProps| { - let body = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new( + let body = Incoming::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new( props.body, )))); props @@ -164,7 +164,7 @@ mod internal { use bytes::Bytes; use futures_util::{ready, Stream, TryFuture}; - use hyper::Body; + use hyper::body::Incoming; use pin_project::pin_project; use crate::filter::{Filter, FilterBase, Internal}; @@ -201,8 +201,8 @@ mod internal { } } - impl From for CompressableBody { - fn from(body: Body) -> Self { + impl From for CompressableBody { + fn from(body: Incoming) -> Self { CompressableBody { body } } } @@ -210,12 +210,12 @@ mod internal { /// Compression Props #[derive(Debug)] pub struct CompressionProps { - pub(super) body: CompressableBody, + pub(super) body: CompressableBody, pub(super) head: http::response::Parts, } - impl From> for CompressionProps { - fn from(resp: http::Response) -> Self { + impl From> for CompressionProps { + fn from(resp: http::Response) -> Self { let (head, body) = resp.into_parts(); CompressionProps { body: body.into(), diff --git a/src/filters/fs.rs b/src/filters/fs.rs index fdfa70968..635edb9cf 100644 --- a/src/filters/fs.rs +++ b/src/filters/fs.rs @@ -18,7 +18,7 @@ use headers::{ IfUnmodifiedSince, LastModified, Range, }; use http::StatusCode; -use hyper::Body; +use hyper::body::Incoming; use mime_guess; use percent_encoding::percent_decode_str; use tokio::fs::File as TkFile; @@ -162,7 +162,7 @@ impl Conditionals { precondition ); if !precondition { - let mut res = Response::new(Body::empty()); + let mut res = Response::new(Incoming::empty()); *res.status_mut() = StatusCode::PRECONDITION_FAILED; return Cond::NoBody(res); } @@ -179,7 +179,7 @@ impl Conditionals { // no last_modified means its always modified .unwrap_or(false); if unmodified { - let mut res = Response::new(Body::empty()); + let mut res = Response::new(Incoming::empty()); *res.status_mut() = StatusCode::NOT_MODIFIED; return Cond::NoBody(res); } @@ -318,7 +318,7 @@ fn file_conditional( let sub_len = end - start; let buf_size = optimal_buf_size(&meta); let stream = file_stream(file, buf_size, (start, end)); - let body = Body::wrap_stream(stream); + let body = Incoming::wrap_stream(stream); let mut resp = Response::new(body); @@ -345,7 +345,7 @@ fn file_conditional( }) .unwrap_or_else(|BadRange| { // bad byte range - let mut resp = Response::new(Body::empty()); + let mut resp = Response::new(Incoming::empty()); *resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE; resp.headers_mut() .typed_insert(ContentRange::unsatisfied_bytes(len)); diff --git a/src/filters/multipart.rs b/src/filters/multipart.rs index f8a6d36b7..3b131d8ed 100644 --- a/src/filters/multipart.rs +++ b/src/filters/multipart.rs @@ -12,7 +12,7 @@ use std::{fmt, io}; use bytes::{Buf, Bytes}; use futures_util::{future, Stream}; use headers::ContentType; -use hyper::Body; +use hyper::body::Incoming; use mime::Mime; use multer::{Field as PartInner, Multipart as FormDataInner}; @@ -200,7 +200,7 @@ impl Stream for PartStream { } } -struct BodyIoError(Body); +struct BodyIoError(Incoming); impl Stream for BodyIoError { type Item = io::Result; diff --git a/src/filters/sse.rs b/src/filters/sse.rs index 413456aab..25a8a7e40 100644 --- a/src/filters/sse.rs +++ b/src/filters/sse.rs @@ -53,7 +53,7 @@ use std::time::Duration; use futures_util::{future, Stream, TryStream, TryStreamExt}; use http::header::{HeaderValue, CACHE_CONTROL, CONTENT_TYPE}; -use hyper::Body; +use hyper::body::Incoming; use pin_project::pin_project; use serde_json::{self, Error}; use tokio::time::{self, Sleep}; @@ -340,7 +340,7 @@ where .into_stream() .and_then(|event| future::ready(Ok(event.to_string()))); - let mut res = Response::new(Body::wrap_stream(body_stream)); + let mut res = Response::new(Incoming::wrap_stream(body_stream)); // Set appropriate content type res.headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream")); diff --git a/src/lib.rs b/src/lib.rs index 216c74128..36dfbe5c1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -176,4 +176,4 @@ pub use bytes::Buf; pub use futures_util::{Future, Sink, Stream}; #[doc(hidden)] -pub(crate) type Request = http::Request; +pub(crate) type Request = http::Request; diff --git a/src/reject.rs b/src/reject.rs index ee18073ad..414f0253d 100644 --- a/src/reject.rs +++ b/src/reject.rs @@ -66,7 +66,7 @@ use http::{ header::{HeaderValue, CONTENT_TYPE}, StatusCode, }; -use hyper::Body; +use hyper::body::Incoming; pub(crate) use self::sealed::{CombineRejection, IsReject}; @@ -443,7 +443,7 @@ impl Rejections { fn into_response(&self) -> crate::reply::Response { match *self { Rejections::Known(ref e) => { - let mut res = http::Response::new(Body::from(e.to_string())); + let mut res = http::Response::new(Incoming::from(e.to_string())); *res.status_mut() = self.status(); res.headers_mut().insert( CONTENT_TYPE, @@ -457,7 +457,7 @@ impl Rejections { e ); let body = format!("Unhandled rejection: {:?}", e); - let mut res = http::Response::new(Body::from(body)); + let mut res = http::Response::new(Incoming::from(body)); *res.status_mut() = self.status(); res.headers_mut().insert( CONTENT_TYPE, @@ -800,7 +800,7 @@ mod tests { } async fn response_body_string(resp: crate::reply::Response) -> String { - use hyper::body::HttpBody; + use http_body_util::BodyExt; let (_, body) = resp.into_parts(); let body_bytes = body.collect().await.expect("failed concat").to_bytes(); String::from_utf8_lossy(&body_bytes).to_string() diff --git a/src/reply.rs b/src/reply.rs index 79e6dfd0e..8dfe4dc41 100644 --- a/src/reply.rs +++ b/src/reply.rs @@ -8,7 +8,7 @@ //! Besides them, you can return a type that implements [`Reply`](./trait.Reply.html). This //! could be any of the following: //! -//! - [`http::Response>`](https://docs.rs/http) +//! - [`http::Response>`](https://docs.rs/http) //! - `String` //! - `&'static str` //! - `http::StatusCode` @@ -41,7 +41,7 @@ use std::fmt; use crate::generic::{Either, One}; use http::header::{HeaderName, HeaderValue, CONTENT_TYPE}; use http::StatusCode; -use hyper::Body; +use hyper::body::Incoming; use serde::Serialize; use serde_json; @@ -52,7 +52,7 @@ use self::sealed::{BoxedReply, Internal}; pub use crate::filters::reply as with; /// Response type into which types implementing the `Reply` trait are convertable. -pub type Response = ::http::Response; +pub type Response = ::http::Response; /// Returns an empty `Reply` with status code `200 OK`. /// @@ -167,7 +167,7 @@ impl StdError for ReplyJsonError {} /// ``` pub fn html(body: T) -> Html where - Body: From, + Incoming: From, T: Send, { Html { body } @@ -181,12 +181,12 @@ pub struct Html { impl Reply for Html where - Body: From, + Incoming: From, T: Send, { #[inline] fn into_response(self) -> Response { - let mut res = Response::new(Body::from(self.body)); + let mut res = Response::new(Incoming::from(self.body)); res.headers_mut().insert( CONTENT_TYPE, HeaderValue::from_static("text/html; charset=utf-8"), @@ -200,7 +200,7 @@ where /// This trait is implemented for the following: /// /// - `http::StatusCode` -/// - `http::Response>` +/// - `http::Response>` /// - `String` /// - `&'static str` /// @@ -394,11 +394,11 @@ impl Reply for WithHeader { impl Reply for ::http::Response where - Body: From, + Incoming: From, { #[inline] fn into_response(self) -> Response { - self.map(Body::from) + self.map(Incoming::from) } } @@ -433,7 +433,7 @@ where } } -fn text_plain>(body: T) -> Response { +fn text_plain>(body: T) -> Response { let mut response = ::http::Response::new(body.into()); response.headers_mut().insert( CONTENT_TYPE, @@ -457,7 +457,7 @@ impl Reply for Vec { CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"), ) - .body(Body::from(self)) + .body(Incoming::from(self)) .unwrap() } } @@ -487,7 +487,7 @@ impl Reply for &'static [u8] { CONTENT_TYPE, HeaderValue::from_static("application/octet-stream"), ) - .body(Body::from(self)) + .body(Incoming::from(self)) .unwrap() } } diff --git a/src/route.rs b/src/route.rs index afbac4d8b..e6b5e9d39 100644 --- a/src/route.rs +++ b/src/route.rs @@ -3,7 +3,7 @@ use std::cell::RefCell; use std::mem; use std::net::SocketAddr; -use hyper::Body; +use hyper::body::Incoming; use crate::Request; @@ -127,10 +127,10 @@ impl Route { self.remote_addr } - pub(crate) fn take_body(&mut self) -> Option { + pub(crate) fn take_body(&mut self) -> Option { match self.body { BodyState::Ready => { - let body = mem::replace(self.req.body_mut(), Body::empty()); + let body = mem::replace(self.req.body_mut(), Incoming::empty()); self.body = BodyState::Taken; Some(body) } diff --git a/src/test.rs b/src/test.rs index bacc8a113..3d902aaa3 100644 --- a/src/test.rs +++ b/src/test.rs @@ -379,7 +379,7 @@ impl RequestBuilder { let route = Route::new(self.req, self.remote_addr); let mut fut = Box::pin( route::set(&route, move || f.filter(crate::filter::Internal)).then(|result| { - use hyper::body::HttpBody; + use http_body_util::BodyExt; let res = match result { Ok(rep) => rep.into_response(), @@ -389,8 +389,7 @@ impl RequestBuilder { } }; let (parts, body) = res.into_parts(); - HttpBody::collect(body) - .map_ok(|chunk| Response::from_parts(parts, chunk.to_bytes())) + BodyExt::collect(body).map_ok(|chunk| Response::from_parts(parts, chunk.to_bytes())) }), );