Skip to content

Commit

Permalink
Listen for signals in the main task (#2086)
Browse files Browse the repository at this point in the history
There's no need to spawn a separate task for it, because as soon as the
signals are registered, any received signals lead to events being sent
to the streams. And since the channel used to notify that a shutdown
signal was received is never dropped, the future waiting for a signal
can just be polled directly.
  • Loading branch information
jvff authored May 31, 2024
1 parent a79bd1c commit cf27151
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions linera-service/src/linera/net_up_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::time::Duration;
use std::{future::Future, time::Duration};

use colored::Colorize as _;
use linera_base::data_types::Amount;
Expand All @@ -17,7 +17,7 @@ use linera_storage_service::{
child::{get_free_endpoint, StorageService},
common::get_service_storage_binary,
};
use tokio::{signal::unix, sync::mpsc};
use tokio::signal::unix;
use tracing::info;
#[cfg(feature = "kubernetes")]
use {
Expand All @@ -42,8 +42,7 @@ pub async fn handle_net_up_kubernetes(
panic!("The local test network must have at least one shard per validator.");
}

let (shutdown_sender, mut shutdown_receiver) = mpsc::channel(1);
tokio::spawn(handle_signals(shutdown_sender));
let shutdown_receiver = handle_signals();

let config = LocalKubernetesNetConfig {
network: Network::Grpc,
Expand All @@ -57,7 +56,7 @@ pub async fn handle_net_up_kubernetes(
};
let (mut net, client1) = config.instantiate().await?;
net_up(extra_wallets, &mut net, client1).await?;
listen_for_signals(&mut shutdown_receiver, &mut net).await
listen_for_signals(shutdown_receiver, &mut net).await
}

pub async fn handle_net_up_service(
Expand All @@ -76,8 +75,7 @@ pub async fn handle_net_up_service(
panic!("The local test network must have at least one shard per validator.");
}

let (shutdown_sender, mut shutdown_receiver) = mpsc::channel(1);
tokio::spawn(handle_signals(shutdown_sender));
let shutdown_receiver = handle_signals();

let tmp_dir = tempfile::tempdir()?;
let path = tmp_dir.path();
Expand Down Expand Up @@ -107,10 +105,10 @@ pub async fn handle_net_up_service(
};
let (mut net, client1) = config.instantiate().await?;
net_up(extra_wallets, &mut net, client1).await?;
listen_for_signals(&mut shutdown_receiver, &mut net).await
listen_for_signals(shutdown_receiver, &mut net).await
}

async fn handle_signals(shutdown_sender: mpsc::Sender<()>) {
fn handle_signals() -> impl Future<Output = ()> {
let mut sigint =
unix::signal(unix::SignalKind::interrupt()).expect("Failed to set up SIGINT handler");
let mut sigterm =
Expand All @@ -120,25 +118,24 @@ async fn handle_signals(shutdown_sender: mpsc::Sender<()>) {
let mut sighup =
unix::signal(unix::SignalKind::hangup()).expect("Failed to set up SIGHUP handler");

tokio::select! {
_ = sigint.recv() => (),
_ = sigterm.recv() => (),
_ = sigpipe.recv() => (),
_ = sighup.recv() => (),
async move {
tokio::select! {
_ = sigint.recv() => (),
_ = sigterm.recv() => (),
_ = sigpipe.recv() => (),
_ = sighup.recv() => (),
}
}

let _ = shutdown_sender.send(()).await;
}

async fn listen_for_signals(
shutdown_receiver: &mut tokio::sync::mpsc::Receiver<()>,
shutdown_receiver: impl Future<Output = ()>,
net: &mut impl LineraNet,
) -> anyhow::Result<()> {
if shutdown_receiver.recv().await.is_some() {
eprintln!("\nTerminating the local test network");
net.terminate().await?;
eprintln!("\nDone.");
}
shutdown_receiver.await;
eprintln!("\nTerminating the local test network");
net.terminate().await?;
eprintln!("\nDone.");

Ok(())
}
Expand Down

0 comments on commit cf27151

Please sign in to comment.