Skip to content

Commit

Permalink
Redial when tor is fully ready
Browse files Browse the repository at this point in the history
  • Loading branch information
ikoenigsknecht committed May 2, 2024
1 parent 914926c commit 4b592bb
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ describe('Connections manager', () => {

// Peer connected
await connectionsManagerService.init()
libp2pService.connectedPeers.set(peerId.toString(), DateTime.utc().valueOf())
libp2pService.connectedPeers.set(peerId.toString(), {
connectedAtSeconds: DateTime.utc().valueOf(),
address: peerId.toString(),
})

// Peer disconnected
const remoteAddr = `${peerId.toString()}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.tor.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
})
this.tor.on(SocketActionTypes.REDIAL_PEERS, async data => {
this.logger(`Socket - ${SocketActionTypes.REDIAL_PEERS}`)
const peerInfo = this.libp2pService.getCurrentPeerInfo()
await this.libp2pService.redialPeers(peerInfo)
})
this.socketService.on(SocketActionTypes.CONNECTION_PROCESS_INFO, data => {
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, data)
})
Expand All @@ -619,8 +624,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
// Update Frontend with Initialized Communities
if (this.communityId) {
this.serverIoProvider.io.emit(SocketActionTypes.COMMUNITY_LAUNCHED, { id: this.communityId })
console.log('this.libp2pService.connectedPeers', this.libp2pService.connectedPeers)
console.log('this.libp2pservice', this.libp2pService)
this.logger('this.libp2pService.connectedPeers', this.libp2pService.connectedPeers)
this.logger('this.libp2pservice', this.libp2pService)
this.serverIoProvider.io.emit(
SocketActionTypes.CONNECTED_PEERS,
Array.from(this.libp2pService.connectedPeers.keys())
Expand Down
62 changes: 56 additions & 6 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const'
import { ServerIoProviderTypes } from '../types'
import { webSockets } from '../websocketOverTor'
import { all } from '../websocketOverTor/filters'
import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { Libp2pConnectedPeer, Libp2pEvents, Libp2pNodeParams, Libp2pPeerInfo } from './libp2p.types'
import { ProcessInChunksService } from './process-in-chunks.service'

const KEY_LENGTH = 32
Expand All @@ -29,7 +29,7 @@ export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n'
@Injectable()
export class Libp2pService extends EventEmitter {
public libp2pInstance: Libp2p | null
public connectedPeers: Map<string, number> = new Map()
public connectedPeers: Map<string, Libp2pConnectedPeer> = new Map()
public dialedPeers: Set<string> = new Set()
private readonly logger = Logger(Libp2pService.name)
constructor(
Expand All @@ -48,6 +48,13 @@ export class Libp2pService extends EventEmitter {
await this.libp2pInstance?.dial(multiaddr(peerAddress))
}

public getCurrentPeerInfo = (): Libp2pPeerInfo => {
return {
dialed: Array.from(this.dialedPeers),
connected: Array.from(this.connectedPeers.values()).map(peer => peer.address),
}
}

public readonly createLibp2pAddress = (address: string, peerId: string): string => {
return createLibp2pAddress(address, peerId)
}
Expand All @@ -74,6 +81,45 @@ export class Libp2pService extends EventEmitter {
return { psk: psk.toString('base64'), fullKey }
}

public async hangUpPeers(peers: string[]) {
this.logger('Hanging up on all peers')
for (const peer of peers) {
await this.hangUpPeer(peer)
}
}

public async hangUpPeer(peerAddress: string) {
this.logger('Hanging up on peer', peerAddress)
await this.libp2pInstance?.hangUp(multiaddr(peerAddress))
this.dialedPeers.delete(peerAddress)
this.connectedPeers.delete(peerAddress)
}

/**
* Hang up existing peer connections and re-dial them. Specifically useful on
* iOS where Tor receives a new port when the app resumes from background and
* we want to close/re-open connections.
*/
public async redialPeers(peerInfo?: Libp2pPeerInfo) {
const dialed = peerInfo ? peerInfo.dialed : Array.from(this.dialedPeers)
const toDial = peerInfo
? [...peerInfo.connected, ...peerInfo.dialed]
: [...this.connectedPeers.keys(), ...this.dialedPeers]

if (dialed.length === 0) {
this.logger('No peers to redial!')
return
}

this.logger(`Re-dialing ${dialed.length} peers`)

// TODO: Sort peers
await this.hangUpPeers(dialed)

this.processInChunksService.updateData(toDial)
await this.processInChunksService.process()
}

public async createInstance(params: Libp2pNodeParams): Promise<Libp2p> {
if (this.libp2pInstance) {
return this.libp2pInstance
Expand All @@ -85,8 +131,8 @@ export class Libp2pService extends EventEmitter {
libp2p = await createLibp2p({
start: false,
connectionManager: {
minConnections: 3, // TODO: increase?
maxConnections: 8, // TODO: increase?
minConnections: 1, // TODO: increase?
maxConnections: 12, // TODO: increase?
dialTimeout: 120_000,
maxParallelDials: 10,
autoDial: true, // It's a default but let's set it to have explicit information
Expand Down Expand Up @@ -157,7 +203,11 @@ export class Libp2pService extends EventEmitter {
const localPeerId = peerId.toString()
this.logger(`${localPeerId} connected to ${remotePeerId}`)

this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf())
const connectedPeer: Libp2pConnectedPeer = {
address: peer.detail.remoteAddr.toString(),
connectedAtSeconds: DateTime.utc().valueOf(),
}
this.connectedPeers.set(remotePeerId, connectedPeer)
this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`)
this.logger(`${localPeerId} has ${this.libp2pInstance?.getConnections().length} open connections`)

Expand All @@ -176,7 +226,7 @@ export class Libp2pService extends EventEmitter {
}
this.logger(`${localPeerId} has ${this.libp2pInstance.getConnections().length} open connections`)

const connectionStartTime = this.connectedPeers.get(remotePeerId)
const connectionStartTime: number = this.connectedPeers.get(remotePeerId)!.connectedAtSeconds
if (!connectionStartTime) {
this.logger.error(`No connection start time for peer ${remotePeerId}`)
return
Expand Down
10 changes: 10 additions & 0 deletions packages/backend/src/nest/libp2p/libp2p.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,13 @@ export interface Libp2pNodeParams {
peers: string[]
psk: Uint8Array
}

export type Libp2pPeerInfo = {
dialed: string[]
connected: string[]
}

export type Libp2pConnectedPeer = {
address: string
connectedAtSeconds: number
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class ProcessInChunksService<T> extends EventEmitter {
}

public async process() {
this.logger(`Processing ${this.taskQueue.length} items`)
this.logger(`Processing ${this.taskQueue.length()} items`)
this.taskQueue.resume()
}

Expand Down
3 changes: 3 additions & 0 deletions packages/backend/src/nest/tor/tor-control.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ export class TorControl {
}

public async sendCommand(command: string): Promise<{ code: number; messages: string[] }> {
this.logger(`Sending tor command: ${command}`)
// Only send one command at a time.
if (this.isSending) {
this.logger('Tor connection already established, waiting...')
}

// Wait for existing command to finish.
while (this.isSending) {
const timeout = 750
this.logger(`Waiting for ${timeout}ms to retry command...`)
await new Promise(r => setTimeout(r, 750))
}

Expand Down
5 changes: 4 additions & 1 deletion packages/backend/src/nest/tor/tor.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,14 @@ export class Tor extends EventEmitter implements OnModuleInit {

this.interval = setInterval(async () => {
const log = await this.torControl.sendCommand('GETINFO status/bootstrap-phase')
this.logger(log.messages[0])
if (
log.messages[0] === '250-status/bootstrap-phase=NOTICE BOOTSTRAP PROGRESS=100 TAG=done SUMMARY="Done"'
) {
this.logger(`Sending ${SocketActionTypes.TOR_INITIALIZED}`)
this.serverIoProvider.io.emit(SocketActionTypes.TOR_INITIALIZED)

this.logger('Attempting to redial peers (if possible)')
this.emit(SocketActionTypes.REDIAL_PEERS)
clearInterval(this.interval)
}
}, 2500)
Expand Down
1 change: 1 addition & 0 deletions packages/types/src/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export enum SocketActionTypes {
PEER_CONNECTED = 'peerConnected',
PEER_DISCONNECTED = 'peerDisconnected',
TOR_INITIALIZED = 'torInitialized',
REDIAL_PEERS = 'redialPeers',

// ====== Misc ======

Expand Down

0 comments on commit 4b592bb

Please sign in to comment.