diff --git a/Cargo.toml b/Cargo.toml index 42fd3745..cf3e1017 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,5 +22,6 @@ tower-async = { version = "0.1" } tracing = "0.1" [dev-dependencies] +anyhow = "1.0" tokio-test = "0.4" tracing-subscriber = "0.3" diff --git a/src/transport/tcp/server/listener.rs b/src/transport/tcp/server/listener.rs index 563f2133..4afe0d84 100644 --- a/src/transport/tcp/server/listener.rs +++ b/src/transport/tcp/server/listener.rs @@ -15,7 +15,7 @@ use crate::transport::{connection::Connection, graceful}; /// Listens to incoming TCP connections and serves them with a [`tower_async::Service`]. /// /// That [`tower_async::Service`] is created by a [`tower_async::Service`] for each incoming connection. -/// +/// /// [`tower_async::Service`]: https://docs.rs/tower-async/*/tower_async/trait.Service.html #[derive(Debug)] pub struct TcpListener { @@ -74,7 +74,7 @@ impl TryFrom impl TcpListener { /// Sets a state for the [`TcpListener`], /// which will be passed to the [`tower_async::Service`] for each incoming connection. - /// + /// /// [`tower_async::Service`]: https://docs.rs/tower-async/*/tower_async/trait.Service.html pub fn state(self, state: S) -> TcpListener where @@ -199,7 +199,7 @@ mod private { use crate::transport::tcp::server::error::{Error, ErrorHandler, ErrorKind}; - #[derive(Debug)] + #[derive(Debug, Clone, Copy, Default)] pub(super) struct NoState; #[derive(Debug, Clone, Copy, Default)] diff --git a/src/transport/tcp/server/listener_backup.rs b/src/transport/tcp/server/listener_backup.rs deleted file mode 100644 index 5c782e5f..00000000 --- a/src/transport/tcp/server/listener_backup.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::{ - net::{TcpListener as StdTcpListener, ToSocketAddrs}, - time::Duration, future::Future, convert::Infallible, -}; - -use tokio::net::TcpListener; - -use crate::{transport::graceful, service::{Service, ServiceFactory}}; - -mod marker { - #[derive(Debug)] - pub(super) struct None; - - #[derive(Debug)] - pub(super) struct Some(pub(super) T); -} - -#[derive(Debug)] -pub struct Listener { - tcp: TcpListener, - service_factory: F, - shutdown_timeout: Option, - graceful: graceful::GracefulService, - state: State, -} - -#[derive(Debug)] -pub struct Builder { - incoming: I, - graceful: G, - state: S, -} - -#[derive(Debug)] -pub struct SocketConfig { - listener: L, - ttl: Option, -} - -#[derive(Debug)] -pub struct GracefulConfig { - shutdown: S, - timeout: Option, -} - -pub trait IntoTcpListener { - fn into_tcp_listener(self) -> Result; -} - -impl IntoTcpListener for TcpListener { - fn into_tcp_listener(self) -> Result { - Ok(self) - } -} - -impl IntoTcpListener for StdTcpListener { - fn into_tcp_listener(self) -> Result { - TcpListener::from_std(self) - } -} - -impl Listener { - pub fn bind(addr: A) -> Builder, marker::None, marker::None> { - match Self::try_bind(addr) { - Ok(incoming) => incoming, - Err(err) => panic!("failed to bind tcp listener: {}", err), - } - } - - pub fn try_bind( - addr: A, - ) -> Result, marker::None, marker::None>, std::io::Error> { - let incoming = StdTcpListener::bind(addr)?; - incoming.set_nonblocking(true)?; - Self::build(incoming) - } - - pub fn build(incoming: impl IntoTcpListener) -> Result, marker::None, marker::None>, std::io::Error> { - let listener = incoming.into_tcp_listener()?; - Ok(Builder::new(listener, marker::None, marker::None)) - } -} - -impl Builder, G, S> { - /// Create a new `Builder` with the specified address. - fn new(listener: TcpListener, graceful: G, state: S) -> Self { - Self { - incoming: SocketConfig { - listener, - ttl: None, - }, - graceful, - state, - } - } - - /// Set the value of `IP_TTL` option for accepted connections. - /// - /// If `None` is specified, ttl is not explicitly set. - pub fn ttl(mut self, ttl: Option) -> Self { - self.incoming.ttl = ttl; - self - } -} - -impl Builder { - /// Upgrade the builder to one which builds - /// a graceful TCP listener which will shutdown once the given future resolves. - pub fn graceful>( - self, - shutdown: S, - ) -> Builder, State> { - Builder { - incoming: self.incoming, - graceful: GracefulConfig { - shutdown, - timeout: None, - }, - state: self.state, - } - } - - /// Upgrade the builder to one which builds - /// a graceful TCP listener which will shutdown once the "ctrl+c" signal is received (SIGINT). - pub fn graceful_ctrl_c(self) -> Builder>, State> { - self.graceful(async { - let _ = tokio::signal::ctrl_c().await; - }) - } -} - -impl Builder { - pub fn state(self, state: S) -> Builder> { - Builder { - incoming: self.incoming, - graceful: self.graceful, - state: marker::Some(state), - } - } -} - -impl Builder, State> { - /// Set the timeout for graceful shutdown. - /// - /// If `None` is specified, the default timeout is used. - pub fn timeout(mut self, timeout: Option) -> Self { - self.graceful.timeout = timeout; - self - } -} - -impl Builder, GracefulConfig, State> -where - S: Future + Send + 'static, -{ - pub async fn serve(self, service_factory: F) -> Result<(), Box> - where - F: ServiceFactory, - F::Service: Service + Send + 'static, - { - // create and configure the tcp listener... - let listener = self.incoming.listener; - if let Some(ttl) = self.incoming.ttl { - listener.set_ttl(ttl)?; - } - // listen gracefully.. - Listener::new( - listener, - service_factory, - self.graceful.shutdown, - self.graceful.timeout, - ) - .serve() - .await - } -} \ No newline at end of file diff --git a/src/transport/tcp/server/mod.rs b/src/transport/tcp/server/mod.rs index 96d37c92..c57654f9 100644 --- a/src/transport/tcp/server/mod.rs +++ b/src/transport/tcp/server/mod.rs @@ -3,6 +3,7 @@ //! as the entrypoint of pretty much any Rama server. pub mod error; +pub mod factory; mod listener; pub use listener::*;