From fff266772ad0201f9e0000c332cf9d6bad1a0aca Mon Sep 17 00:00:00 2001 From: Rodrigo <39995243+RodriFS@users.noreply.github.com> Date: Thu, 27 Jul 2023 15:59:44 +0200 Subject: [PATCH] Monitor channels job (#242) * Monitor channels job * Refactored reused code into function, fixed channel filters * Lightning client reused in jobs * Fixed test case * Added unit tests for channel monitor job * Monitor channel status every hour * Reusing channels instead of lightning client --- .../Interfaces/INodeRepository.cs | 3 + src/Data/Repositories/NodeRepository.cs | 36 ++- src/Helpers/Constants.cs | 3 + src/Jobs/ChannelMonitorJob.cs | 155 +++++++++++++ src/Jobs/MonitorChannelsJob.cs | 56 +++++ src/Jobs/NodeChannelSubscribeJob.cs | 55 ++--- src/Pages/ChannelRequests.razor | 21 +- src/Program.cs | 10 + src/Properties/launchSettings.json | 1 + .../LightningClientsStorageService.cs | 60 +++++ src/Services/LightningService.cs | 171 ++++++++------- .../ChannelOperationRequestRepositoryTests.cs | 1 + .../Data/Repositories/NodeRepositoryTests.cs | 49 +++++ .../Jobs/ChannelMonitorJobTests.cs | 205 ++++++++++++++++++ .../Jobs/NodeChannelSubscribeJobTests.cs | 38 +--- 15 files changed, 690 insertions(+), 174 deletions(-) create mode 100644 src/Jobs/ChannelMonitorJob.cs create mode 100644 src/Jobs/MonitorChannelsJob.cs create mode 100644 src/Services/LightningClientsStorageService.cs create mode 100644 test/FundsManager.Tests/Data/Repositories/NodeRepositoryTests.cs create mode 100644 test/FundsManager.Tests/Jobs/ChannelMonitorJobTests.cs diff --git a/src/Data/Repositories/Interfaces/INodeRepository.cs b/src/Data/Repositories/Interfaces/INodeRepository.cs index b3020365..b7e2ad58 100644 --- a/src/Data/Repositories/Interfaces/INodeRepository.cs +++ b/src/Data/Repositories/Interfaces/INodeRepository.cs @@ -18,6 +18,7 @@ */ using FundsManager.Data.Models; +using FundsManager.Services; namespace FundsManager.Data.Repositories.Interfaces; @@ -27,6 +28,8 @@ public interface INodeRepository Task GetByPubkey(string key); + public Task GetOrCreateByPubKey(string pubKey, ILightningService lightningService); + Task> GetAll(); Task> GetAllManagedByUser(string userId); diff --git a/src/Data/Repositories/NodeRepository.cs b/src/Data/Repositories/NodeRepository.cs index 8c18f523..28284003 100644 --- a/src/Data/Repositories/NodeRepository.cs +++ b/src/Data/Repositories/NodeRepository.cs @@ -20,6 +20,7 @@ using AutoMapper; using FundsManager.Data.Models; using FundsManager.Data.Repositories.Interfaces; +using FundsManager.Services; using Microsoft.EntityFrameworkCore; namespace FundsManager.Data.Repositories @@ -29,7 +30,7 @@ public class NodeRepository : INodeRepository private readonly IRepository _repository; private readonly ILogger _logger; private readonly IDbContextFactory _dbContextFactory; - private readonly IMapper _mapper; + private readonly IMapper _mapper; public NodeRepository(IRepository repository, ILogger logger, @@ -39,7 +40,7 @@ public NodeRepository(IRepository repository, _repository = repository; _logger = logger; _dbContextFactory = dbContextFactory; - this._mapper = mapper; + _mapper = mapper; } public async Task GetById(int id) @@ -67,6 +68,33 @@ public NodeRepository(IRepository repository, .SingleOrDefaultAsync(x => x.PubKey == key); } + public async Task GetOrCreateByPubKey(string pubKey, ILightningService lightningService) + { + var node = await GetByPubkey(pubKey); + + if (node == null) + { + var foundNode = await lightningService.GetNodeInfo(pubKey); + if (foundNode == null) + { + throw new Exception("Node info not found"); + } + + node = new Node() + { + Name = foundNode.Alias, + PubKey = foundNode.PubKey, + }; + var addNode = await AddAsync(node); + if (!addNode.Item1) + { + throw new Exception(addNode.Item2); + } + } + + return node; + } + public async Task> GetAll() { await using var applicationDbContext = await _dbContextFactory.CreateDbContextAsync(); @@ -141,13 +169,13 @@ public async Task> GetAllManagedByUser(string userId) { using var applicationDbContext = _dbContextFactory.CreateDbContext(); type.SetUpdateDatetime(); - + type.Users?.Clear(); type.ChannelOperationRequestsAsSource?.Clear(); type.ChannelOperationRequestsAsDestination?.Clear(); type = _mapper.Map(type); - + return _repository.Update(type, applicationDbContext); } } diff --git a/src/Helpers/Constants.cs b/src/Helpers/Constants.cs index c8611322..9c2ec667 100644 --- a/src/Helpers/Constants.cs +++ b/src/Helpers/Constants.cs @@ -59,6 +59,7 @@ public class Constants // Crons & Jobs public static readonly string MONITOR_WITHDRAWALS_CRON = "10 0/5 * * * ?"; + public static readonly string MONITOR_CHANNELS_CRON = "0 0 */1 * * ?"; public static readonly string JOB_RETRY_INTERVAL_LIST_IN_MINUTES = "1,2,5,10,20"; @@ -166,6 +167,8 @@ static Constants() // Crons & Jobs MONITOR_WITHDRAWALS_CRON = Environment.GetEnvironmentVariable("MONITOR_WITHDRAWALS_CRON") ?? MONITOR_WITHDRAWALS_CRON; + MONITOR_CHANNELS_CRON = Environment.GetEnvironmentVariable("MONITOR_CHANNELS_CRON") ?? MONITOR_CHANNELS_CRON; + JOB_RETRY_INTERVAL_LIST_IN_MINUTES = Environment.GetEnvironmentVariable("JOB_RETRY_INTERVAL_LIST_IN_MINUTES") ?? JOB_RETRY_INTERVAL_LIST_IN_MINUTES; diff --git a/src/Jobs/ChannelMonitorJob.cs b/src/Jobs/ChannelMonitorJob.cs new file mode 100644 index 00000000..91a650a4 --- /dev/null +++ b/src/Jobs/ChannelMonitorJob.cs @@ -0,0 +1,155 @@ +/* + * NodeGuard + * Copyright (C) 2023 Elenpay + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + * + */ + +using FundsManager.Data; +using FundsManager.Data.Models; +using FundsManager.Data.Repositories.Interfaces; +using FundsManager.Services; +using Google.Protobuf; +using Grpc.Core; +using Lnrpc; +using Microsoft.EntityFrameworkCore; +using Quartz; +using Channel = Lnrpc.Channel; + +namespace FundsManager.Jobs; + +/// +/// Job for update the status of the channels +/// +/// +[DisallowConcurrentExecution] +public class ChannelMonitorJob : IJob +{ + private readonly ILogger _logger; + private readonly IDbContextFactory _dbContextFactory; + private readonly INodeRepository _nodeRepository; + private readonly ILightningService _lightningService; + private readonly ILightningClientsStorageService _lightningClientsStorageService; + + public ChannelMonitorJob(ILogger logger, IDbContextFactory dbContextFactory, INodeRepository nodeRepository, ILightningService lightningService, ILightningClientsStorageService lightningClientsStorageService) + { + _logger = logger; + _dbContextFactory = dbContextFactory; + _nodeRepository = nodeRepository; + _lightningService = lightningService; + _lightningClientsStorageService = lightningClientsStorageService; + } + + public async Task Execute(IJobExecutionContext context) + { + var data = context.JobDetail.JobDataMap; + var nodeId = data.GetInt("nodeId"); + _logger.LogInformation("Starting {JobName} for node {nodeId}... ", nameof(ChannelMonitorJob), nodeId); + try + { + var node1 = await _nodeRepository.GetById(nodeId); + + if (node1 == null) + { + _logger.LogInformation("The node {NodeId} was set up for monitoring but the node doesn't exist anymore", nodeId); + return; + } + + var client = _lightningClientsStorageService.GetLightningClient(node1.Endpoint); + var result = client.ListChannels(new ListChannelsRequest(), + new Metadata + { + { "macaroon", node1.ChannelAdminMacaroon } + }); + + foreach (var channel in result?.Channels) + { + var node2 = await _nodeRepository.GetOrCreateByPubKey(channel.RemotePubkey, _lightningService); + + // Recover Operations on channels + await RecoverGhostChannels(node1, node2, channel); + await RecoverChannelInConfirmationPendingStatus(node1); + } + + } + catch (Exception e) + { + _logger.LogError(e, "Error while subscribing for the channel updates of node {NodeId}", nodeId); + throw new JobExecutionException(e, true); + } + + _logger.LogInformation("{JobName} ended", nameof(ChannelMonitorJob)); + } + + public async Task RecoverGhostChannels(Node source, Node destination, Channel? channel) + { + if (!channel.Initiator) return; + try + { + await using var dbContext = await _dbContextFactory.CreateDbContextAsync(); + + var channelExists = await dbContext.Channels.AnyAsync(c => c.ChanId == channel.ChanId); + if (channelExists) return; + + var channelPoint = channel.ChannelPoint.Split(":"); + var fundingTx = channelPoint[0]; + var outputIndex = channelPoint[1]; + + var parsedChannelPoint = new ChannelPoint + { + FundingTxidStr = fundingTx, FundingTxidBytes = ByteString.CopyFrom(Convert.FromHexString(fundingTx).Reverse().ToArray()), + OutputIndex = Convert.ToUInt32(outputIndex) + }; + + var createdChannel = await LightningService.CreateChannel(source, destination.Id, parsedChannelPoint, channel.Capacity, channel.CloseAddress); + + await dbContext.Channels.AddAsync(createdChannel); + await dbContext.SaveChangesAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Error while recovering ghost channel, {SourceNodeId}, {ChannelId}: {Error}", source.Id, channel?.ChanId, e); + throw new JobExecutionException(e, true); + } + } + + public async Task RecoverChannelInConfirmationPendingStatus(Node source) + { + await using var dbContext = await _dbContextFactory.CreateDbContextAsync(); + try + { + var confirmationPendingRequests = dbContext.ChannelOperationRequests.Where(or => or.Status == ChannelOperationRequestStatus.OnChainConfirmationPending).ToList(); + foreach (var request in confirmationPendingRequests) + { + if (request.SourceNodeId != source.Id) return; + if (request.TxId == null) + { + _logger.LogWarning("The channel operation request {RequestId} is in OnChainConfirmationPending status but the txId is null", request.Id); + return; + } + + var channel = await dbContext.Channels.FirstAsync(c => c.FundingTx == request.TxId); + request.ChannelId = channel.Id; + request.Status = ChannelOperationRequestStatus.OnChainConfirmed; + await dbContext.SaveChangesAsync(); + } + } + catch (Exception e) + { + _logger.LogError(e, "Error while recovering channel in OnChainConfirmationPending status, {SourceNodeId}: {Error}", source.Id, e); + throw new JobExecutionException(e, true); + } + } +} \ No newline at end of file diff --git a/src/Jobs/MonitorChannelsJob.cs b/src/Jobs/MonitorChannelsJob.cs new file mode 100644 index 00000000..cd08878b --- /dev/null +++ b/src/Jobs/MonitorChannelsJob.cs @@ -0,0 +1,56 @@ +using FundsManager.Data.Repositories.Interfaces; +using FundsManager.Helpers; +using Quartz; + +namespace FundsManager.Jobs; + +public class MonitorChannelsJob : IJob +{ + private readonly ILogger _logger; + private readonly ISchedulerFactory _schedulerFactory; + private readonly INodeRepository _nodeRepository; + + public MonitorChannelsJob(ILogger logger, ISchedulerFactory schedulerFactory, INodeRepository nodeRepository) + { + _logger = logger; + _schedulerFactory = schedulerFactory; + _nodeRepository = nodeRepository; + } + + public async Task Execute(IJobExecutionContext context) + { + _logger.LogInformation("Starting {JobName}... ", nameof(MonitorChannelsJob)); + try + { + var managedNodes = await _nodeRepository.GetAllManagedByNodeGuard(); + + var scheduler = await _schedulerFactory.GetScheduler(); + + foreach (var managedNode in managedNodes) + { + if (managedNode.ChannelAdminMacaroon != null) + { + var map = new JobDataMap(); + map.Put("nodeId", managedNode.Id.ToString()); + var job = SimpleJob.Create(map, managedNode.Id.ToString()); + await scheduler.ScheduleJob(job.Job, job.Trigger); + + var jobId = job.Job.Key.ToString(); + managedNode.JobId = jobId; + var jobUpateResult = _nodeRepository.Update(managedNode); + if (!jobUpateResult.Item1) + { + _logger.LogWarning("Couldn't update Node {NodeId} with JobId {JobId}", managedNode.Id, jobId); + } + } + } + } + catch (Exception e) + { + _logger.LogError(e, "Error on {JobName}", nameof(MonitorChannelsJob)); + throw new JobExecutionException(e, false); + } + + _logger.LogInformation("{JobName} ended", nameof(MonitorChannelsJob)); + } +} \ No newline at end of file diff --git a/src/Jobs/NodeChannelSubscribeJob.cs b/src/Jobs/NodeChannelSubscribeJob.cs index c09b4c12..229eac2a 100644 --- a/src/Jobs/NodeChannelSubscribeJob.cs +++ b/src/Jobs/NodeChannelSubscribeJob.cs @@ -17,12 +17,10 @@ * */ -using FundsManager.Data.Repositories; using FundsManager.Data.Repositories.Interfaces; using FundsManager.Services; using Grpc.Core; using Lnrpc; -using NBitcoin.Protocol; using Quartz; using Channel = FundsManager.Data.Models.Channel; using Node = FundsManager.Data.Models.Node; @@ -40,17 +38,17 @@ public class NodeChannelSuscribeJob : IJob private readonly ILightningService _lightningService; private readonly INodeRepository _nodeRepository; private readonly IChannelRepository _channelRepository; - private readonly ISchedulerFactory _schedulerFactory; - - public NodeChannelSuscribeJob(ILogger logger, ILightningService lightningService, INodeRepository nodeRepository, IChannelRepository channelRepository, ISchedulerFactory schedulerFactory) + private readonly ILightningClientsStorageService _lightningClientsStorageService; + + public NodeChannelSuscribeJob(ILogger logger, ILightningService lightningService, INodeRepository nodeRepository, IChannelRepository channelRepository, ILightningClientsStorageService lightningClientsStorageService) { _logger = logger; _lightningService = lightningService; _nodeRepository = nodeRepository; _channelRepository = channelRepository; - _schedulerFactory = schedulerFactory; + _lightningClientsStorageService = lightningClientsStorageService; } - + public async Task Execute(IJobExecutionContext context) { var data = context.JobDetail.JobDataMap; @@ -59,29 +57,29 @@ public async Task Execute(IJobExecutionContext context) try { var node = await _nodeRepository.GetById(nodeId); - + if (node == null) { _logger.LogInformation("The node: {@Node} is no longer ready to be supported quartz jobs", node); return; } - var client = LightningService.CreateLightningClient(node.Endpoint); - var result = client.Execute(x => x.SubscribeChannelEvents(new ChannelEventSubscription(), + var client = _lightningClientsStorageService.GetLightningClient(node.Endpoint); + var result = client.SubscribeChannelEvents(new ChannelEventSubscription(), new Metadata { {"macaroon", node.ChannelAdminMacaroon} - }, null, default)); + }); while (await result.ResponseStream.MoveNext()) { node = await _nodeRepository.GetById(nodeId); - + if (node == null) { _logger.LogInformation("The node: {@Node} is no longer ready to be supported quartz jobs", node); return; } - + try { var channelEventUpdate = result.ResponseStream.Current; _logger.LogInformation("Channel event update received for node {@NodeId}", node.Id); @@ -100,7 +98,7 @@ public async Task Execute(IJobExecutionContext context) _logger.LogError(e, "Error while subscribing for the channel updates of node {NodeId}", nodeId); throw new JobExecutionException(e, true); } - + _logger.LogInformation("{JobName} ended", nameof(NodeChannelSuscribeJob)); } @@ -127,31 +125,12 @@ public async Task NodeUpdateManagement(ChannelEventUpdate channelEventUpdate, No IsPrivate = channelOpened.Private }; - var remoteNode = await _nodeRepository.GetByPubkey(channelOpened.RemotePubkey); - if (remoteNode == null) - { - var foundNode = await _lightningService.GetNodeInfo(channelOpened.RemotePubkey); - if (foundNode == null) - { - throw new Exception("Node info not found"); - } - - remoteNode = new Node() - { - Name = foundNode.Alias, - PubKey = foundNode.PubKey, - }; - var addNode = await _nodeRepository.AddAsync(remoteNode); - if (!addNode.Item1) - { - throw new Exception(addNode.Item2); - } - } - else if (remoteNode.IsManaged && channelOpened.Initiator) + var remoteNode = await _nodeRepository.GetOrCreateByPubKey(channelOpened.RemotePubkey, _lightningService); + if (remoteNode.IsManaged && channelOpened.Initiator) { return; } - + remoteNode = await _nodeRepository.GetByPubkey(channelOpened.RemotePubkey); channelToOpen.SourceNodeId = channelOpened.Initiator ? node.Id : remoteNode.Id; channelToOpen.DestinationNodeId = channelOpened.Initiator ? remoteNode.Id : node.Id; @@ -171,9 +150,9 @@ public async Task NodeUpdateManagement(ChannelEventUpdate channelEventUpdate, No { _logger.LogInformation("Channel with id: {ChannelId} already exists in the system", channelToOpen.Id); } - + break; - + case ChannelEventUpdate.Types.UpdateType.ClosedChannel: var channelClosed = channelEventUpdate.ClosedChannel; var channelToClose = await _channelRepository.GetByChanId(channelClosed.ChanId); diff --git a/src/Pages/ChannelRequests.razor b/src/Pages/ChannelRequests.razor index 045f67ef..4a16f7e7 100644 --- a/src/Pages/ChannelRequests.razor +++ b/src/Pages/ChannelRequests.razor @@ -588,26 +588,9 @@ var foundNode = await LightningService.GetNodeInfo(_destNodeName); if (foundNode != null) { - _selectedDestNode = (await NodeRepository.GetByPubkey(_destNodeName)); + _selectedDestNode = await NodeRepository.GetOrCreateByPubKey(_destNodeName, LightningService); - //if not found we create it.. - if (_selectedDestNode == null) - { - _selectedDestNode = new Node - { - Name = foundNode.Alias, - PubKey = _destNodeName, - }; - - var nodeAddResult = await NodeRepository.AddAsync(_selectedDestNode); - - if (nodeAddResult.Item1) - { - _selectedDestNode = await NodeRepository.GetByPubkey(_selectedDestNode.PubKey); - } - } - - // Refresh the list of available source nodes and take out the one selected + // Refresh the list of available source nodes and take out the one selected _manageableNodes = await NodeRepository.GetAllManagedByUser(LoggedUser.Id); _manageableNodes = _manageableNodes.Where(node => node.Id != _selectedDestNode?.Id).ToList(); _destNodeValidation.Clear(); diff --git a/src/Program.cs b/src/Program.cs index e7e8dad2..de3ea136 100644 --- a/src/Program.cs +++ b/src/Program.cs @@ -106,6 +106,7 @@ public static void Main(string[] args) builder.Services.AddTransient(); builder.Services.AddTransient(); builder.Services.AddTransient(); + builder.Services.AddSingleton(); //BlazoredToast builder.Services.AddBlazoredToast(); @@ -233,6 +234,15 @@ public static void Main(string[] args) opts.ForJob(nameof(NodeSubscriptorJob)).WithIdentity($"{nameof(NodeSubscriptorJob)}Trigger") .StartNow(); }); + + // MonitorChannelsJob + q.AddJob(opts => { opts.WithIdentity(nameof(MonitorChannelsJob)); }); + + q.AddTrigger(opts => + { + opts.ForJob(nameof(MonitorChannelsJob)).WithIdentity($"{nameof(MonitorChannelsJob)}Trigger") + .StartNow().WithCronSchedule(Constants.MONITOR_CHANNELS_CRON); + }); }); // ASP.NET Core hosting diff --git a/src/Properties/launchSettings.json b/src/Properties/launchSettings.json index 752ebfc1..e1b904b5 100644 --- a/src/Properties/launchSettings.json +++ b/src/Properties/launchSettings.json @@ -44,6 +44,7 @@ "AMBOSS_ENDPOINT": "https://amboss.space", "TRANSACTION_CONFIRMATION_MINIMUM_BLOCKS": "6", "MONITOR_WITHDRAWALS_CRON": "0 */1 * * * ?", + "MONITOR_CHANNELS_CRON": "0 0 */1 * * ?", "COINGECKO_ENDPOINT": "https://pro-api.coingecko.com/api/v3/coins/markets?vs_currency=usd&ids=bitcoin", "COINGECKO_KEY": "TBD", "HTTP1_LISTEN_PORT": "38080" diff --git a/src/Services/LightningClientsStorageService.cs b/src/Services/LightningClientsStorageService.cs new file mode 100644 index 00000000..dfd051ca --- /dev/null +++ b/src/Services/LightningClientsStorageService.cs @@ -0,0 +1,60 @@ +using System.Collections.Concurrent; +using Grpc.Net.Client; +using Lnrpc; + +namespace FundsManager.Services; + +public interface ILightningClientsStorageService +{ + public Lightning.LightningClient GetLightningClient(string? endpoint); +} +public class LightningClientsStorageService: ILightningClientsStorageService +{ + private readonly ILogger _logger; + private readonly ConcurrentDictionary _clients = new(); + + public LightningClientsStorageService(ILogger logger) + { + _logger = logger; + } + + private static GrpcChannel CreateLightningClient(string? endpoint) + { + //Setup of grpc lnd api client (Lightning.proto) + //Hack to allow self-signed https grpc calls + var httpHandler = new HttpClientHandler + { + ServerCertificateCustomValidationCallback = + HttpClientHandler.DangerousAcceptAnyServerCertificateValidator + }; + + var grpcChannel = GrpcChannel.ForAddress($"https://{endpoint}", + new GrpcChannelOptions { HttpHandler = httpHandler }); + + return grpcChannel; + } + + public Lightning.LightningClient GetLightningClient(string? endpoint) + { + if (endpoint == null) + { + throw new ArgumentNullException(nameof(endpoint)); + } + + // Atomic operation for TryGetValue and TryAdd + lock(_clients) + { + var found = _clients.TryGetValue(endpoint, out var client); + if (!found) + { + _logger.LogInformation("Client not found for endpoint {endpoint}, creating a new one", endpoint); + var newClient = CreateLightningClient(endpoint); + var added = _clients.TryAdd(endpoint, newClient); + _logger.LogDebug("Client for endpoint {endpoint} was added: {added}", endpoint, added ? "true" : "false"); + return new Lightning.LightningClient(newClient); + } + _logger.LogInformation("Client found for endpoint {endpoint}", endpoint); + return new Lightning.LightningClient(client); + } + } +} \ No newline at end of file diff --git a/src/Services/LightningService.cs b/src/Services/LightningService.cs index 67a0d800..47cafa17 100644 --- a/src/Services/LightningService.cs +++ b/src/Services/LightningService.cs @@ -325,83 +325,7 @@ public async Task OpenChannel(ChannelOperationRequest channelOperationRequest) break; case OpenStatusUpdate.UpdateOneofCase.ChanOpen: - _logger.LogInformation( - "Channel opened for channel operation request request id: {RequestId}, channel point: {ChannelPoint}", - channelOperationRequest.Id, response.ChanOpen.ChannelPoint.ToString()); - - channelOperationRequest.Status = ChannelOperationRequestStatus.OnChainConfirmed; - if (channelOperationRequest.StatusLogs.Count > 0) - { - channelOperationRequest.StatusLogs.Add( - ChannelStatusLog.Info($"Channel opened successfully 🎉")); - } - - _channelOperationRequestRepository.Update(channelOperationRequest); - - var fundingTx = - LightningHelper.DecodeTxId(response.ChanOpen.ChannelPoint.FundingTxidBytes); - - //Get the channels to find the channelId, not the temporary one - var channels = await client.Execute(x => x.ListChannelsAsync(new ListChannelsRequest(), - new Metadata {{"macaroon", source.ChannelAdminMacaroon}}, null, default)); - var currentChannel = channels.Channels.SingleOrDefault(x => - x.ChannelPoint == $"{fundingTx}:{response.ChanOpen.ChannelPoint.OutputIndex}"); - - if (currentChannel == null) - { - _logger.LogError("Error, channel not found for channel point: {ChannelPoint}", - response.ChanOpen.ChannelPoint.ToString()); - throw new InvalidOperationException(); - } - - var channel = new Channel - { - ChanId = currentChannel.ChanId, - CreationDatetime = DateTimeOffset.Now, - FundingTx = fundingTx, - FundingTxOutputIndex = response.ChanOpen.ChannelPoint.OutputIndex, - BtcCloseAddress = openChannelRequest.CloseAddress, - SatsAmount = channelOperationRequest.SatsAmount, - UpdateDatetime = DateTimeOffset.Now, - Status = Channel.ChannelStatus.Open, - SourceNodeId = channelOperationRequest.SourceNode.Id, - DestinationNodeId = channelOperationRequest.DestNode.Id, - CreatedByNodeGuard = true, - IsPrivate = currentChannel.Private - }; - - var channelExists = await _channelRepository.GetByChanId(channel.ChanId); - if (channelExists == null) - await context.AddAsync(channel); - else - { - channel.Id = channelExists.Id; - context.Update(channel); - } - - var addChannelResult = (await context.SaveChangesAsync()) > 0; - - if (addChannelResult == false) - { - _logger.LogError( - "Channel for channel operation request id: {RequestId} could not be created, reason: {Reason}", - channelOperationRequest.Id, - "Could not persist to db"); - } - - channelOperationRequest.ChannelId = channel.Id; - channelOperationRequest.DestNode = null; - - var channelUpdate = _channelOperationRequestRepository.Update(channelOperationRequest); - - if (channelUpdate.Item1 == false) - { - _logger.LogError( - "Could not assign channel id to channel operation request: {RequestId} reason: {Reason}", - channelOperationRequest.Id, - channelUpdate.Item2); - } - + await OnStatusChannelOpened(channelOperationRequest, source, response.ChanOpen.ChannelPoint, openChannelRequest.CloseAddress); break; case OpenStatusUpdate.UpdateOneofCase.PsbtFund: @@ -697,8 +621,97 @@ await _channelOperationRequestRepository.GetById(channelOperationRequest return new Lightning.LightningClient(grpcChannel).Wrap(); }; - public long GetFundingAmount(ChannelOperationRequest channelOperationRequest, PSBT combinedPSBT, - decimal initialFeeRate) + public async Task OnStatusChannelOpened(ChannelOperationRequest channelOperationRequest, Node source, ChannelPoint channelPoint, string? closeAddress = null) + { + await using var context = await _dbContextFactory.CreateDbContextAsync(); + + _logger.LogInformation( + "Channel opened for channel operation request request id: {RequestId}, channel point: {ChannelPoint}", + channelOperationRequest.Id, channelPoint.ToString()); + + channelOperationRequest.Status = ChannelOperationRequestStatus.OnChainConfirmed; + if (channelOperationRequest.StatusLogs.Count > 0) + { + channelOperationRequest.StatusLogs.Add(ChannelStatusLog.Info($"Channel opened successfully 🎉")); + } + + var (isSuccess, error) = _channelOperationRequestRepository.Update(channelOperationRequest); + if (!isSuccess) + { + _logger.LogWarning("Request is in OnChainConfirmed, but could not update status for request id: {RequestId} reason: {Reason}", channelOperationRequest.Id, error); + } + + var channel = await CreateChannel(source, channelOperationRequest.DestNode.Id, channelPoint, channelOperationRequest.SatsAmount, closeAddress); + + var channelExists = await _channelRepository.GetByChanId(channel.ChanId); + if (channelExists == null) + await context.AddAsync(channel); + else + { + channel.Id = channelExists.Id; + context.Update(channel); + } + + var addChannelResult = (await context.SaveChangesAsync()) > 0; + + if (addChannelResult == false) + { + _logger.LogError( + "Channel for channel operation request id: {RequestId} could not be created, reason: {Reason}", + channelOperationRequest.Id, + "Could not persist to db"); + } + + channelOperationRequest.ChannelId = channel?.Id; + channelOperationRequest.DestNode = null; + + var channelUpdate = _channelOperationRequestRepository.Update(channelOperationRequest); + + if (channelUpdate.Item1 == false) + { + _logger.LogError( + "Could not assign channel id to channel operation request: {RequestId} reason: {Reason}", + channelOperationRequest.Id, + channelUpdate.Item2); + } + } + + public static async Task CreateChannel(Node source, int destId, ChannelPoint channelPoint, long satsAmount, string? closeAddress = null) + { + var fundingTx = LightningHelper.DecodeTxId(channelPoint.FundingTxidBytes); + + var client= CreateLightningClient(source.Endpoint); + + //Get the channels to find the channelId, not the temporary one + var channels = await client.Execute(x => x.ListChannelsAsync(new ListChannelsRequest(), + new Metadata { { "macaroon", source.ChannelAdminMacaroon } }, null, default)); + var currentChannel = channels.Channels.SingleOrDefault(x => x.ChannelPoint == $"{fundingTx}:{channelPoint.OutputIndex}"); + + if (currentChannel == null) + { + throw new InvalidOperationException($"Error, channel not found for channel point: {channelPoint}"); + } + + var channel = new Channel + { + ChanId = currentChannel.ChanId, + CreationDatetime = DateTimeOffset.Now, + FundingTx = fundingTx, + FundingTxOutputIndex = channelPoint.OutputIndex, + BtcCloseAddress = closeAddress, + SatsAmount = satsAmount, + UpdateDatetime = DateTimeOffset.Now, + Status = Channel.ChannelStatus.Open, + SourceNodeId = source.Id, + DestinationNodeId = destId, + CreatedByNodeGuard = true, + IsPrivate = currentChannel.Private + }; + + return channel; + } + + public long GetFundingAmount(ChannelOperationRequest channelOperationRequest, PSBT combinedPSBT, decimal initialFeeRate) { if (!combinedPSBT.TryGetVirtualSize(out var estimatedVsize)) { diff --git a/test/FundsManager.Tests/Data/Repositories/ChannelOperationRequestRepositoryTests.cs b/test/FundsManager.Tests/Data/Repositories/ChannelOperationRequestRepositoryTests.cs index c8852d48..696b9485 100644 --- a/test/FundsManager.Tests/Data/Repositories/ChannelOperationRequestRepositoryTests.cs +++ b/test/FundsManager.Tests/Data/Repositories/ChannelOperationRequestRepositoryTests.cs @@ -40,6 +40,7 @@ public async Task AddAsync_ChannelCloseOperations() Status = ChannelOperationRequestStatus.OnChainConfirmationPending }; await context.ChannelOperationRequests.AddAsync(request1); + await context.SaveChangesAsync(); var channelOperationRequestRepository = new ChannelOperationRequestRepository(repository.Object, null, dbContextFactory.Object, null, null); diff --git a/test/FundsManager.Tests/Data/Repositories/NodeRepositoryTests.cs b/test/FundsManager.Tests/Data/Repositories/NodeRepositoryTests.cs new file mode 100644 index 00000000..d260e0de --- /dev/null +++ b/test/FundsManager.Tests/Data/Repositories/NodeRepositoryTests.cs @@ -0,0 +1,49 @@ +using FluentAssertions; +using FundsManager.Data.Models; +using FundsManager.Data.Repositories.Interfaces; +using FundsManager.Services; +using Lnrpc; +using Microsoft.EntityFrameworkCore; + +namespace FundsManager.Data.Repositories; + +public class NodeRepositoryTests +{ + private readonly Random _random = new(); + + private Mock> SetupDbContextFactory() + { + var dbContextFactory = new Mock>(); + var options = new DbContextOptionsBuilder() + .UseInMemoryDatabase(databaseName: "NodeRepositoryTests" + _random.Next()) + .Options; + var context = ()=> new ApplicationDbContext(options); + dbContextFactory.Setup(x => x.CreateDbContext()).Returns(context); + dbContextFactory.Setup(x => x.CreateDbContextAsync(default)).ReturnsAsync(context); + return dbContextFactory; + } + + [Fact] + public async Task AddsNewNode_WhenRemoteNodeNotFound() + { + // Arrange + var dbContextFactory = SetupDbContextFactory(); + var lightningServiceMock = new Mock(); + var repositoryMock = new Mock>(); + + lightningServiceMock.Setup(service => service.GetNodeInfo(It.IsAny())) + .ReturnsAsync(new LightningNode() { Alias = "TestAlias", PubKey = "TestPubKey" }); + + repositoryMock.Setup(repository => repository.AddAsync(It.IsAny(), It.IsAny())) + .ReturnsAsync((true, null)); + + var nodeRepository = new NodeRepository(repositoryMock.Object, null, dbContextFactory.Object, null); + + // Act + var result = await nodeRepository.GetOrCreateByPubKey("abc", lightningServiceMock.Object); + + // Assert + result.Name.Should().Be("TestAlias"); + result.PubKey.Should().Be("TestPubKey"); + } +} \ No newline at end of file diff --git a/test/FundsManager.Tests/Jobs/ChannelMonitorJobTests.cs b/test/FundsManager.Tests/Jobs/ChannelMonitorJobTests.cs new file mode 100644 index 00000000..cf340838 --- /dev/null +++ b/test/FundsManager.Tests/Jobs/ChannelMonitorJobTests.cs @@ -0,0 +1,205 @@ +using FluentAssertions; +using FundsManager.Data; +using FundsManager.Data.Models; +using FundsManager.Helpers; +using FundsManager.Jobs; +using FundsManager.Services; +using FundsManager.TestHelpers; +using Google.Protobuf; +using Grpc.Core; +using Lnrpc; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Channel = FundsManager.Data.Models.Channel; + +namespace FundsManager.Tests.Jobs; + +public class ChannelMonitorJobTests +{ + private readonly Random _random = new(); + + private Mock> SetupDbContextFactory() + { + var dbContextFactory = new Mock>(); + var options = new DbContextOptionsBuilder() + .UseInMemoryDatabase(databaseName: "ChannelOperationRequestRepositoryTests" + _random.Next()) + .Options; + var context = ()=> new ApplicationDbContext(options); + dbContextFactory.Setup(x => x.CreateDbContext()).Returns(context); + dbContextFactory.Setup(x => x.CreateDbContextAsync(default)).ReturnsAsync(context); + return dbContextFactory; + } + + [Fact] + public async Task RecoverGhostChannels_ChannelIsNotInitiator() + { + // Arrange + var dbContextFactory = SetupDbContextFactory(); + var channelMonitorJob = new ChannelMonitorJob(null, dbContextFactory.Object, null, null, null); + + var channel = new Lnrpc.Channel() + { + Initiator = false + }; + // Act + var act = () => channelMonitorJob.RecoverGhostChannels(null, null, channel); + + // Assert + await act.Should().NotThrowAsync(); + dbContextFactory.Invocations.Count.Should().Be(0); + + } + + [Fact] + public async Task RecoverGhostChannels_ChannelAlreadyExists() + { + // Arrange + var dbContextFactory = SetupDbContextFactory(); + await using var context = await dbContextFactory.Object.CreateDbContextAsync(); + + var request1 = new Channel() + { + ChanId = 1, + FundingTx = "abc:0" + }; + await context.Channels.AddAsync(request1); + await context.SaveChangesAsync(); + + var channelMonitorJob = new ChannelMonitorJob(null, dbContextFactory.Object, null, null, null); + + var channel = new Lnrpc.Channel() + { + ChanId = 1, + Initiator = true + }; + // Act + var act = () => channelMonitorJob.RecoverGhostChannels(null, null, channel); + + // Assert + await act.Should().NotThrowAsync(); + dbContextFactory.Invocations.Count.Should().Be(2); + } + + [Fact] + public async Task RecoverGhostChannels_CreatesChannel() + { + // Arrange + var logger = new Mock>(); + var dbContextFactory = SetupDbContextFactory(); + var context = await dbContextFactory.Object.CreateDbContextAsync(); + //Mock lightning client with iunmockable methods + var channelPoint = new ChannelPoint{ FundingTxidBytes = ByteString.CopyFrom(Convert.FromHexString("a2dffe0545ae0ce9091949477a9a7d91bb9478eb054fd9fa142e73562287ca4e").Reverse().ToArray()), OutputIndex = 1}; + + var listChannelsResponse = new ListChannelsResponse + { + Channels = {new Lnrpc.Channel + { + Active = true, + RemotePubkey = "03b48034270e522e4033afdbe43383d66d426638927b940d09a8a7a0de4d96e807", + ChannelPoint = $"{LightningHelper.DecodeTxId(channelPoint.FundingTxidBytes)}:{channelPoint.OutputIndex}", + ChanId = 123, + Capacity = 1000, + LocalBalance = 100, + RemoteBalance = 900 + } + } + }; + + var lightningClient = Interceptor.For() + .Setup(x => x.ListChannelsAsync( + Arg.Ignore(), + Arg.Ignore(), + null, + Arg.Ignore() + )) + .Returns(MockHelpers.CreateAsyncUnaryCall(listChannelsResponse)); + var originalLightningClient = LightningService.CreateLightningClient; + LightningService.CreateLightningClient = (_) => lightningClient; + + var channelMonitorJob = new ChannelMonitorJob(logger.Object, dbContextFactory.Object, null, null, null); + + var source = new Node() + { + Endpoint = "localhost", + ChannelAdminMacaroon = "abc" + }; + var destination = new Node(); + var channel = new Lnrpc.Channel() + { + ChanId = 1, + Initiator = true, + ChannelPoint = "a2dffe0545ae0ce9091949477a9a7d91bb9478eb054fd9fa142e73562287ca4e:1" + }; + + // Act + await channelMonitorJob.RecoverGhostChannels(source, destination, channel); + + // Assert + context.Channels.Count().Should().Be(1); + LightningService.CreateLightningClient = originalLightningClient; + } + + [Fact] + public async Task RecoverChannelInConfirmationPendingStatus_RequestWithDifferentSource() + { + // Arrange + var dbContextFactory = SetupDbContextFactory(); + await using var context = await dbContextFactory.Object.CreateDbContextAsync(); + + var request1 = new ChannelOperationRequest() + { + Status = ChannelOperationRequestStatus.OnChainConfirmationPending, + SourceNodeId = 10 + }; + await context.ChannelOperationRequests.AddAsync(request1); + await context.SaveChangesAsync(); + + var channelMonitorJob = new ChannelMonitorJob(null, dbContextFactory.Object, null, null, null); + + var source = new Node() { Id = 3 }; + // Act + var act = () => channelMonitorJob.RecoverChannelInConfirmationPendingStatus(source); + + // Assert + await act.Should().NotThrowAsync(); + (await context.ChannelOperationRequests.FirstOrDefaultAsync()).Status.Should().Be(ChannelOperationRequestStatus.OnChainConfirmationPending); + } + + [Fact] + public async Task RecoverChannelInConfirmationPendingStatus_StuckRequestFound() + { + // Arrange + var logger = new Mock>(); + var dbContextFactory = SetupDbContextFactory(); + await using var context = await dbContextFactory.Object.CreateDbContextAsync(); + + var request1 = new ChannelOperationRequest() + { + Status = ChannelOperationRequestStatus.OnChainConfirmationPending, + SourceNodeId = 3, + TxId = "a2dffe0545ae0ce9091949477a9a7d91bb9478eb054fd9fa142e73562287ca4e" + }; + await context.ChannelOperationRequests.AddAsync(request1); + + var channel1 = new Channel() + { + Id = 4, + FundingTx = "a2dffe0545ae0ce9091949477a9a7d91bb9478eb054fd9fa142e73562287ca4e" + }; + await context.Channels.AddAsync(channel1); + await context.SaveChangesAsync(); + + var channelMonitorJob = new ChannelMonitorJob(logger.Object, dbContextFactory.Object, null, null, null); + + var source = new Node() { Id = 3 }; + // Act + var act = () => channelMonitorJob.RecoverChannelInConfirmationPendingStatus(source); + + // Assert + await using var newContext = await dbContextFactory.Object.CreateDbContextAsync(); + await act.Should().NotThrowAsync(); + var updatedRequest = await newContext.ChannelOperationRequests.FirstAsync(); + updatedRequest.Status.Should().Be(ChannelOperationRequestStatus.OnChainConfirmed); + updatedRequest.ChannelId.Should().Be(channel1.Id); + } +} \ No newline at end of file diff --git a/test/FundsManager.Tests/Jobs/NodeChannelSubscribeJobTests.cs b/test/FundsManager.Tests/Jobs/NodeChannelSubscribeJobTests.cs index a78e881f..3b77f9a3 100644 --- a/test/FundsManager.Tests/Jobs/NodeChannelSubscribeJobTests.cs +++ b/test/FundsManager.Tests/Jobs/NodeChannelSubscribeJobTests.cs @@ -18,7 +18,7 @@ public class NodeChannelSubscribeJobTests private Mock _nodeRepositoryMock; private Mock _channelRepositoryMock; private NodeChannelSuscribeJob _nodeUpdateManager; - private ISchedulerFactory _schedulerFactory; + private Mock _lightningClientsStorageService; public NodeChannelSubscribeJobTests() { @@ -26,13 +26,14 @@ public NodeChannelSubscribeJobTests() _nodeRepositoryMock = new Mock(); _channelRepositoryMock = new Mock(); _lightningServiceMock = new Mock(); - + _lightningClientsStorageService = new Mock(); + _nodeUpdateManager = new NodeChannelSuscribeJob( _loggerMock.Object, _lightningServiceMock.Object, _nodeRepositoryMock.Object, _channelRepositoryMock.Object, - _schedulerFactory); + _lightningClientsStorageService.Object); } [Fact] @@ -52,37 +53,6 @@ public async Task NodeUpdateManagement_ThrowsException_WhenCloseAddressIsEmpty() Assert.ThrowsAsync(async () => await _nodeUpdateManager.NodeUpdateManagement(channelEventUpdate, new Node())); } - [Fact] - public async Task NodeUpdateManagement_AddsNewNode_WhenRemoteNodeNotFound() - { - // Arrange - var channelEventUpdate = new ChannelEventUpdate() - { - Type = ChannelEventUpdate.Types.UpdateType.OpenChannel, - OpenChannel = new Lnrpc.Channel() - { - ChannelPoint = "1:1", - CloseAddress = "closeAddress", - RemotePubkey = "remotePubkey", - }, - }; - _nodeRepositoryMock.SetupSequence(repo => repo.GetByPubkey(channelEventUpdate.OpenChannel.RemotePubkey)) - .ReturnsAsync((Node?)null) - .ReturnsAsync(new Node(){Id = 1, Name = "TestAlias", PubKey = "TestPubKey"}); - _nodeRepositoryMock.Setup(repo => repo.AddAsync(It.IsAny())).ReturnsAsync((true, "")); - _channelRepositoryMock.Setup(repo => repo.AddAsync(It.IsAny())).ReturnsAsync((true, "")); - - _lightningServiceMock.Setup(service => service.GetNodeInfo(channelEventUpdate.OpenChannel.RemotePubkey)) - .ReturnsAsync(new LightningNode() { Alias = "TestAlias", PubKey = "TestPubKey" }); - - // Act - await _nodeUpdateManager.NodeUpdateManagement(channelEventUpdate, new Node(){Id = 1}); - - // Assert - _nodeRepositoryMock.Verify(repo => repo.AddAsync(It.IsAny()), Times.Once); - _channelRepositoryMock.Verify(repo => repo.AddAsync(It.IsAny()), Times.Once); - } - [Fact] public async Task NodeUpdateManagement_UpdatesChannelStatus_WhenClosedChannelEventReceived() {