Skip to content

Commit

Permalink
feat: terminateVisibilityTimeout = number to customise the timeout
Browse files Browse the repository at this point in the history
This allows users to introduce a delay before the message can be
received again. In some cases this can improve the overall success rate.
  • Loading branch information
Frederick888 committed Oct 14, 2024
1 parent 7609799 commit 092c3b9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
18 changes: 13 additions & 5 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class Consumer extends TypedEventEmitter {
private alwaysAcknowledge: boolean;
private batchSize: number;
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
private terminateVisibilityTimeout: boolean | number;
private waitTimeSeconds: number;
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
Expand Down Expand Up @@ -362,8 +362,12 @@ export class Consumer extends TypedEventEmitter {
} catch (err) {
this.emitError(err, message);

if (this.terminateVisibilityTimeout) {
await this.changeVisibilityTimeout(message, 0);
if (this.terminateVisibilityTimeout !== false) {
const timeout =
this.terminateVisibilityTimeout === true
? 0
: this.terminateVisibilityTimeout;
await this.changeVisibilityTimeout(message, timeout);
}
} finally {
if (this.heartbeatInterval) {
Expand Down Expand Up @@ -400,8 +404,12 @@ export class Consumer extends TypedEventEmitter {
} catch (err) {
this.emit("error", err, messages);

if (this.terminateVisibilityTimeout) {
await this.changeVisibilityTimeoutBatch(messages, 0);
if (this.terminateVisibilityTimeout !== false) {
const timeout =
this.terminateVisibilityTimeout === true
? 0
: this.terminateVisibilityTimeout;
await this.changeVisibilityTimeoutBatch(messages, timeout);
}
} finally {
clearInterval(heartbeatTimeoutId);
Expand Down
5 changes: 3 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ export interface ConsumerOptions {
*/
pollingCompleteWaitTimeMs?: number;
/**
* If true, sets the message visibility timeout to 0 after a `processing_error`.
* If true, sets the message visibility timeout to 0 after a `processing_error`. You can
* also specify a different timeout using a number.
* @defaultvalue `false`
*/
terminateVisibilityTimeout?: boolean;
terminateVisibilityTimeout?: boolean | number;
/**
* The interval (in seconds) between requests to extend the message visibility timeout.
*
Expand Down
23 changes: 23 additions & 0 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,29 @@ describe("Consumer", () => {
);
});

it("changes message visibility timeout on processing error", async () => {
handleMessage.rejects(new Error("Processing error"));

consumer.terminateVisibilityTimeout = 10;

consumer.start();
await pEvent(consumer, "processing_error");
consumer.stop();

sandbox.assert.calledWith(
sqs.send.secondCall,
mockChangeMessageVisibility,
);
sandbox.assert.match(
sqs.send.secondCall.args[0].input,
sinon.match({
QueueUrl: QUEUE_URL,
ReceiptHandle: "receipt-handle",
VisibilityTimeout: 10,
}),
);
});

it("does not terminate visibility timeout when `terminateVisibilityTimeout` option is false", async () => {
handleMessage.rejects(new Error("Processing error"));
consumer.terminateVisibilityTimeout = false;
Expand Down

0 comments on commit 092c3b9

Please sign in to comment.