Skip to content

Commit

Permalink
refactor: improve error handling in Server type
Browse files Browse the repository at this point in the history
This refactoring improves readability and reduces coupling between
the `Server` and `Topology` types. Previously errors experienced
at the `Server` level would emit an `error` event, which needed to
be caught by the `Topology`. Since nothing about error handling
required access to the `Topology` type, all of that behavior was
moved back into the `Server` type.

NODE-2449
  • Loading branch information
mbroadst committed Feb 6, 2020
1 parent 21195ce commit 5bf0df8
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 89 deletions.
22 changes: 11 additions & 11 deletions lib/core/sdam/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const collationNotSupported = require('../utils').collationNotSupported;
const debugOptions = require('../connection/utils').debugOptions;
const isSDAMUnrecoverableError = require('../error').isSDAMUnrecoverableError;
const isNetworkTimeoutError = require('../error').isNetworkTimeoutError;
const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
const maxWireVersion = require('../utils').maxWireVersion;
const makeStateMachine = require('../utils').makeStateMachine;
const common = require('./common');

Expand Down Expand Up @@ -140,15 +142,7 @@ class Server extends EventEmitter {
this.s.pool.clear();
});

this[kMonitor].on('resetServer', error => {
// Revert to an `Unknown` state by emitting a default description with no isMaster, and the
// error from the heartbeat attempt
this.emit(
'descriptionReceived',
new ServerDescription(this.description.address, null, { error })
);
});

this[kMonitor].on('resetServer', error => markServerUnknown(this, error));
this[kMonitor].on('serverHeartbeatSucceeded', event => {
this.emit(
'descriptionReceived',
Expand Down Expand Up @@ -477,10 +471,16 @@ function makeOperationHandler(server, options, callback) {
}

if (!isNetworkTimeoutError(err)) {
server.emit('error', err);
markServerUnknown(server, err);
server.s.pool.clear();
}
} else if (isSDAMUnrecoverableError(err)) {
server.emit('error', err);
if (maxWireVersion(server) <= 7 || isNodeShuttingDownError(err)) {
server.s.pool.clear();
}

markServerUnknown(server, err);
process.nextTick(() => server.requestCheck());
}
}

Expand Down
74 changes: 6 additions & 68 deletions lib/core/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ const deprecate = require('util').deprecate;
const BSON = require('../connection/utils').retrieveBSON();
const createCompressionInfo = require('../topologies/shared').createCompressionInfo;
const isRetryableError = require('../error').isRetryableError;
const isNodeShuttingDownError = require('../error').isNodeShuttingDownError;
const maxWireVersion = require('../utils').maxWireVersion;
const ClientSession = require('../sessions').ClientSession;
const MongoError = require('../error').MongoError;
const MongoServerSelectionError = require('../error').MongoServerSelectionError;
Expand Down Expand Up @@ -55,7 +53,7 @@ const SERVER_RELAY_EVENTS = [
].concat(CMAP_EVENT_NAMES);

// all events we listen to from `Server` instances
const LOCAL_SERVER_EVENTS = ['error', 'connect', 'descriptionReceived', 'close', 'ended'];
const LOCAL_SERVER_EVENTS = ['connect', 'descriptionReceived', 'close', 'ended'];

const STATE_CLOSING = common.STATE_CLOSING;
const STATE_CLOSED = common.STATE_CLOSED;
Expand Down Expand Up @@ -279,7 +277,7 @@ class Topology extends EventEmitter {

translateReadPreference(options);
const readPreference = options.readPreference || ReadPreference.primary;
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
this.selectServer(readPreferenceServerSelector(readPreference), options, err => {
if (err) {
this.close();

Expand All @@ -292,28 +290,11 @@ class Topology extends EventEmitter {
return;
}

const errorHandler = err => {
stateTransition(this, STATE_CLOSED);
server.removeListener('connect', connectHandler);
if (typeof callback === 'function') callback(err, null);
};

const connectHandler = (_, err) => {
stateTransition(this, STATE_CONNECTED);
server.removeListener('error', errorHandler);
this.emit('open', err, this);
this.emit('connect', this);

if (typeof callback === 'function') callback(err, this);
};

if (server.s.state === STATE_CONNECTING) {
server.once('error', errorHandler);
server.once('connect', connectHandler);
return;
}
stateTransition(this, STATE_CONNECTED);
this.emit('open', err, this);
this.emit('connect', this);

connectHandler();
if (typeof callback === 'function') callback(err, this);
});
}

Expand Down Expand Up @@ -794,9 +775,6 @@ function destroyServer(server, topology, options, callback) {
options = options || {};
LOCAL_SERVER_EVENTS.forEach(event => server.removeAllListeners(event));

// register a no-op for errors, we don't care now that we are destroying the server
server.on('error', () => {});

server.destroy(options, () => {
topology.emit(
'serverClosed',
Expand Down Expand Up @@ -843,7 +821,6 @@ function createAndConnectServer(topology, serverDescription, connectDelay) {
relayEvents(server, topology, SERVER_RELAY_EVENTS);

server.on('descriptionReceived', topology.serverUpdateHandler.bind(topology));
server.on('error', serverErrorEventHandler(server, topology));

if (connectDelay) {
const connectTimer = setTimeout(() => {
Expand Down Expand Up @@ -904,21 +881,6 @@ function updateServers(topology, incomingServerDescription) {
}
}

function serverErrorEventHandler(server, topology) {
return function(err) {
if (topology.s.state === STATE_CLOSING || topology.s.state === STATE_CLOSED) {
return;
}

if (maxWireVersion(server) >= 8 && !isNodeShuttingDownError(err)) {
resetServerState(server, err);
return;
}

resetServerState(server, err, { clearPool: true });
};
}

function executeWriteOperation(args, options, callback) {
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};
Expand Down Expand Up @@ -972,30 +934,6 @@ function executeWriteOperation(args, options, callback) {
});
}

/**
* Resets the internal state of this server to `Unknown` by simulating an empty ismaster
*
* @private
* @param {Server} server
* @param {MongoError} error The error that caused the state reset
* @param {object} [options] Optional settings
* @param {boolean} [options.clearPool=false] Pool should be cleared out on state reset
*/
function resetServerState(server, error, options) {
options = Object.assign({}, { clearPool: false }, options);

if (options.clearPool && server.s.pool) {
server.s.pool.clear();
}

server.emit(
'descriptionReceived',
new ServerDescription(server.description.address, null, { error })
);

process.nextTick(() => server.requestCheck());
}

function translateReadPreference(options) {
if (options.readPreference == null) {
return;
Expand Down
2 changes: 1 addition & 1 deletion test/unit/sdam/srv_polling.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ describe('Mongos SRV Polling', function() {
});
});

srvPoller.trigger(recordSets[1]);
process.nextTick(() => srvPoller.trigger(recordSets[1]));
} catch (e) {
done(e);
}
Expand Down
18 changes: 9 additions & 9 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ describe('Topology (unit)', function() {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));
let serverDescription;
server.on('descriptionReceived', sd => (serverDescription = sd));

let poolCleared = false;
topology.on('connectionPoolCleared', () => (poolCleared = true));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(err).to.eql(serverDescription.error);
expect(poolCleared).to.be.true;
done();
});
Expand Down Expand Up @@ -176,16 +176,16 @@ describe('Topology (unit)', function() {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));
let serverDescription;
server.on('descriptionReceived', sd => (serverDescription = sd));

let poolCleared = false;
topology.on('connectionPoolCleared', () => (poolCleared = true));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(err).to.eql(serverDescription.error);
expect(poolCleared).to.be.false;
done();
});
Expand Down Expand Up @@ -213,13 +213,13 @@ describe('Topology (unit)', function() {
expect(err).to.not.exist;
this.defer(() => topology.close());

let serverError;
server.on('error', err => (serverError = err));
let serverDescription;
server.on('descriptionReceived', sd => (serverDescription = sd));

server.command('test.test', { insert: { a: 42 } }, (err, result) => {
expect(result).to.not.exist;
expect(err).to.exist;
expect(err).to.eql(serverError);
expect(err).to.eql(serverDescription.error);
expect(server.description.type).to.equal('Unknown');
done();
});
Expand Down

0 comments on commit 5bf0df8

Please sign in to comment.