diff --git a/.gitignore b/.gitignore index be2e166c..8f61d2f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ node_modules test.js coverage -dist +# dist .nyc_output diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 00000000..3f584f60 --- /dev/null +++ b/.prettierrc @@ -0,0 +1,4 @@ +{ + "printWidth": 120, + "singleQuote": true +} diff --git a/dist/bind.d.ts b/dist/bind.d.ts new file mode 100644 index 00000000..f761ddfe --- /dev/null +++ b/dist/bind.d.ts @@ -0,0 +1 @@ +export declare function autoBind(obj: object): void; diff --git a/dist/bind.js b/dist/bind.js new file mode 100644 index 00000000..df71c05f --- /dev/null +++ b/dist/bind.js @@ -0,0 +1,15 @@ +'use strict'; +Object.defineProperty(exports, "__esModule", { value: true }); +function isMethod(propertyName, value) { + return propertyName !== 'constructor' && typeof value === 'function'; +} +function autoBind(obj) { + const propertyNames = Object.getOwnPropertyNames(obj.constructor.prototype); + propertyNames.forEach((propertyName) => { + const value = obj[propertyName]; + if (isMethod(propertyName, value)) { + obj[propertyName] = value.bind(obj); + } + }); +} +exports.autoBind = autoBind; diff --git a/dist/consumer.d.ts b/dist/consumer.d.ts new file mode 100644 index 00000000..4493e4b2 --- /dev/null +++ b/dist/consumer.d.ts @@ -0,0 +1,71 @@ +import * as SQS from 'aws-sdk/clients/sqs'; +import { EventEmitter } from 'events'; +declare type SQSMessage = SQS.Types.Message; +export interface ConsumerOptions { + queueUrl?: string; + attributeNames?: string[]; + messageAttributeNames?: string[]; + stopped?: boolean; + concurrencyLimit?: number; + batchSize?: number; + visibilityTimeout?: number; + waitTimeSeconds?: number; + authenticationErrorTimeout?: number; + pollingWaitTimeMs?: number; + msDelayOnEmptyBatchSize?: number; + terminateVisibilityTimeout?: boolean; + sqs?: SQS; + region?: string; + handleMessageTimeout?: number; + handleMessage?(message: SQSMessage): Promise; + handleMessageBatch?(messages: SQSMessage[], consumer: Consumer): Promise; + pollingStartedInstrumentCallback?(eventData: object): void; + pollingFinishedInstrumentCallback?(eventData: object): void; + batchStartedInstrumentCallBack?(eventData: object): void; + batchFinishedInstrumentCallBack?(eventData: object): void; + batchFailedInstrumentCallBack?(eventData: object): void; +} +export declare class Consumer extends EventEmitter { + private queueUrl; + private handleMessage; + private handleMessageBatch; + private pollingStartedInstrumentCallback?; + private pollingFinishedInstrumentCallback?; + private batchStartedInstrumentCallBack?; + private batchFinishedInstrumentCallBack?; + private batchFailedInstrumentCallBack?; + private handleMessageTimeout; + private attributeNames; + private messageAttributeNames; + private stopped; + private concurrencyLimit; + private freeConcurrentSlots; + private batchSize; + private visibilityTimeout; + private waitTimeSeconds; + private authenticationErrorTimeout; + private pollingWaitTimeMs; + private msDelayOnEmptyBatchSize; + private terminateVisibilityTimeout; + private sqs; + constructor(options: ConsumerOptions); + readonly isRunning: boolean; + static create(options: ConsumerOptions): Consumer; + start(): void; + stop(): void; + setBatchSize(newBatchSize: number): void; + setConcurrencyLimit(newConcurrencyLimit: number): void; + setPollingWaitTimeMs(newPollingWaitTimeMs: number): void; + reportMessageFromBatchFinished(message: SQSMessage, error: Error): Promise; + private reportNumberOfMessagesReceived; + private handleSqsResponse; + private processMessage; + private receiveMessage; + private deleteMessage; + private executeHandler; + private terminateVisabilityTimeout; + private emitError; + private poll; + private processMessageBatch; +} +export {}; diff --git a/dist/consumer.js b/dist/consumer.js new file mode 100644 index 00000000..b7dfe4eb --- /dev/null +++ b/dist/consumer.js @@ -0,0 +1,355 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const SQS = require("aws-sdk/clients/sqs"); +const Debug = require("debug"); +const crypto = require("crypto"); +const events_1 = require("events"); +const bind_1 = require("./bind"); +const errors_1 = require("./errors"); +const debug = Debug('sqs-consumer'); +const requiredOptions = [ + 'queueUrl', + // only one of handleMessage / handleMessagesBatch is required + 'handleMessage|handleMessageBatch', +]; +function generateUuid() { + return crypto.randomBytes(16).toString('hex'); +} +function createTimeout(duration) { + let timeout; + const pending = new Promise((_, reject) => { + timeout = setTimeout(() => { + reject(new errors_1.TimeoutError()); + }, duration); + }); + return [timeout, pending]; +} +function assertOptions(options) { + requiredOptions.forEach((option) => { + const possibilities = option.split('|'); + if (!possibilities.find((p) => options[p])) { + throw new Error(`Missing SQS consumer option [ ${possibilities.join(' or ')} ].`); + } + }); + if (options.batchSize > 10 || options.batchSize < 1) { + throw new Error('SQS batchSize option must be between 1 and 10.'); + } +} +function isConnectionError(err) { + if (err instanceof errors_1.SQSError) { + return err.statusCode === 403 || err.code === 'CredentialsError' || err.code === 'UnknownEndpoint'; + } + return false; +} +function isNonExistentQueueError(err) { + if (err instanceof errors_1.SQSError) { + return err.code === 'AWS.SimpleQueueService.NonExistentQueue'; + } + return false; +} +function toSQSError(err, message) { + const sqsError = new errors_1.SQSError(message); + sqsError.code = err.code; + sqsError.statusCode = err.statusCode; + sqsError.region = err.region; + sqsError.retryable = err.retryable; + sqsError.hostname = err.hostname; + sqsError.time = err.time; + return sqsError; +} +function hasMessages(response) { + return response.Messages && response.Messages.length > 0; +} +function addMessageUuidToError(error, message) { + try { + const messageBody = JSON.parse(message.Body); + const messageUuid = messageBody && messageBody.payload && messageBody.payload.uuid; + error.messageUuid = messageUuid; + } + catch (err) { } +} +class Consumer extends events_1.EventEmitter { + constructor(options) { + super(); + assertOptions(options); + this.queueUrl = options.queueUrl; + this.handleMessage = options.handleMessage; + this.handleMessageBatch = options.handleMessageBatch; + this.pollingStartedInstrumentCallback = options.pollingStartedInstrumentCallback; + this.pollingFinishedInstrumentCallback = options.pollingFinishedInstrumentCallback; + this.batchStartedInstrumentCallBack = options.batchStartedInstrumentCallBack; + this.batchFinishedInstrumentCallBack = options.batchFinishedInstrumentCallBack; + this.batchFailedInstrumentCallBack = options.batchFailedInstrumentCallBack; + this.handleMessageTimeout = options.handleMessageTimeout; + this.attributeNames = options.attributeNames || []; + this.messageAttributeNames = options.messageAttributeNames || []; + this.stopped = true; + this.batchSize = options.batchSize || 1; + this.concurrencyLimit = options.concurrencyLimit || 30; + this.freeConcurrentSlots = this.concurrencyLimit; + this.visibilityTimeout = options.visibilityTimeout; + this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false; + this.waitTimeSeconds = options.waitTimeSeconds || 20; + this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000; + this.pollingWaitTimeMs = options.pollingWaitTimeMs || 0; + this.msDelayOnEmptyBatchSize = options.msDelayOnEmptyBatchSize || 5; + this.sqs = + options.sqs || + new SQS({ + region: options.region || process.env.AWS_REGION || 'eu-west-1', + }); + bind_1.autoBind(this); + } + get isRunning() { + return !this.stopped; + } + static create(options) { + return new Consumer(options); + } + start() { + if (this.stopped) { + debug('Starting consumer'); + this.stopped = false; + this.poll(); + } + } + stop() { + debug('Stopping consumer'); + this.stopped = true; + } + setBatchSize(newBatchSize) { + this.batchSize = newBatchSize; + } + setConcurrencyLimit(newConcurrencyLimit) { + const concurrencyLimitDiff = newConcurrencyLimit - this.concurrencyLimit; + const newFreeConcurrentSlots = Math.max(0, this.freeConcurrentSlots + concurrencyLimitDiff); + this.concurrencyLimit = newConcurrencyLimit; + this.freeConcurrentSlots = newFreeConcurrentSlots; + } + setPollingWaitTimeMs(newPollingWaitTimeMs) { + this.pollingWaitTimeMs = newPollingWaitTimeMs; + } + async reportMessageFromBatchFinished(message, error) { + debug('Message from batch has finished'); + this.freeConcurrentSlots++; + try { + if (error) + throw error; + await this.deleteMessage(message); + this.emit('message_processed', message, this.queueUrl); + } + catch (err) { + this.emitError(err, message); + } + } + reportNumberOfMessagesReceived(numberOfMessages) { + debug('Reducing number of messages received from freeConcurrentSlots'); + this.freeConcurrentSlots = this.freeConcurrentSlots - numberOfMessages; + } + async handleSqsResponse(response) { + debug('Received SQS response'); + debug(response); + const hasResponseWithMessages = !!response && hasMessages(response); + const numberOfMessages = hasResponseWithMessages ? response.Messages.length : 0; + if (this.pollingFinishedInstrumentCallback) { + // instrument pod how many messages received + this.pollingFinishedInstrumentCallback({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + messagesReceived: numberOfMessages, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } + if (response) { + if (hasMessages(response)) { + if (this.handleMessageBatch) { + // prefer handling messages in batch when available + await this.processMessageBatch(response.Messages); + } + else { + await Promise.all(response.Messages.map(this.processMessage)); + } + this.emit('response_processed', this.queueUrl); + } + else { + this.emit('empty', this.queueUrl); + } + } + } + async processMessage(message) { + this.emit('message_received', message, this.queueUrl); + try { + await this.executeHandler(message); + await this.deleteMessage(message); + this.emit('message_processed', message, this.queueUrl); + } + catch (err) { + this.emitError(err, message); + if (this.terminateVisibilityTimeout) { + try { + await this.terminateVisabilityTimeout(message); + } + catch (err) { + this.emit('error', err, message, this.queueUrl); + } + } + } + } + async receiveMessage(params) { + try { + return await this.sqs.receiveMessage(params).promise(); + } + catch (err) { + throw toSQSError(err, `SQS receive message failed: ${err.message}`); + } + } + async deleteMessage(message) { + debug('Deleting message %s', message.MessageId); + const deleteParams = { + QueueUrl: this.queueUrl, + ReceiptHandle: message.ReceiptHandle, + }; + try { + await this.sqs.deleteMessage(deleteParams).promise(); + } + catch (err) { + throw toSQSError(err, `SQS delete message failed: ${err.message}`); + } + } + async executeHandler(message) { + let timeout; + let pending; + try { + if (this.handleMessageTimeout) { + [timeout, pending] = createTimeout(this.handleMessageTimeout); + await Promise.race([this.handleMessage(message), pending]); + } + else { + await this.handleMessage(message); + } + } + catch (err) { + addMessageUuidToError(err, message); + if (err instanceof errors_1.TimeoutError) { + err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`; + } + else { + err.message = `Unexpected message handler failure: ${err.message}`; + } + throw err; + } + finally { + clearTimeout(timeout); + } + } + async terminateVisabilityTimeout(message) { + return this.sqs + .changeMessageVisibility({ + QueueUrl: this.queueUrl, + ReceiptHandle: message.ReceiptHandle, + VisibilityTimeout: 0, + }) + .promise(); + } + emitError(err, message) { + if (err.name === errors_1.SQSError.name) { + this.emit('error', err, message, this.queueUrl); + } + else if (err instanceof errors_1.TimeoutError) { + this.emit('timeout_error', err, message, this.queueUrl); + } + else { + this.emit('processing_error', err, message, this.queueUrl); + } + } + poll() { + if (this.stopped) { + this.emit('stopped', this.queueUrl); + return; + } + const pollBatchSize = Math.min(this.batchSize, this.freeConcurrentSlots); + debug('Polling for messages'); + if (this.pollingStartedInstrumentCallback) { + this.pollingStartedInstrumentCallback({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + pollBatchSize, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } + let currentPollingTimeout = this.pollingWaitTimeMs; + if (pollBatchSize > 0) { + const receiveParams = { + QueueUrl: this.queueUrl, + AttributeNames: this.attributeNames, + MessageAttributeNames: this.messageAttributeNames, + MaxNumberOfMessages: pollBatchSize, + WaitTimeSeconds: this.waitTimeSeconds, + VisibilityTimeout: this.visibilityTimeout, + }; + this.receiveMessage(receiveParams) + .then(this.handleSqsResponse) + .catch((err) => { + this.emit('unhandled_error', err, this.queueUrl); + if (isNonExistentQueueError(err)) { + throw new Error(`Could not receive messages - non existent queue - ${this.queueUrl}`); + } + if (isConnectionError(err)) { + debug('There was an authentication error. Pausing before retrying.'); + currentPollingTimeout = this.authenticationErrorTimeout; + } + return; + }) + .then(() => { + setTimeout(this.poll, currentPollingTimeout); + }) + .catch((err) => { + this.emit('unhandled_error', err, this.queueUrl); + }); + } + else { + setTimeout(this.poll, this.msDelayOnEmptyBatchSize); + } + } + async processMessageBatch(messages) { + messages.forEach((message) => { + this.emit('message_received', message, this.queueUrl); + }); + this.reportNumberOfMessagesReceived(messages.length); + const batchUuid = generateUuid(); + if (this.batchStartedInstrumentCallBack) { + this.batchStartedInstrumentCallBack({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + batchUuid, + numberOfMessages: messages.length, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } + this.handleMessageBatch(messages, this) + .then(() => { + if (this.batchFinishedInstrumentCallBack) { + this.batchFinishedInstrumentCallBack({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + batchUuid, + numberOfMessages: messages.length, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } + }) + .catch((err) => { + if (this.batchFailedInstrumentCallBack) { + this.batchFailedInstrumentCallBack({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + batchUuid, + numberOfMessages: messages.length, + freeConcurrentSlots: this.freeConcurrentSlots, + error: err, + }); + } + }); + } +} +exports.Consumer = Consumer; diff --git a/dist/errors.d.ts b/dist/errors.d.ts new file mode 100644 index 00000000..f14df68c --- /dev/null +++ b/dist/errors.d.ts @@ -0,0 +1,13 @@ +declare class SQSError extends Error { + code: string; + statusCode: number; + region: string; + hostname: string; + time: Date; + retryable: boolean; + constructor(message: string); +} +declare class TimeoutError extends Error { + constructor(message?: string); +} +export { SQSError, TimeoutError }; diff --git a/dist/errors.js b/dist/errors.js new file mode 100644 index 00000000..1e30c249 --- /dev/null +++ b/dist/errors.js @@ -0,0 +1,17 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +class SQSError extends Error { + constructor(message) { + super(message); + this.name = this.constructor.name; + } +} +exports.SQSError = SQSError; +class TimeoutError extends Error { + constructor(message = 'Operation timed out.') { + super(message); + this.message = message; + this.name = 'TimeoutError'; + } +} +exports.TimeoutError = TimeoutError; diff --git a/dist/index.d.ts b/dist/index.d.ts new file mode 100644 index 00000000..84ff4197 --- /dev/null +++ b/dist/index.d.ts @@ -0,0 +1 @@ +export { Consumer, ConsumerOptions } from './consumer'; diff --git a/dist/index.js b/dist/index.js new file mode 100644 index 00000000..20a65164 --- /dev/null +++ b/dist/index.js @@ -0,0 +1,4 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +var consumer_1 = require("./consumer"); +exports.Consumer = consumer_1.Consumer; diff --git a/package-lock.json b/package-lock.json index f9b201a5..639fbc60 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "sqs-consumer", - "version": "5.4.0", + "version": "6.2.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 80a19372..b7e8e705 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "sqs-consumer", - "version": "5.4.0", + "version": "6.2.0", "description": "Build SQS-based Node applications without the boilerplate", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -50,7 +50,7 @@ "typescript": "^2.6.1" }, "dependencies": { - "aws-sdk": "^2.443.0", + "aws-sdk": "2.1490.0", "debug": "^4.1.1" }, "nyc": { diff --git a/src/consumer.ts b/src/consumer.ts index 681e1d74..2c0973f3 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -2,6 +2,7 @@ import { AWSError } from 'aws-sdk'; import * as SQS from 'aws-sdk/clients/sqs'; import { PromiseResult } from 'aws-sdk/lib/request'; import * as Debug from 'debug'; +import * as crypto from 'crypto'; import { EventEmitter } from 'events'; import { autoBind } from './bind'; import { SQSError, TimeoutError } from './errors'; @@ -15,7 +16,7 @@ type ReceiveMessageRequest = SQS.Types.ReceiveMessageRequest; const requiredOptions = [ 'queueUrl', // only one of handleMessage / handleMessagesBatch is required - 'handleMessage|handleMessageBatch' + 'handleMessage|handleMessageBatch', ]; interface TimeoutResponse { @@ -23,6 +24,10 @@ interface TimeoutResponse { pending: Promise; } +function generateUuid(): string { + return crypto.randomBytes(16).toString('hex'); +} + function createTimeout(duration: number): TimeoutResponse[] { let timeout; const pending = new Promise((_, reject) => { @@ -48,11 +53,19 @@ function assertOptions(options: ConsumerOptions): void { function isConnectionError(err: Error): Boolean { if (err instanceof SQSError) { - return (err.statusCode === 403 || err.code === 'CredentialsError' || err.code === 'UnknownEndpoint'); + return err.statusCode === 403 || err.code === 'CredentialsError' || err.code === 'UnknownEndpoint'; } return false; } +function isNonExistentQueueError(err: Error): Boolean { + if (err instanceof SQSError) { + return err.code === 'AWS.SimpleQueueService.NonExistentQueue'; + } + + return false; +} + function toSQSError(err: AWSError, message: string): SQSError { const sqsError = new SQSError(message); sqsError.code = err.code; @@ -69,37 +82,61 @@ function hasMessages(response: ReceieveMessageResponse): boolean { return response.Messages && response.Messages.length > 0; } +function addMessageUuidToError(error, message): void { + try { + const messageBody = JSON.parse(message.Body); + const messageUuid = messageBody && messageBody.payload && messageBody.payload.uuid; + + error.messageUuid = messageUuid; + } catch (err) {} +} + export interface ConsumerOptions { queueUrl?: string; attributeNames?: string[]; messageAttributeNames?: string[]; stopped?: boolean; + concurrencyLimit?: number; batchSize?: number; visibilityTimeout?: number; waitTimeSeconds?: number; authenticationErrorTimeout?: number; pollingWaitTimeMs?: number; + msDelayOnEmptyBatchSize?: number; terminateVisibilityTimeout?: boolean; sqs?: SQS; region?: string; handleMessageTimeout?: number; handleMessage?(message: SQSMessage): Promise; - handleMessageBatch?(messages: SQSMessage[]): Promise; + handleMessageBatch?(messages: SQSMessage[], consumer: Consumer): Promise; + pollingStartedInstrumentCallback?(eventData: object): void; + pollingFinishedInstrumentCallback?(eventData: object): void; + batchStartedInstrumentCallBack?(eventData: object): void; + batchFinishedInstrumentCallBack?(eventData: object): void; + batchFailedInstrumentCallBack?(eventData: object): void; } export class Consumer extends EventEmitter { private queueUrl: string; private handleMessage: (message: SQSMessage) => Promise; - private handleMessageBatch: (message: SQSMessage[]) => Promise; + private handleMessageBatch: (message: SQSMessage[], consumer: Consumer) => Promise; + private pollingStartedInstrumentCallback?: (eventData: object) => void; + private pollingFinishedInstrumentCallback?: (eventData: object) => void; + private batchStartedInstrumentCallBack?: (eventData: object) => void; + private batchFinishedInstrumentCallBack?: (eventData: object) => void; + private batchFailedInstrumentCallBack?: (eventData: object) => void; private handleMessageTimeout: number; private attributeNames: string[]; private messageAttributeNames: string[]; private stopped: boolean; + private concurrencyLimit: number; + private freeConcurrentSlots: number; private batchSize: number; private visibilityTimeout: number; private waitTimeSeconds: number; private authenticationErrorTimeout: number; private pollingWaitTimeMs: number; + private msDelayOnEmptyBatchSize: number; private terminateVisibilityTimeout: boolean; private sqs: SQS; @@ -109,20 +146,30 @@ export class Consumer extends EventEmitter { this.queueUrl = options.queueUrl; this.handleMessage = options.handleMessage; this.handleMessageBatch = options.handleMessageBatch; + this.pollingStartedInstrumentCallback = options.pollingStartedInstrumentCallback; + this.pollingFinishedInstrumentCallback = options.pollingFinishedInstrumentCallback; + this.batchStartedInstrumentCallBack = options.batchStartedInstrumentCallBack; + this.batchFinishedInstrumentCallBack = options.batchFinishedInstrumentCallBack; + this.batchFailedInstrumentCallBack = options.batchFailedInstrumentCallBack; this.handleMessageTimeout = options.handleMessageTimeout; this.attributeNames = options.attributeNames || []; this.messageAttributeNames = options.messageAttributeNames || []; this.stopped = true; this.batchSize = options.batchSize || 1; + this.concurrencyLimit = options.concurrencyLimit || 30; + this.freeConcurrentSlots = this.concurrencyLimit; this.visibilityTimeout = options.visibilityTimeout; this.terminateVisibilityTimeout = options.terminateVisibilityTimeout || false; this.waitTimeSeconds = options.waitTimeSeconds || 20; this.authenticationErrorTimeout = options.authenticationErrorTimeout || 10000; this.pollingWaitTimeMs = options.pollingWaitTimeMs || 0; + this.msDelayOnEmptyBatchSize = options.msDelayOnEmptyBatchSize || 5; - this.sqs = options.sqs || new SQS({ - region: options.region || process.env.AWS_REGION || 'eu-west-1' - }); + this.sqs = + options.sqs || + new SQS({ + region: options.region || process.env.AWS_REGION || 'eu-west-1', + }); autoBind(this); } @@ -148,10 +195,59 @@ export class Consumer extends EventEmitter { this.stopped = true; } + public setBatchSize(newBatchSize: number): void { + this.batchSize = newBatchSize; + } + + public setConcurrencyLimit(newConcurrencyLimit: number): void { + const concurrencyLimitDiff = newConcurrencyLimit - this.concurrencyLimit; + const newFreeConcurrentSlots = Math.max(0, this.freeConcurrentSlots + concurrencyLimitDiff); + + this.concurrencyLimit = newConcurrencyLimit; + this.freeConcurrentSlots = newFreeConcurrentSlots; + } + + public setPollingWaitTimeMs(newPollingWaitTimeMs: number): void { + this.pollingWaitTimeMs = newPollingWaitTimeMs; + } + + public async reportMessageFromBatchFinished(message: SQSMessage, error?: Error): Promise { + debug('Message from batch has finished'); + + this.freeConcurrentSlots++; + + try { + if (error) throw error; + + await this.deleteMessage(message); + this.emit('message_processed', message, this.queueUrl); + } catch (err) { + this.emitError(err, message); + } + } + + private reportNumberOfMessagesReceived(numberOfMessages: number): void { + debug('Reducing number of messages received from freeConcurrentSlots'); + this.freeConcurrentSlots = this.freeConcurrentSlots - numberOfMessages; + } + private async handleSqsResponse(response: ReceieveMessageResponse): Promise { debug('Received SQS response'); debug(response); + const hasResponseWithMessages = !!response && hasMessages(response); + const numberOfMessages = hasResponseWithMessages ? response.Messages.length : 0; + + if (this.pollingFinishedInstrumentCallback) { + // instrument pod how many messages received + this.pollingFinishedInstrumentCallback({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + messagesReceived: numberOfMessages, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } + if (response) { if (hasMessages(response)) { if (this.handleMessageBatch) { @@ -160,20 +256,20 @@ export class Consumer extends EventEmitter { } else { await Promise.all(response.Messages.map(this.processMessage)); } - this.emit('response_processed'); + this.emit('response_processed', this.queueUrl); } else { - this.emit('empty'); + this.emit('empty', this.queueUrl); } } } private async processMessage(message: SQSMessage): Promise { - this.emit('message_received', message); + this.emit('message_received', message, this.queueUrl); try { await this.executeHandler(message); await this.deleteMessage(message); - this.emit('message_processed', message); + this.emit('message_processed', message, this.queueUrl); } catch (err) { this.emitError(err, message); @@ -181,7 +277,7 @@ export class Consumer extends EventEmitter { try { await this.terminateVisabilityTimeout(message); } catch (err) { - this.emit('error', err, message); + this.emit('error', err, message, this.queueUrl); } } } @@ -189,9 +285,7 @@ export class Consumer extends EventEmitter { private async receiveMessage(params: ReceiveMessageRequest): Promise { try { - return await this.sqs - .receiveMessage(params) - .promise(); + return await this.sqs.receiveMessage(params).promise(); } catch (err) { throw toSQSError(err, `SQS receive message failed: ${err.message}`); } @@ -202,13 +296,11 @@ export class Consumer extends EventEmitter { const deleteParams = { QueueUrl: this.queueUrl, - ReceiptHandle: message.ReceiptHandle + ReceiptHandle: message.ReceiptHandle, }; try { - await this.sqs - .deleteMessage(deleteParams) - .promise(); + await this.sqs.deleteMessage(deleteParams).promise(); } catch (err) { throw toSQSError(err, `SQS delete message failed: ${err.message}`); } @@ -220,14 +312,12 @@ export class Consumer extends EventEmitter { try { if (this.handleMessageTimeout) { [timeout, pending] = createTimeout(this.handleMessageTimeout); - await Promise.race([ - this.handleMessage(message), - pending - ]); + await Promise.race([this.handleMessage(message), pending]); } else { await this.handleMessage(message); } } catch (err) { + addMessageUuidToError(err, message); if (err instanceof TimeoutError) { err.message = `Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`; } else { @@ -244,119 +334,117 @@ export class Consumer extends EventEmitter { .changeMessageVisibility({ QueueUrl: this.queueUrl, ReceiptHandle: message.ReceiptHandle, - VisibilityTimeout: 0 + VisibilityTimeout: 0, }) .promise(); } private emitError(err: Error, message: SQSMessage): void { if (err.name === SQSError.name) { - this.emit('error', err, message); + this.emit('error', err, message, this.queueUrl); } else if (err instanceof TimeoutError) { - this.emit('timeout_error', err, message); + this.emit('timeout_error', err, message, this.queueUrl); } else { - this.emit('processing_error', err, message); + this.emit('processing_error', err, message, this.queueUrl); } } private poll(): void { if (this.stopped) { - this.emit('stopped'); + this.emit('stopped', this.queueUrl); return; } + const pollBatchSize = Math.min(this.batchSize, this.freeConcurrentSlots); + debug('Polling for messages'); - const receiveParams = { - QueueUrl: this.queueUrl, - AttributeNames: this.attributeNames, - MessageAttributeNames: this.messageAttributeNames, - MaxNumberOfMessages: this.batchSize, - WaitTimeSeconds: this.waitTimeSeconds, - VisibilityTimeout: this.visibilityTimeout - }; + if (this.pollingStartedInstrumentCallback) { + this.pollingStartedInstrumentCallback({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + pollBatchSize, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } let currentPollingTimeout = this.pollingWaitTimeMs; - this.receiveMessage(receiveParams) - .then(this.handleSqsResponse) - .catch((err) => { - this.emit('error', err); - if (isConnectionError(err)) { - debug('There was an authentication error. Pausing before retrying.'); - currentPollingTimeout = this.authenticationErrorTimeout; - } - return; - }).then(() => { - setTimeout(this.poll, currentPollingTimeout); - }).catch((err) => { - this.emit('error', err); - }); + + if (pollBatchSize > 0) { + const receiveParams = { + QueueUrl: this.queueUrl, + AttributeNames: this.attributeNames, + MessageAttributeNames: this.messageAttributeNames, + MaxNumberOfMessages: pollBatchSize, + WaitTimeSeconds: this.waitTimeSeconds, + VisibilityTimeout: this.visibilityTimeout, + }; + + this.receiveMessage(receiveParams) + .then(this.handleSqsResponse) + .catch((err) => { + this.emit('unhandled_error', err, this.queueUrl); + if (isNonExistentQueueError(err)) { + throw new Error(`Could not receive messages - non existent queue - ${this.queueUrl}`); + } + if (isConnectionError(err)) { + debug('There was an authentication error. Pausing before retrying.'); + currentPollingTimeout = this.authenticationErrorTimeout; + } + + return; + }) + .then(() => { + setTimeout(this.poll, currentPollingTimeout); + }) + .catch((err) => { + this.emit('unhandled_error', err, this.queueUrl); + }); + } else { + setTimeout(this.poll, this.msDelayOnEmptyBatchSize); + } } private async processMessageBatch(messages: SQSMessage[]): Promise { messages.forEach((message) => { - this.emit('message_received', message); + this.emit('message_received', message, this.queueUrl); }); - try { - await this.executeBatchHandler(messages); - await this.deleteMessageBatch(messages); - messages.forEach((message) => { - this.emit('message_processed', message); - }); - } catch (err) { - this.emit('error', err, messages); - - if (this.terminateVisibilityTimeout) { - try { - await this.terminateVisabilityTimeoutBatch(messages); - } catch (err) { - this.emit('error', err, messages); - } - } - } - } - - private async deleteMessageBatch(messages: SQSMessage[]): Promise { - debug('Deleting messages %s', messages.map((msg) => msg.MessageId).join(' ,')); + this.reportNumberOfMessagesReceived(messages.length); + const batchUuid = generateUuid(); - const deleteParams = { - QueueUrl: this.queueUrl, - Entries: messages.map(message => ({ - Id: message.MessageId, - ReceiptHandle: message.ReceiptHandle - })) - }; - - try { - await this.sqs - .deleteMessageBatch(deleteParams) - .promise(); - } catch (err) { - throw toSQSError(err, `SQS delete message failed: ${err.message}`); - } - } - - private async executeBatchHandler(messages: SQSMessage[]): Promise { - try { - await this.handleMessageBatch(messages); - } catch (err) { - err.message = `Unexpected message handler failure: ${err.message}`; - throw err; + if (this.batchStartedInstrumentCallBack) { + this.batchStartedInstrumentCallBack({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + batchUuid, + numberOfMessages: messages.length, + freeConcurrentSlots: this.freeConcurrentSlots, + }); } - } - private async terminateVisabilityTimeoutBatch(messages: SQSMessage[]): Promise> { - const params = { - QueueUrl: this.queueUrl, - Entries: messages.map((message) => ({ - Id: message.MessageId, - ReceiptHandle: message.ReceiptHandle, - VisibilityTimeout: 0 - })) - }; - return this.sqs - .changeMessageVisibilityBatch(params) - .promise(); + this.handleMessageBatch(messages, this) + .then(() => { + if (this.batchFinishedInstrumentCallBack) { + this.batchFinishedInstrumentCallBack({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + batchUuid, + numberOfMessages: messages.length, + freeConcurrentSlots: this.freeConcurrentSlots, + }); + } + }) + .catch((err) => { + if (this.batchFailedInstrumentCallBack) { + this.batchFailedInstrumentCallBack({ + instanceId: process.env.HOSTNAME, + queueUrl: this.queueUrl, + batchUuid, + numberOfMessages: messages.length, + freeConcurrentSlots: this.freeConcurrentSlots, + error: err, + }); + } + }); } - }