From a544363f02e3ccf0deba44d30c39e32a9f970d6e Mon Sep 17 00:00:00 2001 From: Alex Hunsaker Date: Thu, 18 Jul 2024 08:16:12 -0600 Subject: [PATCH] wrap read_key in a ResponseMeta so callers can access the index when a delete occurs we need this index as the ModifyIndex of all the ReadKeyResponses will still be the same (and lower). which causes blocking requests to be "broken" as future read_key request immediatly returns until either a new key, or a update occuries to a key contained in the ReadKeyResponse Vec. per Roblox/rs-consul#30 --- src/lib.rs | 62 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 9cd57fe..827455d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -267,29 +267,35 @@ impl Consul { /// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest) /// # Errors: /// [ConsulError](consul::ConsulError) describes all possible errors returned by this api. - pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result> { + pub async fn read_key( + &self, + request: ReadKeyRequest<'_>, + ) -> Result>> { let req = self.build_read_key_req(request); - let (mut response_body, _index) = self + let (mut response_body, index) = self .execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME) .await?; let bytes = response_body.copy_to_bytes(response_body.remaining()); - serde_json::from_slice::>(&bytes) - .map_err(ConsulError::ResponseDeserializationFailed)? - .into_iter() - .map(|mut r| { - r.value = match r.value { - Some(val) => Some( - std::str::from_utf8( - &base64::engine::general_purpose::STANDARD.decode(val)?, - )? - .to_string(), - ), - None => None, - }; - - Ok(r) - }) - .collect() + Ok(ResponseMeta { + response: serde_json::from_slice::>(&bytes) + .map_err(ConsulError::ResponseDeserializationFailed)? + .into_iter() + .map(|mut r| { + r.value = match r.value { + Some(val) => Some( + std::str::from_utf8( + &base64::engine::general_purpose::STANDARD.decode(val)?, + )? + .to_string(), + ), + None => None, + }; + + Ok(r) + }) + .collect::>>()?, + index, + }) } /// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information. @@ -461,7 +467,7 @@ impl Consul { pub async fn watch_lock<'a>( &self, request: LockWatchRequest<'_>, - ) -> Result> { + ) -> Result>> { let req = ReadKeyRequest { key: request.key, namespace: request.namespace, @@ -1293,7 +1299,7 @@ mod tests { .await } - async fn read_key(consul: &Consul, key: &str) -> Result> { + async fn read_key(consul: &Consul, key: &str) -> Result>> { let req = ReadKeyRequest { key, ..Default::default() @@ -1322,14 +1328,20 @@ mod tests { async fn get_single_key_value_with_index(consul: &Consul, key: &str) -> (Option, i64) { let res = read_key(consul, key).await.expect("failed to read key"); - let r = res.into_iter().next().unwrap(); - (r.value, r.modify_index) + let r = res.response.into_iter().next().unwrap(); + (r.value, res.index as i64) } - fn verify_single_value_matches(res: Result>, value: &str) { + fn verify_single_value_matches(res: Result>>, value: &str) { assert!(res.is_ok()); assert_eq!( - res.unwrap().into_iter().next().unwrap().value.unwrap(), + res.unwrap() + .response + .into_iter() + .next() + .unwrap() + .value + .unwrap(), value ) }