Skip to content

Commit

Permalink
fix: Sagas running multiple times and socket closing
Browse files Browse the repository at this point in the history
  • Loading branch information
leblowl committed May 6, 2024
1 parent e81192e commit 0060adc
Show file tree
Hide file tree
Showing 18 changed files with 298 additions and 130 deletions.
8 changes: 4 additions & 4 deletions packages/backend/src/backendManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ export const runBackendMobile = async () => {
{ logger: ['warn', 'error', 'log', 'debug', 'verbose'] }
)

rn_bridge.channel.on('close', async () => {
rn_bridge.channel.on('close', () => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
await connectionsManager.pause()
connectionsManager.pause()
})

rn_bridge.channel.on('open', async (msg: OpenServices) => {
rn_bridge.channel.on('open', (msg: OpenServices) => {
const connectionsManager = app.get<ConnectionsManagerService>(ConnectionsManagerService)
const torControl = app.get<TorControl>(TorControl)
const proxyAgent = app.get<{ proxy: { port: string } }>(SOCKS_PROXY_AGENT)
Expand All @@ -123,7 +123,7 @@ export const runBackendMobile = async () => {
torControl.torControlParams.auth.value = msg.authCookie
proxyAgent.proxy.port = msg.httpTunnelPort

await connectionsManager.resume()
connectionsManager.resume()
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,24 +242,20 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
}
}

public closeSocket() {
this.logger('Closing socket server')
// TODO: We should call this.socketService.close() instead
this.serverIoProvider.io.close()
public async closeSocket() {
await this.socketService.close()
}

public async pause() {
this.logger('Pausing!')
this.logger('Closing socket!')
this.closeSocket()
await this.closeSocket()
this.logger('Pausing libp2pService!')
this.peerInfo = await this.libp2pService?.pause()
this.logger('Found the following peer info on pause: ', this.peerInfo)
}

public async resume() {
this.logger('Resuming!')
this.logger('Reopening socket!')
await this.openSocket()
this.logger('Dialing peers with info: ', this.peerInfo)
await this.libp2pService?.redialPeers(this.peerInfo)
Expand Down
60 changes: 53 additions & 7 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
import { ConfigOptions, ServerIoProviderTypes } from '../types'
import { suspendableSocketEvents } from './suspendable.events'
import Logger from '../common/logger'
import { sleep } from '../common/sleep'
import type net from 'node:net'

@Injectable()
export class SocketService extends EventEmitter implements OnModuleInit {
private readonly logger = Logger(SocketService.name)

public resolveReadyness: (value: void | PromiseLike<void>) => void
public readyness: Promise<void>
private listening: boolean
private closeSockets: () => void

constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
Expand All @@ -44,12 +48,14 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.readyness = new Promise<void>(resolve => {
this.resolveReadyness = resolve
})

this.listening = false
this.closeSockets = this.attachListeners()
}

async onModuleInit() {
this.logger('init: Started')

this.attachListeners()
await this.init()

this.logger('init: Finished')
Expand All @@ -71,7 +77,7 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.logger('init: Frontend connected')
}

private readonly attachListeners = (): void => {
private readonly attachListeners = (): (() => void) => {
// Attach listeners here
this.serverIoProvider.io.on(SocketActionTypes.CONNECTION, socket => {
this.logger('Socket connection')
Expand Down Expand Up @@ -195,25 +201,65 @@ export class SocketService extends EventEmitter implements OnModuleInit {
this.emit(SocketActionTypes.LOAD_MIGRATION_DATA, data)
})
})

// Ensure the underlying connections get closed. See:
// https://github.com/socketio/socket.io/issues/1602
//
// I also tried `this.serverIoProvider.io.disconnectSockets(true)`
// which didn't work for me.
const sockets = new Set<net.Socket>()

this.serverIoProvider.server.on('connection', conn => {
sockets.add(conn)
conn.on('close', () => {
sockets.delete(conn)
})
})

return () => sockets.forEach(s => s.destroy())
}

public getConnections = (): Promise<number> => {
return new Promise(resolve => {
this.serverIoProvider.server.getConnections((err, count) => {
if (err) throw new Error(err.message)
resolve(count)
})
})
}

public listen = async (port = this.configOptions.socketIOPort): Promise<void> => {
this.logger(`Opening data server on port ${this.configOptions.socketIOPort}`)

// Sometimes socket.io closes the HTTP server but doesn't close
// all underlying connections. So it doesn't appear that
// `this.serverIoProvider.server.listening` is sufficient.
if (this.listening) {
const numConnections = await this.getConnections()
this.logger('Failed to listen. Connections still open:', numConnections)
return
}

return await new Promise(resolve => {
if (this.serverIoProvider.server.listening) resolve()
this.serverIoProvider.server.listen(this.configOptions.socketIOPort, '127.0.0.1', () => {
this.logger(`Data server running on port ${this.configOptions.socketIOPort}`)
this.listening = true
resolve()
})
})
}

public close = async (): Promise<void> => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)
return await new Promise(resolve => {
this.serverIoProvider.server.close(err => {
public close = (): Promise<void> => {
return new Promise(resolve => {
this.logger(`Closing data server on port ${this.configOptions.socketIOPort}`)
this.serverIoProvider.io.close(err => {
if (err) throw new Error(err.message)
this.logger('Data server closed')
this.listening = false
resolve()
})
this.logger('Disconnecting sockets')
this.closeSockets()
})
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import { io } from 'socket.io-client'
import { select, put, call, cancel, fork, takeEvery, FixedTask, delay, apply } from 'typed-redux-saga'
import {
select,
put,
call,
cancel,
fork,
take,
takeLeading,
takeEvery,
FixedTask,
delay,
apply,
} from 'typed-redux-saga'
import { PayloadAction } from '@reduxjs/toolkit'
import { socket as stateManager, Socket } from '@quiet/state-manager'
import { encodeSecret } from '@quiet/common'
Expand Down Expand Up @@ -49,17 +61,20 @@ export function* startConnectionSaga(
})
yield* fork(handleSocketLifecycleActions, socket, action.payload)
// Handle opening/restoring connection
yield* takeEvery(initActions.setWebsocketConnected, setConnectedSaga, socket)
yield* takeLeading(initActions.setWebsocketConnected, setConnectedSaga, socket)
}

function* setConnectedSaga(socket: Socket): Generator {
console.log('Frontend is ready. Forking state-manager sagas and starting backend...')

const task = yield* fork(stateManager.useIO, socket)
console.log('WEBSOCKET', 'Forking state-manager sagas', task)
// Handle suspending current connection
yield* takeEvery(initActions.suspendWebsocketConnection, cancelRootTaskSaga, task)
console.log('Frontend is ready. Starting backend...')

// @ts-ignore - Why is this broken?
yield* apply(socket, socket.emit, [SocketActionTypes.START])

// Handle suspending current connection
const suspendAction = yield* take(initActions.suspendWebsocketConnection)
yield* call(cancelRootTaskSaga, task, suspendAction)
}

function* handleSocketLifecycleActions(socket: Socket, socketIOData: WebsocketConnectionPayload): Generator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { eventChannel } from 'redux-saga'
import { call, put, take } from 'typed-redux-saga'
import { call, put, take, cancelled } from 'typed-redux-saga'
import { app, publicChannels, WEBSOCKET_CONNECTION_CHANNEL, INIT_CHECK_CHANNEL, network } from '@quiet/state-manager'
import { initActions, InitCheckPayload, WebsocketConnectionPayload } from '../../init/init.slice'
import { ScreenNames } from '../../../const/ScreenNames.enum'
Expand All @@ -9,10 +9,18 @@ import { navigationActions } from '../../navigation/navigation.slice'
import { nativeServicesActions } from '../nativeServices.slice'

export function* nativeServicesCallbacksSaga(): Generator {
const channel = yield* call(deviceEvents)
while (true) {
const action = yield* take(channel)
yield put(action)
console.log('nativeServicesCallbacksSaga starting')
try {
const channel = yield* call(deviceEvents)
while (true) {
const action = yield* take(channel)
yield put(action)
}
} finally {
console.log('nativeServicesCallbacksSaga stopping')
if (yield cancelled()) {
console.log('nativeServicesCallbacksSaga cancelled')
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { select, call, put, takeLeading } from 'typed-redux-saga'
import { select, call, put } from 'typed-redux-saga'
import { app } from '@quiet/state-manager'
import { persistor } from '../../store'
import { nativeServicesActions } from '../nativeServices.slice'
Expand All @@ -9,11 +9,8 @@ import { ScreenNames } from '../../../../src/const/ScreenNames.enum'

export function* leaveCommunitySaga(): Generator {
console.log('Leaving community')

// Restart backend
yield* put(app.actions.closeServices())

yield takeLeading(initActions.canceledRootTask.type, clearReduxStore)
}

export function* clearReduxStore(): Generator {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import { all, fork, takeEvery } from 'typed-redux-saga'
import { all, fork, takeEvery, cancelled } from 'typed-redux-saga'
import { nativeServicesCallbacksSaga } from './events/nativeServicesCallbacks'
import { leaveCommunitySaga } from './leaveCommunity/leaveCommunity.saga'
import { flushPersistorSaga } from './flushPersistor/flushPersistor.saga'
import { nativeServicesActions } from './nativeServices.slice'

export function* nativeServicesMasterSaga(): Generator {
yield all([
fork(nativeServicesCallbacksSaga),
takeEvery(nativeServicesActions.leaveCommunity.type, leaveCommunitySaga),
takeEvery(nativeServicesActions.flushPersistor.type, flushPersistorSaga),
])
console.log('nativeServicesMasterSaga starting')
try {
yield all([
fork(nativeServicesCallbacksSaga),
takeEvery(nativeServicesActions.leaveCommunity.type, leaveCommunitySaga),
takeEvery(nativeServicesActions.flushPersistor.type, flushPersistorSaga),
])
} finally {
console.log('nativeServicesMasterSaga stopping')
if (yield cancelled()) {
console.log('nativeServicesMasterSaga cancelled')
}
}
}
28 changes: 19 additions & 9 deletions packages/mobile/src/store/root.saga.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
import { all, takeEvery } from 'typed-redux-saga'
import { all, takeEvery, takeLeading, fork, cancelled } from 'typed-redux-saga'
import { nativeServicesMasterSaga } from './nativeServices/nativeServices.master.saga'
import { navigationMasterSaga } from './navigation/navigation.master.saga'
import { initMasterSaga } from './init/init.master.saga'
import { initActions } from './init/init.slice'
import { setupCryptoSaga } from './init/setupCrypto/setupCrypto.saga'
import { publicChannels } from '@quiet/state-manager'
import { showNotificationSaga } from './nativeServices/showNotification/showNotification.saga'
import { clearReduxStore } from './nativeServices/leaveCommunity/leaveCommunity.saga'

export function* rootSaga(): Generator {
yield all([
takeEvery(initActions.setStoreReady.type, setupCryptoSaga),
takeEvery(initActions.setStoreReady.type, initMasterSaga),
takeEvery(initActions.setStoreReady.type, navigationMasterSaga),
takeEvery(initActions.setStoreReady.type, nativeServicesMasterSaga),
// Below line is reponsible for displaying notifications about messages from channels other than currently viewing one
takeEvery(publicChannels.actions.markUnreadChannel.type, showNotificationSaga),
])
console.log('rootSaga starting')
try {
yield all([
fork(setupCryptoSaga),
fork(initMasterSaga),
fork(navigationMasterSaga),
fork(nativeServicesMasterSaga),
// Below line is reponsible for displaying notifications about messages from channels other than currently viewing one
takeEvery(publicChannels.actions.markUnreadChannel.type, showNotificationSaga),
takeLeading(initActions.canceledRootTask.type, clearReduxStore),
])
} finally {
console.log('rootSaga stopping')
if (yield cancelled()) {
console.log('rootSaga cancelled')
}
}
}
20 changes: 14 additions & 6 deletions packages/state-manager/src/sagas/app/app.master.saga.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import { Socket } from '../../types'
import { all, takeEvery, takeLeading } from 'typed-redux-saga'
import { all, takeEvery, takeLeading, cancelled } from 'typed-redux-saga'
import { appActions } from './app.slice'
import { closeServicesSaga } from './closeServices.saga'
import { stopBackendSaga } from './stopBackend/stopBackend.saga'
import { loadMigrationDataSaga } from './loadMigrationData/loadMigrationData.saga'

export function* appMasterSaga(socket: Socket): Generator {
yield* all([
takeLeading(appActions.closeServices.type, closeServicesSaga, socket),
takeEvery(appActions.stopBackend.type, stopBackendSaga, socket),
takeEvery(appActions.loadMigrationData.type, loadMigrationDataSaga, socket),
])
console.log('appMasterSaga starting')
try {
yield* all([
takeLeading(appActions.closeServices.type, closeServicesSaga, socket),
takeEvery(appActions.stopBackend.type, stopBackendSaga, socket),
takeEvery(appActions.loadMigrationData.type, loadMigrationDataSaga, socket),
])
} finally {
console.log('appMasterSaga stopping')
if (yield cancelled()) {
console.log('appMasterSaga cancelled')
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import { all, fork } from 'typed-redux-saga'
import { all, fork, cancelled } from 'typed-redux-saga'
import { uptimeSaga } from './uptime/uptime.saga'

export function* connectionMasterSaga(): Generator {
yield all([fork(uptimeSaga)])
console.log('connectionMasterSaga starting')
try {
yield all([fork(uptimeSaga)])
} finally {
console.log('connectionMasterSaga stopping')
if (yield cancelled()) {
console.log('connectionMasterSaga cancelled')
}
}
}
Loading

0 comments on commit 0060adc

Please sign in to comment.