Skip to content

Commit

Permalink
feat: add exponential backoff to API client
Browse files Browse the repository at this point in the history
  • Loading branch information
dszakallas committed May 18, 2017
1 parent b463c22 commit 9cf8e5d
Show file tree
Hide file tree
Showing 13 changed files with 422 additions and 51 deletions.
14 changes: 14 additions & 0 deletions lib/agent/api/httpError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

var http = require('http')
var inherits = require('util').inherits

function HttpError (statusCode, response) {
Error.captureStackTrace && Error.captureStackTrace(this, this.constructor)
this.message = String(statusCode) + ' - ' + http.STATUS_CODES[statusCode]
this.statusCode = statusCode
this.response = response
}
inherits(HttpError, Error)

module.exports = HttpError
51 changes: 51 additions & 0 deletions lib/agent/api/httpRetry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
'use strict'

var HttpError = require('./httpError')
var exponentialRetry = require('../../utils/exponentialRetry')

var DEFAULT_MAX_RETRIES = Infinity
var DEFAULT_MAX_WAIT = 10 * 60 * 1000
var DEFAULT_EXP_SCALE = 0.828
var DEFAULT_LIN_SCALE = 150
var DEFAULT_ERR_SCALE = 0.24 // +-12% error

function httpRetry (opts, cb) {
opts = opts || {}
var client = opts.client
var payload = opts.payload
var reqOpts = opts.reqOpts
var errorFilter = opts.errorFilter
var maxRetries = opts.maxRetries != null ? opts.maxRetries : DEFAULT_MAX_RETRIES
var maxWait = opts.maxWait != null ? opts.maxWait : DEFAULT_MAX_WAIT

function httpRequest (cb) {
var completed = false
var req = client.request(reqOpts, function (response) {
completed = true
if (response.statusCode >= 400) {
return cb(new HttpError(response.statusCode), response)
}
return cb(null, response)
})
req.on('error', function (err) {
if (!completed) {
completed = true
return cb(err)
}
})
if (payload) {
req.write(payload)
}
req.end()
}
return exponentialRetry({
maxRetries: maxRetries,
maxWait: maxWait,
expScale: DEFAULT_EXP_SCALE,
linScale: DEFAULT_LIN_SCALE,
errScale: DEFAULT_ERR_SCALE,
errorFilter: errorFilter
}, httpRequest, cb)
}

module.exports = httpRetry
68 changes: 68 additions & 0 deletions lib/agent/api/httpRetry.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
'use strict'

var http = require('http')
var HttpError = require('./httpError')
var httpRetry = require('./httpRetry')
var expect = require('chai').expect
var nock = require('nock')
var bl = require('bl')

describe('httpRetry', function (done) {
it('retries', function (done) {
nock('http://something.com')
.post('/', 'data')
.reply(500)
nock('http://something.com')
.post('/', 'data')
.reply(200, 'response')

this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) {
return process.nextTick(cb)
})

httpRetry({
client: http,
maxRetries: 1,
reqOpts: {
hostname: 'something.com',
method: 'POST',
path: '/'
},
payload: 'data'
}, function (err, data) {
expect(err).to.not.exist
data.pipe(bl(function (err, data) {
expect(err).not.to.exist
expect(data.toString()).to.eql('response')
done()
}))
})
})
it('returns error', function (done) {
nock('http://something.com')
.post('/', 'data')
.reply(500, 'bad')

this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) {
return process.nextTick(cb)
})

httpRetry({
client: http,
maxRetries: 0,
reqOpts: {
hostname: 'something.com',
method: 'POST',
path: '/'
},
payload: 'data'
}, function (err, data) {
expect(err).to.be.instanceof(HttpError)
data.pipe(bl(function (err, data) {
expect(err).to.not.exist
expect(data.toString()).to.eql('bad')
done()
}))
})
})
})
67 changes: 30 additions & 37 deletions lib/agent/api/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ var util = require('util')
var requestSync = require('sync-request')
var isNumber = require('lodash.isnumber')
var debug = require('../../utils/debug')('api')
var format = require('util').format
var assign = require('lodash.assign')
var HttpsProxyAgent = require('https-proxy-agent')
var stringify = require('json-stringify-safe')
var BufferStream = require('./bufferStream')
var httpRetry = require('./httpRetry')
var CompositeError = require('../../utils/compositeError')

var bl = require('bl')
var libPackage = require('../../../package')
Expand All @@ -34,6 +35,7 @@ function CollectorApi (options) {
this.serviceName = options.serviceName
this.baseRetryInterval = 1000 * 60 * 30 // 30 minutes
this.serviceKey = null
this.getServiceMaxRetries = Infinity

if (options.proxy) {
this.proxyAgent = new HttpsProxyAgent(options.proxy)
Expand Down Expand Up @@ -270,7 +272,8 @@ CollectorApi.prototype.getService = function (cb) {
cpus: self.system.cpus
}
})
var req = https.request({

var reqOpts = {
hostname: opts.hostname,
port: opts.port,
path: opts.path,
Expand All @@ -283,51 +286,41 @@ CollectorApi.prototype.getService = function (cb) {
'X-Reporter-Language': this.collectorLanguage,
'Content-Length': Buffer.byteLength(payload)
}
}, function (res) {
res.setEncoding('utf8')
res.pipe(bl(function (err, resBuffer) {
var response

var retryInterval = self.baseRetryInterval
}

if (err) {
debug.error('getService', err)
return setTimeout(function () {
debug.warn('getService', format('Retrying with %d ms', retryInterval))
self.getService()
}, retryInterval)
httpRetry({
client: https,
reqOpts: reqOpts,
payload: payload,
errorFilter: function shouldContinueRetrying (err) {
if (err.statusCode === 401) {
return false
}

var resText = resBuffer.toString('utf8')

if (res.statusCode === 401) {
debug.error('getService', 'Api key rejected')
return
return true
},
maxRetries: this.getServiceMaxRetries
},
function done (err, result) {
if (err) {
if (err.statusCode === 401) {
debug.error('getService', 'API key rejected')
}
if (res.statusCode > 399) {
debug.error('getService', 'Service responded with ' + res.statusCode)
return setTimeout(function () {
debug.warn('getService', format('Retrying with %d ms', retryInterval))
self.getService(cb)
}, retryInterval)
return cb(new CompositeError('Could not get service key', err))
}
var response
result.pipe(bl(function (err, data) {
if (err) {
cb(err)
}

try {
response = JSON.parse(resText)
} catch (ex) {
return
response = JSON.parse(data.toString('utf8'))
} catch (err) {
return cb(err)
}

self.serviceKey = response.key
cb(null, response.key)
}))
})

req.on('error', function (error) {
debug.error('getService', error)
})
req.write(payload)
req.end()
}

function logServiceKeyError (method) {
Expand Down
14 changes: 8 additions & 6 deletions lib/agent/api/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -275,16 +275,18 @@ describe('The Trace CollectorApi module', function () {
}
})
.post(defaultConfig.collectorApiServiceEndpoint, JSON.stringify(data))
.times(2)
.times(100)
.reply(409, {})

collectorApi.getService()

collectorApi.baseRetryInterval = 1
collectorApi.getServiceMaxRetries = 100

this.timeout(500)
global.setTimeout = this.sandbox.stub(global, 'setTimeout').callsFake(function (cb, int) {
return process.nextTick(cb)
})

this.sandbox.stub(collectorApi, 'getService').callsFake(function () {
collectorApi.getService(function (err) {
expect(setTimeout).to.have.callCount(100)
expect(err).to.exist
done()
})
})
Expand Down
9 changes: 2 additions & 7 deletions lib/agent/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,12 @@ Agent.prototype.stop = function (callback) {
debug.info('stop', 'Stopping agents...')
var agents = this.agents
var counter = 1
var error

agents.forEach(function (agent) {
agent.stop(function (err) {
if (!error && err) {
error = err
}

agent.stop(function () {
if (counter >= agents.length) {
if (callback && typeof callback === 'function') {
callback(error)
callback()
}
} else {
counter++
Expand Down
19 changes: 19 additions & 0 deletions lib/utils/compositeError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

var inherits = require('util').inherits

function CompositeError (message, cause) {
if (message instanceof Error) {
message = ''
cause = message
}
this.message = message ? message.toString() : ''
this.cause = cause
Error.captureStackTrace && Error.captureStackTrace(this, this.constructor)
if (this.stack != null && this.cause instanceof Error && this.cause.stack != null) {
this.stack += '\nCaused by: ' + this.cause.stack
}
}
inherits(CompositeError, Error)

module.exports = CompositeError
56 changes: 56 additions & 0 deletions lib/utils/exponentialRetry.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
'use strict'

var inherits = require('util').inherits
var retry = require('async/retry')

var DEFAULT_MAX_RETRIES = Infinity
var DEFAULT_MAX_WAIT = Infinity
var DEFAULT_EXP_SCALE = 1
var DEFAULT_LIN_SCALE = 1
var DEFAULT_TRANS = 0
var DEFAULT_ERR_SCALE = 0
var DEFAULT_ERR_TRANS = 0

function MaxRetriesExceededError (n, last) {
Error.captureStackTrace && Error.captureStackTrace(this, this.constructor)
this.message = 'Network request max retry limit reached after ' + n + ' attempts. Last error message was: ' + last.message
if (this.stack && last.stack) {
this.stack += '\nCaused by: ' + last.stack
}
}
inherits(MaxRetriesExceededError, Error)

function exponentialRetry (opts, task, cb) {
if (typeof opts === 'function') {
cb = task
task = opts
opts = {}
}
opts = opts || {}
var maxRetries = opts.maxRetries != null ? opts.maxRetries : DEFAULT_MAX_RETRIES
var maxWait = opts.maxWait != null ? opts.maxWait : DEFAULT_MAX_WAIT
var expScale = opts.expScale != null ? opts.expScale : DEFAULT_EXP_SCALE
var linScale = opts.linScale != null ? opts.linScale : DEFAULT_LIN_SCALE
var trans = opts.trans != null ? opts.trans : DEFAULT_TRANS
var errScale = opts.errScale != null ? opts.errScale : DEFAULT_ERR_SCALE
var errTrans = opts.errTrans != null ? opts.errTrans : DEFAULT_ERR_TRANS
var errorFilter = opts.errorFilter

return retry({
times: maxRetries + 1,
errorFilter: errorFilter,
interval: function (i) {
var wait = Math.exp((i - 1) * expScale) * linScale + trans
if (wait > maxWait) {
wait = maxWait
}
var rnd = 0.5 - Math.random()
wait = wait + (wait * rnd * errScale) + errTrans
var res = Math.floor(wait)
return res
}
}, task, cb)
}

module.exports = exponentialRetry
module.exports.MaxRetriesExceededError = MaxRetriesExceededError
Loading

0 comments on commit 9cf8e5d

Please sign in to comment.