diff --git a/libs/sdk-core/src/breez_services.rs b/libs/sdk-core/src/breez_services.rs index 7da47c959..dfdb1f09c 100644 --- a/libs/sdk-core/src/breez_services.rs +++ b/libs/sdk-core/src/breez_services.rs @@ -641,13 +641,20 @@ impl BreezServices { /// * channels - The list of channels and their status /// * payments - The incoming/outgoing payments pub async fn sync(&self) -> Result<()> { + self.do_sync(false).await + } + + async fn do_sync(&self, balance_changed: bool) -> Result<()> { let start = Instant::now(); self.start_node().await?; self.connect_lsp_peer().await?; // First query the changes since last sync time. let since_timestamp = self.persister.last_payment_timestamp().unwrap_or(0); - let new_data = &self.node_api.pull_changed(since_timestamp).await?; + let new_data = &self + .node_api + .pull_changed(since_timestamp, balance_changed) + .await?; debug!( "pull changed time={:?} {:?}", @@ -714,12 +721,9 @@ impl BreezServices { return Err(payment_res.err().unwrap()); } let payment = payment_res.unwrap(); - self.sync().await?; + self.do_sync(true).await?; - match self - .persister - .get_completed_payment_by_hash(&payment.payment_hash)? - { + match self.persister.get_payment_by_hash(&payment.payment_hash)? { Some(p) => { self.notify_event_listeners(BreezEvent::PaymentSucceed { details: p.clone() }) .await?; @@ -929,7 +933,7 @@ impl BreezServices { .insert_or_update_payments(&vec![payment.unwrap()]); debug!("paid invoice was added to payments list {:?}", res); } - if let Err(e) = cloned.sync().await { + if let Err(e) = cloned.do_sync(true).await { error!("failed to sync after paid invoice: {:?}", e); } _ = cloned.on_event(BreezEvent::InvoicePaid { diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 1c1b3d49f..328a76e82 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -1,7 +1,7 @@ use std::cmp::{max, min}; use std::str::FromStr; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, Result}; use bitcoin::bech32::{u5, ToBase32}; @@ -28,6 +28,7 @@ use lightning_invoice::{RawInvoice, SignedRawInvoice}; use serde::{Deserialize, Serialize}; use strum_macros::{Display, EnumString}; use tokio::sync::{mpsc, Mutex}; +use tokio::time::sleep; use tonic::Streaming; use crate::invoice::parse_invoice; @@ -44,6 +45,7 @@ pub(crate) struct Greenlight { tls_config: TlsConfig, gl_client: Mutex>, node_client: Mutex>, + persister: Arc, } impl Greenlight { @@ -123,7 +125,7 @@ impl Greenlight { match encryptd_creds { Some(c) => { persister.set_gl_credentials(c)?; - Greenlight::new(config, seed, creds).await + Greenlight::new(config, seed, creds, persister).await } None => { return Err(anyhow!("Failed to encrypt credentials")); @@ -139,6 +141,7 @@ impl Greenlight { sdk_config: Config, seed: Vec, connection_credentials: GreenlightCredentials, + persister: Arc, ) -> Result { let greenlight_network = sdk_config.network.into(); let tls_config = TlsConfig::new()?.identity( @@ -153,6 +156,7 @@ impl Greenlight { tls_config, gl_client: Mutex::new(None), node_client: Mutex::new(None), + persister, }) } @@ -233,6 +237,51 @@ impl Greenlight { } Ok(node_client.clone().unwrap()) } + + async fn fetch_channels_and_balance( + &self, + ) -> Result<(Vec, Vec, Vec, u64)> { + let mut client = self.get_client().await?; + // list all peers + let peers = client + .list_peers(pb::ListPeersRequest::default()) + .await? + .into_inner(); + + // filter only connected peers + let connected_peers: Vec = peers + .peers + .iter() + .filter(|p| p.connected) + .map(|p| hex::encode(p.id.clone())) + .collect(); + let mut all_channels: Vec = vec![]; + peers.peers.iter().for_each(|p| { + let peer_channels = &mut p.channels.clone(); + all_channels.append(peer_channels); + }); + + // filter only opened channels + let opened_channels: Vec = all_channels + .iter() + .cloned() + .filter(|c| c.state == *"CHANNELD_NORMAL") + .collect(); + + // calculate channels balance only from opened channels + let channels_balance = opened_channels + .iter() + .map(|c: &pb::Channel| { + amount_to_msat(&parse_amount(c.spendable.clone()).unwrap_or_default()) + }) + .sum::(); + Ok(( + all_channels, + opened_channels, + connected_peers, + channels_balance, + )) + } } #[tonic::async_trait] @@ -272,17 +321,15 @@ impl NodeAPI for Greenlight { } // implemenet pull changes from greenlight - async fn pull_changed(&self, since_timestamp: i64) -> Result { + async fn pull_changed( + &self, + since_timestamp: i64, + balance_changed: bool, + ) -> Result { info!("pull changed since {}", since_timestamp); let mut client = self.get_client().await?; let mut node_client = self.get_node_client().await?; - // list all peers - let peers = client - .list_peers(pb::ListPeersRequest::default()) - .await? - .into_inner(); - // get node info let node_info = client .get_info(pb::GetInfoRequest::default()) @@ -296,28 +343,6 @@ impl NodeAPI for Greenlight { .into_inner(); let onchain_funds = funds.outputs; - // filter only connected peers - let connected_peers: Vec = peers - .peers - .clone() - .iter() - .filter(|p| p.connected) - .map(|p| hex::encode(p.id.clone())) - .collect(); - - // make a vector of all channels by searching in peers - let all_channels: &mut Vec = &mut Vec::new(); - peers.peers.clone().iter().for_each(|p| { - let peer_channels = &mut p.channels.clone(); - all_channels.append(peer_channels); - }); - - // filter only opened channels - let opened_channels: &mut Vec<&pb::Channel> = &mut all_channels - .iter() - .filter(|c| c.state == *"CHANNELD_NORMAL") - .collect(); - // Fetch closed channels from greenlight let closed_channels = match node_client .list_closed_channels(ListclosedchannelsRequest { id: None }) @@ -330,6 +355,28 @@ impl NodeAPI for Greenlight { } }; + // calculate the node new balance and in case the caller signals balance has changed + // keep polling until the balance is updated + let (mut all_channels, mut opened_channels, mut connected_peers, mut channels_balance) = + self.fetch_channels_and_balance().await?; + if balance_changed { + let node_state = self.persister.get_node_state()?; + if let Some(state) = node_state { + let mut retry_count = 0; + while state.channels_balance_msat == channels_balance && retry_count < 3 { + warn!("balance update was required but was not updated, retrying in 100ms..."); + sleep(Duration::from_millis(100)).await; + ( + all_channels, + opened_channels, + connected_peers, + channels_balance, + ) = self.fetch_channels_and_balance().await?; + retry_count += 1; + } + } + } + let forgotten_closed_channels: Result> = closed_channels .into_iter() .filter(|c| { @@ -345,14 +392,6 @@ impl NodeAPI for Greenlight { all_channels.clone().into_iter().map(|c| c.into()).collect(); all_channel_models.extend(forgotten_closed_channels?); - // calculate channels balance only from opened channels - let channels_balance = opened_channels - .iter() - .map(|c: &&pb::Channel| { - amount_to_msat(&parse_amount(c.spendable.clone()).unwrap_or_default()) - }) - .sum::(); - // calculate onchain balance let onchain_balance = onchain_funds.iter().fold(0, |a, b| { if b.reserved { diff --git a/libs/sdk-core/src/models.rs b/libs/sdk-core/src/models.rs index 8b4feea2a..21c94b546 100644 --- a/libs/sdk-core/src/models.rs +++ b/libs/sdk-core/src/models.rs @@ -54,7 +54,11 @@ pub trait NodeAPI: Send + Sync { expiry: Option, cltv: Option, ) -> Result; - async fn pull_changed(&self, since_timestamp: i64) -> Result; + async fn pull_changed( + &self, + since_timestamp: i64, + balance_changed: bool, + ) -> Result; /// As per the `pb::PayRequest` docs, `amount_sats` is only needed when the invoice doesn't specify an amount async fn send_payment( &self, diff --git a/libs/sdk-core/src/test_utils.rs b/libs/sdk-core/src/test_utils.rs index 639da4c9f..c36f14207 100644 --- a/libs/sdk-core/src/test_utils.rs +++ b/libs/sdk-core/src/test_utils.rs @@ -262,7 +262,11 @@ impl NodeAPI for MockNodeAPI { Ok(invoice.bolt11) } - async fn pull_changed(&self, _since_timestamp: i64) -> Result { + async fn pull_changed( + &self, + _since_timestamp: i64, + _balance_changed: bool, + ) -> Result { Ok(SyncResponse { node_state: self.node_state.clone(), payments: self @@ -574,7 +578,7 @@ pub fn rand_vec_u8(len: usize) -> Vec { pub fn create_test_config() -> crate::models::Config { let mut conf = Config { - ..Config::staging( + ..Config::production( "".into(), crate::NodeConfig::Greenlight { config: crate::GreenlightNodeConfig {