Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use worker to prevent timer throttling #1557

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions packages/client/generate-timer-worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

npx tsc src/timers/worker.ts \
--skipLibCheck \
--removeComments \
--module preserve \
--lib ES2020,WebWorker \
--outDir worker-dist

cat <<EOF >src/timers/worker.build.ts
export const timerWorker = {
src: \`$(<worker-dist/worker.js)\`,
};
EOF

rm -r worker-dist
5 changes: 3 additions & 2 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
"scripts": {
"clean": "rimraf dist",
"start": "rollup -w -c",
"build": "yarn clean && rollup -c",
"build": "yarn clean && ./generate-timer-worker.sh && rollup -c",
"test": "vitest",
"clean:docs": "rimraf generated-docs",
"test-ci": "vitest --coverage",
"generate:open-api": "./generate-openapi.sh protocol",
"generate:open-api:dev": "./generate-openapi.sh chat"
"generate:open-api:dev": "./generate-openapi.sh chat",
"generate:timer-worker": "./generate-timer-worker.sh"
},
"files": [
"dist",
Expand Down
10 changes: 6 additions & 4 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
promiseWithResolvers,
PromiseWithResolvers,
} from './helpers/withResolvers';
import { getTimers } from './timers';

export type StreamSfuClientConstructor = {
/**
Expand Down Expand Up @@ -110,7 +111,7 @@ export class StreamSfuClient {
isLeaving = false;

private readonly rpc: SignalServerClient;
private keepAliveInterval?: NodeJS.Timeout;
private keepAliveInterval?: number;
private connectionCheckTimeout?: NodeJS.Timeout;
private migrateAwayTimeout?: NodeJS.Timeout;
private pingIntervalInMs = 10 * 1000;
Expand Down Expand Up @@ -263,7 +264,7 @@ export class StreamSfuClient {

private handleWebSocketClose = () => {
this.signalWs.removeEventListener('close', this.handleWebSocketClose);
clearInterval(this.keepAliveInterval);
getTimers().clearInterval(this.keepAliveInterval);
clearTimeout(this.connectionCheckTimeout);
this.onSignalClose?.();
};
Expand Down Expand Up @@ -489,8 +490,9 @@ export class StreamSfuClient {
};

private keepAlive = () => {
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = setInterval(() => {
const timers = getTimers();
timers.clearInterval(this.keepAliveInterval);
this.keepAliveInterval = timers.setInterval(() => {
this.ping().catch((e) => {
this.logger('error', 'Error sending healthCheckRequest to SFU', e);
});
Expand Down
10 changes: 6 additions & 4 deletions packages/client/src/coordinator/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
UR,
} from './types';
import type { ConnectedEvent, WSAuthMessage } from '../../gen/coordinator';
import { getTimers } from '../../timers';

// Type guards to check WebSocket error type
const isCloseEvent = (
Expand Down Expand Up @@ -58,7 +59,7 @@ export class StableWSConnection {
authenticationSent: boolean;
consecutiveFailures: number;
pingInterval: number;
healthCheckTimeoutRef?: NodeJS.Timeout;
healthCheckTimeoutRef?: number;
isConnecting: boolean;
isDisconnected: boolean;
isHealthy: boolean;
Expand Down Expand Up @@ -249,7 +250,7 @@ export class StableWSConnection {

// start by removing all the listeners
if (this.healthCheckTimeoutRef) {
clearInterval(this.healthCheckTimeoutRef);
getTimers().clearInterval(this.healthCheckTimeoutRef);
}
if (this.connectionCheckTimeoutRef) {
clearInterval(this.connectionCheckTimeoutRef);
Expand Down Expand Up @@ -757,12 +758,13 @@ export class StableWSConnection {
* Schedules a next health check ping for websocket.
*/
scheduleNextPing = () => {
const timers = getTimers();
if (this.healthCheckTimeoutRef) {
clearTimeout(this.healthCheckTimeoutRef);
timers.clearTimeout(this.healthCheckTimeoutRef);
}

// 30 seconds is the recommended interval (messenger uses this)
this.healthCheckTimeoutRef = setTimeout(() => {
this.healthCheckTimeoutRef = timers.setTimeout(() => {
// send the healthcheck..., server replies with a health check event
const data = [{ type: 'health.check', client_id: this.client.clientID }];
// try to send on the connection
Expand Down
126 changes: 126 additions & 0 deletions packages/client/src/timers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import { lazy } from '../helpers/lazy';
import { getLogger } from '../logger';
import { TimerWorkerEvent, TimerWorkerRequest } from './types';
import { timerWorker } from './worker.build';

class TimerWorker {
private currentTimerId = 1;
private callbacks = new Map<number, () => void>();
private worker: Worker | undefined;
private fallback = false;

setup(): void {
try {
const source = timerWorker.src;
const blob = new Blob([source], {
type: 'application/javascript; charset=utf-8',
});
const script = URL.createObjectURL(blob);
this.worker = new Worker(script, { name: 'str-timer-worker' });
this.worker.addEventListener('message', (event) => {
const { type, id } = event.data as TimerWorkerEvent;
if (type === 'tick') {
this.callbacks.get(id)?.();
}
});
} catch (err: any) {
getLogger(['timer-worker'])('error', err);
this.fallback = true;
}
}

destroy(): void {
this.callbacks.clear();
this.worker?.terminate();
this.worker = undefined;
this.fallback = false;
}

get ready() {
return this.fallback || Boolean(this.worker);
}

setInterval(callback: () => void, timeout: number): number {
return this.setTimer('setInterval', callback, timeout);
}

clearInterval(id?: number): void {
this.clearTimer('clearInterval', id);
}

setTimeout(callback: () => void, timeout: number): number {
return this.setTimer('setTimeout', callback, timeout);
}

clearTimeout(id?: number): void {
this.clearTimer('clearTimeout', id);
}

private setTimer(
type: 'setTimeout' | 'setInterval',
callback: () => void,
timeout: number,
) {
if (!this.ready) {
this.setup();
}

if (this.fallback) {
return (type === 'setTimeout' ? setTimeout : setInterval)(
callback,
timeout,
) as unknown as number;
}

const id = this.getTimerId();

this.callbacks.set(id, () => {
callback();

// Timeouts are one-off operations, so no need to keep callback reference
// after timer has fired
if (type === 'setTimeout') {
this.callbacks.delete(id);
}
});

this.sendMessage({ type, id, timeout });
return id;
}

private clearTimer(type: 'clearTimeout' | 'clearInterval', id?: number) {
if (!id) {
return;
}

if (!this.ready) {
this.setup();
}

if (this.fallback) {
(type === 'clearTimeout' ? clearTimeout : clearInterval)(id);
return;
}

this.callbacks.delete(id);
this.sendMessage({ type, id });
}

private getTimerId() {
return this.currentTimerId++;
}

private sendMessage(message: TimerWorkerRequest) {
if (!this.worker) {
throw new Error("Cannot use timer worker before it's set up");
}

this.worker.postMessage(message);
}
}

export const getTimers = lazy(() => {
const instance = new TimerWorker();
instance.setup();
return instance;
});
15 changes: 15 additions & 0 deletions packages/client/src/timers/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export type TimerWorkerRequest =
| {
type: 'setInterval' | 'setTimeout';
id: number;
timeout: number;
}
| {
type: 'clearInterval' | 'clearTimeout';
id: number;
};

export type TimerWorkerEvent = {
type: 'tick';
id: number;
};
9 changes: 9 additions & 0 deletions packages/client/src/timers/worker.build.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Do not modify this file manually. You can edit worker.ts if necessary
// and the run ./generate-timer-worker.sh
export const timerWorker = {
get src(): string {
throw new Error(
'Timer worker source missing. Did you forget to run generate-timer-worker.sh?',
);
},
};
40 changes: 40 additions & 0 deletions packages/client/src/timers/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/* eslint-disable */

import type { TimerWorkerEvent, TimerWorkerRequest } from './types';

const timerIdMapping = new Map<number, NodeJS.Timeout>();

self.addEventListener('message', (event: MessageEvent) => {
const request = event.data as TimerWorkerRequest;

switch (request.type) {
case 'setTimeout':
case 'setInterval':
timerIdMapping.set(
request.id,
(request.type === 'setTimeout' ? setTimeout : setInterval)(() => {
tick(request.id);

if (request.type === 'setTimeout') {
timerIdMapping.delete(request.id);
}
}, request.timeout),
);
break;

case 'clearTimeout':
case 'clearInterval':
(request.type === 'clearTimeout' ? clearTimeout : clearInterval)(
timerIdMapping.get(request.id),
);
timerIdMapping.delete(request.id);
break;
}
});

function tick(id: number) {
const message: TimerWorkerEvent = { type: 'tick', id };
self.postMessage(message);
}

/* eslint-enable */
Loading