Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glide-core UDS Socket Handling Rework #2482

Open
wants to merge 1 commit into
base: release-1.2
Choose a base branch
from
Open

Conversation

ikolomi
Copy link
Collaborator

@ikolomi ikolomi commented Oct 21, 2024

1.Introduced a user-land mechanism for ensuring singleton behavior of the socket, rather than relying on OS-specific semantics. This addresses the issue where macOS and Linux report different errors when the socket path already exists.

2.Simplified the implementation by removing unnecessary abstractions, including redundant connection retry logic.

Issue link

This Pull Request is linked to issue (URL): [https://github.com//issues/2433]

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Commits will be squashed upon merging.

@ikolomi ikolomi added bug Something isn't working Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests. labels Oct 21, 2024
@ikolomi ikolomi added this to the 1.2 milestone Oct 21, 2024
@ikolomi ikolomi requested a review from a team as a code owner October 21, 2024 09:42
1.Introduced a user-land mechanism for ensuring singleton behavior of the socket, rather than relying on OS-specific semantics. This addresses the issue where macOS and Linux report different errors when the socket path already exists.

2.Simplified the implementation by removing unnecessary abstractions, including redundant connection retry logic.

Signed-off-by: ikolomi <[email protected]>
{
static INITIALIZED_SOCKETS: Lazy<Arc<RwLock<HashSet<String>>>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the Arc?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it is needed because the lock is shared between the threads. Will check


{
// Optimize for already initialized
let initialized_sockets = INITIALIZED_SOCKETS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:

Can we have more than 1 socket? (looking at the socket path it is hard coded to:

let socket_name = format!("{}-{}", SOCKET_FILE_NAME, std::process::id());

So it seems 1 per process (unless it is needed for testing purposes)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its odd but multiple calls with different socket_path are allowed, leading to multiple UDS sockets

let socket_path = socket_path.unwrap_or_else(get_socket_path);

{
// Optimize for already initialized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think its needed since this method is mostly called once in the lifetime of a client creation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not true. multiple sockets are allowed (dont want to change it )

Comment on lines +844 to 851
let runtime = Builder::new_current_thread().enable_all().build();
if let Err(err) = runtime {
log_error(
"listen_on_socket",
format!("Error failed to create a new tokio thread: {err}"),
);
return Err(err);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let runtime = Builder::new_current_thread().enable_all().build();
if let Err(err) = runtime {
log_error(
"listen_on_socket",
format!("Error failed to create a new tokio thread: {err}"),
);
return Err(err);
}
let runtime = match Builder::new_current_thread().enable_all().build() {
Err(err) => {
log_error(
"listen_on_socket",
format!("Error failed to create a new tokio thread: {err}"),
);
return Err(err);
}
Ok(runtime) => runtime,
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see why its better

}
Err(err) => init_callback(Err(err.to_string())),

runtime.unwrap().block_on(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you accept my suggestion above, we can drop the unwrap() here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think your suggestion is a less elegant. unwrap() is legit, otherwise it would not be in the language or clippy would complain

Comment on lines +854 to +862
let listener_socket = UnixListener::bind(socket_path_cloned.clone());
if let Err(err) = listener_socket {
log_error(
"listen_on_socket",
format!("Error failed to bind listening socket: {err}"),
);
return Err(err);
}
let listener_socket = listener_socket.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let listener_socket = UnixListener::bind(socket_path_cloned.clone());
if let Err(err) = listener_socket {
log_error(
"listen_on_socket",
format!("Error failed to bind listening socket: {err}"),
);
return Err(err);
}
let listener_socket = listener_socket.unwrap();
let listener_socket = match UnixListener::bind(socket_path_cloned.clone()) {
Err(err) ==> {
log_error(
"listen_on_socket",
format!("Error failed to bind listening socket: {err}"),
);
return Err(err);
}
Ok(listener_socket) => listener_socket,
};

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above


// signal initialization success
init_callback(Ok(socket_path_cloned.clone()));
let _ = tx.send(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only reason a tx.send will fail means that the receiving end has dropped - we should handle this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by handle ? we could log maybe, but its so unimportant I would not bother
Otherwise, it is safer to process to the accept loop since it might serve other clients

@@ -924,23 +806,109 @@ pub fn start_socket_listener_internal<InitCallback>(
init_callback: InitCallback,
socket_path: Option<String>,
) where
InitCallback: FnOnce(Result<String, String>) + Send + 'static,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should return a Result. This will reduce the code below by 1/2

Comment on lines +870 to +883
match listener_socket.accept().await {
Ok((stream, _addr)) => {
local_set_pool
.spawn_pinned(move || listen_on_client_stream(stream));
}
Err(err) => {
log_error(
"listen_on_socket",
format!("Error accepting connection: {err}"),
);
break;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should consider making this method returning a Result.

This highlighted lines above can then become:

let stream = listener_socket.accept().await?.0;
local_set_pool.spawn_pinned(move || listen_on_client_stream(stream));

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isnt having an await? will make it impossible to log "error accepting connection"

Comment on lines +886 to +894
drop(listener_socket);
let _ = std::fs::remove_file(socket_path_cloned.clone());

// no more listening on socket - update the sockets db
let mut sockets_write_guard = INITIALIZED_SOCKETS
.write()
.expect("Failed to acquire sockets db write guard");
sockets_write_guard.remove(&socket_path_cloned);
Ok(())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a new client will be created between the time you drop the listener_socket/when we break from the accept loop to the time we remove it from the sockets_write_guard, the new client would get that there's an existing socket listener and return its path although it isn't available anymore. how do we handle that? it would probably make the wrapper to fail trying to connect to a bad socket path

Copy link
Collaborator Author

@ikolomi ikolomi Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does not differ from the previous behavior, which I find odd, but I do not want to change the design at this time.
The weak point of the design is that the init callback is called upon the socket creation. Thus, if the loop terminates by failing to accept the connection - the callback might be called with a success but the connection will not be accepted.

With the previous implementation what you are describing might happen like this:

  1. client A is created, started thread_a, binding and accepting on the socket
  2. client B creation started, creating thread_b, which detects an existing socket which he could connect to
  3. thread_a failed to accept the connection and terminated
  4. client B init callback called with a success, thread_b does not continue (since it detected a connectable socket) but the creation eventually fails since the connection cannot be established

To summarize - the new implementation does not degrade, but even improves the situation by having the explicit remove_file and user-land socket db update upon the accept loop termination (in the orig implementation the socket file will remain causing the following creations to fail)

@Yury-Fridlyand
Copy link
Collaborator

Add a changelog please

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Core changes Used to label a PR as PR with significant changes that should trigger a full matrix tests.
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

4 participants