diff --git a/src/consumer.ts b/src/consumer.ts index c70a5c9..b96a367 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -51,7 +51,7 @@ export class Consumer extends TypedEventEmitter { private alwaysAcknowledge: boolean; private batchSize: number; private visibilityTimeout: number; - private terminateVisibilityTimeout: boolean; + private terminateVisibilityTimeout: boolean | number; private waitTimeSeconds: number; private authenticationErrorTimeout: number; private pollingWaitTimeMs: number; @@ -362,8 +362,12 @@ export class Consumer extends TypedEventEmitter { } catch (err) { this.emitError(err, message); - if (this.terminateVisibilityTimeout) { - await this.changeVisibilityTimeout(message, 0); + if (this.terminateVisibilityTimeout !== false) { + const timeout = + this.terminateVisibilityTimeout === true + ? 0 + : this.terminateVisibilityTimeout; + await this.changeVisibilityTimeout(message, timeout); } } finally { if (this.heartbeatInterval) { @@ -400,8 +404,12 @@ export class Consumer extends TypedEventEmitter { } catch (err) { this.emit("error", err, messages); - if (this.terminateVisibilityTimeout) { - await this.changeVisibilityTimeoutBatch(messages, 0); + if (this.terminateVisibilityTimeout !== false) { + const timeout = + this.terminateVisibilityTimeout === true + ? 0 + : this.terminateVisibilityTimeout; + await this.changeVisibilityTimeoutBatch(messages, timeout); } } finally { clearInterval(heartbeatTimeoutId); diff --git a/src/types.ts b/src/types.ts index aeec5c8..a1ed7aa 100644 --- a/src/types.ts +++ b/src/types.ts @@ -66,10 +66,11 @@ export interface ConsumerOptions { */ pollingCompleteWaitTimeMs?: number; /** - * If true, sets the message visibility timeout to 0 after a `processing_error`. + * If true, sets the message visibility timeout to 0 after a `processing_error`. You can + * also specify a different timeout using a number. * @defaultvalue `false` */ - terminateVisibilityTimeout?: boolean; + terminateVisibilityTimeout?: boolean | number; /** * The interval (in seconds) between requests to extend the message visibility timeout. * diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index df81845..871bcfa 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -1031,6 +1031,29 @@ describe("Consumer", () => { ); }); + it("changes message visibility timeout on processing error", async () => { + handleMessage.rejects(new Error("Processing error")); + + consumer.terminateVisibilityTimeout = 10; + + consumer.start(); + await pEvent(consumer, "processing_error"); + consumer.stop(); + + sandbox.assert.calledWith( + sqs.send.secondCall, + mockChangeMessageVisibility, + ); + sandbox.assert.match( + sqs.send.secondCall.args[0].input, + sinon.match({ + QueueUrl: QUEUE_URL, + ReceiptHandle: "receipt-handle", + VisibilityTimeout: 10, + }), + ); + }); + it("does not terminate visibility timeout when `terminateVisibilityTimeout` option is false", async () => { handleMessage.rejects(new Error("Processing error")); consumer.terminateVisibilityTimeout = false;