Skip to content

Commit

Permalink
Add health check mixin
Browse files Browse the repository at this point in the history
  • Loading branch information
Americas committed Mar 21, 2024
1 parent 348ea73 commit c3b41cf
Show file tree
Hide file tree
Showing 8 changed files with 509 additions and 133 deletions.
4 changes: 3 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ Create a reader:
- `maxConnectionAttempts` max reconnection attempts [Infinity]
- `maxInFlight` max messages distributed across connections [10]
- `msgTimeout` session-specific msg timeout
- `pollInterval` nsqlookupd poll interval[10000]
- `pollInterval` nsqlookupd poll interval [10000]
- `ready` when `false` auto-RDY maintenance will be disabled
- `trace` trace function
- `healthCheck` setup health check [false]

Events:

Expand All @@ -133,6 +134,7 @@ an address string is passed, or an object with the nsqd option:

- `nsqd` array of nsqd addresses
- `maxConnectionAttempts` max reconnection attempts [Infinity]
- `healthCheck` setup health check [false]

Events:

Expand Down
3 changes: 1 addition & 2 deletions lib/mixins/close.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ module.exports = reader => {
return;
}

reader.emit('close');
reader.conns.forEach(conn => conn.end());
reader.end();
}

reader.once('closing', () => {
Expand Down
31 changes: 31 additions & 0 deletions lib/mixins/health-check.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict';

/**
* Health check handling mixin.
*
* @param {Reader|Writer} client
* @api private
*/

module.exports = client => {
if (!client.healthCheck) {
return;
}

client.getHealthStatus = () => {
const healthData = { connections: { active: client.conns.size } };

if (client.nsqd) {
healthData.connections.expected = client.nsqd.length;
}

if (client.nsqlookupd) {
healthData.lookups = {
active: client.nsqlookupd.length - client.lookupErrors,
expected: client.nsqlookupd.length
};
}

return healthData;
};
};
32 changes: 22 additions & 10 deletions lib/reader.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Connection = require('./connection');
const assert = require('node:assert');
const close = require('./mixins/close');
const debug = require('debug')('nsq:reader');
const healthCheck = require('./mixins/health-check');
const lookup = require('nsq-lookup');
const ready = require('./mixins/ready');
const reconnect = require('./mixins/reconnect');
Expand Down Expand Up @@ -36,6 +37,7 @@ class Reader extends EventEmitter {
* @param {Function} [options.trace] - trace function
* @param {Number} [options.maxConnectionAttempts=Infinity] - max reconnection attempts
* @param {String} [options.id] - client identifier
* @param {Boolean} [options.healthCheck=false] - setup health check
* @api public
*/

Expand All @@ -51,6 +53,7 @@ class Reader extends EventEmitter {
this.trace = options.trace || function() {};
this.maxConnectionAttempts = options.maxConnectionAttempts ?? Infinity;
this.pollInterval = options.pollInterval || 20000;
this.healthCheck = options.healthCheck ?? false;
this.maxAttempts = options.maxAttempts || Infinity;
this.maxInFlight = options.maxInFlight || 10;
this.msgTimeout = options.msgTimeout;
Expand All @@ -67,6 +70,9 @@ class Reader extends EventEmitter {
// Add close mixin.
close(this);

// Add health check mixin.
healthCheck(this);

// Defer connecting to nodes.
setImmediate(() => this.connect());
}
Expand All @@ -90,7 +96,9 @@ class Reader extends EventEmitter {

// If we have a list of nsqlookupd servers,
// do a lookup for relevant nodes and connect to them.
this.lookup((_, nodes) => {
this.lookup((errors, nodes) => {
this.lookupErrors = errors?.length ?? 0;

for (const node of nodes) {
this.connectTo(node);
}
Expand All @@ -117,6 +125,10 @@ class Reader extends EventEmitter {
for (const error of errors) {
this.emit('error lookup', error);
}

this.lookupErrors = errors.length;
} else {
this.lookupErrors = 0;
}

for (const node of nodes) {
Expand Down Expand Up @@ -328,13 +340,9 @@ class Reader extends EventEmitter {

if (this.conns.size === 0) {
this.emit('close');

return;
}

this.conns.forEach(conn => {
conn.close();
});
this.conns.forEach(conn => conn.close());
}

/**
Expand All @@ -347,20 +355,24 @@ class Reader extends EventEmitter {
end(fn) {
debug('end');

if (fn) {
this.once('close', fn);
}

clearInterval(this.timer);

let n = this.conns.size;

if (n === 0 && fn) {
fn();
if (n === 0) {
this.emit('close');
}

this.conns.forEach(conn => {
conn.end(() => {
debug('%s - conn ended', conn.addr);

if (fn && !--n) {
fn();
if (--n === 0) {
this.emit('close');
}
});
});
Expand Down
29 changes: 21 additions & 8 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
const { EventEmitter } = require('node:events');
const Connection = require('./connection');
const debug = require('debug')('nsq:writer');
const healthCheck = require('./mixins/health-check');
const reconnect = require('./mixins/reconnect');
const utils = require('./utils');

Expand All @@ -17,6 +18,7 @@ class Writer extends EventEmitter {
* @param {String|Object} options - can be either the nsqd address string or an object with the following options
* @param {String[]} [options.nsqd] - nsqd addresses
* @param {Number} [options.maxConnectionAttempts=Infinity] - max reconnection attempts
* @param {Boolean} [options.healthCheck=false] - setup health check
* @api public
*/

Expand All @@ -32,9 +34,13 @@ class Writer extends EventEmitter {
this.maxConnectionAttempts = options.maxConnectionAttempts ?? Infinity;
this.nsqd = options.nsqd || [''];
this.conns = new Set();
this.healthCheck = options.healthCheck ?? false;
this.pendingConns = new Set();
this.publishQueue = [];

// Add health check mixin.
healthCheck(this);

this.connect();
this.n = 0;
}
Expand Down Expand Up @@ -104,7 +110,10 @@ class Writer extends EventEmitter {

conn.on('close', () => {
this.conns.delete(conn);
this.pendingConns.add(conn);

if (!conn.closing) {
this.pendingConns.add(conn);
}

debug('%s - remove from pool (total: %s)', address, this.conns.size);
});
Expand Down Expand Up @@ -162,22 +171,26 @@ class Writer extends EventEmitter {
*/

close(fn) {
debug('start close');

this.pendingConns.forEach(conn => conn.destroy());
debug('close');

if (!this.conns.size && fn) {
fn();
if (fn) {
this.once('close', fn);
}

this.pendingConns.forEach(conn => conn.destroy());

let n = this.conns.size;

if (n === 0) {
this.emit('close');
}

this.conns.forEach(conn => {
conn.end(() => {
debug('%s - conn ended', conn.addr);

if (!--n && fn) {
fn();
if (--n === 0) {
this.emit('close');
}
});
});
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
"backo": "^1.1.0",
"debug": "^4.3.4",
"ms": "^2.1.3",
"nsq-lookup": "https://github.com/uphold-forks/nsq-lookup#1e6e4eb37a07649a5dc430dbf69b65c1b05253be"
"nsq-lookup": "https://github.com/uphold-forks/nsq-lookup#b01fdd518459785b9561424d17c4eb5be4bd85dd"
},
"devDependencies": {
"bytes": "^2.4.0",
"eslint": "^8.56.0",
"jstrace": "^0.3.0",
"mocha": "^3.1.0",
"sinon": "^17.0.1",
"superagent": "~3.8.3",
"superagent": "^8.1.2",
"uid": "^0.0.2"
},
"engines": {
Expand Down
Loading

0 comments on commit c3b41cf

Please sign in to comment.