From 4b592bbe8a2a400861d929f83e4728e96465c95f Mon Sep 17 00:00:00 2001 From: Isla Koenigsknecht Date: Thu, 2 May 2024 16:55:20 -0400 Subject: [PATCH] Redial when tor is fully ready --- .../connections-manager.service.tor.spec.ts | 5 +- .../connections-manager.service.ts | 9 ++- .../backend/src/nest/libp2p/libp2p.service.ts | 62 +++++++++++++++++-- .../backend/src/nest/libp2p/libp2p.types.ts | 10 +++ .../nest/libp2p/process-in-chunks.service.ts | 2 +- .../src/nest/tor/tor-control.service.ts | 3 + packages/backend/src/nest/tor/tor.service.ts | 5 +- packages/types/src/socket.ts | 1 + 8 files changed, 86 insertions(+), 11 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts index 98d717762..99567d552 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts @@ -138,7 +138,10 @@ describe('Connections manager', () => { // Peer connected await connectionsManagerService.init() - libp2pService.connectedPeers.set(peerId.toString(), DateTime.utc().valueOf()) + libp2pService.connectedPeers.set(peerId.toString(), { + connectedAtSeconds: DateTime.utc().valueOf(), + address: peerId.toString(), + }) // Peer disconnected const remoteAddr = `${peerId.toString()}` diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index be759d57f..5c098fad1 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -608,6 +608,11 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.tor.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => { this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data) }) + this.tor.on(SocketActionTypes.REDIAL_PEERS, async data => { + this.logger(`Socket - ${SocketActionTypes.REDIAL_PEERS}`) + const peerInfo = this.libp2pService.getCurrentPeerInfo() + await this.libp2pService.redialPeers(peerInfo) + }) this.socketService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => { this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data) }) @@ -619,8 +624,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI // Update Frontend with Initialized Communities if (this.communityId) { this.serverIoProvider.io.emit(SocketActionTypes.COMMUNITY_LAUNCHED, { id: this.communityId }) - console.log('this.libp2pService.connectedPeers', this.libp2pService.connectedPeers) - console.log('this.libp2pservice', this.libp2pService) + this.logger('this.libp2pService.connectedPeers', this.libp2pService.connectedPeers) + this.logger('this.libp2pservice', this.libp2pService) this.serverIoProvider.io.emit( SocketActionTypes.CONNECTED_PEERS, Array.from(this.libp2pService.connectedPeers.keys()) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index c4129e988..dbbfa7208 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -20,7 +20,7 @@ import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const' import { ServerIoProviderTypes } from '../types' import { webSockets } from '../websocketOverTor' import { all } from '../websocketOverTor/filters' -import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' +import { Libp2pConnectedPeer, Libp2pEvents, Libp2pNodeParams, Libp2pPeerInfo } from './libp2p.types' import { ProcessInChunksService } from './process-in-chunks.service' const KEY_LENGTH = 32 @@ -29,7 +29,7 @@ export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' @Injectable() export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null - public connectedPeers: Map = new Map() + public connectedPeers: Map = new Map() public dialedPeers: Set = new Set() private readonly logger = Logger(Libp2pService.name) constructor( @@ -48,6 +48,13 @@ export class Libp2pService extends EventEmitter { await this.libp2pInstance?.dial(multiaddr(peerAddress)) } + public getCurrentPeerInfo = (): Libp2pPeerInfo => { + return { + dialed: Array.from(this.dialedPeers), + connected: Array.from(this.connectedPeers.values()).map(peer => peer.address), + } + } + public readonly createLibp2pAddress = (address: string, peerId: string): string => { return createLibp2pAddress(address, peerId) } @@ -74,6 +81,45 @@ export class Libp2pService extends EventEmitter { return { psk: psk.toString('base64'), fullKey } } + public async hangUpPeers(peers: string[]) { + this.logger('Hanging up on all peers') + for (const peer of peers) { + await this.hangUpPeer(peer) + } + } + + public async hangUpPeer(peerAddress: string) { + this.logger('Hanging up on peer', peerAddress) + await this.libp2pInstance?.hangUp(multiaddr(peerAddress)) + this.dialedPeers.delete(peerAddress) + this.connectedPeers.delete(peerAddress) + } + + /** + * Hang up existing peer connections and re-dial them. Specifically useful on + * iOS where Tor receives a new port when the app resumes from background and + * we want to close/re-open connections. + */ + public async redialPeers(peerInfo?: Libp2pPeerInfo) { + const dialed = peerInfo ? peerInfo.dialed : Array.from(this.dialedPeers) + const toDial = peerInfo + ? [...peerInfo.connected, ...peerInfo.dialed] + : [...this.connectedPeers.keys(), ...this.dialedPeers] + + if (dialed.length === 0) { + this.logger('No peers to redial!') + return + } + + this.logger(`Re-dialing ${dialed.length} peers`) + + // TODO: Sort peers + await this.hangUpPeers(dialed) + + this.processInChunksService.updateData(toDial) + await this.processInChunksService.process() + } + public async createInstance(params: Libp2pNodeParams): Promise { if (this.libp2pInstance) { return this.libp2pInstance @@ -85,8 +131,8 @@ export class Libp2pService extends EventEmitter { libp2p = await createLibp2p({ start: false, connectionManager: { - minConnections: 3, // TODO: increase? - maxConnections: 8, // TODO: increase? + minConnections: 1, // TODO: increase? + maxConnections: 12, // TODO: increase? dialTimeout: 120_000, maxParallelDials: 10, autoDial: true, // It's a default but let's set it to have explicit information @@ -157,7 +203,11 @@ export class Libp2pService extends EventEmitter { const localPeerId = peerId.toString() this.logger(`${localPeerId} connected to ${remotePeerId}`) - this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) + const connectedPeer: Libp2pConnectedPeer = { + address: peer.detail.remoteAddr.toString(), + connectedAtSeconds: DateTime.utc().valueOf(), + } + this.connectedPeers.set(remotePeerId, connectedPeer) this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) this.logger(`${localPeerId} has ${this.libp2pInstance?.getConnections().length} open connections`) @@ -176,7 +226,7 @@ export class Libp2pService extends EventEmitter { } this.logger(`${localPeerId} has ${this.libp2pInstance.getConnections().length} open connections`) - const connectionStartTime = this.connectedPeers.get(remotePeerId) + const connectionStartTime: number = this.connectedPeers.get(remotePeerId)!.connectedAtSeconds if (!connectionStartTime) { this.logger.error(`No connection start time for peer ${remotePeerId}`) return diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 985c3a490..34b7056db 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -17,3 +17,13 @@ export interface Libp2pNodeParams { peers: string[] psk: Uint8Array } + +export type Libp2pPeerInfo = { + dialed: string[] + connected: string[] +} + +export type Libp2pConnectedPeer = { + address: string + connectedAtSeconds: number +} diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index e57e68122..bad92b873 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -71,7 +71,7 @@ export class ProcessInChunksService extends EventEmitter { } public async process() { - this.logger(`Processing ${this.taskQueue.length} items`) + this.logger(`Processing ${this.taskQueue.length()} items`) this.taskQueue.resume() } diff --git a/packages/backend/src/nest/tor/tor-control.service.ts b/packages/backend/src/nest/tor/tor-control.service.ts index b83040c7a..596b246a1 100644 --- a/packages/backend/src/nest/tor/tor-control.service.ts +++ b/packages/backend/src/nest/tor/tor-control.service.ts @@ -104,6 +104,7 @@ export class TorControl { } public async sendCommand(command: string): Promise<{ code: number; messages: string[] }> { + this.logger(`Sending tor command: ${command}`) // Only send one command at a time. if (this.isSending) { this.logger('Tor connection already established, waiting...') @@ -111,6 +112,8 @@ export class TorControl { // Wait for existing command to finish. while (this.isSending) { + const timeout = 750 + this.logger(`Waiting for ${timeout}ms to retry command...`) await new Promise(r => setTimeout(r, 750)) } diff --git a/packages/backend/src/nest/tor/tor.service.ts b/packages/backend/src/nest/tor/tor.service.ts index 1c7700ebf..a266b40ec 100644 --- a/packages/backend/src/nest/tor/tor.service.ts +++ b/packages/backend/src/nest/tor/tor.service.ts @@ -105,11 +105,14 @@ export class Tor extends EventEmitter implements OnModuleInit { this.interval = setInterval(async () => { const log = await this.torControl.sendCommand('GETINFO status/bootstrap-phase') + this.logger(log.messages[0]) if ( log.messages[0] === '250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=100 TAG=done SUMMARY="Done"' ) { + this.logger(`Sending ${SocketActionTypes.TOR_INITIALIZED}`) this.serverIoProvider.io.emit(SocketActionTypes.TOR_INITIALIZED) - + this.logger('Attempting to redial peers (if possible)') + this.emit(SocketActionTypes.REDIAL_PEERS) clearInterval(this.interval) } }, 2500) diff --git a/packages/types/src/socket.ts b/packages/types/src/socket.ts index c74ba5dd1..4fcb44cfd 100644 --- a/packages/types/src/socket.ts +++ b/packages/types/src/socket.ts @@ -71,6 +71,7 @@ export enum SocketActionTypes { PEER_CONNECTED = 'peerConnected', PEER_DISCONNECTED = 'peerDisconnected', TOR_INITIALIZED = 'torInitialized', + REDIAL_PEERS = 'redialPeers', // ====== Misc ======