Skip to content

Commit

Permalink
fix docs + implement ungraceful varients of the byte-transport services
Browse files Browse the repository at this point in the history
  • Loading branch information
glendc committed Jul 22, 2023
1 parent cdfac6b commit 30e715b
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 192 deletions.
31 changes: 20 additions & 11 deletions src/transport/bytes/service/echo.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
use std::io::{Error, ErrorKind, Result};
use std::io::{Error, ErrorKind};

use tower_async::Service;

use crate::transport::{bytes::ByteStream, connection::Connection};

async fn echo<T, B>(conn: Connection<B, T>) -> Result<u64>
/// Creates an async service which echoes the incoming bytes back on the same connection,
/// and which respects the graceful shutdown, by shutting down the connection when requested.
pub fn echo_service<B, T>() -> impl Service<Connection<B, T>>
where
B: ByteStream,
{
let (socket, token, _) = conn.into_parts();
let (mut reader, mut writer) = tokio::io::split(socket);
tokio::select! {
_ = token.shutdown() => Err(Error::new(ErrorKind::Interrupted, "echo: graceful shutdown requested")),
res = tokio::io::copy(&mut reader, &mut writer) => res,
}
crate::transport::connection::service_fn(|conn: Connection<B, T>| async {
let (socket, token, _) = conn.into_parts();
let (mut reader, mut writer) = tokio::io::split(socket);
tokio::select! {
_ = token.shutdown() => Err(Error::new(ErrorKind::Interrupted, "echo: graceful shutdown requested")),
res = tokio::io::copy(&mut reader, &mut writer) => res,
}
})
}

/// Crates an async service which echoes the incoming bytes back on the same connection.
pub fn echo_service<T, B>() -> impl Service<Connection<B, T>>
/// Creates an async service which echoes the incoming bytes back on the same connection,
/// and which does not respect the graceful shutdown, by not shutting down the connection when requested,
/// and instead keeps echoing bytes until the connection is closed or other error.
pub fn echo_service_ungraceful<B, T>() -> impl Service<Connection<B, T>>
where
B: ByteStream,
{
crate::transport::connection::service_fn(echo)
crate::transport::connection::service_fn(|stream: B| async {
let (mut reader, mut writer) = tokio::io::split(stream);
tokio::io::copy(&mut reader, &mut writer).await
})
}
50 changes: 47 additions & 3 deletions src/transport/bytes/service/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,48 @@ use crate::transport::{bytes::ByteStream, connection::Connection};
/// Crates an async service which forwards the incoming connection bytes to the given destination,
/// and forwards the response back from the destination to the incoming connection.
#[derive(Debug)]
pub struct ForwardService<D> {
pub struct ForwardService<D, K> {
destination: Pin<Box<D>>,
_kind: std::marker::PhantomData<K>,
}

impl<D> ForwardService<D> {
mod marker {
/// Marker type for the graceful variant of [`super::ForwardService`].
pub(super) struct Graceful;
/// Marker type for the ungraceful variant of [`super::ForwardService`].
pub(super) struct Ungraceful;
}

impl<D> ForwardService<D, marker::Graceful> {
/// Creates a new [`ForwardService`] which respects the graceful shutdown,
/// by being an alias of [`ForwardService::graceful`].
pub fn new(destination: D) -> Self {
Self::graceful(destination)
}

/// Creates a new [`ForwardService`] which respects the graceful shutdown,
/// and stops bidirectionally copying bytes as soon as the shutdown is requested.
pub fn graceful(destination: D) -> Self {
ForwardService {
destination: Box::pin(destination),
_kind: std::marker::PhantomData,
}
}
}

impl<T, S, D> Service<Connection<S, T>> for ForwardService<D>
impl<D> ForwardService<D, marker::Ungraceful> {
/// Creates a new [`ForwardService`] which does not respect the graceful shutdown,
/// and keeps bidirectionally copying bytes until the connection is closed or other error,
/// even if the shutdown was requested already way before.
pub fn ungraceful(destination: D) -> Self {
ForwardService {
destination: Box::pin(destination),
_kind: std::marker::PhantomData,
}
}
}

impl<T, S, D> Service<Connection<S, T>> for ForwardService<D, marker::Graceful>
where
S: ByteStream,
D: ByteStream,
Expand All @@ -39,3 +68,18 @@ where
}
}
}

impl<T, S, D> Service<Connection<S, T>> for ForwardService<D, marker::Ungraceful>
where
S: ByteStream,
D: ByteStream,
{
type Response = (u64, u64);
type Error = Error;

async fn call(&mut self, conn: Connection<S, T>) -> Result<Self::Response, Self::Error> {
let (source, _, _) = conn.into_parts();
tokio::pin!(source);
tokio::io::copy_bidirectional(&mut source, &mut self.destination).await
}
}
2 changes: 1 addition & 1 deletion src/transport/bytes/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Useful for testing and very specific purposes.

mod echo;
pub use echo::echo_service;
pub use echo::{echo_service, echo_service_ungraceful};

mod forward;
pub use forward::ForwardService;
4 changes: 2 additions & 2 deletions src/transport/tcp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Provides the Tcp transport functionality
//! Provides the Tcp transport functionality
//! for Rama, which at the very least will be used
//! as the entrypoint of pretty much any Rama server.
//!
//!
//! See [`server`] for more information about the functionalities
//! provided for the server side of the Tcp transport.

Expand Down
177 changes: 6 additions & 171 deletions src/transport/tcp/server/listener.rs
Original file line number Diff line number Diff line change
@@ -1,176 +1,11 @@
use std::{
net::{TcpListener as StdTcpListener, ToSocketAddrs},
time::Duration, future::Future, convert::Infallible,
};

use tokio::net::TcpListener;

use crate::{transport::graceful, service::{Service, ServiceFactory}};

mod marker {
#[derive(Debug)]
pub(super) struct None;

#[derive(Debug)]
pub(super) struct Some<T>(pub(super) T);
}

#[derive(Debug)]
pub struct Listener<F, State> {
tcp: TcpListener,
service_factory: F,
shutdown_timeout: Option<Duration>,
graceful: graceful::GracefulService,
state: State,
}

#[derive(Debug)]
pub struct Builder<I, G, S> {
incoming: I,
graceful: G,
state: S,
}
//! TCP server listener, TODO.

/// TCP server listener, TODO.
#[derive(Debug)]
pub struct SocketConfig<L> {
listener: L,
ttl: Option<u32>,
}
pub struct TcpListener;

#[derive(Debug)]
pub struct GracefulConfig<S> {
shutdown: S,
timeout: Option<Duration>,
}

pub trait IntoTcpListener {
fn into_tcp_listener(self) -> Result<TcpListener, std::io::Error>;
}

impl IntoTcpListener for TcpListener {
fn into_tcp_listener(self) -> Result<TcpListener, std::io::Error> {
Ok(self)
impl TcpListener {
pub async fn listen() {
todo!();
}
}

impl IntoTcpListener for StdTcpListener {
fn into_tcp_listener(self) -> Result<TcpListener, std::io::Error> {
TcpListener::from_std(self)
}
}

impl Listener<marker::None, marker::None> {
pub fn bind<A: ToSocketAddrs>(addr: A) -> Builder<SocketConfig<TcpListener>, marker::None, marker::None> {
match Self::try_bind(addr) {
Ok(incoming) => incoming,
Err(err) => panic!("failed to bind tcp listener: {}", err),
}
}

pub fn try_bind<A: ToSocketAddrs>(
addr: A,
) -> Result<Builder<SocketConfig<TcpListener>, marker::None, marker::None>, std::io::Error> {
let incoming = StdTcpListener::bind(addr)?;
incoming.set_nonblocking(true)?;
Self::build(incoming)
}

pub fn build(incoming: impl IntoTcpListener) -> Result<Builder<SocketConfig<TcpListener>, marker::None, marker::None>, std::io::Error> {
let listener = incoming.into_tcp_listener()?;
Ok(Builder::new(listener, marker::None, marker::None))
}
}

impl<G, S> Builder<SocketConfig<TcpListener>, G, S> {
/// Create a new `Builder` with the specified address.
fn new(listener: TcpListener, graceful: G, state: S) -> Self {
Self {
incoming: SocketConfig {
listener,
ttl: None,
},
graceful,
state,
}
}

/// Set the value of `IP_TTL` option for accepted connections.
///
/// If `None` is specified, ttl is not explicitly set.
pub fn ttl(mut self, ttl: Option<u32>) -> Self {
self.incoming.ttl = ttl;
self
}
}

impl<I, State> Builder<I, marker::None, State> {
/// Upgrade the builder to one which builds
/// a graceful TCP listener which will shutdown once the given future resolves.
pub fn graceful<S: Future<Output = ()>>(
self,
shutdown: S,
) -> Builder<I, GracefulConfig<S>, State> {
Builder {
incoming: self.incoming,
graceful: GracefulConfig {
shutdown,
timeout: None,
},
state: self.state,
}
}

/// Upgrade the builder to one which builds
/// a graceful TCP listener which will shutdown once the "ctrl+c" signal is received (SIGINT).
pub fn graceful_ctrl_c(self) -> Builder<I, GracefulConfig<impl Future<Output = ()>>, State> {
self.graceful(async {
let _ = tokio::signal::ctrl_c().await;
})
}
}

impl<I, G> Builder<I, G, marker::None> {
pub fn state<S>(self, state: S) -> Builder<I, G, marker::Some<S>> {
Builder {
incoming: self.incoming,
graceful: self.graceful,
state: marker::Some(state),
}
}
}

impl<I, S, State> Builder<I, GracefulConfig<S>, State> {
/// Set the timeout for graceful shutdown.
///
/// If `None` is specified, the default timeout is used.
pub fn timeout(mut self, timeout: Option<Duration>) -> Self {
self.graceful.timeout = timeout;
self
}
}

impl<S, State> Builder<SocketConfig<TcpListener>, GracefulConfig<S>, State>
where
S: Future + Send + 'static,
{
pub async fn serve<F>(self, service_factory: F) -> Result<(), Box<dyn std::error::Error>>
where
F: ServiceFactory<State>,
F::Service: Service<State> + Send + 'static,
{
// create and configure the tcp listener...
let listener = self.incoming.listener;
if let Some(ttl) = self.incoming.ttl {
listener.set_ttl(ttl)?;
}
// listen gracefully..
Listener::new(
listener,
service_factory,
self.graceful.shutdown,
self.graceful.timeout,
)
.serve()
.await
}
}
Loading

0 comments on commit 30e715b

Please sign in to comment.