Skip to content

Commit

Permalink
Merge pull request #942 from MutinyWallet/less-aggresive-peer-reconne…
Browse files Browse the repository at this point in the history
…ction

Less aggressive peer reconnection
  • Loading branch information
TonyGiorgio authored Jan 9, 2024
2 parents ea3b4b0 + 83f62be commit 9777886
Showing 1 changed file with 86 additions and 23 deletions.
109 changes: 86 additions & 23 deletions mutiny-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use std::{
},
};

const INITIAL_RECONNECTION_DELAY: u64 = 5;
const INITIAL_RECONNECTION_DELAY: u64 = 10;
const MAX_RECONNECTION_DELAY: u64 = 60;

pub(crate) type BumpTxEventHandler<S: MutinyStorage> = BumpTransactionEventHandler<
Expand Down Expand Up @@ -1981,7 +1981,8 @@ async fn start_reconnection_handling<S: MutinyStorage>(
let stop_copy = stop.clone();
utils::spawn(async move {
// Now try to connect to the client's LSP
if let Some(lsp) = lsp_client_copy {
// This is here in case the LSP client node info has not saved to storage yet
if let Some(lsp) = lsp_client_copy.as_ref() {
let node_id = NodeId::from_pubkey(&lsp.get_lsp_pubkey());

let connect_res = connect_peer_if_necessary(
Expand Down Expand Up @@ -2014,29 +2015,91 @@ async fn start_reconnection_handling<S: MutinyStorage>(
log_error!(proxy_logger, "could not save connection to lsp: {e}");
}
};
});

// keep trying to connect each lightning peer if they get disconnected
let connect_peer_man = peer_man.clone();
let connect_fee_estimator = fee_estimator.clone();
let connect_logger = logger.clone();
let connect_storage = storage.clone();
utils::spawn(async move {
// Now try to connect to other nodes the client might have, skipping the LSP if necessary
let stored_peers = get_all_peers(&storage_copy).unwrap_or_default();
let lsp_node_id = lsp_client_copy.map(|lsp| NodeId::from_pubkey(&lsp.get_lsp_pubkey()));
let initial_peers: Vec<(NodeId, String)> = stored_peers
.into_iter()
.filter(|(_, d)| {
d.connection_string.is_some() && d.nodes.binary_search(&uuid.to_string()).is_ok()
})
.map(|(n, d)| (n, d.connection_string.unwrap()))
.filter(|(n, _)| lsp_node_id != Some(*n))
.collect();
for (pubkey, conn_str) in initial_peers.into_iter() {
log_trace!(
proxy_logger,
"starting initial connection to peer: {pubkey}"
);
let peer_connection_info = match PubkeyConnectionInfo::new(&conn_str) {
Ok(p) => p,
Err(e) => {
log_error!(proxy_logger, "could not parse connection info: {e}");
continue;
}
};

let connect_res = connect_peer_if_necessary(
#[cfg(target_arch = "wasm32")]
&websocket_proxy_addr,
&peer_connection_info,
&storage_copy,
proxy_logger.clone(),
peer_man_proxy.clone(),
proxy_fee_estimator.clone(),
stop.clone(),
)
.await;
match connect_res {
Ok(_) => {
log_trace!(proxy_logger, "initial connection to peer: {pubkey}");
}
Err(e) => {
log_warn!(
proxy_logger,
"could not start initial connection to peer: {e}"
);
}
}
}

// keep trying to connect each lightning peer if they get disconnected
// hashMap to store backoff times for each pubkey
let mut backoff_times = HashMap::new();

// Only begin this process after 30s of running
for _ in 0..30 {
if stop.load(Ordering::Relaxed) {
log_debug!(
proxy_logger,
"stopping connection component and disconnecting peers for node: {}",
node_pubkey.to_hex(),
);
peer_man_proxy.disconnect_all_peers();
stop_component(&stopped_components);
log_debug!(
proxy_logger,
"stopped connection component and disconnected peers for node: {}",
node_pubkey.to_hex(),
);
return;
}
sleep(1_000).await;
}

loop {
for _ in 0..5 {
for _ in 0..INITIAL_RECONNECTION_DELAY {
if stop.load(Ordering::Relaxed) {
log_debug!(
connect_logger,
proxy_logger,
"stopping connection component and disconnecting peers for node: {}",
node_pubkey.to_hex(),
);
connect_peer_man.disconnect_all_peers();
peer_man_proxy.disconnect_all_peers();
stop_component(&stopped_components);
log_debug!(
connect_logger,
proxy_logger,
"stopped connection component and disconnected peers for node: {}",
node_pubkey.to_hex(),
);
Expand All @@ -2045,8 +2108,8 @@ async fn start_reconnection_handling<S: MutinyStorage>(
sleep(1_000).await;
}

let peer_connections = get_all_peers(&connect_storage).unwrap_or_default();
let current_connections = connect_peer_man.get_peer_node_ids();
let peer_connections = get_all_peers(&storage_copy).unwrap_or_default();
let current_connections = peer_man_proxy.get_peer_node_ids();

let not_connected: Vec<(NodeId, String)> = peer_connections
.into_iter()
Expand Down Expand Up @@ -2078,11 +2141,11 @@ async fn start_reconnection_handling<S: MutinyStorage>(
// Update the last attempt time
backoff_entry.1 = now;

log_trace!(connect_logger, "going to auto connect to peer: {pubkey}");
log_trace!(proxy_logger, "going to auto connect to peer: {pubkey}");
let peer_connection_info = match PubkeyConnectionInfo::new(&conn_str) {
Ok(p) => p,
Err(e) => {
log_error!(connect_logger, "could not parse connection info: {e}");
log_error!(proxy_logger, "could not parse connection info: {e}");
continue;
}
};
Expand All @@ -2091,21 +2154,21 @@ async fn start_reconnection_handling<S: MutinyStorage>(
#[cfg(target_arch = "wasm32")]
&websocket_proxy_addr,
&peer_connection_info,
&connect_storage,
connect_logger.clone(),
connect_peer_man.clone(),
connect_fee_estimator.clone(),
&storage_copy,
proxy_logger.clone(),
peer_man_proxy.clone(),
proxy_fee_estimator.clone(),
stop.clone(),
)
.await;
match connect_res {
Ok(_) => {
log_trace!(connect_logger, "auto connected peer: {pubkey}");
log_trace!(proxy_logger, "auto connected peer: {pubkey}");
// reset backoff time to initial value if connection is successful
backoff_entry.0 = INITIAL_RECONNECTION_DELAY;
}
Err(e) => {
log_warn!(connect_logger, "could not auto connect peer: {e}");
log_warn!(proxy_logger, "could not auto connect peer: {e}");
// double the backoff time if connection fails, but do not exceed max
backoff_entry.0 = (backoff_entry.0 * 2).min(MAX_RECONNECTION_DELAY);
}
Expand Down

0 comments on commit 9777886

Please sign in to comment.