Skip to content

Commit

Permalink
Merge pull request #409 from breez/correct-balance
Browse files Browse the repository at this point in the history
Poll for balance changes where hinted.
  • Loading branch information
roeierez authored Aug 28, 2023
2 parents 4fc3115 + 085ed74 commit 1c9152a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 49 deletions.
18 changes: 11 additions & 7 deletions libs/sdk-core/src/breez_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,13 +641,20 @@ impl BreezServices {
/// * channels - The list of channels and their status
/// * payments - The incoming/outgoing payments
pub async fn sync(&self) -> Result<()> {
self.do_sync(false).await
}

async fn do_sync(&self, balance_changed: bool) -> Result<()> {
let start = Instant::now();
self.start_node().await?;
self.connect_lsp_peer().await?;

// First query the changes since last sync time.
let since_timestamp = self.persister.last_payment_timestamp().unwrap_or(0);
let new_data = &self.node_api.pull_changed(since_timestamp).await?;
let new_data = &self
.node_api
.pull_changed(since_timestamp, balance_changed)
.await?;

debug!(
"pull changed time={:?} {:?}",
Expand Down Expand Up @@ -714,12 +721,9 @@ impl BreezServices {
return Err(payment_res.err().unwrap());
}
let payment = payment_res.unwrap();
self.sync().await?;
self.do_sync(true).await?;

match self
.persister
.get_completed_payment_by_hash(&payment.payment_hash)?
{
match self.persister.get_payment_by_hash(&payment.payment_hash)? {
Some(p) => {
self.notify_event_listeners(BreezEvent::PaymentSucceed { details: p.clone() })
.await?;
Expand Down Expand Up @@ -929,7 +933,7 @@ impl BreezServices {
.insert_or_update_payments(&vec![payment.unwrap()]);
debug!("paid invoice was added to payments list {:?}", res);
}
if let Err(e) = cloned.sync().await {
if let Err(e) = cloned.do_sync(true).await {
error!("failed to sync after paid invoice: {:?}", e);
}
_ = cloned.on_event(BreezEvent::InvoicePaid {
Expand Down
117 changes: 78 additions & 39 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cmp::{max, min};
use std::str::FromStr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Result};
use bitcoin::bech32::{u5, ToBase32};
Expand All @@ -28,6 +28,7 @@ use lightning_invoice::{RawInvoice, SignedRawInvoice};
use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumString};
use tokio::sync::{mpsc, Mutex};
use tokio::time::sleep;
use tonic::Streaming;

use crate::invoice::parse_invoice;
Expand All @@ -44,6 +45,7 @@ pub(crate) struct Greenlight {
tls_config: TlsConfig,
gl_client: Mutex<Option<node::Client>>,
node_client: Mutex<Option<ClnClient>>,
persister: Arc<SqliteStorage>,
}

impl Greenlight {
Expand Down Expand Up @@ -123,7 +125,7 @@ impl Greenlight {
match encryptd_creds {
Some(c) => {
persister.set_gl_credentials(c)?;
Greenlight::new(config, seed, creds).await
Greenlight::new(config, seed, creds, persister).await
}
None => {
return Err(anyhow!("Failed to encrypt credentials"));
Expand All @@ -139,6 +141,7 @@ impl Greenlight {
sdk_config: Config,
seed: Vec<u8>,
connection_credentials: GreenlightCredentials,
persister: Arc<SqliteStorage>,
) -> Result<Greenlight> {
let greenlight_network = sdk_config.network.into();
let tls_config = TlsConfig::new()?.identity(
Expand All @@ -153,6 +156,7 @@ impl Greenlight {
tls_config,
gl_client: Mutex::new(None),
node_client: Mutex::new(None),
persister,
})
}

Expand Down Expand Up @@ -233,6 +237,51 @@ impl Greenlight {
}
Ok(node_client.clone().unwrap())
}

async fn fetch_channels_and_balance(
&self,
) -> Result<(Vec<pb::Channel>, Vec<pb::Channel>, Vec<String>, u64)> {
let mut client = self.get_client().await?;
// list all peers
let peers = client
.list_peers(pb::ListPeersRequest::default())
.await?
.into_inner();

// filter only connected peers
let connected_peers: Vec<String> = peers
.peers
.iter()
.filter(|p| p.connected)
.map(|p| hex::encode(p.id.clone()))
.collect();
let mut all_channels: Vec<pb::Channel> = vec![];
peers.peers.iter().for_each(|p| {
let peer_channels = &mut p.channels.clone();
all_channels.append(peer_channels);
});

// filter only opened channels
let opened_channels: Vec<pb::Channel> = all_channels
.iter()
.cloned()
.filter(|c| c.state == *"CHANNELD_NORMAL")
.collect();

// calculate channels balance only from opened channels
let channels_balance = opened_channels
.iter()
.map(|c: &pb::Channel| {
amount_to_msat(&parse_amount(c.spendable.clone()).unwrap_or_default())
})
.sum::<u64>();
Ok((
all_channels,
opened_channels,
connected_peers,
channels_balance,
))
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -272,17 +321,15 @@ impl NodeAPI for Greenlight {
}

// implemenet pull changes from greenlight
async fn pull_changed(&self, since_timestamp: i64) -> Result<SyncResponse> {
async fn pull_changed(
&self,
since_timestamp: i64,
balance_changed: bool,
) -> Result<SyncResponse> {
info!("pull changed since {}", since_timestamp);
let mut client = self.get_client().await?;
let mut node_client = self.get_node_client().await?;

// list all peers
let peers = client
.list_peers(pb::ListPeersRequest::default())
.await?
.into_inner();

// get node info
let node_info = client
.get_info(pb::GetInfoRequest::default())
Expand All @@ -296,28 +343,6 @@ impl NodeAPI for Greenlight {
.into_inner();
let onchain_funds = funds.outputs;

// filter only connected peers
let connected_peers: Vec<String> = peers
.peers
.clone()
.iter()
.filter(|p| p.connected)
.map(|p| hex::encode(p.id.clone()))
.collect();

// make a vector of all channels by searching in peers
let all_channels: &mut Vec<pb::Channel> = &mut Vec::new();
peers.peers.clone().iter().for_each(|p| {
let peer_channels = &mut p.channels.clone();
all_channels.append(peer_channels);
});

// filter only opened channels
let opened_channels: &mut Vec<&pb::Channel> = &mut all_channels
.iter()
.filter(|c| c.state == *"CHANNELD_NORMAL")
.collect();

// Fetch closed channels from greenlight
let closed_channels = match node_client
.list_closed_channels(ListclosedchannelsRequest { id: None })
Expand All @@ -330,6 +355,28 @@ impl NodeAPI for Greenlight {
}
};

// calculate the node new balance and in case the caller signals balance has changed
// keep polling until the balance is updated
let (mut all_channels, mut opened_channels, mut connected_peers, mut channels_balance) =
self.fetch_channels_and_balance().await?;
if balance_changed {
let node_state = self.persister.get_node_state()?;
if let Some(state) = node_state {
let mut retry_count = 0;
while state.channels_balance_msat == channels_balance && retry_count < 3 {
warn!("balance update was required but was not updated, retrying in 100ms...");
sleep(Duration::from_millis(100)).await;
(
all_channels,
opened_channels,
connected_peers,
channels_balance,
) = self.fetch_channels_and_balance().await?;
retry_count += 1;
}
}
}

let forgotten_closed_channels: Result<Vec<Channel>> = closed_channels
.into_iter()
.filter(|c| {
Expand All @@ -345,14 +392,6 @@ impl NodeAPI for Greenlight {
all_channels.clone().into_iter().map(|c| c.into()).collect();
all_channel_models.extend(forgotten_closed_channels?);

// calculate channels balance only from opened channels
let channels_balance = opened_channels
.iter()
.map(|c: &&pb::Channel| {
amount_to_msat(&parse_amount(c.spendable.clone()).unwrap_or_default())
})
.sum::<u64>();

// calculate onchain balance
let onchain_balance = onchain_funds.iter().fold(0, |a, b| {
if b.reserved {
Expand Down
6 changes: 5 additions & 1 deletion libs/sdk-core/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ pub trait NodeAPI: Send + Sync {
expiry: Option<u64>,
cltv: Option<u32>,
) -> Result<String>;
async fn pull_changed(&self, since_timestamp: i64) -> Result<SyncResponse>;
async fn pull_changed(
&self,
since_timestamp: i64,
balance_changed: bool,
) -> Result<SyncResponse>;
/// As per the `pb::PayRequest` docs, `amount_sats` is only needed when the invoice doesn't specify an amount
async fn send_payment(
&self,
Expand Down
8 changes: 6 additions & 2 deletions libs/sdk-core/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ impl NodeAPI for MockNodeAPI {
Ok(invoice.bolt11)
}

async fn pull_changed(&self, _since_timestamp: i64) -> Result<SyncResponse> {
async fn pull_changed(
&self,
_since_timestamp: i64,
_balance_changed: bool,
) -> Result<SyncResponse> {
Ok(SyncResponse {
node_state: self.node_state.clone(),
payments: self
Expand Down Expand Up @@ -574,7 +578,7 @@ pub fn rand_vec_u8(len: usize) -> Vec<u8> {

pub fn create_test_config() -> crate::models::Config {
let mut conf = Config {
..Config::staging(
..Config::production(
"".into(),
crate::NodeConfig::Greenlight {
config: crate::GreenlightNodeConfig {
Expand Down

0 comments on commit 1c9152a

Please sign in to comment.