Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glide-core UDS Socket Handling Rework #2482

Open
wants to merge 1 commit into
base: release-1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions glide-core/src/retry_strategies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub(crate) fn get_exponential_backoff(
}

#[cfg(feature = "socket-layer")]
#[allow(dead_code)]
pub(crate) fn get_fixed_interval_backoff(
fixed_interval: u32,
number_of_retries: u32,
Expand Down
230 changes: 99 additions & 131 deletions glide-core/src/socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,29 @@ use crate::connection_request::ConnectionRequest;
use crate::errors::{error_message, error_type, RequestErrorType};
use crate::response;
use crate::response::Response;
use crate::retry_strategies::get_fixed_interval_backoff;
use bytes::Bytes;
use directories::BaseDirs;
use dispose::{Disposable, Dispose};
use logger_core::{log_debug, log_error, log_info, log_trace, log_warn};
use once_cell::sync::Lazy;
use protobuf::{Chars, Message};
use redis::cluster_routing::{
MultipleNodeRoutingInfo, Route, RoutingInfo, SingleNodeRoutingInfo, SlotAddr,
};
use redis::cluster_routing::{ResponsePolicy, Routable};
use redis::{Cmd, PushInfo, RedisError, ScanStateRC, Value};
use std::cell::Cell;
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::{Arc, RwLock};
use std::{env, str};
use std::{io, thread};
use thiserror::Error;
use tokio::io::ErrorKind::AddrInUse;
use tokio::net::{UnixListener, UnixStream};
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{channel, Sender};
use tokio::sync::Mutex;
use tokio::task;
use tokio_retry::Retry;
use tokio_util::task::LocalPoolHandle;
use ClosingReason::*;
use PipeListeningResult::*;
Expand All @@ -53,20 +52,6 @@ pub const ZSET: &str = "zset";
pub const HASH: &str = "hash";
pub const STREAM: &str = "stream";

/// struct containing all objects needed to bind to a socket and clean it.
struct SocketListener {
socket_path: String,
cleanup_socket: bool,
}

impl Dispose for SocketListener {
fn dispose(self) {
if self.cleanup_socket {
close_socket(&self.socket_path);
}
}
}

/// struct containing all objects needed to read from a unix stream.
struct UnixStreamListener {
read_socket: Rc<UnixStream>,
Expand Down Expand Up @@ -734,109 +719,6 @@ async fn listen_on_client_stream(socket: UnixStream) {
log_trace("client closing", "closing connection");
}

enum SocketCreationResult {
// Socket creation was successful, returned a socket listener.
Created(UnixListener),
// There's an existing a socket listener.
PreExisting,
// Socket creation failed with an error.
Err(io::Error),
}

impl SocketListener {
fn new(socket_path: String) -> Self {
SocketListener {
socket_path,
// Don't cleanup the socket resources unless we know that the socket is in use, and owned by this listener.
cleanup_socket: false,
}
}

/// Return true if it's possible to connect to socket.
async fn socket_is_available(&self) -> bool {
if UnixStream::connect(&self.socket_path).await.is_ok() {
return true;
}

let retry_strategy = get_fixed_interval_backoff(10, 3);

let action = || async {
UnixStream::connect(&self.socket_path)
.await
.map(|_| ())
.map_err(|_| ())
};
let result = Retry::spawn(retry_strategy.get_iterator(), action).await;
result.is_ok()
}

async fn get_socket_listener(&self) -> SocketCreationResult {
const RETRY_COUNT: u8 = 3;
let mut retries = RETRY_COUNT;
while retries > 0 {
match UnixListener::bind(self.socket_path.clone()) {
Ok(listener) => {
return SocketCreationResult::Created(listener);
}
Err(err) if err.kind() == AddrInUse => {
if self.socket_is_available().await {
return SocketCreationResult::PreExisting;
} else {
// socket file might still exist, even if nothing is listening on it.
close_socket(&self.socket_path);
retries -= 1;
continue;
}
}
Err(err) => {
return SocketCreationResult::Err(err);
}
}
}
SocketCreationResult::Err(io::Error::new(
io::ErrorKind::Other,
"Failed to connect to socket",
))
}

pub(crate) async fn listen_on_socket<InitCallback>(&mut self, init_callback: InitCallback)
where
InitCallback: FnOnce(Result<String, String>) + Send + 'static,
{
// Bind to socket
let listener = match self.get_socket_listener().await {
SocketCreationResult::Created(listener) => listener,
SocketCreationResult::Err(err) => {
log_info("listen_on_socket", format!("failed with error: {err}"));
init_callback(Err(err.to_string()));
return;
}
SocketCreationResult::PreExisting => {
init_callback(Ok(self.socket_path.clone()));
return;
}
};

self.cleanup_socket = true;
init_callback(Ok(self.socket_path.clone()));
let local_set_pool = LocalPoolHandle::new(num_cpus::get());
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
local_set_pool.spawn_pinned(move || listen_on_client_stream(stream));
}
Err(err) => {
log_debug(
"listen_on_socket",
format!("Socket closed with error: `{err}`"),
);
return;
}
}
}
}
}

#[derive(Debug)]
/// Enum describing the reason that a socket listener stopped listening on a socket.
pub enum ClosingReason {
Expand Down Expand Up @@ -924,23 +806,109 @@ pub fn start_socket_listener_internal<InitCallback>(
init_callback: InitCallback,
socket_path: Option<String>,
) where
InitCallback: FnOnce(Result<String, String>) + Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should return a Result. This will reduce the code below by 1/2

InitCallback: FnOnce(Result<String, String>) + Send + Clone + 'static,
{
static INITIALIZED_SOCKETS: Lazy<Arc<RwLock<HashSet<String>>>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the Arc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it is needed because the lock is shared between the threads. Will check

Lazy::new(|| Arc::new(RwLock::new(HashSet::new())));

let socket_path = socket_path.unwrap_or_else(get_socket_path);

{
// Optimize for already initialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its needed since this method is mostly called once in the lifetime of a client creation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not true. multiple sockets are allowed (dont want to change it )

let initialized_sockets = INITIALIZED_SOCKETS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:

Can we have more than 1 socket? (looking at the socket path it is hard coded to:

let socket_name = format!("{}-{}", SOCKET_FILE_NAME, std::process::id());

So it seems 1 per process (unless it is needed for testing purposes)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its odd but multiple calls with different socket_path are allowed, leading to multiple UDS sockets

.read()
.expect("Failed to acquire sockets db read guard");
if initialized_sockets.contains(&socket_path) {
init_callback(Ok(socket_path.clone()));
return;
}
}

// Retry with write lock, will be dropped upon the function completion
let mut sockets_write_guard = INITIALIZED_SOCKETS
.write()
.expect("Failed to acquire sockets db write guard");
if sockets_write_guard.contains(&socket_path) {
init_callback(Ok(socket_path.clone()));
return;
}

let (tx, rx) = std::sync::mpsc::channel();
let socket_path_cloned = socket_path.clone();
let init_callback_cloned = init_callback.clone();
let tx_cloned = tx.clone();
thread::Builder::new()
.name("socket_listener_thread".to_string())
.spawn(move || {
let runtime = Builder::new_current_thread().enable_all().build();
match runtime {
Ok(runtime) => {
let mut listener = Disposable::new(SocketListener::new(
socket_path.unwrap_or_else(get_socket_path),
));
runtime.block_on(listener.listen_on_socket(init_callback));
let init_result = {
let runtime = Builder::new_current_thread().enable_all().build();
if let Err(err) = runtime {
log_error(
"listen_on_socket",
format!("Error failed to create a new tokio thread: {err}"),
);
return Err(err);
}
Comment on lines +844 to 851
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let runtime = Builder::new_current_thread().enable_all().build();
if let Err(err) = runtime {
log_error(
"listen_on_socket",
format!("Error failed to create a new tokio thread: {err}"),
);
return Err(err);
}
let runtime = match Builder::new_current_thread().enable_all().build() {
Err(err) => {
log_error(
"listen_on_socket",
format!("Error failed to create a new tokio thread: {err}"),
);
return Err(err);
}
Ok(runtime) => runtime,
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see why its better

Err(err) => init_callback(Err(err.to_string())),

runtime.unwrap().block_on(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you accept my suggestion above, we can drop the unwrap() here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think your suggestion is a less elegant. unwrap() is legit, otherwise it would not be in the language or clippy would complain

let listener_socket = UnixListener::bind(socket_path_cloned.clone());
if let Err(err) = listener_socket {
log_error(
"listen_on_socket",
format!("Error failed to bind listening socket: {err}"),
);
return Err(err);
}
let listener_socket = listener_socket.unwrap();
Comment on lines +854 to +862
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let listener_socket = UnixListener::bind(socket_path_cloned.clone());
if let Err(err) = listener_socket {
log_error(
"listen_on_socket",
format!("Error failed to bind listening socket: {err}"),
);
return Err(err);
}
let listener_socket = listener_socket.unwrap();
let listener_socket = match UnixListener::bind(socket_path_cloned.clone()) {
Err(err) ==> {
log_error(
"listen_on_socket",
format!("Error failed to bind listening socket: {err}"),
);
return Err(err);
}
Ok(listener_socket) => listener_socket,
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


// signal initialization success
init_callback(Ok(socket_path_cloned.clone()));
let _ = tx.send(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason a tx.send will fail means that the receiving end has dropped - we should handle this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by handle ? we could log maybe, but its so unimportant I would not bother
Otherwise, it is safer to process to the accept loop since it might serve other clients


let local_set_pool = LocalPoolHandle::new(num_cpus::get());
loop {
match listener_socket.accept().await {
Ok((stream, _addr)) => {
local_set_pool
.spawn_pinned(move || listen_on_client_stream(stream));
}
Err(err) => {
log_error(
"listen_on_socket",
format!("Error accepting connection: {err}"),
);
break;
}
}
}
Comment on lines +870 to +883
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should consider making this method returning a Result.

This highlighted lines above can then become:

let stream = listener_socket.accept().await?.0;
local_set_pool.spawn_pinned(move || listen_on_client_stream(stream));

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt having an await? will make it impossible to log "error accepting connection"


// ensure socket file removal
drop(listener_socket);
let _ = std::fs::remove_file(socket_path_cloned.clone());

// no more listening on socket - update the sockets db
let mut sockets_write_guard = INITIALIZED_SOCKETS
.write()
.expect("Failed to acquire sockets db write guard");
sockets_write_guard.remove(&socket_path_cloned);
Ok(())
Comment on lines +886 to +894
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a new client will be created between the time you drop the listener_socket/when we break from the accept loop to the time we remove it from the sockets_write_guard, the new client would get that there's an existing socket listener and return its path although it isn't available anymore. how do we handle that? it would probably make the wrapper to fail trying to connect to a bad socket path

Copy link
Collaborator Author

@ikolomi ikolomi Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does not differ from the previous behavior, which I find odd, but I do not want to change the design at this time.
The weak point of the design is that the init callback is called upon the socket creation. Thus, if the loop terminates by failing to accept the connection - the callback might be called with a success but the connection will not be accepted.

With the previous implementation what you are describing might happen like this:

  1. client A is created, started thread_a, binding and accepting on the socket
  2. client B creation started, creating thread_b, which detects an existing socket which he could connect to
  3. thread_a failed to accept the connection and terminated
  4. client B init callback called with a success, thread_b does not continue (since it detected a connectable socket) but the creation eventually fails since the connection cannot be established

To summarize - the new implementation does not degrade, but even improves the situation by having the explicit remove_file and user-land socket db update upon the accept loop termination (in the orig implementation the socket file will remain causing the following creations to fail)

})
};

if let Err(err) = init_result {
init_callback_cloned(Err(err.to_string()));
let _ = tx_cloned.send(false);
}
Ok(())
})
.expect("Thread spawn failed. Cannot report error because callback was moved.");

// wait for thread initialization signaling, callback invocation is done in the thread
let _ = rx.recv().map(|res| {
if res {
sockets_write_guard.insert(socket_path);
}
});
}

/// Creates a new thread with a main loop task listening on the socket for new connections.
Expand All @@ -950,7 +918,7 @@ pub fn start_socket_listener_internal<InitCallback>(
/// * `init_callback` - called when the socket listener fails to initialize, with the reason for the failure.
pub fn start_socket_listener<InitCallback>(init_callback: InitCallback)
where
InitCallback: FnOnce(Result<String, String>) + Send + 'static,
InitCallback: FnOnce(Result<String, String>) + Send + Clone + 'static,
{
start_socket_listener_internal(init_callback, None);
}
15 changes: 10 additions & 5 deletions glide-core/tests/test_socket_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,10 @@ mod socket_listener {
#[rstest]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_working_after_socket_listener_was_dropped() {
let socket_path =
get_socket_path_from_name("test_working_after_socket_listener_was_dropped".to_string());
let socket_path = get_socket_path_from_name(format!(
"{}_test_working_after_socket_listener_was_dropped",
std::process::id()
));
close_socket(&socket_path);
// create a socket listener and drop it, to simulate a panic in a previous iteration.
Builder::new_current_thread()
Expand All @@ -528,6 +530,8 @@ mod socket_listener {
.unwrap()
.block_on(async {
let _ = UnixListener::bind(socket_path.clone()).unwrap();
// UDS sockets require explicit removal of the socket file
close_socket(&socket_path);
});

const CALLBACK_INDEX: u32 = 99;
Expand All @@ -554,9 +558,10 @@ mod socket_listener {
#[rstest]
#[timeout(SHORT_STANDALONE_TEST_TIMEOUT)]
fn test_multiple_listeners_competing_for_the_socket() {
let socket_path = get_socket_path_from_name(
"test_multiple_listeners_competing_for_the_socket".to_string(),
);
let socket_path = get_socket_path_from_name(format!(
"{}_test_multiple_listeners_competing_for_the_socket",
std::process::id()
));
close_socket(&socket_path);
let server = Arc::new(RedisServer::new(ServerType::Tcp { tls: false }));

Expand Down
Loading