From d727dad5c594582c7811830dce219bad7ec30359 Mon Sep 17 00:00:00 2001 From: Sander Bosma Date: Mon, 20 Mar 2023 15:34:57 +0100 Subject: [PATCH] fix: runner: reconnect after parachain connection error --- Cargo.lock | 10 ++--- runner/src/main.rs | 15 ++----- runner/src/runner.rs | 104 ++++++++++++++++++++++++------------------- 3 files changed, 68 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67dced928..c28defba4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15984,12 +15984,12 @@ dependencies = [ "pkg-config", ] -[[patch.unused]] -name = "sp-serializer" -version = "4.0.0-dev" -source = "git+https://github.com/paritytech//substrate?branch=polkadot-v0.9.37#f38bd6671d460293c93062cc1e4fe9e9e490cb29" - [[patch.unused]] name = "orml-xcm" version = "0.4.1-dev" source = "git+https://github.com/open-web3-stack//open-runtime-module-library?rev=24f0a8b6e04e1078f70d0437fb816337cdf4f64c#24f0a8b6e04e1078f70d0437fb816337cdf4f64c" + +[[patch.unused]] +name = "sp-serializer" +version = "4.0.0-dev" +source = "git+https://github.com/paritytech//substrate?branch=polkadot-v0.9.37#f38bd6671d460293c93062cc1e4fe9e9e490cb29" diff --git a/runner/src/main.rs b/runner/src/main.rs index 2a31ad7a8..f9d2c7897 100644 --- a/runner/src/main.rs +++ b/runner/src/main.rs @@ -5,13 +5,12 @@ use clap::Parser; use error::Error; -use futures::{FutureExt, TryFutureExt}; use runner::ClientType; use signal_hook::consts::*; use signal_hook_tokio::Signals; use std::{fmt::Debug, path::PathBuf}; -use crate::runner::{retry_with_log_async, subxt_api, Runner}; +use crate::runner::Runner; #[derive(Parser, Debug, Clone)] #[clap(version, author, about, trailing_var_arg = true)] @@ -38,15 +37,9 @@ async fn main() -> Result<(), Error> { env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log::LevelFilter::Info.as_str()), ); let opts: Opts = Opts::parse(); - let rpc_client = retry_with_log_async( - || subxt_api(&opts.parachain_ws).into_future().boxed(), - "Error fetching executable".to_string(), - ) - .await?; - log::info!("Connected to the parachain"); - - let runner = Runner::new(rpc_client, opts); - let shutdown_signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?; + + let runner = Runner::new(opts); + let shutdown_signals = Signals::new(&[SIGHUP, SIGTERM, SIGINT, SIGQUIT])?; Runner::run(Box::new(runner), shutdown_signals).await?; Ok(()) } diff --git a/runner/src/runner.rs b/runner/src/runner.rs index b7429bd4b..2b8d6f524 100644 --- a/runner/src/runner.rs +++ b/runner/src/runner.rs @@ -68,7 +68,7 @@ pub const PARACHAIN_MODULE: &str = "ClientsInfo"; pub const CURRENT_RELEASES_STORAGE_ITEM: &str = "CurrentClientReleases"; /// Parachain block time -pub const BLOCK_TIME: Duration = Duration::from_secs(6); +pub const BLOCK_TIME: Duration = Duration::from_secs(12); /// Timeout used by the retry utilities: One minute pub const RETRY_TIMEOUT: Duration = Duration::from_millis(60_000); @@ -110,7 +110,7 @@ fn sha256sum(bytes: &[u8]) -> Vec { /// Per-network manager of the client executable pub struct Runner { /// `subxt` api to the parachain - subxt_api: OnlineClient, + subxt_api: Option>, /// The child process (interbtc client) spawned by this runner child_proc: Option, /// Details about the currently run release @@ -120,9 +120,9 @@ pub struct Runner { } impl Runner { - pub fn new(subxt_api: OnlineClient, opts: Opts) -> Self { + pub fn new(opts: Opts) -> Self { Self { - subxt_api, + subxt_api: None, child_proc: None, downloaded_release: None, opts, @@ -252,17 +252,16 @@ impl Runner { Ok(pid) } - async fn try_get_release(runner: &T) -> Result, Error> { - retry_with_log_async( - || { - runner - .read_chain_storage::(runner.subxt_api()) - .into_future() - .boxed() - }, - "Error fetching executable".to_string(), - ) - .await + /// A single attempt to the the release + async fn try_get_release(runner: &mut T) -> Result, Error> { + let api = runner.subxt_api().await?; + match runner.read_chain_storage::(&api).into_future().await { + Ok(x) => Ok(x), + Err(err) => { + runner.drop_subxt_api(); // we might have lost connection. Rebuild connection next time + Err(err) + } + } } /// Read parachain storage via an RPC call, and decode the result @@ -326,26 +325,32 @@ impl Runner { loop { runner.maybe_restart_client()?; - if let Some(new_release) = runner.try_get_release().await? { - let maybe_downloaded_release = runner.downloaded_release(); - let downloaded_release = maybe_downloaded_release.as_ref().ok_or(Error::NoDownloadedRelease)?; - if new_release.checksum != downloaded_release.checksum { - log::info!("Found new client release, updating..."); - - // Wait for child process to finish completely. - // To ensure there can't be two client processes using the same resources (such as the Bitcoin - // wallet for vaults). - runner.terminate_proc_and_wait()?; - - // Delete old release - runner.delete_downloaded_release()?; - - // Download new release - runner.download_binary(new_release).await?; - - // Run the downloaded release - runner.run_binary()?; + match runner.try_get_release().await { + Ok(Some(new_release)) => { + let maybe_downloaded_release = runner.downloaded_release(); + let downloaded_release = maybe_downloaded_release.as_ref().ok_or(Error::NoDownloadedRelease)?; + if new_release.checksum != downloaded_release.checksum { + log::info!("Found new client release, updating..."); + + // Wait for child process to finish completely. + // To ensure there can't be two client processes using the same resources (such as the Bitcoin + // wallet for vaults). + runner.terminate_proc_and_wait()?; + + // Delete old release + runner.delete_downloaded_release()?; + + // Download new release + runner.download_binary(new_release).await?; + + // Run the downloaded release + runner.run_binary()?; + } } + Err(err) => { + log::warn!("Failed to get new release: {err:?}"); + } + _ => {} // no new release } tokio::time::sleep(BLOCK_TIME).await; } @@ -407,7 +412,7 @@ impl Drop for Runner { #[async_trait] pub trait RunnerExt { - fn subxt_api(&self) -> &OnlineClient; + async fn subxt_api(&mut self) -> Result, Error>; fn client_args(&self) -> &Vec; fn child_proc(&mut self) -> &mut Option; fn set_child_proc(&mut self, child_proc: Option); @@ -417,7 +422,7 @@ pub trait RunnerExt { fn parachain_url(&self) -> String; fn client_type(&self) -> ClientType; /// Read the current client release from the parachain, retrying for `RETRY_TIMEOUT` if there is a network error. - async fn try_get_release(&self) -> Result, Error>; + async fn try_get_release(&mut self) -> Result, Error>; /// Download the client binary and make it executable, retrying for `RETRY_TIMEOUT` if there is a network error. async fn download_binary(&mut self, release: ClientRelease) -> Result<(), Error>; /// Convert a release URI (e.g. a GitHub link) to an executable name and OS path (after download) @@ -441,12 +446,20 @@ pub trait RunnerExt { fn maybe_restart_client(&mut self) -> Result<(), Error>; /// If a client binary exists on disk, load it. fn try_load_downloaded_binary(&mut self, release: &ClientRelease) -> Result<(), Error>; + /// Remove the subxt api from cache. + fn drop_subxt_api(&mut self); } #[async_trait] impl RunnerExt for Runner { - fn subxt_api(&self) -> &OnlineClient { - &self.subxt_api + async fn subxt_api(&mut self) -> Result, Error> { + if let Some(ref subxt_api) = self.subxt_api { + Ok(subxt_api.clone()) + } else { + let client = OnlineClient::from_url(&self.opts.parachain_ws).await?; + self.subxt_api = Some(client.clone()); + Ok(client) + } } fn client_args(&self) -> &Vec { @@ -481,7 +494,7 @@ impl RunnerExt for Runner { self.opts.client_type.clone() } - async fn try_get_release(&self) -> Result, Error> { + async fn try_get_release(&mut self) -> Result, Error> { Runner::try_get_release(self).await } @@ -532,6 +545,10 @@ impl RunnerExt for Runner { fn try_load_downloaded_binary(&mut self, release: &ClientRelease) -> Result<(), Error> { Runner::try_load_downloaded_binary(self, release) } + + fn drop_subxt_api(&mut self) { + self.subxt_api = None; + } } #[async_trait] @@ -552,10 +569,6 @@ impl StorageReader for Runner { } } -pub async fn subxt_api(url: &str) -> Result, Error> { - Ok(OnlineClient::from_url(url).await?) -} - pub fn custom_retry_config() -> ExponentialBackoff { ExponentialBackoff { initial_interval: RETRY_INTERVAL, @@ -639,7 +652,7 @@ mod tests { #[async_trait] pub trait RunnerExt { - fn subxt_api(&self) -> &OnlineClient; + async fn subxt_api(&mut self) -> Result, Error>; fn client_args(&self) -> &Vec; fn child_proc(&mut self) -> &mut Option; fn set_child_proc(&mut self, child_proc: Option); @@ -648,7 +661,7 @@ mod tests { fn download_path(&self) -> &PathBuf; fn parachain_url(&self) -> String; fn client_type(&self) -> ClientType; - async fn try_get_release(&self) -> Result, Error>; + async fn try_get_release(&mut self) -> Result, Error>; async fn download_binary(&mut self, release: ClientRelease) -> Result<(), Error>; fn get_bin_path(&self, uri: &str) -> Result<(String, PathBuf), Error>; fn delete_downloaded_release(&mut self) -> Result<(), Error>; @@ -659,6 +672,7 @@ mod tests { fn check_child_proc_alive(&mut self) -> Result; fn maybe_restart_client(&mut self) -> Result<(), Error>; fn try_load_downloaded_binary(&mut self, release: &ClientRelease) -> Result<(), Error>; + fn drop_subxt_api(&mut self); } #[async_trait]