Skip to content

Commit

Permalink
Merge pull request #547 from breez/fast-sync
Browse files Browse the repository at this point in the history
Fast sync
  • Loading branch information
roeierez authored Oct 24, 2023
2 parents 3486c95 + 7c0e142 commit 20cd9dd
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 47 deletions.
15 changes: 10 additions & 5 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,11 +757,16 @@ impl BreezServices {
if let Ok(lsp_info) = self.lsp_info().await {
let node_id = lsp_info.pubkey;
let address = lsp_info.host;
debug!("connecting to lsp {}@{}", node_id.clone(), address.clone());
self.node_api
.connect_peer(node_id.clone(), address.clone())
.await
.map_err(anyhow::Error::msg)?;
let lsp_connected = self
.node_info()
.map(|info| info.connected_peers.iter().any(|e| e == node_id.as_str()))?;
if !lsp_connected {
debug!("connecting to lsp {}@{}", node_id.clone(), address.clone());
self.node_api
.connect_peer(node_id.clone(), address.clone())
.await
.map_err(anyhow::Error::msg)?;
}
debug!("connected to lsp {}@{}", node_id.clone(), address.clone());
}
Ok(())
Expand Down
105 changes: 63 additions & 42 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,51 @@ impl Greenlight {
Ok(node_client.clone().unwrap())
}

async fn fetch_channels_and_balance_with_retry(
cln_client: node::ClnClient,
persister: Arc<SqliteStorage>,
balance_changed: bool,
) -> Result<(
Vec<cln::ListpeersPeersChannels>,
Vec<cln::ListpeersPeersChannels>,
Vec<String>,
u64,
)> {
let (mut all_channels, mut opened_channels, mut connected_peers, mut channels_balance) =
Greenlight::fetch_channels_and_balance(cln_client.clone()).await?;
if balance_changed {
let node_state = persister.get_node_state()?;
if let Some(state) = node_state {
let mut retry_count = 0;
while state.channels_balance_msat == channels_balance && retry_count < 10 {
warn!("balance update was required but was not updated, retrying in 100ms...");
sleep(Duration::from_millis(100)).await;
(
all_channels,
opened_channels,
connected_peers,
channels_balance,
) = Greenlight::fetch_channels_and_balance(cln_client.clone()).await?;
retry_count += 1;
}
}
}
Ok((
all_channels,
opened_channels,
connected_peers,
channels_balance,
))
}

async fn fetch_channels_and_balance(
&self,
mut cln_client: node::ClnClient,
) -> Result<(
Vec<cln::ListpeersPeersChannels>,
Vec<cln::ListpeersPeersChannels>,
Vec<String>,
u64,
)> {
let mut cln_client = self.get_node_client().await?;
// list all peers
let peers = cln_client
.list_peers(cln::ListpeersRequest::default())
Expand Down Expand Up @@ -359,54 +395,40 @@ impl NodeAPI for Greenlight {
balance_changed: bool,
) -> Result<SyncResponse> {
info!("pull changed since {}", since_timestamp);
let mut node_client = self.get_node_client().await?;
let node_client = self.get_node_client().await?;

// get node info
let node_info = node_client
.getinfo(pb::cln::GetinfoRequest::default())
.await?
.into_inner();
let mut node_info_client = node_client.clone();
let node_info_future = node_info_client.getinfo(pb::cln::GetinfoRequest::default());

// list both off chain funds and on chain fudns
let funds: pb::cln::ListfundsResponse = node_client
.list_funds(pb::cln::ListfundsRequest::default())
.await?
.into_inner();
let onchain_funds = funds.outputs;
let mut funds_client = node_client.clone();
let funds_future = funds_client.list_funds(pb::cln::ListfundsRequest::default());

// Fetch closed channels from greenlight
let closed_channels = match node_client
.list_closed_channels(ListclosedchannelsRequest { id: None })
.await
{
Ok(c) => c.into_inner().closedchannels,
Err(e) => {
error!("list closed channels error {:?}", e);
vec![]
}
};
let mut closed_channels_client = node_client.clone();
let closed_channels_future =
closed_channels_client.list_closed_channels(ListclosedchannelsRequest { id: None });

// calculate the node new balance and in case the caller signals balance has changed
// keep polling until the balance is updated
let (mut all_channels, mut opened_channels, mut connected_peers, mut channels_balance) =
self.fetch_channels_and_balance().await?;
if balance_changed {
let node_state = self.persister.get_node_state()?;
if let Some(state) = node_state {
let mut retry_count = 0;
while state.channels_balance_msat == channels_balance && retry_count < 10 {
warn!("balance update was required but was not updated, retrying in 100ms...");
sleep(Duration::from_millis(100)).await;
(
all_channels,
opened_channels,
connected_peers,
channels_balance,
) = self.fetch_channels_and_balance().await?;
retry_count += 1;
}
}
}
let balance_future = Greenlight::fetch_channels_and_balance_with_retry(
node_client.clone(),
self.persister.clone(),
balance_changed,
);

let (node_info_res, funds_res, closed_channels_res, balance_res) = tokio::join!(
node_info_future,
funds_future,
closed_channels_future,
balance_future
);

let node_info = node_info_res?.into_inner();
let onchain_funds = funds_res?.into_inner().outputs;
let closed_channels = closed_channels_res?.into_inner().closedchannels;
let (all_channels, opened_channels, connected_peers, channels_balance) = balance_res?;

let forgotten_closed_channels: Result<Vec<Channel>> = closed_channels
.into_iter()
Expand Down Expand Up @@ -911,7 +933,6 @@ async fn pull_transactions(since_timestamp: u64, client: node::ClnClient) -> Res
.list_invoices(pb::cln::ListinvoicesRequest::default())
.await?
.into_inner();

// construct the received transactions by filtering the invoices to those paid and beyond the filter timestamp
let received_transactions: Result<Vec<Payment>> = invoices
.invoices
Expand Down

0 comments on commit 20cd9dd

Please sign in to comment.