Skip to content

Commit

Permalink
Monitor channels job (#242)
Browse files Browse the repository at this point in the history
* Monitor channels job

* Refactored reused code into function, fixed channel filters

* Lightning client reused in jobs

* Fixed test case

* Added unit tests for channel monitor job

* Monitor channel status every hour

* Reusing channels instead of lightning client
  • Loading branch information
RodriFS authored Jul 27, 2023
1 parent 111fbcc commit fff2667
Show file tree
Hide file tree
Showing 15 changed files with 690 additions and 174 deletions.
3 changes: 3 additions & 0 deletions src/Data/Repositories/Interfaces/INodeRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/

using FundsManager.Data.Models;
using FundsManager.Services;

namespace FundsManager.Data.Repositories.Interfaces;

Expand All @@ -27,6 +28,8 @@ public interface INodeRepository

Task<Node?> GetByPubkey(string key);

public Task<Node> GetOrCreateByPubKey(string pubKey, ILightningService lightningService);

Task<List<Node>> GetAll();

Task<List<Node>> GetAllManagedByUser(string userId);
Expand Down
36 changes: 32 additions & 4 deletions src/Data/Repositories/NodeRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using AutoMapper;
using FundsManager.Data.Models;
using FundsManager.Data.Repositories.Interfaces;
using FundsManager.Services;
using Microsoft.EntityFrameworkCore;

namespace FundsManager.Data.Repositories
Expand All @@ -29,7 +30,7 @@ public class NodeRepository : INodeRepository
private readonly IRepository<Node> _repository;
private readonly ILogger<NodeRepository> _logger;
private readonly IDbContextFactory<ApplicationDbContext> _dbContextFactory;
private readonly IMapper _mapper;
private readonly IMapper _mapper;

public NodeRepository(IRepository<Node> repository,
ILogger<NodeRepository> logger,
Expand All @@ -39,7 +40,7 @@ public NodeRepository(IRepository<Node> repository,
_repository = repository;
_logger = logger;
_dbContextFactory = dbContextFactory;
this._mapper = mapper;
_mapper = mapper;
}

public async Task<Node?> GetById(int id)
Expand Down Expand Up @@ -67,6 +68,33 @@ public NodeRepository(IRepository<Node> repository,
.SingleOrDefaultAsync(x => x.PubKey == key);
}

public async Task<Node> GetOrCreateByPubKey(string pubKey, ILightningService lightningService)
{
var node = await GetByPubkey(pubKey);

if (node == null)
{
var foundNode = await lightningService.GetNodeInfo(pubKey);
if (foundNode == null)
{
throw new Exception("Node info not found");
}

node = new Node()
{
Name = foundNode.Alias,
PubKey = foundNode.PubKey,
};
var addNode = await AddAsync(node);
if (!addNode.Item1)
{
throw new Exception(addNode.Item2);
}
}

return node;
}

public async Task<List<Node>> GetAll()
{
await using var applicationDbContext = await _dbContextFactory.CreateDbContextAsync();
Expand Down Expand Up @@ -141,13 +169,13 @@ public async Task<List<Node>> GetAllManagedByUser(string userId)
{
using var applicationDbContext = _dbContextFactory.CreateDbContext();
type.SetUpdateDatetime();

type.Users?.Clear();
type.ChannelOperationRequestsAsSource?.Clear();
type.ChannelOperationRequestsAsDestination?.Clear();

type = _mapper.Map<Node, Node>(type);

return _repository.Update(type, applicationDbContext);
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/Helpers/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class Constants

// Crons & Jobs
public static readonly string MONITOR_WITHDRAWALS_CRON = "10 0/5 * * * ?";
public static readonly string MONITOR_CHANNELS_CRON = "0 0 */1 * * ?";
public static readonly string JOB_RETRY_INTERVAL_LIST_IN_MINUTES = "1,2,5,10,20";


Expand Down Expand Up @@ -166,6 +167,8 @@ static Constants()
// Crons & Jobs
MONITOR_WITHDRAWALS_CRON = Environment.GetEnvironmentVariable("MONITOR_WITHDRAWALS_CRON") ?? MONITOR_WITHDRAWALS_CRON;

MONITOR_CHANNELS_CRON = Environment.GetEnvironmentVariable("MONITOR_CHANNELS_CRON") ?? MONITOR_CHANNELS_CRON;

JOB_RETRY_INTERVAL_LIST_IN_MINUTES = Environment.GetEnvironmentVariable("JOB_RETRY_INTERVAL_LIST_IN_MINUTES") ?? JOB_RETRY_INTERVAL_LIST_IN_MINUTES;


Expand Down
155 changes: 155 additions & 0 deletions src/Jobs/ChannelMonitorJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* NodeGuard
* Copyright (C) 2023 Elenpay
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*
*/

using FundsManager.Data;
using FundsManager.Data.Models;
using FundsManager.Data.Repositories.Interfaces;
using FundsManager.Services;
using Google.Protobuf;
using Grpc.Core;
using Lnrpc;
using Microsoft.EntityFrameworkCore;
using Quartz;
using Channel = Lnrpc.Channel;

namespace FundsManager.Jobs;

/// <summary>
/// Job for update the status of the channels
/// </summary>
/// <returns></returns>
[DisallowConcurrentExecution]
public class ChannelMonitorJob : IJob
{
private readonly ILogger<ChannelMonitorJob> _logger;
private readonly IDbContextFactory<ApplicationDbContext> _dbContextFactory;
private readonly INodeRepository _nodeRepository;
private readonly ILightningService _lightningService;
private readonly ILightningClientsStorageService _lightningClientsStorageService;

public ChannelMonitorJob(ILogger<ChannelMonitorJob> logger, IDbContextFactory<ApplicationDbContext> dbContextFactory, INodeRepository nodeRepository, ILightningService lightningService, ILightningClientsStorageService lightningClientsStorageService)
{
_logger = logger;
_dbContextFactory = dbContextFactory;
_nodeRepository = nodeRepository;
_lightningService = lightningService;
_lightningClientsStorageService = lightningClientsStorageService;
}

public async Task Execute(IJobExecutionContext context)
{
var data = context.JobDetail.JobDataMap;
var nodeId = data.GetInt("nodeId");
_logger.LogInformation("Starting {JobName} for node {nodeId}... ", nameof(ChannelMonitorJob), nodeId);
try
{
var node1 = await _nodeRepository.GetById(nodeId);

if (node1 == null)
{
_logger.LogInformation("The node {NodeId} was set up for monitoring but the node doesn't exist anymore", nodeId);
return;
}

var client = _lightningClientsStorageService.GetLightningClient(node1.Endpoint);
var result = client.ListChannels(new ListChannelsRequest(),
new Metadata
{
{ "macaroon", node1.ChannelAdminMacaroon }
});

foreach (var channel in result?.Channels)
{
var node2 = await _nodeRepository.GetOrCreateByPubKey(channel.RemotePubkey, _lightningService);

// Recover Operations on channels
await RecoverGhostChannels(node1, node2, channel);
await RecoverChannelInConfirmationPendingStatus(node1);
}

}
catch (Exception e)
{
_logger.LogError(e, "Error while subscribing for the channel updates of node {NodeId}", nodeId);
throw new JobExecutionException(e, true);
}

_logger.LogInformation("{JobName} ended", nameof(ChannelMonitorJob));
}

public async Task RecoverGhostChannels(Node source, Node destination, Channel? channel)
{
if (!channel.Initiator) return;
try
{
await using var dbContext = await _dbContextFactory.CreateDbContextAsync();

var channelExists = await dbContext.Channels.AnyAsync(c => c.ChanId == channel.ChanId);
if (channelExists) return;

var channelPoint = channel.ChannelPoint.Split(":");
var fundingTx = channelPoint[0];
var outputIndex = channelPoint[1];

var parsedChannelPoint = new ChannelPoint
{
FundingTxidStr = fundingTx, FundingTxidBytes = ByteString.CopyFrom(Convert.FromHexString(fundingTx).Reverse().ToArray()),
OutputIndex = Convert.ToUInt32(outputIndex)
};

var createdChannel = await LightningService.CreateChannel(source, destination.Id, parsedChannelPoint, channel.Capacity, channel.CloseAddress);

await dbContext.Channels.AddAsync(createdChannel);
await dbContext.SaveChangesAsync();
}
catch (Exception e)
{
_logger.LogError(e, "Error while recovering ghost channel, {SourceNodeId}, {ChannelId}: {Error}", source.Id, channel?.ChanId, e);
throw new JobExecutionException(e, true);
}
}

public async Task RecoverChannelInConfirmationPendingStatus(Node source)
{
await using var dbContext = await _dbContextFactory.CreateDbContextAsync();
try
{
var confirmationPendingRequests = dbContext.ChannelOperationRequests.Where(or => or.Status == ChannelOperationRequestStatus.OnChainConfirmationPending).ToList();
foreach (var request in confirmationPendingRequests)
{
if (request.SourceNodeId != source.Id) return;
if (request.TxId == null)
{
_logger.LogWarning("The channel operation request {RequestId} is in OnChainConfirmationPending status but the txId is null", request.Id);
return;
}

var channel = await dbContext.Channels.FirstAsync(c => c.FundingTx == request.TxId);
request.ChannelId = channel.Id;
request.Status = ChannelOperationRequestStatus.OnChainConfirmed;
await dbContext.SaveChangesAsync();
}
}
catch (Exception e)
{
_logger.LogError(e, "Error while recovering channel in OnChainConfirmationPending status, {SourceNodeId}: {Error}", source.Id, e);
throw new JobExecutionException(e, true);
}
}
}
56 changes: 56 additions & 0 deletions src/Jobs/MonitorChannelsJob.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using FundsManager.Data.Repositories.Interfaces;
using FundsManager.Helpers;
using Quartz;

namespace FundsManager.Jobs;

public class MonitorChannelsJob : IJob
{
private readonly ILogger<MonitorChannelsJob> _logger;
private readonly ISchedulerFactory _schedulerFactory;
private readonly INodeRepository _nodeRepository;

public MonitorChannelsJob(ILogger<MonitorChannelsJob> logger, ISchedulerFactory schedulerFactory, INodeRepository nodeRepository)
{
_logger = logger;
_schedulerFactory = schedulerFactory;
_nodeRepository = nodeRepository;
}

public async Task Execute(IJobExecutionContext context)
{
_logger.LogInformation("Starting {JobName}... ", nameof(MonitorChannelsJob));
try
{
var managedNodes = await _nodeRepository.GetAllManagedByNodeGuard();

var scheduler = await _schedulerFactory.GetScheduler();

foreach (var managedNode in managedNodes)
{
if (managedNode.ChannelAdminMacaroon != null)
{
var map = new JobDataMap();
map.Put("nodeId", managedNode.Id.ToString());
var job = SimpleJob.Create<ChannelMonitorJob>(map, managedNode.Id.ToString());
await scheduler.ScheduleJob(job.Job, job.Trigger);

var jobId = job.Job.Key.ToString();
managedNode.JobId = jobId;
var jobUpateResult = _nodeRepository.Update(managedNode);
if (!jobUpateResult.Item1)
{
_logger.LogWarning("Couldn't update Node {NodeId} with JobId {JobId}", managedNode.Id, jobId);
}
}
}
}
catch (Exception e)
{
_logger.LogError(e, "Error on {JobName}", nameof(MonitorChannelsJob));
throw new JobExecutionException(e, false);
}

_logger.LogInformation("{JobName} ended", nameof(MonitorChannelsJob));
}
}
Loading

0 comments on commit fff2667

Please sign in to comment.