Skip to content

Commit

Permalink
Merge branch 'release-1.2' into CI/redis-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh authored Oct 21, 2024
2 parents 70f99b0 + 914fd60 commit c952ec8
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 123 deletions.
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::connection::{
resp2_is_pub_sub_state_cleared, resp3_is_pub_sub_state_cleared, ConnectionAddr, ConnectionInfo,
Msg, RedisConnectionInfo,
};
#[cfg(any(feature = "tokio-comp"))]
#[cfg(feature = "tokio-comp")]
use crate::parser::ValueCodec;
use crate::types::{ErrorKind, FromRedisValue, RedisError, RedisFuture, RedisResult, Value};
use crate::{from_owned_redis_value, ProtocolVersion, ToRedisArgs};
Expand Down
10 changes: 5 additions & 5 deletions glide-core/redis-rs/redis/src/aio/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ macro_rules! reconnect_if_dropped {
};
}

/// Handle a connection result. If there's an I/O error, reconnect.
/// Handle a connection result. If the connection has dropped, reconnect.
/// Propagate any error.
macro_rules! reconnect_if_io_error {
macro_rules! reconnect_if_conn_dropped {
($self:expr, $result:expr, $current:expr) => {
if let Err(e) = $result {
if e.is_io_error() {
if e.is_connection_dropped() {
$self.reconnect($current);
}
return Err(e);
Expand Down Expand Up @@ -249,7 +249,7 @@ impl ConnectionManager {
.clone()
.await
.map_err(|e| e.clone_mostly("Reconnecting failed"));
reconnect_if_io_error!(self, connection_result, guard);
reconnect_if_conn_dropped!(self, connection_result, guard);
let result = connection_result?.send_packed_command(cmd).await;
reconnect_if_dropped!(self, &result, guard);
result
Expand All @@ -270,7 +270,7 @@ impl ConnectionManager {
.clone()
.await
.map_err(|e| e.clone_mostly("Reconnecting failed"));
reconnect_if_io_error!(self, connection_result, guard);
reconnect_if_conn_dropped!(self, connection_result, guard);
let result = connection_result?
.send_packed_commands(cmd, offset, count)
.await;
Expand Down
42 changes: 25 additions & 17 deletions glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ where
&mut self,
item: SinkItem,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
self.send_recv(item, None, timeout).await
}

Expand All @@ -359,7 +359,7 @@ where
// If `None`, this is a single request, not a pipeline of multiple requests.
pipeline_response_count: Option<usize>,
timeout: Duration,
) -> Result<Value, Option<RedisError>> {
) -> Result<Value, RedisError> {
let (sender, receiver) = oneshot::channel();

self.sender
Expand All @@ -369,15 +369,29 @@ where
output: sender,
})
.await
.map_err(|_| None)?;
.map_err(|err| {
// If an error occurs here, it means the request never reached the server, as guaranteed
// by the 'send' function. Since the server did not receive the data, it is safe to retry
// the request.
RedisError::from((
crate::ErrorKind::FatalSendError,
"Failed to send the request to the server",
err.to_string(),
))
})?;
match Runtime::locate().timeout(timeout, receiver).await {
Ok(Ok(result)) => result.map_err(Some),
Ok(Err(_)) => {
// The `sender` was dropped which likely means that the stream part
// failed for one reason or another
Err(None)
Ok(Ok(result)) => result,
Ok(Err(err)) => {
// The `sender` was dropped, likely indicating a failure in the stream.
// This error suggests that it's unclear whether the server received the request before the connection failed,
// making it unsafe to retry. For example, retrying an INCR request could result in double increments.
Err(RedisError::from((
crate::ErrorKind::FatalReceiveError,
"Failed to receive a response due to a fatal error",
err.to_string(),
)))
}
Err(elapsed) => Err(Some(elapsed.into())),
Err(elapsed) => Err(elapsed.into()),
}
}

Expand Down Expand Up @@ -503,10 +517,7 @@ impl MultiplexedConnection {
let result = self
.pipeline
.send_single(cmd.get_packed_command(), self.response_timeout)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;
if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
if e.is_connection_dropped() {
Expand Down Expand Up @@ -537,10 +548,7 @@ impl MultiplexedConnection {
Some(offset + count),
self.response_timeout,
)
.await
.map_err(|err| {
err.unwrap_or_else(|| RedisError::from(io::Error::from(io::ErrorKind::BrokenPipe)))
});
.await;

if self.protocol != ProtocolVersion::RESP2 {
if let Err(e) = &result {
Expand Down
21 changes: 10 additions & 11 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ use std::time::Duration;

use rand::{seq::IteratorRandom, thread_rng};

pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
use crate::cluster_pipeline::UNROUTABLE_ERROR;
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};
use crate::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo,
};
Expand All @@ -54,17 +56,14 @@ use crate::connection::{
connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo,
};
use crate::parser::parse_redis_value;
use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, Value};
use crate::types::{ErrorKind, HashMap, RedisError, RedisResult, RetryMethod, Value};
pub use crate::TlsMode; // Pub for backwards compatibility
use crate::{
cluster_client::ClusterParams,
cluster_routing::{Redirect, Route, RoutingInfo},
IntoConnectionInfo, PushInfo,
};

pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};

use tokio::sync::mpsc;

#[cfg(feature = "tls-rustls")]
Expand Down Expand Up @@ -749,29 +748,29 @@ where
retries += 1;

match err.retry_method() {
crate::types::RetryMethod::AskRedirect => {
RetryMethod::AskRedirect => {
redirected = err
.redirect_node()
.map(|(node, _slot)| Redirect::Ask(node.to_string()));
}
crate::types::RetryMethod::MovedRedirect => {
RetryMethod::MovedRedirect => {
// Refresh slots.
self.refresh_slots()?;
// Request again.
redirected = err
.redirect_node()
.map(|(node, _slot)| Redirect::Moved(node.to_string()));
}
crate::types::RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica
| crate::types::RetryMethod::WaitAndRetry => {
RetryMethod::WaitAndRetryOnPrimaryRedirectOnReplica
| RetryMethod::WaitAndRetry => {
// Sleep and retry.
let sleep_time = self
.cluster_params
.retry_params
.wait_time_for_retry(retries);
thread::sleep(sleep_time);
}
crate::types::RetryMethod::Reconnect => {
RetryMethod::Reconnect | RetryMethod::ReconnectAndRetry => {
if *self.auto_reconnect.borrow() {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
Expand All @@ -780,10 +779,10 @@ where
}
}
}
crate::types::RetryMethod::NoRetry => {
RetryMethod::NoRetry => {
return Err(err);
}
crate::types::RetryMethod::RetryImmediately => {}
RetryMethod::RetryImmediately => {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ where
&self,
amount: usize,
conn_type: ConnectionType,
) -> impl Iterator<Item = ConnectionAndAddress<Connection>> + '_ {
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
) -> Option<impl Iterator<Item = ConnectionAndAddress<Connection>> + '_> {
(!self.connection_map.is_empty()).then_some({
self.connection_map
.iter()
.choose_multiple(&mut rand::thread_rng(), amount)
.into_iter()
.map(move |item| {
let (address, node) = (item.key(), item.value());
let conn = node.get_connection(&conn_type);
(address.clone(), conn)
})
})
}

pub(crate) fn replace_or_add_connection_for_address(
Expand Down Expand Up @@ -633,6 +635,7 @@ mod tests {

let random_connections: HashSet<_> = container
.random_connections(3, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();

Expand All @@ -647,12 +650,9 @@ mod tests {
let container = create_container();
remove_all_connections(&container);

assert_eq!(
0,
container
.random_connections(1, ConnectionType::User)
.count()
);
assert!(container
.random_connections(1, ConnectionType::User)
.is_none());
}

#[test]
Expand All @@ -665,6 +665,7 @@ mod tests {
);
let random_connections: Vec<_> = container
.random_connections(1, ConnectionType::User)
.expect("No connections found")
.collect();

assert_eq!(vec![(address, 4)], random_connections);
Expand All @@ -675,6 +676,7 @@ mod tests {
let container = create_container();
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::User)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand All @@ -687,6 +689,7 @@ mod tests {
let container = create_container_with_strategy(ReadFromReplicaStrategy::RoundRobin, true);
let mut random_connections: Vec<_> = container
.random_connections(1000, ConnectionType::PreferManagement)
.expect("No connections found")
.map(|pair| pair.1)
.collect();
random_connections.sort();
Expand Down
Loading

0 comments on commit c952ec8

Please sign in to comment.