From 241ae1e31cc83651b224f99ce4d677a27f992f0a Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Wed, 26 Oct 2022 16:09:06 -0700 Subject: [PATCH 01/13] added additional modes for retrieving values, added a builder api for ReadKeyRequest --- src/lib.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++------- src/types.rs | 31 ++++++++++++++++++++ 2 files changed, 103 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 46259ec..97cceb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,7 +37,7 @@ use hyper::{body::Buf, client::HttpConnector, Body, Method}; #[cfg(feature = "metrics")] use lazy_static::lazy_static; use quick_error::quick_error; -use serde::{Deserialize, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use slog_scope::{error, info}; use tokio::time::timeout; @@ -259,12 +259,13 @@ impl Consul { } } - /// Reads a key from Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. + /// Reads keys from Consul's KV store and returns them as `String`s + /// See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. /// # Arguments: /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result> { + pub async fn read_string(&self, request: ReadKeyRequest<'_>) -> Result> { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) @@ -289,6 +290,66 @@ impl Consul { .collect() } + /// Reads keys from Consul's KV store and returns them as `Vec`s + /// See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. + /// # Arguments: + /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn read_key( + &self, + request: ReadKeyRequest<'_>, + ) -> Result, ReadKeyResponse)>> { + let req = self.build_read_key_req(request); + let (mut response_body, _index) = self + .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let items = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)? + .into_iter() + .map(|r| match r.value { + Some(ref val) => Ok(Some((base64::decode(val)?, r))), + None => Ok(None), + }) + .collect::, ReadKeyResponse)>>>>(); + items.map(|v| v.into_iter().flatten().collect()) + } + + /// Reads keys from Consul's KV store and attempts to deserialize them into + /// type T from JSON. + /// See the [consul docs](https://www.consul.io/api-docs/kv#read-key) for more information. + /// # Arguments: + /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn read_obj( + &self, + request: ReadKeyRequest<'_>, + ) -> Result> + where + T: DeserializeOwned, + { + let req = self.build_read_key_req(request); + let (mut response_body, _index) = self + .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let items = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)? + .into_iter() + .map(|r| match r.value { + Some(ref val) => Ok(Some(( + serde_json::from_slice(&base64::decode(val)?) + .map_err(ConsulError::ResponseDeserializationFailed)?, + r, + ))), + None => Ok(None), + }) + .collect::>>>(); + items.map(|v| v.into_iter().flatten().collect()) + } + /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. /// # Arguments: /// - request - the [CreateOrUpdateKeyRequest](consul::types::CreateOrUpdateKeyRequest) @@ -468,7 +529,7 @@ impl Consul { consistency: request.consistency, ..Default::default() }; - self.read_key(req).await + self.read_string(req).await } /// Registers or updates entries in consul's global catalog. @@ -884,14 +945,14 @@ mod tests { use super::*; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn create_and_read_key() { + async fn create_and_read_string() { let consul = get_client(); let key = "test/consul/read"; let string_value = "This is a test"; let res = create_or_update_key_value(&consul, key, string_value).await; assert_expected_result_with_index(res); - let res = read_key(&consul, key).await; + let res = read_string(&consul, key).await; verify_single_value_matches(res, string_value); } @@ -996,7 +1057,7 @@ mod tests { let res = delete_key(&consul, key).await; assert_expected_result(res); - let res = read_key(&consul, key).await.unwrap_err(); + let res = read_string(&consul, key).await.unwrap_err(); match res { ConsulError::UnexpectedResponseCode(code, _body) => { assert_eq!(code, hyper::http::StatusCode::NOT_FOUND) @@ -1042,8 +1103,8 @@ mod tests { } sleep(Duration::from_secs(2)).await; - let key_resp = read_key(&consul, key).await; - verify_single_value_matches(key_resp, new_string_value); + let key_resp = read_string(&consul, key).await; + verify_single_value_matches(key_resp, &new_string_value); let req = LockRequest { key, @@ -1229,12 +1290,12 @@ mod tests { .await } - async fn read_key(consul: &Consul, key: &str) -> Result> { + async fn read_string(consul: &Consul, key: &str) -> Result> { let req = ReadKeyRequest { key, ..Default::default() }; - consul.read_key(req).await + consul.read_string(req).await } async fn delete_key(consul: &Consul, key: &str) -> Result { diff --git a/src/types.rs b/src/types.rs index b78e0c4..64df690 100644 --- a/src/types.rs +++ b/src/types.rs @@ -117,6 +117,37 @@ pub struct ReadKeyRequest<'a> { pub wait: Duration, } +macro_rules! builder_fun { + ($nm:ident, $fun:ident, $parm:ty) => { + /// Builder-style method to set $nm on the object and return `self` + pub fn $fun(self, $nm: $parm) -> Self { + ReadKeyRequest { $nm, ..self } + } + }; +} +impl<'a> ReadKeyRequest<'a> { + /// Construct a default ReadKeyRequest to be used with the builder API + /// e.g. + /// ```rust + /// let req = ReadKeyRequest::new() + /// .set_key("bar") + /// .set_namespace("foo") + /// .recurse(true); + /// ``` + pub fn new() -> Self { + Default::default() + } + + builder_fun!(key, set_key, &'a str); + builder_fun!(namespace, set_namespace, &'a str); + builder_fun!(datacenter, set_datacenter, &'a str); + builder_fun!(recurse, set_recurse, bool); + builder_fun!(separator, set_separator, &'a str); + builder_fun!(consistency, set_consistency, ConsistencyMode); + builder_fun!(index, set_index, Option); + builder_fun!(wait, set_wait, Duration); +} + /// Represents a request to read a key from Consul's Key Value store. #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] pub struct LockWatchRequest<'a> { From cd3d8d963872812ba8324d72128d9fea328aa53c Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Thu, 27 Oct 2022 13:34:02 -0700 Subject: [PATCH 02/13] made ReadKeyResponse generic over the returned value --- src/lib.rs | 75 +++++++++++++++++++++++++++------------------------- src/types.rs | 6 ++--- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 97cceb6..207290f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -265,13 +265,16 @@ impl Consul { /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_string(&self, request: ReadKeyRequest<'_>) -> Result> { + pub async fn read_string( + &self, + request: ReadKeyRequest<'_>, + ) -> Result>> { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); - serde_json::from_slice::>(&bytes) + serde_json::from_slice::>>(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)? .into_iter() .map(|mut r| { @@ -296,24 +299,24 @@ impl Consul { /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_key( - &self, - request: ReadKeyRequest<'_>, - ) -> Result, ReadKeyResponse)>> { + pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result> { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); - let items = serde_json::from_slice::>(&bytes) + serde_json::from_slice::>(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)? .into_iter() - .map(|r| match r.value { - Some(ref val) => Ok(Some((base64::decode(val)?, r))), - None => Ok(None), + .map(|mut r| { + let v = match r.value { + Some(ref val) => Some(base64::decode(val)?), + None => None, + }; + r.value = v; + Ok(r) }) - .collect::, ReadKeyResponse)>>>>(); - items.map(|v| v.into_iter().flatten().collect()) + .collect::>>() } /// Reads keys from Consul's KV store and attempts to deserialize them into @@ -323,31 +326,37 @@ impl Consul { /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_obj( - &self, - request: ReadKeyRequest<'_>, - ) -> Result> + pub async fn read_obj(&self, request: ReadKeyRequest<'_>) -> Result>> where - T: DeserializeOwned, + T: DeserializeOwned + Default, { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); - let items = serde_json::from_slice::>(&bytes) + serde_json::from_slice::>(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)? .into_iter() - .map(|r| match r.value { - Some(ref val) => Ok(Some(( - serde_json::from_slice(&base64::decode(val)?) - .map_err(ConsulError::ResponseDeserializationFailed)?, - r, - ))), - None => Ok(None), + .map(|r| { + let v = match r.value { + Some(ref val) => Some( + serde_json::from_slice(&base64::decode(val)?) + .map_err(ConsulError::ResponseDeserializationFailed)?, + ), + None => None, + }; + Ok(ReadKeyResponse { + value: v, + create_index: r.create_index, + modify_index: r.modify_index, + lock_index: r.lock_index, + key: r.key, + flags: r.flags, + session: r.session, + }) }) - .collect::>>>(); - items.map(|v| v.into_iter().flatten().collect()) + .collect::>>>() } /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. @@ -529,7 +538,7 @@ impl Consul { consistency: request.consistency, ..Default::default() }; - self.read_string(req).await + self.read_key(req).await } /// Registers or updates entries in consul's global catalog. @@ -1290,7 +1299,7 @@ mod tests { .await } - async fn read_string(consul: &Consul, key: &str) -> Result> { + async fn read_string(consul: &Consul, key: &str) -> Result>> { let req = ReadKeyRequest { key, ..Default::default() @@ -1317,13 +1326,7 @@ mod tests { assert!(res.unwrap()); } - async fn get_single_key_value_with_index(consul: &Consul, key: &str) -> (Option, i64) { - let res = read_key(consul, key).await.expect("failed to read key"); - let r = res.into_iter().next().unwrap(); - (r.value, r.modify_index) - } - - fn verify_single_value_matches(res: Result>, value: &str) { + fn verify_single_value_matches(res: Result>>, value: &str) { assert!(res.is_ok()); assert_eq!( res.unwrap().into_iter().next().unwrap().value.unwrap(), diff --git a/src/types.rs b/src/types.rs index 64df690..e81bf8b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -204,10 +204,10 @@ pub struct CreateOrUpdateKeyRequest<'a> { pub release: &'a str, } -/// Represents a request to read a key from Consul Key Value store. +/// Represents a response from reading a key from Consul Key Value store. #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "PascalCase")] -pub struct ReadKeyResponse { +pub struct ReadKeyResponse> { /// CreateIndex is the internal index value that represents when the entry was created. pub create_index: i64, /// ModifyIndex is the last index that modified this key. @@ -223,7 +223,7 @@ pub struct ReadKeyResponse { /// Clients can choose to use this however makes sense for their application. pub flags: u64, /// Value is a base64-encoded blob of data. - pub value: Option, + pub value: Option, /// If a lock is held, the Session key provides the session that owns the lock. pub session: Option, } From 94d9095d0427f8c2fa8c8e3665a4425eaf6ffd01 Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Mon, 31 Oct 2022 13:53:20 -0700 Subject: [PATCH 03/13] added transaction support, added a vec base64 de/serializer --- Cargo.toml | 3 +- src/lib.rs | 198 +++++++++++++++++++++++++++++++++++++++------------ src/types.rs | 149 ++++++++++++++++++++++++++++++++------ 3 files changed, 282 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d3eb588..a6d5bb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,9 +12,9 @@ license-file = "LICENSE" [features] default = ["rustls-native"] metrics = ["prometheus", "lazy_static"] +default-tls = ["hyper-tls"] rustls-native = ["hyper-rustls/rustls-native-certs"] rustls-webpki = ["hyper-rustls/webpki-roots"] -trace = ["dep:opentelemetry"] # keep this list sorted! [dependencies] @@ -23,6 +23,7 @@ futures = "0.3" http = "0.2" hyper = { version = "0.14", features = ["full"] } hyper-rustls = { version = "0.24" } +hyper-tls = { version = "0.5.0", optional = true, no-default-features = true } lazy_static = { version = "1", optional = true } opentelemetry = { version = "0.19", features = ["rt-tokio"], optional = true } prometheus = { version = "0.13", optional = true } diff --git a/src/lib.rs b/src/lib.rs index 207290f..d74408e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,9 @@ use std::{env, str::Utf8Error}; use base64::Engine; use hyper::{body::Buf, client::HttpConnector, Body, Method}; #[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))] -#[cfg(feature = "metrics")] +use hyper_rustls::HttpsConnector; +#[cfg(feature = "default-tls")] +use hyper_tls::HttpsConnector; use lazy_static::lazy_static; use quick_error::quick_error; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -72,6 +74,8 @@ quick_error! { /// The consul server response could not be deserialized from json. ResponseDeserializationFailed(err: serde_json::error::Error) {} /// The consul server response could not be deserialized from bytes. + RequestSerializationFailed(err: serde_json::error::Error) {} + /// The consul server response could not be deserialized from bytes. ResponseStringDeserializationFailed(err: std::str::Utf8Error) {} /// The consul server response was something other than 200. The status code and the body of the response are included. UnexpectedResponseCode(status_code: hyper::http::StatusCode, body: String) {} @@ -137,15 +141,21 @@ lazy_static! { .unwrap(); } +lazy_static! { + static ref DEFAULT_QUERY_OPTS: QueryOptions = Default::default(); +} + const READ_KEY_METHOD_NAME: &str = "read_key"; const CREATE_OR_UPDATE_KEY_METHOD_NAME: &str = "create_or_update_key"; +const CREATE_OR_UPDATE_ALL_METHOD_NAME: &str = "create_or_update_all"; const CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME: &str = "create_or_update_key_sync"; const DELETE_KEY_METHOD_NAME: &str = "delete_key"; const GET_LOCK_METHOD_NAME: &str = "get_lock"; const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity"; const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names"; const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes"; -const GET_SESSION_METHOD_NAME: &str = "get_session"; +const CREATE_SESSION_METHOD_NAME: &str = "create_session"; +const GET_DATACENTERS: &str = "get_datacenters"; pub(crate) type Result = std::result::Result; @@ -184,7 +194,7 @@ impl Config { /// The lifetime of this object defines the validity of the lock against consul. /// When the object is dropped, the lock is attempted to be released for the next consumer. #[derive(Clone, Debug)] -pub struct Lock<'a> { +pub struct LockGuard<'a> { /// The session ID of the lock. pub session_id: String, /// The key for the lock. @@ -201,7 +211,7 @@ pub struct Lock<'a> { pub consul: &'a Consul, } -impl Drop for Lock<'_> { +impl Drop for LockGuard<'_> { fn drop(&mut self) { let req = CreateOrUpdateKeyRequest { key: &self.key, @@ -243,6 +253,16 @@ fn https_connector() -> hyper_rustls::HttpsConnector { .build() } +impl Clone for Consul { + fn clone(&self) -> Self { + Consul { + https_client: self.https_client.clone(), + config: self.config.clone(), + tracer: global::tracer("consul"), + } + } +} + impl Consul { /// Creates a new instance of [`Consul`](consul::Consul). /// This is the entry point for this crate. @@ -306,17 +326,7 @@ impl Consul { .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); serde_json::from_slice::>(&bytes) - .map_err(ConsulError::ResponseDeserializationFailed)? - .into_iter() - .map(|mut r| { - let v = match r.value { - Some(ref val) => Some(base64::decode(val)?), - None => None, - }; - r.value = v; - Ok(r) - }) - .collect::>>() + .map_err(ConsulError::ResponseDeserializationFailed) } /// Reads keys from Consul's KV store and attempts to deserialize them into @@ -341,7 +351,7 @@ impl Consul { .map(|r| { let v = match r.value { Some(ref val) => Some( - serde_json::from_slice(&base64::decode(val)?) + serde_json::from_slice(&val.0) .map_err(ConsulError::ResponseDeserializationFailed)?, ), None => None, @@ -389,6 +399,36 @@ impl Consul { )) } + /// Executes a transaction in Consul's KV store. + /// This takes a vector of operations, and only succeeds if all operations within the Vec of operations + /// See https://developer.hashicorp.com/consul/api-docs/txn for more information + pub async fn create_or_update_all( + &self, + request: Vec>, + datacenter: Option<&str>, + ) -> Result> { + let url = self.build_create_txn_url(datacenter); + let req = hyper::Request::builder().method(Method::PUT).uri(url); + let txn_request: Vec> = request + .into_iter() + .map(|r| HashMap::from([("KV", r)])) + .collect(); + let data = + serde_json::to_vec(&txn_request).map_err(ConsulError::RequestSerializationFailed)?; + let (mut response_body, _index) = self + .execute_request( + req, + Body::from(data), + None, + CREATE_OR_UPDATE_ALL_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let resp = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + Ok(resp) + } + /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. /// This is the synchronous version of create_or_update_key /// # Arguments: @@ -471,33 +511,55 @@ impl Consul { serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) } - /// Obtains a lock against a specific key in consul. See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. + /// Attempts to acquire a lock for a given key. + /// This creates a new session that will own the lock, + /// if successful acquiring the lock, it will then return a `LockGuard` which will attempt to + /// release the lock when it goes out of scope. + /// If it fails to acquire the lock, it will return a `ConsulError::LockAcquisitionError` + /// See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. + /// # Arguments: + /// - request - the [LockRequest](consul::types::LockRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_lock( + &self, + mut request: LockRequest<'_>, + value: &[u8], + ) -> Result> { + let session = self.create_session(&request).await?; + request.session_id = session.id.as_str(); + let _lock_res = self.get_lock_inner(&request, value).await?; + // No need to check lock_res, if it didn't return Err, then the lock was successful + Ok(LockGuard { + timeout: request.timeout, + key: request.key.to_string(), + session_id: request.session_id.to_owned(), + consul: self, + datacenter: request.datacenter.to_string(), + namespace: request.namespace.to_string(), + value: Some(value.to_owned()), + }) + } + + /// Lower-level lock function wich acquires the lock in consul and returns true if the lock + /// succeeded, otherwise it returns a LockAcquisitionFailure error + /// See the [consul docs](https://learn.hashicorp.com/tutorials/consul/application-leader-elections?in=consul/developer-configuration) for more information. /// # Arguments: /// - request - the [LockRequest](consul::types::LockRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn get_lock(&self, request: LockRequest<'_>, value: &[u8]) -> Result> { - let session = self.get_session(request).await?; + pub async fn get_lock_inner(&self, request: &LockRequest<'_>, value: &[u8]) -> Result { let req = CreateOrUpdateKeyRequest { key: request.key, namespace: request.namespace, datacenter: request.datacenter, - acquire: &session.id, + acquire: request.session_id, ..Default::default() }; let value_copy = value.to_vec(); let (lock_acquisition_result, _index) = self.create_or_update_key(req, value_copy).await?; if lock_acquisition_result { - let value_copy = value.to_vec(); - Ok(Lock { - timeout: request.timeout, - key: request.key.to_string(), - session_id: session.id, - consul: self, - datacenter: request.datacenter.to_string(), - namespace: request.namespace.to_string(), - value: Some(value_copy), - }) + Ok(lock_acquisition_result) } else { let watch_req = ReadKeyRequest { key: request.key, @@ -569,20 +631,15 @@ impl Consul { /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. pub async fn get_all_registered_service_names( &self, - query_opts: Option, + query_opts: Option<&QueryOptions>, ) -> Result>> { - let mut uri = format!("{}/v1/catalog/services", self.config.address); - let query_opts = query_opts.unwrap_or_default(); - add_query_option_params(&mut uri, &query_opts, '?'); - - let request = hyper::Request::builder() - .method(Method::GET) - .uri(uri.clone()); + let opts = query_opts.unwrap_or(&(*DEFAULT_QUERY_OPTS)); + let request = self.create_get_catalog_request("services", opts); let (mut response_body, index) = self .execute_request( request, hyper::Body::empty(), - query_opts.timeout, + opts.timeout, GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME, ) .await?; @@ -596,6 +653,43 @@ impl Consul { }) } + /// Returns all datacenters currently registered in consul + /// See https://developer.hashicorp.com/consul/api-docs/catalog#list-datacenters + /// # Arguments: + /// - query_opts: The [`QueryOptions`](QueryOptions) to apply for this endpoint. + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_datacenters( + &self, + query_opts: Option<&QueryOptions>, + ) -> Result>> { + let opts = query_opts.unwrap_or(&(*DEFAULT_QUERY_OPTS)); + let request = self.create_get_catalog_request("datacenters", opts); + let (mut response_body, index) = self + .execute_request(request, hyper::Body::empty(), opts.timeout, GET_DATACENTERS) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let service_tags_by_name = serde_json::from_slice::>>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + + Ok(ResponseMeta { + response: service_tags_by_name.keys().cloned().collect(), + index, + }) + } + + fn create_get_catalog_request( + &self, + name: &str, + query_opts: &QueryOptions, + ) -> http::request::Builder { + let mut uri = format!("{}/v1/catalog/{}", self.config.address, name); + add_query_option_params(&mut uri, query_opts, '?'); + hyper::Request::builder() + .method(Method::GET) + .uri(uri.clone()) + } + /// returns the nodes providing the service indicated on the path. /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. @@ -713,10 +807,11 @@ impl Consul { req.uri(url) } - async fn get_session(&self, request: LockRequest<'_>) -> Result { + /// Create a new session + pub async fn create_session(&self, request: &LockRequest<'_>) -> Result { let session_req = CreateSessionRequest { lock_delay: request.lock_delay, - behavior: request.behavior, + behavior: request.behavior.clone(), ttl: request.timeout, ..Default::default() }; @@ -733,7 +828,7 @@ impl Consul { req, hyper::Body::from(create_session_json), None, - GET_SESSION_METHOD_NAME, + CREATE_SESSION_METHOD_NAME, ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); @@ -751,7 +846,9 @@ impl Consul { "{}/v1/health/service/{}", self.config.address, request.service )); - url.push_str(&format!("?passing={}", request.passing)); + if request.passing { + url.push_str(&format!("?passing={}", request.passing)); + } if let Some(near) = request.near { url.push_str(&format!("&near={}", near)); } @@ -839,9 +936,16 @@ impl Consul { } } + fn build_create_txn_url(&self, datacenter: Option<&str>) -> String { + let mut url = format!("{}/v1/txn", self.config.address); + if let Some(dc) = datacenter { + url.push_str(&format!("?datacenter={}", dc)); + } + url + } + fn build_create_or_update_url(&self, request: CreateOrUpdateKeyRequest<'_>) -> String { - let mut url = String::new(); - url.push_str(&format!("{}/v1/kv/{}", self.config.address, request.key)); + let mut url = format!("{}/v1/kv/{}", self.config.address, request.key); let mut added_query_param = false; if request.flags != 0 { url = add_query_param_separator(url, added_query_param); @@ -1176,6 +1280,8 @@ mod tests { node: "node".to_string(), address: "1.1.1.1".to_string(), datacenter: "datacenter".to_string(), + tagged_addresses: HashMap::new(), + meta: HashMap::new(), }; let service = Service { @@ -1304,7 +1410,7 @@ mod tests { key, ..Default::default() }; - consul.read_string(req).await + consul.read_obj::(req).await } async fn delete_key(consul: &Consul, key: &str) -> Result { diff --git a/src/types.rs b/src/types.rs index e81bf8b..e4d8b9b 100644 --- a/src/types.rs +++ b/src/types.rs @@ -25,7 +25,7 @@ SOFTWARE. use std::collections::HashMap; use std::time::Duration; -use serde::{self, Deserialize, Serialize, Serializer}; +use serde::{self, de::Deserializer, de::Error as SerdeError, Deserialize, Serialize, Serializer}; use smart_default::SmartDefault; // TODO retrofit other get APIs to use this struct @@ -71,7 +71,7 @@ pub struct ResponseMeta { } /// Represents a request to delete a key or all keys sharing a prefix from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] pub struct DeleteKeyRequest<'a> { /// Specifies the path of the key to delete. pub key: &'a str, @@ -91,7 +91,7 @@ pub struct DeleteKeyRequest<'a> { } /// Represents a request to read a key from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] pub struct ReadKeyRequest<'a> { /// Specifies the path of the key to read. pub key: &'a str, @@ -149,7 +149,7 @@ impl<'a> ReadKeyRequest<'a> { } /// Represents a request to read a key from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] pub struct LockWatchRequest<'a> { /// Specifies the path of the key to read. pub key: &'a str, @@ -171,7 +171,7 @@ pub struct LockWatchRequest<'a> { } /// Represents a request to read a key from Consul Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] pub struct CreateOrUpdateKeyRequest<'a> { /// Specifies the path of the key. pub key: &'a str, @@ -204,19 +204,55 @@ pub struct CreateOrUpdateKeyRequest<'a> { pub release: &'a str, } +/// An operation to be executed within a transaction +/// See https://developer.hashicorp.com/consul/api-docs/txn for more info +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub struct TransactionOp<'a> { + /// The type of operation to execute + pub verb: TransactionOpVerb, + /// The key on which to operate + pub key: &'a str, + /// The value to set (if applicable) + pub value: Base64Vec, + #[serde(rename = "Index")] + /// The modify_index if it is a cas operation + pub check_and_set: u64, + /// Optional flags to associate with the key + pub flags: u64, + /// Namespace on which to operate + pub namespace: &'a str, +} + +/// Response from Consul for a txn request +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +pub struct TransactionResponse { + /// The key on which the operation was executed + pub key: String, + /// The resulting value from the key (if applicable) + pub value: Option>, + /// The index at which the key was created + pub create_index: u64, + /// The index at which the key was locked + pub lock_index: u64, + /// The index at which the key was modified + pub modify_index: u64, +} + /// Represents a response from reading a key from Consul Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] -pub struct ReadKeyResponse> { +pub struct ReadKeyResponse { /// CreateIndex is the internal index value that represents when the entry was created. - pub create_index: i64, + pub create_index: u64, /// ModifyIndex is the last index that modified this key. /// It can be used to establish blocking queries by setting the ?index query parameter. /// You can even perform blocking queries against entire subtrees of the KV store: if ?recurse is provided, the returned X-Consul-Index corresponds to the latest ModifyIndex within the prefix, and a blocking query using that ?index will wait until any key within that prefix is updated. - pub modify_index: i64, + pub modify_index: u64, /// LockIndex is the number of times this key has successfully been acquired in a lock. /// If the lock is held, the Session key provides the session that owns the lock. - pub lock_index: i64, + pub lock_index: u64, /// Key is simply the full path of the entry. pub key: String, /// Flags is an opaque unsigned integer that can be attached to each entry. @@ -229,7 +265,7 @@ pub struct ReadKeyResponse> { } /// Represents a request to create a lock . -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Copy)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct LockRequest<'a> { /// The key to use for locking. @@ -260,7 +296,7 @@ pub struct LockRequest<'a> { } /// Controls the behavior of locks when a session is invalidated. See [consul docs](https://www.consul.io/api-docs/session#behavior) for more information. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Copy)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum LockExpirationBehavior { #[default] @@ -272,7 +308,7 @@ pub enum LockExpirationBehavior { /// Most of the read query endpoints support multiple levels of consistency. /// Since no policy will suit all clients' needs, these consistency modes allow the user to have the ultimate say in how to balance the trade-offs inherent in a distributed system. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub enum ConsistencyMode { /// If not specified, the default is strongly consistent in almost all cases. @@ -294,13 +330,15 @@ pub enum ConsistencyMode { Stale, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] -pub(crate) struct SessionResponse { +/// Response from the session-creation step +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +pub struct SessionResponse { #[serde(rename = "ID")] - pub(crate) id: String, + /// The Id of the created session + pub id: String, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub(crate) struct CreateSessionRequest { #[default(_code = "Duration::from_secs(0)")] @@ -407,7 +445,7 @@ pub struct RegisterEntityCheck { } /// Request for the nodes providing a specified service registered in Consul. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] pub struct GetServiceNodesRequest<'a> { /// Specifies the service to list services for. This is provided as part of the URL. pub service: &'a str, @@ -425,7 +463,7 @@ pub struct GetServiceNodesRequest<'a> { pub(crate) type GetServiceNodesResponse = Vec; -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// An instance of a node providing a Consul service. pub struct ServiceNode { @@ -435,7 +473,7 @@ pub struct ServiceNode { pub service: Service, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The node information of an instance providing a Consul service. pub struct Node { @@ -448,9 +486,13 @@ pub struct Node { pub address: String, /// The datacenter where this node is running on. pub datacenter: String, + /// List of explicit WAN and LAN addresses for the node + pub tagged_addresses: HashMap, + /// Map of metadata options + pub meta: HashMap, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The service information of an instance providing a Consul service. pub struct Service { @@ -482,3 +524,68 @@ pub(crate) fn duration_as_string(duration: &Duration) -> String { res.push('s'); res } + +/// Operation types for all available verbs within a Consul Transaction +/// See https://developer.hashicorp.com/consul/api-docs/txn#tables-of-operations for more +/// information +/// NOTE: Presently only the KV-based operations are supported by this client +#[derive(Clone, SmartDefault, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "kebab-case")] +pub enum TransactionOpVerb { + #[default] + /// Sets the Key to the given Value + Set, + /// Sets, but with CAS semantics + Cas, + /// Lock with the given session + Lock, + /// Unlock with the given session + Unlock, + /// Get the key, fails if the key doesn't exist + Get, + /// Get all keys using the 'key' field as a prefix + GetTree, + /// Fail if modify_index != index + CheckIndex, + /// Fail if not locked by the supplied session + CheckSession, + /// Fail if key exists + CheckNotExists, + /// Delete the key (and value at the key) + Delete, + /// Delete all keys/vals starting with prefix + DeleteTree, + /// Delete, but with CAS semantics + DeleteCas, +} + +/// A helper type which serializes a `Vec` from/to a bas64 encoded String +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct Base64Vec(pub Vec); + +impl Serialize for Base64Vec { + fn serialize(&self, serializer: S) -> Result { + serializer.collect_str(&base64::display::Base64Display::with_config( + &self.0, + base64::STANDARD, + )) + } +} + +impl<'de> Deserialize<'de> for Base64Vec { + fn deserialize>(deserializer: D) -> Result { + struct Vis; + impl serde::de::Visitor<'_> for Vis { + type Value = Base64Vec; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a base64 string") + } + + fn visit_str(self, v: &str) -> Result { + base64::decode(v).map(Base64Vec).map_err(SerdeError::custom) + } + } + deserializer.deserialize_str(Vis) + } +} From 8ce1441cce64dea4cf9dd47d5b43e651c4c263ad Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Tue, 1 Nov 2022 10:50:13 -0700 Subject: [PATCH 04/13] made all kv operations generic over value T, updated and cleaned up tests to make them idempotent --- src/lib.rs | 241 ++++++++++++++++++++++++++++++++++++--------------- src/types.rs | 131 ++++++++++++++++++++-------- 2 files changed, 262 insertions(+), 110 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d74408e..ea35f6b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -152,6 +152,7 @@ const CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME: &str = "create_or_update_key_sync"; const DELETE_KEY_METHOD_NAME: &str = "delete_key"; const GET_LOCK_METHOD_NAME: &str = "get_lock"; const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity"; +const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity"; const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names"; const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes"; const CREATE_SESSION_METHOD_NAME: &str = "create_session"; @@ -194,7 +195,10 @@ impl Config { /// The lifetime of this object defines the validity of the lock against consul. /// When the object is dropped, the lock is attempted to be released for the next consumer. #[derive(Clone, Debug)] -pub struct LockGuard<'a> { +pub struct LockGuard<'a, T> +where + T: Default + std::fmt::Debug + Serialize + Clone, +{ /// The session ID of the lock. pub session_id: String, /// The key for the lock. @@ -206,12 +210,15 @@ pub struct LockGuard<'a> { /// The datacenter of this lock. pub datacenter: String, /// The data in this lock's key - pub value: Option>, + pub value: Option, /// The consul client this lock was acquired using. pub consul: &'a Consul, } -impl Drop for LockGuard<'_> { +impl Drop for LockGuard<'_, T> +where + T: Default + std::fmt::Debug + Serialize + Clone, +{ fn drop(&mut self) { let req = CreateOrUpdateKeyRequest { key: &self.key, @@ -338,20 +345,20 @@ impl Consul { /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. pub async fn read_obj(&self, request: ReadKeyRequest<'_>) -> Result>> where - T: DeserializeOwned + Default, + T: DeserializeOwned + Default + std::fmt::Debug, { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); - serde_json::from_slice::>(&bytes) + serde_json::from_slice::>>(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)? .into_iter() .map(|r| { let v = match r.value { Some(ref val) => Some( - serde_json::from_slice(&val.0) + serde_json::from_slice::(&val.0) .map_err(ConsulError::ResponseDeserializationFailed)?, ), None => None, @@ -372,22 +379,27 @@ impl Consul { /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. /// # Arguments: /// - request - the [CreateOrUpdateKeyRequest](consul::types::CreateOrUpdateKeyRequest) - /// - value - the data to store as [Bytes](bytes::Bytes) + /// - value - the data to store, must implement `Serialize` and `Default` /// # Returns: /// A tuple of a boolean and a 64 bit unsigned integer representing whether the operation was successful and the index for a subsequent blocking query. /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn create_or_update_key( + pub async fn create_or_update_key( &self, request: CreateOrUpdateKeyRequest<'_>, - value: Vec, - ) -> Result<(bool, u64)> { + value: &T, + ) -> Result<(bool, u64)> + where + T: Default + Serialize + std::fmt::Debug, + { let url = self.build_create_or_update_url(request); let req = hyper::Request::builder().method(Method::PUT).uri(url); + let req_bytes = + serde_json::to_vec(value).map_err(ConsulError::RequestSerializationFailed)?; let (mut response_body, index) = self .execute_request( req, - Body::from(value), + Body::from(req_bytes), None, CREATE_OR_UPDATE_KEY_METHOD_NAME, ) @@ -438,23 +450,28 @@ impl Consul { /// A tuple of a boolean and a 64 bit unsigned integer representing whether the operation was successful and the index for a subsequent blocking query. /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub fn create_or_update_key_sync( + pub fn create_or_update_key_sync( &self, request: CreateOrUpdateKeyRequest<'_>, - value: Vec, - ) -> Result { + value: T, + ) -> Result + where + T: Default + Serialize + std::fmt::Debug, + { // TODO: Emit OpenTelemetry span for this request let url = self.build_create_or_update_url(request); record_request_metric_if_enabled(&Method::PUT, CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME); + let req_bytes = + serde_json::to_vec(&value).map_err(ConsulError::RequestSerializationFailed)?; let step_start_instant = Instant::now(); let result = ureq::put(&url) .set( "X-Consul-Token", &self.config.token.clone().unwrap_or_default(), ) - .send_bytes(&value); + .send_bytes(&req_bytes); record_duration_metric_if_enabled( &Method::PUT, @@ -521,14 +538,17 @@ impl Consul { /// - request - the [LockRequest](consul::types::LockRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn get_lock( + pub async fn get_lock( &self, mut request: LockRequest<'_>, - value: &[u8], - ) -> Result> { + value: T, + ) -> Result> + where + T: Default + Serialize + std::fmt::Debug + Clone, + { let session = self.create_session(&request).await?; request.session_id = session.id.as_str(); - let _lock_res = self.get_lock_inner(&request, value).await?; + let _lock_res = self.get_lock_inner(&request, &value).await?; // No need to check lock_res, if it didn't return Err, then the lock was successful Ok(LockGuard { timeout: request.timeout, @@ -537,7 +557,7 @@ impl Consul { consul: self, datacenter: request.datacenter.to_string(), namespace: request.namespace.to_string(), - value: Some(value.to_owned()), + value: Some(value), }) } @@ -548,7 +568,10 @@ impl Consul { /// - request - the [LockRequest](consul::types::LockRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn get_lock_inner(&self, request: &LockRequest<'_>, value: &[u8]) -> Result { + pub async fn get_lock_inner(&self, request: &LockRequest<'_>, value: &T) -> Result + where + T: Default + Serialize + std::fmt::Debug, + { let req = CreateOrUpdateKeyRequest { key: request.key, namespace: request.namespace, @@ -556,8 +579,7 @@ impl Consul { acquire: request.session_id, ..Default::default() }; - let value_copy = value.to_vec(); - let (lock_acquisition_result, _index) = self.create_or_update_key(req, value_copy).await?; + let (lock_acquisition_result, _index) = self.create_or_update_key(req, value).await?; if lock_acquisition_result { Ok(lock_acquisition_result) } else { @@ -609,7 +631,7 @@ impl Consul { /// - payload: The [`RegisterEntityPayload`](RegisterEntityPayload) to provide the register entity API. /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn register_entity(&self, payload: &RegisterEntityPayload) -> Result<()> { + pub async fn register_entity(&self, payload: &RegisterEntityRequest<'_>) -> Result<()> { let uri = format!("{}/v1/catalog/register", self.config.address); let request = hyper::Request::builder().method(Method::PUT).uri(uri); let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; @@ -623,6 +645,26 @@ impl Consul { Ok(()) } + /// Un-Registers an entity from the consul Catalog + /// See https://www.consul.io/api-docs/catalog#deregister-entity for more information. + /// # Arguments: + /// - payload: The [`DeRegisterEntityPayload`](DeRegisterEntityPayload) to provide the register entity API. + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn deregister_entity(&self, payload: &DeregisterEntityRequest<'_>) -> Result<()> { + let uri = format!("{}/v1/catalog/deregister", self.config.address); + let request = hyper::Request::builder().method(Method::PUT).uri(uri); + let payload = serde_json::to_string(payload).map_err(ConsulError::InvalidRequest)?; + self.execute_request( + request, + payload.into(), + Some(Duration::from_secs(5)), + DEREGISTER_ENTITY_METHOD_NAME, + ) + .await?; + Ok(()) + } + /// Returns all services currently registered with consul. /// See https://www.consul.io/api-docs/catalog#list-services for more information. /// # Arguments: @@ -1058,22 +1100,76 @@ mod tests { use super::*; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] - async fn create_and_read_string() { + async fn clear_and_create_and_read_string() { let consul = get_client(); let key = "test/consul/read"; - let string_value = "This is a test"; - let res = create_or_update_key_value(&consul, key, string_value).await; - assert_expected_result_with_index(res); + consul + .delete_key(DeleteKeyRequest { + key, + ..Default::default() + }) + .await + .unwrap(); + let string_value = "This is a test".to_owned(); + let res = create_or_update_key_value(&consul, key, &string_value) + .await + .unwrap(); + assert!(res.0); + let res = read_string(&consul, key).await.unwrap(); + assert_eq!(string_value, res.into_iter().next().unwrap().value.unwrap()); + } + + #[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] + struct ComplexStruct { + stuff: HashMap, + wat: String, + num: u64, + } - let res = read_string(&consul, key).await; - verify_single_value_matches(res, string_value); + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] + async fn clear_and_create_and_read_complex() { + let consul = get_client(); + let key = "test/consul/complex"; + consul + .delete_key(DeleteKeyRequest { + key, + ..Default::default() + }) + .await + .unwrap(); + let value = ComplexStruct { + stuff: HashMap::from([("hmmm".to_owned(), 1234234), ("lalala".to_owned(), 42)]), + wat: "this is wat".into(), + num: 424242424242424242, + }; + let res = create_or_update_key_value(&consul, key, &value) + .await + .unwrap(); + assert!(res.0); + let req = ReadKeyRequest::new().set_key(key); + let res = consul.read_obj::(req).await.unwrap(); + assert_eq!(value, res.into_iter().next().unwrap().value.unwrap()); } #[tokio::test(flavor = "multi_thread")] async fn test_register_and_retrieve_services() { let consul = get_client(); - let new_service_name = "test-service-44".to_string(); + let new_service_name = "test-service-44"; + let list_request = GetServiceNodesRequest { + service: new_service_name, + ..Default::default() + }; + let list_response = consul.get_service_nodes(list_request, None).await.unwrap(); + + for sn in list_response.response.iter() { + let dereg_request = DeregisterEntityRequest { + node: "local".into(), + service_id: Some(sn.service.id.as_str()), + ..Default::default() + }; + consul.deregister_entity(&dereg_request).await.unwrap(); + } // verify a service by this name is currently not registered let ResponseMeta { @@ -1083,27 +1179,27 @@ mod tests { .get_all_registered_service_names(None) .await .expect("expected get_registered_service_names request to succeed"); - assert!(!service_names_before_register.contains(&new_service_name)); + assert!(!service_names_before_register.contains(&new_service_name.to_owned())); // register a new service - let payload = RegisterEntityPayload { - ID: None, - Node: "local".to_string(), - Address: "127.0.0.1".to_string(), - Datacenter: None, - TaggedAddresses: Default::default(), - NodeMeta: Default::default(), - Service: Some(RegisterEntityService { - ID: None, - Service: new_service_name.clone(), - Tags: vec![], - TaggedAddresses: Default::default(), - Meta: Default::default(), - Port: Some(42424), - Namespace: None, + let payload = RegisterEntityRequest { + id: None, + node: "local", + address: "127.0.0.1", + datacenter: None, + tagged_addresses: Default::default(), + node_meta: Default::default(), + service: Some(RegisterEntityService { + id: None, + service: new_service_name, + tags: vec![], + tagged_addresses: Default::default(), + meta: Default::default(), + port: Some(42424), + namespace: None, }), - Check: None, - SkipNodeUpdate: None, + check: None, + skip_node_update: None, }; consul .register_entity(&payload) @@ -1118,7 +1214,7 @@ mod tests { .get_all_registered_service_names(None) .await .expect("expected get_registered_service_names request to succeed"); - assert!(service_names_after_register.contains(&new_service_name)); + assert!(service_names_after_register.contains(&new_service_name.to_owned())); } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -1163,8 +1259,8 @@ mod tests { async fn create_and_delete_key() { let consul = get_client(); let key = "test/consul/again"; - let string_value = "This is a new test"; - let res = create_or_update_key_value(&consul, key, string_value).await; + let string_value = "This is a new test".to_owned(); + let res = create_or_update_key_value(&consul, key, &string_value).await; assert_expected_result_with_index(res); let res = delete_key(&consul, key).await; @@ -1196,10 +1292,10 @@ mod tests { }; let session_id: String; { - let res = consul.get_lock(req, string_value.as_bytes()).await; + let res = consul.get_lock(req.clone(), string_value).await; assert!(res.is_ok()); let mut lock = res.unwrap(); - let res2 = consul.get_lock(req, string_value.as_bytes()).await; + let res2 = consul.get_lock(req, string_value).await; assert!(res2.is_err()); let err = res2.unwrap_err(); match err { @@ -1211,13 +1307,13 @@ mod tests { } session_id = lock.session_id.to_string(); // Lets change the value before dropping the lock to ensure the change is persisted when the lock is dropped. - lock.value = Some(new_string_value.as_bytes().to_vec()) + lock.value = Some(new_string_value) // lock gets dropped here. } sleep(Duration::from_secs(2)).await; let key_resp = read_string(&consul, key).await; - verify_single_value_matches(key_resp, &new_string_value); + verify_single_value_matches(key_resp, &new_string_value.to_owned()); let req = LockRequest { key, @@ -1226,7 +1322,7 @@ mod tests { session_id: &session_id, ..Default::default() }; - let res = consul.get_lock(req, string_value.as_bytes()).await; + let res = consul.get_lock(req, string_value).await; assert!(res.is_ok()); } @@ -1242,10 +1338,10 @@ mod tests { ..Default::default() }; let start_index: u64; - let res = consul.get_lock(req, string_value.as_bytes()).await; + let res = consul.get_lock(req.clone(), string_value).await; assert!(res.is_ok()); let lock = res.unwrap(); - let res2 = consul.get_lock(req, string_value.as_bytes()).await; + let res2 = consul.get_lock(req.clone(), string_value).await; assert!(res2.is_err()); let err = res2.unwrap_err(); match err { @@ -1269,7 +1365,7 @@ mod tests { assert!(res.is_ok()); std::mem::drop(lock); // This ensures the lock is not dropped until after the request to watch it completes. - let res = consul.get_lock(req, string_value.as_bytes()).await; + let res = consul.get_lock(req, string_value).await; assert!(res.is_ok()); } @@ -1391,25 +1487,23 @@ mod tests { Consul::new(conf) } - async fn create_or_update_key_value( + async fn create_or_update_key_value( consul: &Consul, key: &str, - value: &str, - ) -> Result<(bool, u64)> { + value: &T, + ) -> Result<(bool, u64)> + where + T: Serialize + std::fmt::Debug + ?Sized + Default, + { let req = CreateOrUpdateKeyRequest { key, ..Default::default() }; - consul - .create_or_update_key(req, value.as_bytes().to_vec()) - .await + Ok(consul.create_or_update_key(req, value).await?) } async fn read_string(consul: &Consul, key: &str) -> Result>> { - let req = ReadKeyRequest { - key, - ..Default::default() - }; + let req = ReadKeyRequest::new().set_key(key); consul.read_obj::(req).await } @@ -1432,10 +1526,13 @@ mod tests { assert!(res.unwrap()); } - fn verify_single_value_matches(res: Result>>, value: &str) { + fn verify_single_value_matches<'a, T>(res: Result>>, value: &'a T) + where + T: PartialEq + std::fmt::Debug + Default, + { assert!(res.is_ok()); assert_eq!( - res.unwrap().into_iter().next().unwrap().value.unwrap(), + &res.unwrap().into_iter().next().unwrap().value.unwrap(), value ) } diff --git a/src/types.rs b/src/types.rs index e4d8b9b..1378ffc 100644 --- a/src/types.rs +++ b/src/types.rs @@ -71,7 +71,7 @@ pub struct ResponseMeta { } /// Represents a request to delete a key or all keys sharing a prefix from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct DeleteKeyRequest<'a> { /// Specifies the path of the key to delete. pub key: &'a str, @@ -91,7 +91,7 @@ pub struct DeleteKeyRequest<'a> { } /// Represents a request to read a key from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct ReadKeyRequest<'a> { /// Specifies the path of the key to read. pub key: &'a str, @@ -129,10 +129,11 @@ impl<'a> ReadKeyRequest<'a> { /// Construct a default ReadKeyRequest to be used with the builder API /// e.g. /// ```rust + /// use rs_consul::ReadKeyRequest; /// let req = ReadKeyRequest::new() /// .set_key("bar") /// .set_namespace("foo") - /// .recurse(true); + /// .set_recurse(true); /// ``` pub fn new() -> Self { Default::default() @@ -149,7 +150,7 @@ impl<'a> ReadKeyRequest<'a> { } /// Represents a request to read a key from Consul's Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct LockWatchRequest<'a> { /// Specifies the path of the key to read. pub key: &'a str, @@ -171,7 +172,7 @@ pub struct LockWatchRequest<'a> { } /// Represents a request to read a key from Consul Key Value store. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct CreateOrUpdateKeyRequest<'a> { /// Specifies the path of the key. pub key: &'a str, @@ -265,7 +266,7 @@ pub struct ReadKeyResponse { } /// Represents a request to create a lock . -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub struct LockRequest<'a> { /// The key to use for locking. @@ -338,7 +339,7 @@ pub struct SessionResponse { pub id: String, } -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] pub(crate) struct CreateSessionRequest { #[default(_code = "Duration::from_secs(0)")] @@ -359,93 +360,122 @@ pub(crate) struct CreateSessionRequest { /// Payload struct to register or update entries in consul's catalog. /// See https://www.consul.io/api-docs/catalog#register-entity for more information. -#[allow(non_snake_case)] -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct RegisterEntityPayload { +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct RegisterEntityRequest<'a> { /// Optional UUID to assign to the node. This string is required to be 36-characters and UUID formatted. #[serde(skip_serializing_if = "Option::is_none")] - pub ID: Option, + #[serde(rename = "ID")] + pub id: Option<&'a str>, /// Node ID to register. - pub Node: String, + pub node: &'a str, /// The address to register. - pub Address: String, + pub address: &'a str, /// The datacenter to register in, defaults to the agent's datacenter. #[serde(skip_serializing_if = "Option::is_none")] - pub Datacenter: Option, + pub datacenter: Option<&'a str>, /// Tagged addressed to register with. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub TaggedAddresses: HashMap, + pub tagged_addresses: HashMap<&'a str, &'a str>, /// KV metadata paris to register with. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub NodeMeta: HashMap, + pub node_meta: HashMap<&'a str, &'a str>, /// Optional service to register. #[serde(skip_serializing_if = "Option::is_none")] - pub Service: Option, + pub service: Option>, /// Optional check to register #[serde(skip_serializing_if = "Option::is_none")] - pub Check: Option, + pub check: Option, /// Whether to skip updating the nodes information in the registration. #[serde(skip_serializing_if = "Option::is_none")] - pub SkipNodeUpdate: Option, + pub skip_node_update: Option, } /// The service to register with consul's global catalog. /// See https://www.consul.io/api/agent/service for more information. -#[allow(non_snake_case)] -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct RegisterEntityService { +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "PascalCase")] +pub struct RegisterEntityService<'a> { /// ID to register service will, defaults to Service.Service property. #[serde(skip_serializing_if = "Option::is_none")] - pub ID: Option, + #[serde(rename = "ID")] + pub id: Option<&'a str>, /// The name of the service. - pub Service: String, + pub service: &'a str, /// Optional tags associated with the service. #[serde(skip_serializing_if = "Vec::is_empty")] - pub Tags: Vec, + pub tags: Vec<&'a str>, /// Optional map of explicit LAN and WAN addresses for the service. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub TaggedAddresses: HashMap, + pub tagged_addresses: HashMap<&'a str, &'a str>, /// Optional key value meta associated with the service. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub Meta: HashMap, + pub meta: HashMap<&'a str, &'a str>, /// The port of the service #[serde(skip_serializing_if = "Option::is_none")] - pub Port: Option, + pub port: Option, /// The consul namespace to register the service in. #[serde(skip_serializing_if = "Option::is_none")] - pub Namespace: Option, + pub namespace: Option<&'a str>, } /// Information related to registering a check. /// See https://www.consul.io/docs/discovery/checks for more information. -#[allow(non_snake_case)] #[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] pub struct RegisterEntityCheck { /// The node to execute the check on. #[serde(skip_serializing_if = "Option::is_none")] - pub Node: Option, + pub node: Option, /// Optional check id, defaults to the name of the check. #[serde(skip_serializing_if = "Option::is_none")] - pub CheckID: Option, + #[serde(rename = "CheckID")] + pub check_id: Option, /// The name associated with the check - pub Name: String, + pub name: String, /// Opaque field encapsulating human-readable text. #[serde(skip_serializing_if = "Option::is_none")] - pub Notes: Option, + pub notes: Option, /// The status of the check. Must be one of 'passing', 'warning', or 'critical'. #[serde(skip_serializing_if = "Option::is_none")] - pub Status: Option, + pub status: Option, /// ID of the service this check is for. If no ID of a service running on the node is provided, /// the check is treated as a node level check #[serde(skip_serializing_if = "Option::is_none")] - pub ServiceID: Option, + #[serde(rename = "ServiceID")] + pub service_id: Option, /// Details for a TCP or HTTP health check. #[serde(skip_serializing_if = "HashMap::is_empty")] - pub Definition: HashMap, + pub definition: HashMap, +} + +/// Request body for de-registering a check or service from the Catalog +/// See https://developer.hashicorp.com/consul/api-docs/catalog#deregister-entity for more +/// information +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct DeregisterEntityRequest<'a> { + /// The node on which to execute the registration + pub node: &'a str, + #[serde(skip_serializing_if = "Option::is_none")] + /// Optional string to specify which datacenter to find the node. If not supplied, defaults to + /// the DC of the agent to which this client is connected + pub datacenter: Option<&'a str>, + /// Specifies the ID of the Check to remove + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "CheckID")] + pub check_id: Option<&'a str>, + /// Specifies the ID of the Service to remove + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "ServiceID")] + pub service_id: Option<&'a str>, + /// The consul namespace to register the service in. + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace: Option<&'a str>, } /// Request for the nodes providing a specified service registered in Consul. -#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Clone, Debug, SmartDefault, Serialize, PartialEq, Eq)] pub struct GetServiceNodesRequest<'a> { /// Specifies the service to list services for. This is provided as part of the URL. pub service: &'a str, @@ -487,8 +517,10 @@ pub struct Node { /// The datacenter where this node is running on. pub datacenter: String, /// List of explicit WAN and LAN addresses for the node + #[serde(deserialize_with = "null_to_default")] pub tagged_addresses: HashMap, /// Map of metadata options + #[serde(deserialize_with = "null_to_default")] pub meta: HashMap, } @@ -589,3 +621,26 @@ impl<'de> Deserialize<'de> for Base64Vec { deserializer.deserialize_str(Vis) } } + +impl From> for Base64Vec { + fn from(a: Vec) -> Base64Vec { + Base64Vec(a) + } +} + +impl From for Vec { + fn from(a: Base64Vec) -> Vec { + a.0 + } +} + +fn null_to_default<'de, D, T>(d: D) -> Result +where + D: Deserializer<'de>, + T: Default + Deserialize<'de>, +{ + let opt = Option::deserialize(d)?; + let val = opt.unwrap_or_else(T::default); + Ok(val) +} + From c96c7353f87fef8f88882e6a34d53b52178b5f8e Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Tue, 1 Nov 2022 11:24:22 -0700 Subject: [PATCH 05/13] swapped the read_* names --- src/lib.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ea35f6b..73c3e4c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -146,6 +146,7 @@ lazy_static! { } const READ_KEY_METHOD_NAME: &str = "read_key"; +const READ_OBJ_METHOD_NAME: &str = "read_obj"; const CREATE_OR_UPDATE_KEY_METHOD_NAME: &str = "create_or_update_key"; const CREATE_OR_UPDATE_ALL_METHOD_NAME: &str = "create_or_update_all"; const CREATE_OR_UPDATE_KEY_SYNC_METHOD_NAME: &str = "create_or_update_key_sync"; @@ -326,7 +327,7 @@ impl Consul { /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result> { + pub async fn read_vec(&self, request: ReadKeyRequest<'_>) -> Result> { let req = self.build_read_key_req(request); let (mut response_body, _index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) @@ -343,13 +344,13 @@ impl Consul { /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_obj(&self, request: ReadKeyRequest<'_>) -> Result>> + pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result>> where T: DeserializeOwned + Default + std::fmt::Debug, { let req = self.build_read_key_req(request); let (mut response_body, _index) = self - .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) + .execute_request(req, hyper::Body::empty(), None, READ_OBJ_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); serde_json::from_slice::>>(&bytes) @@ -1147,7 +1148,7 @@ mod tests { .unwrap(); assert!(res.0); let req = ReadKeyRequest::new().set_key(key); - let res = consul.read_obj::(req).await.unwrap(); + let res = consul.read_key::(req).await.unwrap(); assert_eq!(value, res.into_iter().next().unwrap().value.unwrap()); } @@ -1504,7 +1505,7 @@ mod tests { async fn read_string(consul: &Consul, key: &str) -> Result>> { let req = ReadKeyRequest::new().set_key(key); - consul.read_obj::(req).await + consul.read_key::(req).await } async fn delete_key(consul: &Consul, key: &str) -> Result { From df34c2ac79b6f308d1069fd5616988c3964501e1 Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Tue, 1 Nov 2022 12:33:58 -0700 Subject: [PATCH 06/13] fixing types for lock_and_watch test and reduced timeout --- src/lib.rs | 72 ++++++++++++++++++++++++++++++++-------------------- src/types.rs | 4 +-- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 73c3e4c..23393c9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -516,8 +516,8 @@ impl Consul { "{}/v1/kv/{}?recurse={}", self.config.address, request.key, request.recurse )); - if request.check_and_set != 0 { - url.push_str(&format!("&cas={}", request.check_and_set)); + if let Some(cas) = request.check_and_set { + url.push_str(&format!("&cas={}", cas)); } url = add_namespace_and_datacenter(url, request.namespace, request.datacenter); @@ -610,10 +610,13 @@ impl Consul { /// - request - the [LockWatchRequest](consul::types::LockWatchRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn watch_lock<'a>( + pub async fn watch_lock<'a, T>( &self, request: LockWatchRequest<'_>, - ) -> Result> { + ) -> Result>> + where + T: Default + DeserializeOwned + std::fmt::Debug, + { let req = ReadKeyRequest { key: request.key, namespace: request.namespace, @@ -1331,7 +1334,14 @@ mod tests { async fn create_and_watch_lock() { let consul = get_client(); let key = "test/consul/watchedlock"; - let string_value = "This is a lock test"; + let _res = consul + .delete_key(DeleteKeyRequest { + key, + ..Default::default() + }) + .await + .unwrap(); + let string_value = "This is a lock test".to_owned(); let req = LockRequest { key, behavior: LockExpirationBehavior::Release, @@ -1339,12 +1349,14 @@ mod tests { ..Default::default() }; let start_index: u64; - let res = consul.get_lock(req.clone(), string_value).await; - assert!(res.is_ok()); - let lock = res.unwrap(); - let res2 = consul.get_lock(req.clone(), string_value).await; - assert!(res2.is_err()); - let err = res2.unwrap_err(); + let lock = consul + .get_lock(req.clone(), string_value.clone()) + .await + .unwrap(); + let err = consul + .get_lock(req.clone(), string_value.clone()) + .await + .unwrap_err(); match err { ConsulError::LockAcquisitionFailure(index) => start_index = index, _ => panic!( @@ -1358,16 +1370,14 @@ mod tests { key, consistency: ConsistencyMode::Consistent, index: Some(start_index), - wait: Duration::from_secs(60), + wait: Duration::from_secs(5), ..Default::default() }; // The lock will timeout and this this will return. - let res = consul.watch_lock(watch_req).await; - assert!(res.is_ok()); + let _res = consul.watch_lock::(watch_req).await.unwrap(); std::mem::drop(lock); // This ensures the lock is not dropped until after the request to watch it completes. - let res = consul.get_lock(req, string_value).await; - assert!(res.is_ok()); + let _res = consul.get_lock(req, string_value).await.unwrap(); } #[test] @@ -1418,7 +1428,7 @@ mod tests { async fn properly_handle_check_and_set() { let consul = get_client(); let key = "test/consul/proper_cas_handling"; - let string_value1 = "This is CAS test"; + let string_value1 = "This is CAS test".to_owned(); let req = CreateOrUpdateKeyRequest { key, check_and_set: Some(0), @@ -1428,24 +1438,24 @@ mod tests { // Key does not exist, with CAS set and modify index set to 0 // it should be created. let (set, _) = consul - .create_or_update_key(req.clone(), string_value1.as_bytes().to_vec()) + .create_or_update_key(req.clone(), &string_value1) .await .expect("failed to create key initially"); assert!(set); let (value, mod_idx1) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value1, &value.unwrap()); + assert_eq!(&string_value1, &value.unwrap()); // Subsequent request with CAS set to 0 should not override the // value. - let string_value2 = "This is CAS test - not valid"; + let string_value2 = "This is CAS test - not valid".to_owned(); let (set, _) = consul - .create_or_update_key(req, string_value2.as_bytes().to_vec()) + .create_or_update_key(req, &string_value2) .await .expect("failed to run subsequent create_or_update_key"); assert!(!set); // Value and modify index should not have changed because set failed. let (value, mod_idx2) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value1, &value.unwrap()); + assert_eq!(&string_value1, &value.unwrap()); assert_eq!(mod_idx1, mod_idx2); // Successfully set value with proper CAS value. @@ -1454,15 +1464,15 @@ mod tests { check_and_set: Some(mod_idx1), ..Default::default() }; - let string_value3 = "This is correct CAS updated"; + let string_value3 = "This is correct CAS updated".to_owned(); let (set, _) = consul - .create_or_update_key(req, string_value3.as_bytes().to_vec()) + .create_or_update_key(req, &string_value3) .await .expect("failed to run create_or_update_key with proper CAS value"); assert!(set); // Verify that value was updated and the index changed. let (value, mod_idx3) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value3, &value.unwrap()); + assert_eq!(&string_value3, &value.unwrap()); assert_ne!(mod_idx1, mod_idx3); // Successfully set value without CAS. @@ -1471,18 +1481,24 @@ mod tests { check_and_set: None, ..Default::default() }; - let string_value4 = "This is non CAS update"; + let string_value4 = "This is non CAS update".to_owned(); let (set, _) = consul - .create_or_update_key(req, string_value4.as_bytes().to_vec()) + .create_or_update_key(req, &string_value4) .await .expect("failed to run create_or_update_key without CAS"); assert!(set); // Verify that value was updated and the index changed. let (value, mod_idx4) = get_single_key_value_with_index(&consul, key).await; - assert_eq!(string_value4, &value.unwrap()); + assert_eq!(&string_value4, &value.unwrap()); assert_ne!(mod_idx3, mod_idx4); } + async fn get_single_key_value_with_index(consul: &Consul, key: &str) -> (Option, u64) { + let res = read_string(consul, key).await.expect("failed to read key"); + let r = res.into_iter().next().unwrap(); + (r.value, r.modify_index) + } + fn get_client() -> Consul { let conf: Config = Config::from_env(); Consul::new(conf) diff --git a/src/types.rs b/src/types.rs index 1378ffc..227caf4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -84,7 +84,7 @@ pub struct DeleteKeyRequest<'a> { /// This is very useful as a building block for more complex synchronization primitives. /// The index must be greater than 0 for Consul to take any action: a 0 index will not delete the key. /// If the index is non-zero, the key is only deleted if the index matches the ModifyIndex of that key. - pub check_and_set: u32, + pub check_and_set: Option, /// Specifies the namespace to query. /// If not provided, the namespace will be inferred from the request's ACL token, or will default to the default namespace. pub namespace: &'a str, @@ -190,7 +190,7 @@ pub struct CreateOrUpdateKeyRequest<'a> { /// This is very useful as a building block for more complex synchronization primitives. /// If the index is 0, Consul will only put the key if it does not already exist. /// If the index is non-zero, the key is only set if the index matches the ModifyIndex of that key. - pub check_and_set: Option, + pub check_and_set: Option, /// Supply a session ID to use in a lock acquisition operation. /// This is useful as it allows leader election to be built on top of Consul. /// If the lock is not held and the session is valid, this increments the LockIndex and sets the Session value of the key in addition to updating the key contents. From 2032bfbd4d7d78a8c5d5c9978fd9f8c12de176bb Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Tue, 1 Nov 2022 12:43:50 -0700 Subject: [PATCH 07/13] clippy --- src/types.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/types.rs b/src/types.rs index 227caf4..f6076f7 100644 --- a/src/types.rs +++ b/src/types.rs @@ -465,7 +465,7 @@ pub struct DeregisterEntityRequest<'a> { #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "CheckID")] pub check_id: Option<&'a str>, - /// Specifies the ID of the Service to remove + /// Specifies the ID of the Service to remove #[serde(skip_serializing_if = "Option::is_none")] #[serde(rename = "ServiceID")] pub service_id: Option<&'a str>, @@ -640,7 +640,6 @@ where T: Default + Deserialize<'de>, { let opt = Option::deserialize(d)?; - let val = opt.unwrap_or_else(T::default); + let val = opt.unwrap_or_default(); Ok(val) } - From d5e3b9b9a33aa167fd6b9637ea726fcd1e0067de Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Tue, 1 Nov 2022 13:17:13 -0700 Subject: [PATCH 08/13] make the TransactionOp conform to the new cas scheme --- src/types.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/types.rs b/src/types.rs index f6076f7..662d668 100644 --- a/src/types.rs +++ b/src/types.rs @@ -218,7 +218,8 @@ pub struct TransactionOp<'a> { pub value: Base64Vec, #[serde(rename = "Index")] /// The modify_index if it is a cas operation - pub check_and_set: u64, + #[serde(skip_serializing_if = "Option::is_none")] + pub check_and_set: Option, /// Optional flags to associate with the key pub flags: u64, /// Namespace on which to operate From 2704704fd9c5df6d88a59831fc9e01441cd030e2 Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Tue, 28 Mar 2023 15:06:48 -0700 Subject: [PATCH 09/13] changing get_service_nodes to get_service_nodes_health adding the catalog/service nodes list as the default, also adding get single NodeFull by service and name --- src/lib.rs | 121 ++++++++++++++++++++++++++++++++++++++++++++------- src/types.rs | 25 +++++++++++ 2 files changed, 131 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 23393c9..c8e8edb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -155,6 +155,7 @@ const GET_LOCK_METHOD_NAME: &str = "get_lock"; const REGISTER_ENTITY_METHOD_NAME: &str = "register_entity"; const DEREGISTER_ENTITY_METHOD_NAME: &str = "deregister_entity"; const GET_ALL_REGISTERED_SERVICE_NAMES_METHOD_NAME: &str = "get_all_registered_service_names"; +const GET_SERVICE_NODES_HEALTH_METHOD_NAME: &str = "get_service_nodes_health"; const GET_SERVICE_NODES_METHOD_NAME: &str = "get_service_nodes"; const CREATE_SESSION_METHOD_NAME: &str = "create_session"; const GET_DATACENTERS: &str = "get_datacenters"; @@ -736,9 +737,51 @@ impl Consul { .uri(uri.clone()) } + /// returns the Node by a given service and name + /// + /// If the node is not present, returns none. + /// + /// This uses the catalog#list-nodes function in the Consul API + /// then supplies a filter by the Node name + pub async fn get_node_by_name_and_service( + &self, + service: &str, + node_name: &str, + query_opts: Option, + ) -> Result>> { + let query_opts = query_opts.unwrap_or_default(); + let filter = format!("Node == {node_name}"); + let request = GetServiceNodesRequest { + service, + near: None, + passing: false, + filter: Some(filter.as_str()), + }; + let req = self.build_get_service_nodes_catalog_req(request, &query_opts); + let (mut response_body, index) = self + .execute_request( + req, + hyper::Body::empty(), + query_opts.timeout, + GET_SERVICE_NODES_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); + let mut response = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + if let Some(node) = response.pop() { + Ok(Some(ResponseMeta { + response: node, + index, + })) + } else { + Ok(None) + } + } + /// returns the nodes providing the service indicated on the path. /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. - /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. + /// See the [consul docs](https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service) for more information. /// # Arguments: /// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest) /// # Errors: @@ -747,9 +790,9 @@ impl Consul { &self, request: GetServiceNodesRequest<'_>, query_opts: Option, - ) -> Result> { + ) -> Result>> { let query_opts = query_opts.unwrap_or_default(); - let req = self.build_get_service_nodes_req(request, &query_opts); + let req = self.build_get_service_nodes_catalog_req(request, &query_opts); let (mut response_body, index) = self .execute_request( req, @@ -759,6 +802,34 @@ impl Consul { ) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); + let response = serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)?; + Ok(ResponseMeta { response, index }) + } + + /// returns the nodes providing the service indicated on the path. + /// Users can also build in support for dynamic load balancing and other features by incorporating the use of health checks. + /// See the [consul docs](https://www.consul.io/api-docs/health#list-nodes-for-service) for more information. + /// # Arguments: + /// - request - the [GetServiceNodesRequest](consul::types::GetServiceNodesRequest) + /// # Errors: + /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. + pub async fn get_service_nodes_health( + &self, + request: GetServiceNodesRequest<'_>, + query_opts: Option, + ) -> Result> { + let query_opts = query_opts.unwrap_or_default(); + let req = self.build_get_service_nodes_health_req(request, &query_opts); + let (mut response_body, index) = self + .execute_request( + req, + hyper::Body::empty(), + query_opts.timeout, + GET_SERVICE_NODES_HEALTH_METHOD_NAME, + ) + .await?; + let bytes = response_body.copy_to_bytes(response_body.remaining()); let response = serde_json::from_slice::(&bytes) .map_err(ConsulError::ResponseDeserializationFailed)?; Ok(ResponseMeta { response, index }) @@ -775,10 +846,9 @@ impl Consul { passing: true, ..Default::default() }; - let services = self.get_service_nodes(request, query_opts).await.map_err(|e| { + let services = self.get_service_nodes_health(request, query_opts).await.map_err(|e| { let err = format!( - "Unable to query consul to resolve service '{}' to a list of addresses and ports: {:?}", - service_name, e + "Unable to query consul to resolve service '{service_name}' to a list of addresses and ports: {e:?}" ); error!("{}", err); ConsulError::ServiceInstanceResolutionFailed(service_name.to_string()) @@ -881,7 +951,28 @@ impl Consul { serde_json::from_slice(&bytes).map_err(ConsulError::ResponseDeserializationFailed) } - fn build_get_service_nodes_req( + fn build_get_service_nodes_catalog_req( + &self, + request: GetServiceNodesRequest<'_>, + query_opts: &QueryOptions, + ) -> http::request::Builder { + let req = hyper::Request::builder().method(Method::GET); + let mut url = String::new(); + url.push_str(&format!( + "{}/v1/catalog/service/{}", + self.config.address, request.service + )); + if request.passing { + url.push_str(&format!("?passing={}", request.passing)); + } + if let Some(filter) = request.filter { + url.push_str(&format!("&filter={filter}")); + } + add_query_option_params(&mut url, query_opts, '&'); + req.uri(url) + } + + fn build_get_service_nodes_health_req( &self, request: GetServiceNodesRequest<'_>, query_opts: &QueryOptions, @@ -896,10 +987,10 @@ impl Consul { url.push_str(&format!("?passing={}", request.passing)); } if let Some(near) = request.near { - url.push_str(&format!("&near={}", near)); + url.push_str(&format!("&near={near}")); } if let Some(filter) = request.filter { - url.push_str(&format!("&filter={}", filter)); + url.push_str(&format!("&filter={filter}")); } add_query_option_params(&mut url, query_opts, '&'); req.uri(url) @@ -985,7 +1076,7 @@ impl Consul { fn build_create_txn_url(&self, datacenter: Option<&str>) -> String { let mut url = format!("{}/v1/txn", self.config.address); if let Some(dc) = datacenter { - url.push_str(&format!("?datacenter={}", dc)); + url.push_str(&format!("?datacenter={dc}")); } url } @@ -1010,7 +1101,7 @@ impl Consul { } if let Some(cas_idx) = request.check_and_set { url = add_query_param_separator(url, added_query_param); - url.push_str(&format!("cas={}", cas_idx)); + url.push_str(&format!("cas={cas_idx}")); } add_namespace_and_datacenter(url, request.namespace, request.datacenter) @@ -1020,18 +1111,18 @@ impl Consul { fn add_query_option_params(uri: &mut String, query_opts: &QueryOptions, mut separator: char) { if let Some(ns) = &query_opts.namespace { if !ns.is_empty() { - uri.push_str(&format!("{}ns={}", separator, ns)); + uri.push_str(&format!("{separator}ns={ns}")); separator = '&'; } } if let Some(dc) = &query_opts.datacenter { if !dc.is_empty() { - uri.push_str(&format!("{}dc={}", separator, dc)); + uri.push_str(&format!("{separator}dc={dc}")); separator = '&'; } } if let Some(idx) = query_opts.index { - uri.push_str(&format!("{}index={}", separator, idx)); + uri.push_str(&format!("{separator}index={idx}")); separator = '&'; if let Some(wait) = query_opts.wait { uri.push_str(&format!( @@ -1168,7 +1259,7 @@ mod tests { for sn in list_response.response.iter() { let dereg_request = DeregisterEntityRequest { - node: "local".into(), + node: "local", service_id: Some(sn.service.id.as_str()), ..Default::default() }; diff --git a/src/types.rs b/src/types.rs index 662d668..70c6966 100644 --- a/src/types.rs +++ b/src/types.rs @@ -507,6 +507,7 @@ pub struct ServiceNode { #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The node information of an instance providing a Consul service. +/// provided by the Consul Health API pub struct Node { /// The ID of the service node. #[serde(rename = "ID")] @@ -525,6 +526,30 @@ pub struct Node { pub meta: HashMap, } +#[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "PascalCase")] +/// The node information as returned by the Consul Catalog API +pub struct NodeFull { + id: String, + node: String, + address: String, + datacenter: String, + tagged_addresses: HashMap, + node_meta: HashMap, + create_index: u64, + modify_index: u64, + service_address: Option, + service_enable_tag_override: Option, + #[serde(rename = "Service_ID")] + service_id: Option, + service_name: Option, + service_port: Option, + service_meta: HashMap, + service_tagged_addresses: HashMap, + service_tags: Vec, + namespace: Option, +} + #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] /// The service information of an instance providing a Consul service. From 1cb9860af5c79d1123660785952f217cc849799a Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Mon, 26 Jun 2023 10:00:47 -0700 Subject: [PATCH 10/13] cleanup after merging in #main --- Cargo.toml | 5 +++-- src/lib.rs | 34 +++++++++++++++++++++++++--------- src/types.rs | 11 ++++++----- 3 files changed, 34 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a6d5bb4..c5274ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,10 +11,11 @@ license-file = "LICENSE" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] default = ["rustls-native"] -metrics = ["prometheus", "lazy_static"] +metrics = ["prometheus"] default-tls = ["hyper-tls"] rustls-native = ["hyper-rustls/rustls-native-certs"] rustls-webpki = ["hyper-rustls/webpki-roots"] +trace = ["opentelemetry"] # keep this list sorted! [dependencies] @@ -24,7 +25,7 @@ http = "0.2" hyper = { version = "0.14", features = ["full"] } hyper-rustls = { version = "0.24" } hyper-tls = { version = "0.5.0", optional = true, no-default-features = true } -lazy_static = { version = "1", optional = true } +lazy_static = { version = "1" } opentelemetry = { version = "0.19", features = ["rt-tokio"], optional = true } prometheus = { version = "0.13", optional = true } quick-error = "2" diff --git a/src/lib.rs b/src/lib.rs index c8e8edb..fb0b5c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ use std::{env, str::Utf8Error}; use base64::Engine; use hyper::{body::Buf, client::HttpConnector, Body, Method}; #[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))] -use hyper_rustls::HttpsConnector; +use hyper_rustls::{ HttpsConnector, HttpsConnectorBuilder }; #[cfg(feature = "default-tls")] use hyper_tls::HttpsConnector; use lazy_static::lazy_static; @@ -242,27 +242,36 @@ where #[derive(Debug)] /// This struct defines the consul client and allows access to the consul api via method syntax. pub struct Consul { - https_client: hyper::Client, Body>, + + https_client: hyper::Client, Body>, config: Config, #[cfg(feature = "trace")] tracer: BoxedTracer, } -fn https_connector() -> hyper_rustls::HttpsConnector { - #[cfg(feature = "rustls-webpki")] - return hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() +fn https_connector() -> HttpsConnector { + #[cfg(feature = "rustls-native")] + return HttpsConnectorBuilder::new() + .with_native_roots() .https_or_http() .enable_http1() .build(); - hyper_rustls::HttpsConnectorBuilder::new() - .with_native_roots() + #[cfg(feature = "rustls-webpki")] + return HttpsConnectorBuilder::new() + .with_webpki_roots() .https_or_http() .enable_http1() - .build() + .build(); + #[cfg(feature = "default-tls")] + { + let mut conn = HttpsConnector::new(); + conn.https_only(false); + return conn; + } } impl Clone for Consul { + #[cfg(feature = "trace")] fn clone(&self) -> Self { Consul { https_client: self.https_client.clone(), @@ -270,6 +279,13 @@ impl Clone for Consul { tracer: global::tracer("consul"), } } + #[cfg(not(feature = "trace"))] + fn clone(&self) -> Self { + Consul { + https_client: self.https_client.clone(), + config: self.config.clone(), + } + } } impl Consul { diff --git a/src/types.rs b/src/types.rs index 70c6966..b45f522 100644 --- a/src/types.rs +++ b/src/types.rs @@ -27,6 +27,10 @@ use std::time::Duration; use serde::{self, de::Deserializer, de::Error as SerdeError, Deserialize, Serialize, Serializer}; use smart_default::SmartDefault; +use base64::{ + Engine, + engine::general_purpose::STANDARD as B64, +}; // TODO retrofit other get APIs to use this struct /// Query options for Consul endpoints. @@ -623,10 +627,7 @@ pub struct Base64Vec(pub Vec); impl Serialize for Base64Vec { fn serialize(&self, serializer: S) -> Result { - serializer.collect_str(&base64::display::Base64Display::with_config( - &self.0, - base64::STANDARD, - )) + serializer.collect_str(&B64.encode(&self.0)) } } @@ -641,7 +642,7 @@ impl<'de> Deserialize<'de> for Base64Vec { } fn visit_str(self, v: &str) -> Result { - base64::decode(v).map(Base64Vec).map_err(SerdeError::custom) + B64.decode(v).map(Base64Vec).map_err(SerdeError::custom) } } deserializer.deserialize_str(Vis) From 76dcf5bb067e1ae94d8d068497dae78dc9e59b99 Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Mon, 26 Jun 2023 10:47:16 -0700 Subject: [PATCH 11/13] fixing tests --- Cargo.toml | 2 +- src/lib.rs | 15 ++++++++++++--- src/types.rs | 51 ++++++++++++++++++++++++++++++++++----------------- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c5274ea..9b5985b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ futures = "0.3" http = "0.2" hyper = { version = "0.14", features = ["full"] } hyper-rustls = { version = "0.24" } -hyper-tls = { version = "0.5.0", optional = true, no-default-features = true } +hyper-tls = { version = "0.5.0", optional = true } lazy_static = { version = "1" } opentelemetry = { version = "0.19", features = ["rt-tokio"], optional = true } prometheus = { version = "0.13", optional = true } diff --git a/src/lib.rs b/src/lib.rs index fb0b5c3..a30c0b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -191,6 +191,15 @@ impl Config { hyper_builder: Default::default(), } } + + /// Create a new config from an address and token + pub fn new(address: String, token: Option) -> Self { + Config { + address, + token, + hyper_builder: Default::default(), + } + } } /// Represents a lock against Consul. @@ -1276,7 +1285,7 @@ mod tests { for sn in list_response.response.iter() { let dereg_request = DeregisterEntityRequest { node: "local", - service_id: Some(sn.service.id.as_str()), + service_id: Some(sn.service_id.as_ref().unwrap().as_str()), ..Default::default() }; consul.deregister_entity(&dereg_request).await.unwrap(); @@ -1349,7 +1358,7 @@ mod tests { let addresses: Vec = response .iter() - .map(|sn| sn.service.address.clone()) + .map(|sn| sn.service_address.as_ref().unwrap().clone()) .collect(); let expected_addresses = vec![ "1.1.1.1".to_string(), @@ -1362,7 +1371,7 @@ mod tests { let _: Vec<_> = response .iter() - .map(|sn| assert_eq!("dc1", sn.node.datacenter)) + .map(|sn| assert_eq!("dc1", sn.datacenter)) .collect(); } diff --git a/src/types.rs b/src/types.rs index b45f522..88cd77e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -534,24 +534,41 @@ pub struct Node { #[serde(rename_all = "PascalCase")] /// The node information as returned by the Consul Catalog API pub struct NodeFull { - id: String, - node: String, - address: String, - datacenter: String, - tagged_addresses: HashMap, - node_meta: HashMap, - create_index: u64, - modify_index: u64, - service_address: Option, - service_enable_tag_override: Option, + /// id + pub id: String, + /// node + pub node: String, + /// address + pub address: String, + /// datacenter + pub datacenter: String, + /// tagged_addresses + pub tagged_addresses: HashMap, + /// node_meta + pub node_meta: HashMap, + /// create_index + pub create_index: u64, + /// modify_index + pub modify_index: u64, + /// service_address + pub service_address: Option, + /// service_enable_tag_override + pub service_enable_tag_override: Option, #[serde(rename = "Service_ID")] - service_id: Option, - service_name: Option, - service_port: Option, - service_meta: HashMap, - service_tagged_addresses: HashMap, - service_tags: Vec, - namespace: Option, + /// service_id + pub service_id: Option, + /// service_name + pub service_name: Option, + /// service_port + pub service_port: Option, + /// service_meta + pub service_meta: HashMap, + /// service_tagged_addresses + pub service_tagged_addresses: HashMap, + /// service_tags + pub service_tags: Vec, + /// namespace + pub namespace: Option, } #[derive(Clone, Debug, SmartDefault, Serialize, Deserialize, PartialEq, Eq)] From 157301fd36d5669db593e9ad06bfd5ec5b3f6b1c Mon Sep 17 00:00:00 2001 From: Chris Holcombe Date: Tue, 27 Jun 2023 09:36:13 -0700 Subject: [PATCH 12/13] Make the Service port field optional --- src/lib.rs | 17 ++++++++--------- src/types.rs | 9 +++------ 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a30c0b7..9b3fdc4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,7 +34,7 @@ use std::{env, str::Utf8Error}; use base64::Engine; use hyper::{body::Buf, client::HttpConnector, Body, Method}; #[cfg(any(feature = "rustls-native", feature = "rustls-webpki"))] -use hyper_rustls::{ HttpsConnector, HttpsConnectorBuilder }; +use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; #[cfg(feature = "default-tls")] use hyper_tls::HttpsConnector; use lazy_static::lazy_static; @@ -251,7 +251,6 @@ where #[derive(Debug)] /// This struct defines the consul client and allows access to the consul api via method syntax. pub struct Consul { - https_client: hyper::Client, Body>, config: Config, #[cfg(feature = "trace")] @@ -275,7 +274,7 @@ fn https_connector() -> HttpsConnector { { let mut conn = HttpsConnector::new(); conn.https_only(false); - return conn; + return conn; } } @@ -865,7 +864,7 @@ impl Consul { &self, service_name: &str, query_opts: Option, - ) -> Result> { + ) -> Result)>> { let request = GetServiceNodesRequest { service: service_name, passing: true, @@ -903,12 +902,12 @@ impl Consul { /// in the health endpoint. These requests models are primarily for the /// health endpoints /// https://www.consul.io/api-docs/health#list-nodes-for-service - fn parse_host_port_from_service_node_response(sn: ServiceNode) -> (String, u16) { + fn parse_host_port_from_service_node_response(sn: ServiceNode) -> (String, Option) { ( if sn.service.address.is_empty() { info!( - "Consul service {service_name} instance had an empty Service address, with port:{port}", - service_name = &sn.service.service, port = sn.service.port + "Consul service {} instance had an empty Service address, with port:{:?}", + &sn.service.service, sn.service.port ); sn.node.address } else { @@ -1511,14 +1510,14 @@ mod tests { id: "node".to_string(), service: "node".to_string(), address: "2.2.2.2".to_string(), - port: 32, + port: Some(32), }; let empty_service = Service { id: "".to_string(), service: "".to_string(), address: "".to_string(), - port: 32, + port: Some(32), }; let sn = ServiceNode { diff --git a/src/types.rs b/src/types.rs index 88cd77e..a592842 100644 --- a/src/types.rs +++ b/src/types.rs @@ -25,12 +25,9 @@ SOFTWARE. use std::collections::HashMap; use std::time::Duration; +use base64::{engine::general_purpose::STANDARD as B64, Engine}; use serde::{self, de::Deserializer, de::Error as SerdeError, Deserialize, Serialize, Serializer}; use smart_default::SmartDefault; -use base64::{ - Engine, - engine::general_purpose::STANDARD as B64, -}; // TODO retrofit other get APIs to use this struct /// Query options for Consul endpoints. @@ -536,7 +533,7 @@ pub struct Node { pub struct NodeFull { /// id pub id: String, - /// node + /// node pub node: String, /// address pub address: String, @@ -583,7 +580,7 @@ pub struct Service { /// The address of the instance. pub address: String, /// The port of the instance. - pub port: u16, + pub port: Option, } pub(crate) fn serialize_duration_as_string( From e282cf34bc3992653b077528a3ef40071bacb558 Mon Sep 17 00:00:00 2001 From: Chris Holcombe Date: Tue, 27 Jun 2023 17:52:36 +0000 Subject: [PATCH 13/13] Bump docker compose to latest 1.15.x and fix service tagged addresses --- docker-compose.yml | 2 +- src/types.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 4abbe1d..3cf1bed 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,7 +3,7 @@ version: "3.8" services: consul: container_name: consul - image: consul:1.11.11 + image: consul:1.15.3 command: >- consul agent diff --git a/src/types.rs b/src/types.rs index a592842..eb8285c 100644 --- a/src/types.rs +++ b/src/types.rs @@ -532,6 +532,7 @@ pub struct Node { /// The node information as returned by the Consul Catalog API pub struct NodeFull { /// id + #[serde(rename = "ID")] pub id: String, /// node pub node: String, @@ -561,7 +562,7 @@ pub struct NodeFull { /// service_meta pub service_meta: HashMap, /// service_tagged_addresses - pub service_tagged_addresses: HashMap, + pub service_tagged_addresses: HashMap>, /// service_tags pub service_tags: Vec, /// namespace