Skip to content

Commit

Permalink
(core) Manage memory used for websocket responses to reduce the risk …
Browse files Browse the repository at this point in the history
…of server crashes.

Summary:
- Implements MemoryPool for waiting on memory reservations.
- Uses MemoryPool to control memory used for stringifying JSON responses in Client.ts
- Limits total size of _missedMessages that may be queued for a particular client.
- Upgrades ws library, which may reduce memory usage, and allows pausing the websocket for testing.
  - The upgrade changed subtle behavior corners, requiring various fixes to code and tests.

- dos.ts:
  - Includes Paul's fixes and updates to the dos.ts script for manual stress-testing.
  - Logging tweaks, to avoid excessive dumps on uncaughtError, and include timestamps.

Test Plan:
- Includes a test that measures heap size, and fails without memory management.
- Includes a unittest for MemoryPool
- Some cleanup and additions to TestServer helper; in particular adds makeUserApi() helper used in multiple tests.
- Some fixes related to ws upgrade.

Reviewers: paulfitz

Reviewed By: paulfitz

Differential Revision: https://phab.getgrist.com/D3974
  • Loading branch information
dsagal committed Aug 7, 2023
1 parent 7c114bf commit 526a5df
Show file tree
Hide file tree
Showing 14 changed files with 676 additions and 98 deletions.
1 change: 1 addition & 0 deletions app/client/components/GristWSConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export class GristWSSettingsBrowser implements GristWSSettings {
export class GristWSConnection extends Disposable {
public useCount: number = 0;
public on: BackboneEvents['on']; // set by Backbone
public off: BackboneEvents['off']; // set by Backbone

protected trigger: BackboneEvents['trigger']; // set by Backbone

Expand Down
146 changes: 106 additions & 40 deletions app/server/lib/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ import type {Comm} from 'app/server/lib/Comm';
import {DocSession} from 'app/server/lib/DocSession';
import log from 'app/server/lib/log';
import {LogMethods} from "app/server/lib/LogMethods";
import {MemoryPool} from 'app/server/lib/MemoryPool';
import {shortDesc} from 'app/server/lib/shortDesc';
import {fromCallback} from 'app/server/lib/serverUtils';
import {i18n} from 'i18next';
import * as crypto from 'crypto';
import moment from 'moment';
import * as WebSocket from 'ws';

/// How many messages to accumulate for a disconnected client before booting it.
// How many messages and bytes to accumulate for a disconnected client before booting it.
// The benefit is that a client who temporarily disconnects and reconnects without missing much,
// would not need to reload the document.
const clientMaxMissedMessages = 100;
const clientMaxMissedBytes = 1_000_000;

export type ClientMethod = (client: Client, ...args: any[]) => Promise<unknown>;

Expand All @@ -33,6 +37,14 @@ const clientRemovalTimeoutMs = 300 * 1000; // 300s = 5 minutes.
// A hook for dependency injection.
export const Deps = {clientRemovalTimeoutMs};

// How much memory to allow using for large JSON responses before waiting for some to clear.
// Max total across all clients and all JSON responses.
const jsonResponseTotalReservation = 500 * 1024 * 1024;
// Estimate of a single JSON response, used before we know how large it is. Together with the
// above, it works to limit parallelism (to 25 responses that can be started in parallel).
const jsonResponseReservation = 20 * 1024 * 1024;
export const jsonMemoryPool = new MemoryPool(jsonResponseTotalReservation);

/**
* Generates and returns a random string to use as a clientId. This is better
* than numbering clients with consecutive integers; otherwise a reconnecting
Expand Down Expand Up @@ -81,9 +93,11 @@ export class Client {
private _docFDs: Array<DocSession|null> = [];

private _missedMessages = new Map<number, string>();
private _missedMessagesTotalLength: number = 0;
private _destroyTimer: NodeJS.Timer|null = null;
private _destroyed: boolean = false;
private _websocket: WebSocket|null;
private _websocketEventHandlers: Array<{event: string, handler: (...args: any[]) => void}> = [];
private _org: string|null = null;
private _profile: UserProfile|null = null;
private _userId: number|null = null;
Expand Down Expand Up @@ -124,9 +138,13 @@ export class Client {
this._counter = counter;
this.browserSettings = browserSettings;

websocket.on('error', (err) => this._onError(err));
websocket.on('close', () => this._onClose());
websocket.on('message', (msg: string) => this._onMessage(msg));
const addHandler = (event: string, handler: (...args: any[]) => void) => {
websocket.on(event, handler);
this._websocketEventHandlers.push({event, handler});
};
addHandler('error', (err: unknown) => this._onError(err));
addHandler('close', () => this._onClose());
addHandler('message', (msg: string) => this._onMessage(msg));
}

/**
Expand Down Expand Up @@ -173,7 +191,7 @@ export class Client {

public interruptConnection() {
if (this._websocket) {
this._websocket.removeAllListeners();
this._removeWebsocketListeners();
this._websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
this._websocket = null;
}
Expand All @@ -187,36 +205,69 @@ export class Client {
return;
}

const seqId = this._nextSeqId++;
const message: string = JSON.stringify({...messageObj, seqId});

// Log something useful about the message being sent.
if ('error' in messageObj && messageObj.error) {
this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error);
}
// Large responses require memory; with many connected clients this can crash the server. We
// manage it using a MemoryPool, waiting for free space to appear. This only controls the
// memory used to hold the JSON.stringify result. Once sent, the reservation is released.
//
// Actual process memory will go up also as the outgoing data is sitting in socket buffers,
// but this isn't part of Node's heap. If an outgoing buffer is full, websocket.send may
// block, and MemoryPool will delay other responses. There is a risk here of unresponsive
// clients exhausing the MemoryPool, perhaps intentionally. To mitigate, we could destroy
// clients that are too slow in reading. This isn't currently done.
//
// Also, we do not manage memory of responses moved to a client's _missedMessages queue. But
// we do limit those in size.
//
// Overall, a better solution would be to stream large responses, or to have the client
// request data piecemeal (as we'd have to for handling large data).

await jsonMemoryPool.withReserved(jsonResponseReservation, async (updateReservation) => {
if (this._destroyed) {
// If this Client got destroyed while waiting, stop here and release the reservation.
return;
}
const seqId = this._nextSeqId++;
const message: string = JSON.stringify({...messageObj, seqId});
const size = Buffer.byteLength(message, 'utf8');
updateReservation(size);

// Log something useful about the message being sent.
if ('error' in messageObj && messageObj.error) {
this._log.warn(null, "responding to #%d ERROR %s", messageObj.reqId, messageObj.error);
}

if (this._websocket) {
// If we have a websocket, send the message.
try {
await this._sendToWebsocket(message);
} catch (err) {
// Sending failed. Add the message to missedMessages.
this._log.warn(null, "sendMessage: queuing after send error:", err.toString());
if (this._websocket) {
// If we have a websocket, send the message.
try {
await this._sendToWebsocket(message);
// NOTE: A successful send does NOT mean the message was received. For a better system, see
// https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients
// (keeping a copy of messages until acked). With our system, we are more likely to be
// lacking the needed messages on reconnect, and having to reset the client.
return;
} catch (err) {
// Sending failed. Add the message to missedMessages.
this._log.warn(null, "sendMessage: queuing after send error:", err.toString());
}
}
if (this._missedMessages.size < clientMaxMissedMessages &&
this._missedMessagesTotalLength + message.length <= clientMaxMissedBytes) {
// Queue up the message.
// TODO: this keeps the memory but releases jsonMemoryPool reservation, which is wrong --
// it may allow too much memory to be used. This situation is rare, however, so maybe OK
// as is. Ideally, the queued messages could reserve memory in a "nice to have" mode, and
// if memory is needed for something more important, the queue would get dropped.
// (Holding on to the memory reservation here would creates a risk of freezing future
// responses, which seems *more* dangerous than a crash because a crash would at least
// lead to an eventual recovery.)
this._missedMessages.set(seqId, message);

// NOTE: A successful send does NOT mean the message was received. For a better system, see
// https://docs.microsoft.com/en-us/azure/azure-web-pubsub/howto-develop-reliable-clients
// (keeping a copy of messages until acked). With our system, we are more likely to be
// lacking the needed messages on reconnect, and having to reset the client.
this._missedMessagesTotalLength += message.length;
} else {
// Too many messages queued. Boot the client now, to make it reset when/if it reconnects.
this._log.warn(null, "sendMessage: too many messages queued; booting client");
this.destroy();
}
} else if (this._missedMessages.size < clientMaxMissedMessages) {
// Queue up the message.
this._missedMessages.set(seqId, message);
} else {
// Too many messages queued. Boot the client now, to make it reset when/if it reconnects.
this._log.warn(null, "sendMessage: too many messages queued; booting client");
this.destroy();
}
});
}

/**
Expand Down Expand Up @@ -256,6 +307,7 @@ export class Client {

// We collected any missed messages we need; clear the stored map of them.
this._missedMessages.clear();
this._missedMessagesTotalLength = 0;

let docsClosed: number|null = null;
if (!seamlessReconnect) {
Expand Down Expand Up @@ -344,6 +396,7 @@ export class Client {
this._destroyTimer = null;
}
this._missedMessages.clear();
this._missedMessagesTotalLength = 0;
this._comm.removeClient(this);
this._destroyed = true;
}
Expand Down Expand Up @@ -557,18 +610,31 @@ export class Client {
* Processes the closing of a websocket.
*/
private _onClose() {
this._websocket?.removeAllListeners();
this._removeWebsocketListeners();

// Remove all references to the websocket.
this._websocket = null;

// Schedule the client to be destroyed after a timeout. The timer gets cleared if the same
// client reconnects in the interim.
if (this._destroyTimer) {
this._log.warn(null, "clearing previously scheduled destruction");
clearTimeout(this._destroyTimer);
if (!this._destroyed) {
// Schedule the client to be destroyed after a timeout. The timer gets cleared if the same
// client reconnects in the interim.
if (this._destroyTimer) {
this._log.warn(null, "clearing previously scheduled destruction");
clearTimeout(this._destroyTimer);
}
this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000);
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
}
}

private _removeWebsocketListeners() {
if (this._websocket) {
// Avoiding websocket.removeAllListeners() because WebSocket.Server registers listeners
// internally for websockets it keeps track of, and we should not accidentally remove those.
for (const {event, handler} of this._websocketEventHandlers) {
this._websocket.off(event, handler);
}
this._websocketEventHandlers = [];
}
this._log.info(null, "websocket closed; will discard client in %s sec", Deps.clientRemovalTimeoutMs / 1000);
this._destroyTimer = setTimeout(() => this.destroy(), Deps.clientRemovalTimeoutMs);
}
}
6 changes: 5 additions & 1 deletion app/server/lib/Comm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ export class Comm extends EventEmitter {
public async testServerShutdown() {
if (this._wss) {
for (const wssi of this._wss) {
// Terminate all clients. WebSocket.Server used to do it automatically in close() but no
// longer does (see https://github.com/websockets/ws/pull/1904#discussion_r668844565).
for (const ws of wssi.clients) {
ws.terminate();
}
await fromCallback((cb) => wssi.close(cb));
}
this._wss = null;
Expand Down Expand Up @@ -259,7 +264,6 @@ export class Comm extends EventEmitter {
await this._onWebSocketConnection(websocket, req);
} catch (e) {
log.error("Comm connection for %s threw exception: %s", req.url, e.message);
websocket.removeAllListeners();
websocket.terminate(); // close() is inadequate when ws routed via loadbalancer
}
});
Expand Down
2 changes: 2 additions & 0 deletions app/server/lib/ITestingHooks-ti.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export const ITestingHooks = t.iface([], {
"commShutdown": t.func("void"),
"commRestart": t.func("void"),
"commSetClientPersistence": t.func("number", t.param("ttlMs", "number")),
"commSetClientJsonMemoryLimit": t.func("number", t.param("newTotalSize", "number")),
"closeDocs": t.func("void"),
"setDocWorkerActivation": t.func("void", t.param("workerId", "string"), t.param("active", t.union(t.lit('active'), t.lit('inactive'), t.lit('crash')))),
"flushAuthorizerCache": t.func("void"),
Expand All @@ -21,6 +22,7 @@ export const ITestingHooks = t.iface([], {
"setActiveDocTimeout": t.func("number", t.param("seconds", "number")),
"setDiscourseConnectVar": t.func(t.union("string", "null"), t.param("varName", "string"), t.param("value", t.union("string", "null"))),
"setWidgetRepositoryUrl": t.func("void", t.param("url", "string")),
"getMemoryUsage": t.func("object"),
});

const exportedTypeSuite: t.ITypeSuite = {
Expand Down
2 changes: 2 additions & 0 deletions app/server/lib/ITestingHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface ITestingHooks {
commShutdown(): Promise<void>;
commRestart(): Promise<void>;
commSetClientPersistence(ttlMs: number): Promise<number>;
commSetClientJsonMemoryLimit(newTotalSize: number): Promise<number>;
closeDocs(): Promise<void>;
setDocWorkerActivation(workerId: string, active: 'active'|'inactive'|'crash'): Promise<void>;
flushAuthorizerCache(): Promise<void>;
Expand All @@ -17,4 +18,5 @@ export interface ITestingHooks {
setActiveDocTimeout(seconds: number): Promise<number>;
setDiscourseConnectVar(varName: string, value: string|null): Promise<string|null>;
setWidgetRepositoryUrl(url: string): Promise<void>;
getMemoryUsage(): Promise<object>; // actually NodeJS.MemoryUsage
}
114 changes: 114 additions & 0 deletions app/server/lib/MemoryPool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import Deque from 'double-ended-queue';

/**
* Usage:
*
* OPTION 1, using a callback, which may be async (but doesn't have to be).
*
* await mpool.withReserved(initialSize, async (updateReservation) => {
* ...
* updateReservation(newSize); // if needed
* ...
* });
*
* OPTION 2, lower-level.
*
* Note: dispose() MUST be called (e.g. using try/finally). If not called, other work will
* eventually deadlock waiting for it.
*
* const memoryReservation = await mpool.waitAndReserve(initialSize);
* try {
* ...
* memoryReservation.updateReservation(newSize1); // if needed
* memoryReservation.updateReservation(newSize2); // if needed
* ...
* } finally {
* memoryReservation.dispose();
* }
*
* With both options, it's common for the initialSize to be a pool estimate. You may call
* updateReservation() to update it. If it lowers the estimate, other work may unblock. If it
* raises it, it may delay future work, but will have no impact on work that's already unblocked.
* So it's always safer for initialSize to be an overestimate.
*
* When it's hard to estimate initialSize in bytes, you may specify it as e.g.
* memPool.getTotalSize() / 20. This way at most 20 such parallel tasks may be unblocked at a
* time, and further ones will wait until some release their memory or revise down their estimate.
*/
export class MemoryPool {
private _reservedSize: number = 0;
private _queue = new Deque<MemoryAwaiter>();

constructor(private _totalSize: number) {}

public getTotalSize(): number { return this._totalSize; }
public getReservedSize(): number { return this._reservedSize; }
public getAvailableSize(): number { return this._totalSize - this._reservedSize; }
public isEmpty(): boolean { return this._reservedSize === 0; }
public hasSpace(size: number): boolean { return this._reservedSize + size <= this._totalSize; }

// To avoid failures, allow reserving more than totalSize when memory pool is empty.
public hasSpaceOrIsEmpty(size: number): boolean { return this.hasSpace(size) || this.isEmpty(); }

public numWaiting(): number { return this._queue.length; }

public async waitAndReserve(size: number): Promise<MemoryReservation> {
if (this.hasSpaceOrIsEmpty(size)) {
this._updateReserved(size);
} else {
await new Promise<void>(resolve => this._queue.push({size, resolve}));
}
return new MemoryReservation(size, this._updateReserved.bind(this));
}

public async withReserved(size: number, callback: (updateRes: UpdateReservation) => void|Promise<void>) {
const memRes = await this.waitAndReserve(size);
try {
return await callback(memRes.updateReservation.bind(memRes));
} finally {
memRes.dispose();
}
}

// Update the total size. Returns the old size. This is intended for testing.
public setTotalSize(newTotalSize: number): number {
const oldTotalSize = this._totalSize;
this._totalSize = newTotalSize;
this._checkWaiting();
return oldTotalSize;
}

private _checkWaiting() {
while (!this._queue.isEmpty() && this.hasSpaceOrIsEmpty(this._queue.peekFront()!.size)) {
const item = this._queue.shift()!;
this._updateReserved(item.size);
item.resolve();
}
}

private _updateReserved(sizeDelta: number): void {
this._reservedSize += sizeDelta;
this._checkWaiting();
}
}

type UpdateReservation = (sizeDelta: number) => void;

export class MemoryReservation {
constructor(private _size: number, private _updateReserved: UpdateReservation) {}

public updateReservation(newSize: number) {
this._updateReserved(newSize - this._size);
this._size = newSize;
}

public dispose() {
this.updateReservation(0);
this._updateReserved = undefined as any; // Make sure we don't keep using it after dispose
}
}

interface MemoryAwaiter {
size: number;
resolve: () => void;
}
Loading

0 comments on commit 526a5df

Please sign in to comment.