Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: runner: reconnect after parachain connection error #462

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 4 additions & 11 deletions runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ use clap::Parser;

use error::Error;

use futures::{FutureExt, TryFutureExt};
use runner::ClientType;
use signal_hook::consts::*;
use signal_hook_tokio::Signals;
use std::{fmt::Debug, path::PathBuf};

use crate::runner::{retry_with_log_async, subxt_api, Runner};
use crate::runner::Runner;

#[derive(Parser, Debug, Clone)]
#[clap(version, author, about, trailing_var_arg = true)]
Expand All @@ -38,15 +37,9 @@ async fn main() -> Result<(), Error> {
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, log::LevelFilter::Info.as_str()),
);
let opts: Opts = Opts::parse();
let rpc_client = retry_with_log_async(
|| subxt_api(&opts.parachain_ws).into_future().boxed(),
"Error fetching executable".to_string(),
)
.await?;
log::info!("Connected to the parachain");

let runner = Runner::new(rpc_client, opts);
let shutdown_signals = Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT])?;

let runner = Runner::new(opts);
let shutdown_signals = Signals::new(&[SIGHUP, SIGTERM, SIGINT, SIGQUIT])?;
Runner::run(Box::new(runner), shutdown_signals).await?;
Ok(())
}
104 changes: 59 additions & 45 deletions runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub const PARACHAIN_MODULE: &str = "ClientsInfo";
pub const CURRENT_RELEASES_STORAGE_ITEM: &str = "CurrentClientReleases";

/// Parachain block time
pub const BLOCK_TIME: Duration = Duration::from_secs(6);
pub const BLOCK_TIME: Duration = Duration::from_secs(12);

/// Timeout used by the retry utilities: One minute
pub const RETRY_TIMEOUT: Duration = Duration::from_millis(60_000);
Expand Down Expand Up @@ -110,7 +110,7 @@ fn sha256sum(bytes: &[u8]) -> Vec<u8> {
/// Per-network manager of the client executable
pub struct Runner {
/// `subxt` api to the parachain
subxt_api: OnlineClient<PolkadotConfig>,
subxt_api: Option<OnlineClient<PolkadotConfig>>,
/// The child process (interbtc client) spawned by this runner
child_proc: Option<Child>,
/// Details about the currently run release
Expand All @@ -120,9 +120,9 @@ pub struct Runner {
}

impl Runner {
pub fn new(subxt_api: OnlineClient<PolkadotConfig>, opts: Opts) -> Self {
pub fn new(opts: Opts) -> Self {
Self {
subxt_api,
subxt_api: None,
child_proc: None,
downloaded_release: None,
opts,
Expand Down Expand Up @@ -252,17 +252,16 @@ impl Runner {
Ok(pid)
}

async fn try_get_release<T: RunnerExt + StorageReader>(runner: &T) -> Result<Option<ClientRelease>, Error> {
retry_with_log_async(
|| {
runner
.read_chain_storage::<ClientRelease>(runner.subxt_api())
.into_future()
.boxed()
},
"Error fetching executable".to_string(),
)
.await
/// A single attempt to the the release
async fn try_get_release<T: RunnerExt + StorageReader>(runner: &mut T) -> Result<Option<ClientRelease>, Error> {
let api = runner.subxt_api().await?;
match runner.read_chain_storage::<ClientRelease>(&api).into_future().await {
Ok(x) => Ok(x),
Err(err) => {
runner.drop_subxt_api(); // we might have lost connection. Rebuild connection next time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible to check if the error is related to a lost connection, otherwise this might lead to a lot of bandwidth usage if the chain storage read failed for another reason

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we should distinguish these errors if possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we should distinguish these errors if possible.

Isn't it best to just rebuild the connection anyway, just in case the error matching is too specific? (e.g, error on jsonrpsee rather than on the socket level)

Err(err)
}
}
}

/// Read parachain storage via an RPC call, and decode the result
Expand Down Expand Up @@ -326,26 +325,32 @@ impl Runner {

loop {
runner.maybe_restart_client()?;
if let Some(new_release) = runner.try_get_release().await? {
let maybe_downloaded_release = runner.downloaded_release();
let downloaded_release = maybe_downloaded_release.as_ref().ok_or(Error::NoDownloadedRelease)?;
if new_release.checksum != downloaded_release.checksum {
log::info!("Found new client release, updating...");

// Wait for child process to finish completely.
// To ensure there can't be two client processes using the same resources (such as the Bitcoin
// wallet for vaults).
runner.terminate_proc_and_wait()?;

// Delete old release
runner.delete_downloaded_release()?;

// Download new release
runner.download_binary(new_release).await?;

// Run the downloaded release
runner.run_binary()?;
match runner.try_get_release().await {
Ok(Some(new_release)) => {
let maybe_downloaded_release = runner.downloaded_release();
let downloaded_release = maybe_downloaded_release.as_ref().ok_or(Error::NoDownloadedRelease)?;
if new_release.checksum != downloaded_release.checksum {
log::info!("Found new client release, updating...");

// Wait for child process to finish completely.
// To ensure there can't be two client processes using the same resources (such as the Bitcoin
// wallet for vaults).
runner.terminate_proc_and_wait()?;

// Delete old release
runner.delete_downloaded_release()?;

// Download new release
runner.download_binary(new_release).await?;

// Run the downloaded release
runner.run_binary()?;
}
}
Err(err) => {
log::warn!("Failed to get new release: {err:?}");
}
_ => {} // no new release
}
tokio::time::sleep(BLOCK_TIME).await;
}
Expand Down Expand Up @@ -407,7 +412,7 @@ impl Drop for Runner {

#[async_trait]
pub trait RunnerExt {
fn subxt_api(&self) -> &OnlineClient<PolkadotConfig>;
async fn subxt_api(&mut self) -> Result<OnlineClient<PolkadotConfig>, Error>;
fn client_args(&self) -> &Vec<String>;
fn child_proc(&mut self) -> &mut Option<Child>;
fn set_child_proc(&mut self, child_proc: Option<Child>);
Expand All @@ -417,7 +422,7 @@ pub trait RunnerExt {
fn parachain_url(&self) -> String;
fn client_type(&self) -> ClientType;
/// Read the current client release from the parachain, retrying for `RETRY_TIMEOUT` if there is a network error.
async fn try_get_release(&self) -> Result<Option<ClientRelease>, Error>;
async fn try_get_release(&mut self) -> Result<Option<ClientRelease>, Error>;
/// Download the client binary and make it executable, retrying for `RETRY_TIMEOUT` if there is a network error.
async fn download_binary(&mut self, release: ClientRelease) -> Result<(), Error>;
/// Convert a release URI (e.g. a GitHub link) to an executable name and OS path (after download)
Expand All @@ -441,12 +446,20 @@ pub trait RunnerExt {
fn maybe_restart_client(&mut self) -> Result<(), Error>;
/// If a client binary exists on disk, load it.
fn try_load_downloaded_binary(&mut self, release: &ClientRelease) -> Result<(), Error>;
/// Remove the subxt api from cache.
fn drop_subxt_api(&mut self);
}

#[async_trait]
impl RunnerExt for Runner {
fn subxt_api(&self) -> &OnlineClient<PolkadotConfig> {
&self.subxt_api
async fn subxt_api(&mut self) -> Result<OnlineClient<PolkadotConfig>, Error> {
if let Some(ref subxt_api) = self.subxt_api {
Ok(subxt_api.clone())
} else {
let client = OnlineClient::from_url(&self.opts.parachain_ws).await?;
self.subxt_api = Some(client.clone());
Ok(client)
}
}

fn client_args(&self) -> &Vec<String> {
Expand Down Expand Up @@ -481,7 +494,7 @@ impl RunnerExt for Runner {
self.opts.client_type.clone()
}

async fn try_get_release(&self) -> Result<Option<ClientRelease>, Error> {
async fn try_get_release(&mut self) -> Result<Option<ClientRelease>, Error> {
Runner::try_get_release(self).await
}

Expand Down Expand Up @@ -532,6 +545,10 @@ impl RunnerExt for Runner {
fn try_load_downloaded_binary(&mut self, release: &ClientRelease) -> Result<(), Error> {
Runner::try_load_downloaded_binary(self, release)
}

fn drop_subxt_api(&mut self) {
self.subxt_api = None;
}
}

#[async_trait]
Expand All @@ -552,10 +569,6 @@ impl StorageReader for Runner {
}
}

pub async fn subxt_api(url: &str) -> Result<OnlineClient<PolkadotConfig>, Error> {
Ok(OnlineClient::from_url(url).await?)
}

pub fn custom_retry_config() -> ExponentialBackoff {
ExponentialBackoff {
initial_interval: RETRY_INTERVAL,
Expand Down Expand Up @@ -639,7 +652,7 @@ mod tests {

#[async_trait]
pub trait RunnerExt {
fn subxt_api(&self) -> &OnlineClient<PolkadotConfig>;
async fn subxt_api(&mut self) -> Result<OnlineClient<PolkadotConfig>, Error>;
fn client_args(&self) -> &Vec<String>;
fn child_proc(&mut self) -> &mut Option<Child>;
fn set_child_proc(&mut self, child_proc: Option<Child>);
Expand All @@ -648,7 +661,7 @@ mod tests {
fn download_path(&self) -> &PathBuf;
fn parachain_url(&self) -> String;
fn client_type(&self) -> ClientType;
async fn try_get_release(&self) -> Result<Option<ClientRelease>, Error>;
async fn try_get_release(&mut self) -> Result<Option<ClientRelease>, Error>;
async fn download_binary(&mut self, release: ClientRelease) -> Result<(), Error>;
fn get_bin_path(&self, uri: &str) -> Result<(String, PathBuf), Error>;
fn delete_downloaded_release(&mut self) -> Result<(), Error>;
Expand All @@ -659,6 +672,7 @@ mod tests {
fn check_child_proc_alive(&mut self) -> Result<bool, Error>;
fn maybe_restart_client(&mut self) -> Result<(), Error>;
fn try_load_downloaded_binary(&mut self, release: &ClientRelease) -> Result<(), Error>;
fn drop_subxt_api(&mut self);
}

#[async_trait]
Expand Down