diff --git a/.github/workflows/format-code.yml b/.github/workflows/format-code.yml index 1c5251f..42693b3 100644 --- a/.github/workflows/format-code.yml +++ b/.github/workflows/format-code.yml @@ -8,7 +8,7 @@ on: jobs: format-code: runs-on: "ubuntu-latest" - container: rust:1.77 + container: rust:1.79 steps: - name: Checkout the code on merge diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 297fb62..9e0d1d5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ on: jobs: lint: runs-on: "ubuntu-latest" - container: rust:1.77 + container: rust:1.79 steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 9ea09ba..a6364d1 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -11,9 +11,9 @@ jobs: build: strategy: matrix: - features: ["", "--no-default-features --features rustls-native"] + features: [""] runs-on: "ubuntu-latest" - container: rust:1.74 + container: rust:1.79 steps: - uses: actions/checkout@v2 @@ -24,9 +24,9 @@ jobs: test: strategy: matrix: - features: ["", "--no-default-features --features rustls-native"] + features: [""] runs-on: "ubuntu-latest" - container: rust:1.77 + container: rust:1.79 services: consul: image: consul:1.11.11 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 689b071..9786c24 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -6,7 +6,7 @@ on: jobs: test: runs-on: ubuntu-latest - container: rust:1.77 + container: rust:1.79 services: consul: image: consul:1.11.11 @@ -25,7 +25,7 @@ jobs: dry-run: runs-on: ubuntu-latest - container: rust:1.77 + container: rust:1.79 steps: - uses: actions/checkout@v2 @@ -36,7 +36,7 @@ jobs: publish: needs: [test, dry-run] runs-on: ubuntu-latest - container: rust:1.74 + container: rust:1.79 environment: crates-publish steps: diff --git a/CHANGELOG.md b/CHANGELOG.md index 8afc7bc..c7df596 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,19 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +## 0.7.0 - 2024-06-25 + +### Changed + +- `opentelemetry` updated to version `0.24` from `0.22`. +- `http` updated to version `1.0` from `0.2`. +- `hyper` updated to version `1.0` from `0.14`. +- `hyper-rustls` updated to version `0.27` from `0.24`. +- `get_service_nodes` now supports tags thanks to @gautamg795 +- `read_key` now also returns the index thanks to @badalex +- Allow configuring `Consul` with a custom http client thanks to @LeonHartley +- Removed `rustls-native-roots` feature and now defaults to `rustls-webpki-roots` (which has been removed). This addresses the bug that features were not additive. + ## 0.6.0 - 2024-04-01 ### Changed diff --git a/Cargo.toml b/Cargo.toml index c8a9841..678f5c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rs-consul" -version = "0.6.0" +version = "0.7.0" authors = ["Roblox"] edition = "2021" description = "This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/)" @@ -10,21 +10,20 @@ license-file = "LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["rustls-native"] +default = [] metrics = ["prometheus", "lazy_static"] -rustls-native = ["hyper-rustls/rustls-native-certs"] -rustls-webpki = ["hyper-rustls/webpki-roots"] trace = ["dep:opentelemetry"] # keep this list sorted! [dependencies] base64 = "0.22" -futures = "0.3" -http = "0.2" -hyper = { version = "0.14", features = ["full"] } -hyper-rustls = { version = "0.24" } +http = "1" +http-body-util = "0.1" +hyper = { version = "1", features = ["full"] } +hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-roots", "ring", "http1"] } +hyper-util = { version = "0.1", features = ["client", "client-legacy", "tokio", "http2"] } lazy_static = { version = "1", optional = true } -opentelemetry = { version = "0.22", optional = true } +opentelemetry = { version = "0.24", optional = true } prometheus = { version = "0.13", optional = true } quick-error = "2" serde = { version = "1.0", features = ["derive"] } diff --git a/rust-toolchain b/rust-toolchain index f23daf4..c408301 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -1.77 \ No newline at end of file +1.79 \ No newline at end of file diff --git a/src/hyper_wrapper.rs b/src/hyper_wrapper.rs index b51425f..9688ede 100644 --- a/src/hyper_wrapper.rs +++ b/src/hyper_wrapper.rs @@ -21,7 +21,6 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ -#![cfg(feature = "trace")] use hyper::Version; use opentelemetry::{ global::{BoxedSpan, BoxedTracer}, diff --git a/src/lib.rs b/src/lib.rs index 510227e..c2af1a0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,13 +27,18 @@ SOFTWARE. //! This crate provides access to a set of strongly typed apis to interact with consul (https://www.consul.io/) #![deny(missing_docs)] +use http_body_util::BodyExt; use std::collections::HashMap; +use std::convert::Infallible; use std::time::{Duration, Instant}; use std::{env, str::Utf8Error}; use base64::Engine; -use hyper::{body::Buf, client::HttpConnector, Body, Method}; -#[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))] +use http_body_util::combinators::BoxBody; +use http_body_util::{Empty, Full}; +use hyper::body::Bytes; +use hyper::{body::Buf, Method}; +use hyper_util::client::legacy::{connect::HttpConnector, Builder, Client}; #[cfg(feature = "metrics")] use lazy_static::lazy_static; use quick_error::quick_error; @@ -66,7 +71,7 @@ quick_error! { /// The request was invalid and could not be converted into a proper http request. RequestError(err: http::Error) {} /// The consul server response could not be converted into a proper http response. - ResponseError(err: hyper::Error) {} + ResponseError(err: hyper_util::client::legacy::Error) {} /// The consul server response was invalid. InvalidResponse(err: hyper::Error) {} /// The consul server response could not be deserialized from json. @@ -151,7 +156,7 @@ const GET_SESSION_METHOD_NAME: &str = "get_session"; pub(crate) type Result = std::result::Result; /// The config necessary to create a new consul client. -#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Config { /// The address of the consul server. This must include the protocol to connect over eg. http or https. pub address: String, @@ -160,7 +165,26 @@ pub struct Config { /// The hyper builder for the internal http client. #[serde(skip)] - pub hyper_builder: hyper::client::Builder, + #[serde(default = "default_builder")] + pub hyper_builder: hyper_util::client::legacy::Builder, +} + +fn default_builder() -> Builder { + // https://github.com/hyperium/hyper/issues/2312 + Builder::new(hyper_util::rt::TokioExecutor::new()) + .pool_idle_timeout(std::time::Duration::from_millis(0)) + .pool_max_idle_per_host(0) + .to_owned() +} + +impl Default for Config { + fn default() -> Self { + Config { + address: String::default(), + token: None, + hyper_builder: default_builder(), + } + } } impl Config { @@ -176,7 +200,7 @@ impl Config { Config { address: addr, token: Some(token), - hyper_builder: Default::default(), + hyper_builder: default_builder(), } } } @@ -222,7 +246,8 @@ impl Drop for Lock<'_> { } /// Type alias for a Hyper client using a hyper_rusttls HttpsConnector -pub type HttpsClient = hyper::Client, Body>; +pub type HttpsClient = + Client, BoxBody>; #[derive(Debug)] /// This struct defines the consul client and allows access to the consul api via method syntax. @@ -234,16 +259,8 @@ pub struct Consul { } fn https_connector() -> hyper_rustls::HttpsConnector { - #[cfg(feature = "rustls-webpki")] - return hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_or_http() - .enable_http1() - .build(); - #[allow(unreachable_code)] - // Clippy doesn't realize if the feature is disabled, this code would execute. hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + .with_webpki_roots() .https_or_http() .enable_http1() .build() @@ -259,7 +276,10 @@ pub struct ConsulBuilder { impl ConsulBuilder { /// Creates a new instance of [`ConsulBuilder`](consul::ConsulBuilder) pub fn new(config: Config) -> Self { - Self { config, https_client: None } + Self { + config, + https_client: None, + } } /// Sets the HTTPS client to be used when building an instance of [`Consul`](consul::Consul). @@ -274,7 +294,9 @@ impl ConsulBuilder { pub fn build(self) -> Consul { let https_client = self.https_client.unwrap_or_else(|| { let https = https_connector(); - self.config.hyper_builder.build::<_, Body>(https) + self.config + .hyper_builder + .build::<_, BoxBody>(https) }); Consul::new_with_client(self.config, https_client) @@ -287,8 +309,7 @@ impl Consul { /// #Arguments: /// - [Config](consul::Config) pub fn new(config: Config) -> Self { - ConsulBuilder::new(config) - .build() + ConsulBuilder::new(config).build() } /// Creates a new instance of [`Consul`](consul::Consul) using the supplied HTTPS client. @@ -315,12 +336,16 @@ impl Consul { request: ReadKeyRequest<'_>, ) -> Result>> { let req = self.build_read_key_req(request); - let (mut response_body, index) = self - .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) + let (response_body, index) = self + .execute_request( + req, + BoxBody::new(http_body_util::Empty::::new()), + None, + READ_KEY_METHOD_NAME, + ) .await?; - let bytes = response_body.copy_to_bytes(response_body.remaining()); Ok(ResponseMeta { - response: serde_json::from_slice::>(&bytes) + response: serde_json::from_reader::<_, Vec>(response_body.reader()) .map_err(ConsulError::ResponseDeserializationFailed)? .into_iter() .map(|mut r| { @@ -356,17 +381,17 @@ impl Consul { ) -> Result<(bool, u64)> { let url = self.build_create_or_update_url(request); let req = hyper::Request::builder().method(Method::PUT).uri(url); - let (mut response_body, index) = self + let (response_body, index) = self .execute_request( req, - Body::from(value), + BoxBody::new(Full::::new(Bytes::from(value))), None, CREATE_OR_UPDATE_KEY_METHOD_NAME, ) .await?; - let bytes = response_body.copy_to_bytes(response_body.remaining()); Ok(( - serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed)?, + serde_json::from_reader(response_body.reader()) + .map_err(ConsulError::ResponseDeserializationFailed)?, index, )) } @@ -446,11 +471,16 @@ impl Consul { url = add_namespace_and_datacenter(url, request.namespace, request.datacenter); req = req.uri(url); - let (mut response_body, _index) = self - .execute_request(req, hyper::Body::empty(), None, DELETE_KEY_METHOD_NAME) + let (response_body, _index) = self + .execute_request( + req, + BoxBody::new(Empty::::new()), + None, + DELETE_KEY_METHOD_NAME, + ) .await?; - let bytes = response_body.copy_to_bytes(response_body.remaining()); - serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) + serde_json::from_reader(response_body.reader()) + .map_err(ConsulError::ResponseDeserializationFailed) } /// Obtains a lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. @@ -493,7 +523,7 @@ impl Consul { let (_watch, index) = self .execute_request( lock_index_req, - hyper::Body::empty(), + BoxBody::new(http_body_util::Empty::::new()), None, GET_LOCK_METHOD_NAME, ) @@ -535,7 +565,7 @@ impl Consul { let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; self.execute_request( request, - payload.into(), + BoxBody::new(Full::::new(Bytes::from(payload.into_bytes()))), Some(Duration::from_secs(5)), REGISTER_ENTITY_METHOD_NAME, ) @@ -555,7 +585,7 @@ impl Consul { let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; self.execute_request( request, - payload.into(), + BoxBody::new(Full::::new(Bytes::from(payload.into_bytes()))), Some(Duration::from_secs(5)), DEREGISTER_ENTITY_METHOD_NAME, ) @@ -580,17 +610,17 @@ impl Consul { let request = hyper::Request::builder() .method(Method::GET) .uri(uri.clone()); - let (mut response_body, index) = self + let (response_body, index) = self .execute_request( request, - hyper::Body::empty(), + BoxBody::new(Empty::::new()), query_opts.timeout, GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME, ) .await?; - let bytes = response_body.copy_to_bytes(response_body.remaining()); - let service_tags_by_name = serde_json::from_slice::>>(&bytes) - .map_err(ConsulError::ResponseDeserializationFailed)?; + let service_tags_by_name = + serde_json::from_reader::<_, HashMap>>(response_body.reader()) + .map_err(ConsulError::ResponseDeserializationFailed)?; Ok(ResponseMeta { response: service_tags_by_name.keys().cloned().collect(), @@ -612,17 +642,17 @@ impl Consul { ) -> Result> { let query_opts = query_opts.unwrap_or_default(); let req = self.build_get_service_nodes_req(request, &query_opts); - let (mut response_body, index) = self + let (response_body, index) = self .execute_request( req, - hyper::Body::empty(), + BoxBody::new(Empty::::new()), query_opts.timeout, GET_SERVICE_NODES_METHOD_NAME, ) .await?; - let bytes = response_body.copy_to_bytes(response_body.remaining()); - let response = serde_json::from_slice::(&bytes) - .map_err(ConsulError::ResponseDeserializationFailed)?; + let response = + serde_json::from_reader::<_, GetServiceNodesResponse>(response_body.reader()) + .map_err(ConsulError::ResponseDeserializationFailed)?; Ok(ResponseMeta { response, index }) } @@ -730,16 +760,18 @@ impl Consul { req = req.uri(url); let create_session_json = serde_json::to_string(&session_req).map_err(ConsulError::InvalidRequest)?; - let (mut response_body, _index) = self + let (response_body, _index) = self .execute_request( req, - hyper::Body::from(create_session_json), + BoxBody::new(Full::::new(Bytes::from( + create_session_json.into_bytes(), + ))), None, GET_SESSION_METHOD_NAME, ) .await?; - let bytes = response_body.copy_to_bytes(response_body.remaining()); - serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) + serde_json::from_reader(response_body.reader()) + .map_err(ConsulError::ResponseDeserializationFailed) } fn build_get_service_nodes_req( @@ -767,7 +799,7 @@ impl Consul { async fn execute_request<'a>( &self, req: http::request::Builder, - body: hyper::Body, + body: BoxBody, duration: Option, request_name: &str, ) -> Result<(Box, u64)> { @@ -813,9 +845,12 @@ impl Consul { if status != hyper::StatusCode::OK { record_failure_metric_if_enabled(&method, request_name); - let mut response_body = hyper::body::aggregate(response.into_body()) + let mut response_body = response + .into_body() + .collect() .await - .map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?; + .map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))? + .aggregate(); let bytes = response_body.copy_to_bytes(response_body.remaining()); let resp = std::str::from_utf8(&bytes) .map_err(|e| ConsulError::UnexpectedResponseCode(status, e.to_string()))?; @@ -829,7 +864,7 @@ impl Consul { None => 0, }; - match hyper::body::aggregate(response.into_body()).await { + match response.into_body().collect().await.map(|b| b.aggregate()) { Ok(body) => Ok((Box::new(body), index)), Err(e) => { record_failure_metric_if_enabled(&method, request_name); @@ -963,8 +998,21 @@ mod tests { let res = create_or_update_key_value(&consul, key, string_value).await; assert_expected_result_with_index(res); - let res = read_key(&consul, key).await; - verify_single_value_matches(res, string_value); + let res = read_key(&consul, key).await.unwrap(); + let index = res.index; + verify_single_value_matches(Ok(res), string_value); + + let res = read_key(&consul, key).await.unwrap(); + assert_eq!(res.index, index); + create_or_update_key_value(&consul, key, string_value) + .await + .unwrap(); + assert_eq!(res.index, index); + create_or_update_key_value(&consul, key, "This is a new test") + .await + .unwrap(); + let res = read_key(&consul, key).await.unwrap(); + assert!(res.index > index); } #[tokio::test(flavor = "multi_thread")] @@ -1023,7 +1071,7 @@ mod tests { .iter() .map(|sn| sn.service.address.clone()) .collect(); - let expected_addresses = vec![ + let expected_addresses = [ "1.1.1.1".to_string(), "2.2.2.2".to_string(), "3.3.3.3".to_string(), @@ -1034,10 +1082,9 @@ mod tests { let tags: Vec = response .iter() - .map(|sn| sn.service.tags.clone().into_iter()) - .flatten() + .flat_map(|sn| sn.service.tags.clone().into_iter()) .collect(); - let expected_tags = vec![ + let expected_tags = [ "first".to_string(), "second".to_string(), "third".to_string(),