From cf27151f243ddb2729844a3f951badd33734fb26 Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Fri, 31 May 2024 07:43:46 -0300 Subject: [PATCH] Listen for signals in the main task (#2086) There's no need to spawn a separate task for it, because as soon as the signals are registered, any received signals lead to events being sent to the streams. And since the channel used to notify that a shutdown signal was received is never dropped, the future waiting for a signal can just be polled directly. --- linera-service/src/linera/net_up_utils.rs | 41 +++++++++++------------ 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/linera-service/src/linera/net_up_utils.rs b/linera-service/src/linera/net_up_utils.rs index 2cedf761036..9648a513d20 100644 --- a/linera-service/src/linera/net_up_utils.rs +++ b/linera-service/src/linera/net_up_utils.rs @@ -1,7 +1,7 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::time::Duration; +use std::{future::Future, time::Duration}; use colored::Colorize as _; use linera_base::data_types::Amount; @@ -17,7 +17,7 @@ use linera_storage_service::{ child::{get_free_endpoint, StorageService}, common::get_service_storage_binary, }; -use tokio::{signal::unix, sync::mpsc}; +use tokio::signal::unix; use tracing::info; #[cfg(feature = "kubernetes")] use { @@ -42,8 +42,7 @@ pub async fn handle_net_up_kubernetes( panic!("The local test network must have at least one shard per validator."); } - let (shutdown_sender, mut shutdown_receiver) = mpsc::channel(1); - tokio::spawn(handle_signals(shutdown_sender)); + let shutdown_receiver = handle_signals(); let config = LocalKubernetesNetConfig { network: Network::Grpc, @@ -57,7 +56,7 @@ pub async fn handle_net_up_kubernetes( }; let (mut net, client1) = config.instantiate().await?; net_up(extra_wallets, &mut net, client1).await?; - listen_for_signals(&mut shutdown_receiver, &mut net).await + listen_for_signals(shutdown_receiver, &mut net).await } pub async fn handle_net_up_service( @@ -76,8 +75,7 @@ pub async fn handle_net_up_service( panic!("The local test network must have at least one shard per validator."); } - let (shutdown_sender, mut shutdown_receiver) = mpsc::channel(1); - tokio::spawn(handle_signals(shutdown_sender)); + let shutdown_receiver = handle_signals(); let tmp_dir = tempfile::tempdir()?; let path = tmp_dir.path(); @@ -107,10 +105,10 @@ pub async fn handle_net_up_service( }; let (mut net, client1) = config.instantiate().await?; net_up(extra_wallets, &mut net, client1).await?; - listen_for_signals(&mut shutdown_receiver, &mut net).await + listen_for_signals(shutdown_receiver, &mut net).await } -async fn handle_signals(shutdown_sender: mpsc::Sender<()>) { +fn handle_signals() -> impl Future { let mut sigint = unix::signal(unix::SignalKind::interrupt()).expect("Failed to set up SIGINT handler"); let mut sigterm = @@ -120,25 +118,24 @@ async fn handle_signals(shutdown_sender: mpsc::Sender<()>) { let mut sighup = unix::signal(unix::SignalKind::hangup()).expect("Failed to set up SIGHUP handler"); - tokio::select! { - _ = sigint.recv() => (), - _ = sigterm.recv() => (), - _ = sigpipe.recv() => (), - _ = sighup.recv() => (), + async move { + tokio::select! { + _ = sigint.recv() => (), + _ = sigterm.recv() => (), + _ = sigpipe.recv() => (), + _ = sighup.recv() => (), + } } - - let _ = shutdown_sender.send(()).await; } async fn listen_for_signals( - shutdown_receiver: &mut tokio::sync::mpsc::Receiver<()>, + shutdown_receiver: impl Future, net: &mut impl LineraNet, ) -> anyhow::Result<()> { - if shutdown_receiver.recv().await.is_some() { - eprintln!("\nTerminating the local test network"); - net.terminate().await?; - eprintln!("\nDone."); - } + shutdown_receiver.await; + eprintln!("\nTerminating the local test network"); + net.terminate().await?; + eprintln!("\nDone."); Ok(()) }