Skip to content

Commit

Permalink
wrap read_key in a ResponseMeta so callers can access the index
Browse files Browse the repository at this point in the history
when a delete occurs we need this index as the ModifyIndex of all the
ReadKeyResponses will still be the same (and lower). which causes blocking
requests to be "broken" as future read_key request immediatly returns until
either a new key, or a update occuries to a key contained in the
ReadKeyResponse Vec.

per #30
  • Loading branch information
badalex committed Jul 18, 2024
1 parent 1b4141a commit a544363
Showing 1 changed file with 37 additions and 25 deletions.
62 changes: 37 additions & 25 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,29 +267,35 @@ impl Consul {
/// - request - the [ReadKeyRequest](consul::types::ReadKeyRequest)
/// # Errors:
/// [ConsulError](consul::ConsulError) describes all possible errors returned by this api.
pub async fn read_key(&self, request: ReadKeyRequest<'_>) -> Result<Vec<ReadKeyResponse>> {
pub async fn read_key(
&self,
request: ReadKeyRequest<'_>,
) -> Result<ResponseMeta<Vec<ReadKeyResponse>>> {
let req = self.build_read_key_req(request);
let (mut response_body, _index) = self
let (mut response_body, index) = self
.execute_request(req, hyper::Body::empty(), None, READ_KEY_METHOD_NAME)
.await?;
let bytes = response_body.copy_to_bytes(response_body.remaining());
serde_json::from_slice::<Vec<ReadKeyResponse>>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?
.into_iter()
.map(|mut r| {
r.value = match r.value {
Some(val) => Some(
std::str::from_utf8(
&base64::engine::general_purpose::STANDARD.decode(val)?,
)?
.to_string(),
),
None => None,
};

Ok(r)
})
.collect()
Ok(ResponseMeta {
response: serde_json::from_slice::<Vec<ReadKeyResponse>>(&bytes)
.map_err(ConsulError::ResponseDeserializationFailed)?
.into_iter()
.map(|mut r| {
r.value = match r.value {
Some(val) => Some(
std::str::from_utf8(
&base64::engine::general_purpose::STANDARD.decode(val)?,
)?
.to_string(),
),
None => None,
};

Ok(r)
})
.collect::<Result<Vec<_>>>()?,
index,
})
}

/// Creates or updates a key in Consul's KV store. See the [consul docs](https://www.consul.io/api-docs/kv#create-update-key) for more information.
Expand Down Expand Up @@ -461,7 +467,7 @@ impl Consul {
pub async fn watch_lock<'a>(
&self,
request: LockWatchRequest<'_>,
) -> Result<Vec<ReadKeyResponse>> {
) -> Result<ResponseMeta<Vec<ReadKeyResponse>>> {
let req = ReadKeyRequest {
key: request.key,
namespace: request.namespace,
Expand Down Expand Up @@ -1293,7 +1299,7 @@ mod tests {
.await
}

async fn read_key(consul: &Consul, key: &str) -> Result<Vec<ReadKeyResponse>> {
async fn read_key(consul: &Consul, key: &str) -> Result<ResponseMeta<Vec<ReadKeyResponse>>> {
let req = ReadKeyRequest {
key,
..Default::default()
Expand Down Expand Up @@ -1322,14 +1328,20 @@ mod tests {

async fn get_single_key_value_with_index(consul: &Consul, key: &str) -> (Option<String>, i64) {
let res = read_key(consul, key).await.expect("failed to read key");
let r = res.into_iter().next().unwrap();
(r.value, r.modify_index)
let r = res.response.into_iter().next().unwrap();
(r.value, res.index as i64)
}

fn verify_single_value_matches(res: Result<Vec<ReadKeyResponse>>, value: &str) {
fn verify_single_value_matches(res: Result<ResponseMeta<Vec<ReadKeyResponse>>>, value: &str) {
assert!(res.is_ok());
assert_eq!(
res.unwrap().into_iter().next().unwrap().value.unwrap(),
res.unwrap()
.response
.into_iter()
.next()
.unwrap()
.value
.unwrap(),
value
)
}
Expand Down

0 comments on commit a544363

Please sign in to comment.