Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(worker): mitigate job's locks extensions #2970

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -835,14 +835,14 @@ will never work with more accuracy than 1ms. */
});

const handleCompleted = async (result: ResultType) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
const completed = await job.moveToCompleted(
result,
token,
fetchNextCallback() && !(this.closing || this.paused),
);
jobsInProgress.delete(inProgressItem);

this.emit('completed', job, result, 'active');

span?.addEvent('job completed', {
Expand All @@ -857,8 +857,6 @@ will never work with more accuracy than 1ms. */
};

const handleFailed = async (err: Error) => {
jobsInProgress.delete(inProgressItem);

if (!this.connection.closing) {
try {
// Check if the job was manually rate-limited
Expand All @@ -877,6 +875,8 @@ will never work with more accuracy than 1ms. */
}

const result = await job.moveToFailed(err, token, true);
jobsInProgress.delete(inProgressItem);

this.emit('failed', job, err, 'active');

span?.addEvent('job failed', {
Expand Down Expand Up @@ -911,6 +911,8 @@ will never work with more accuracy than 1ms. */
const failed = await handleFailed(<Error>err);
return failed;
} finally {
jobsInProgress.delete(inProgressItem);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this line be sufficient and the other extra references can be deleted as this line will be executed after completed and failed handlers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because there are other async calls in between, so the lock extender could run in between as well and the there will be no lock for this job.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

before line 864 we also need to delete that inProgressitem as well


span?.setAttributes({
[TelemetryAttributes.JobFinishedTimestamp]: Date.now(),
[TelemetryAttributes.JobProcessedTimestamp]: processedOn,
Expand Down Expand Up @@ -1121,6 +1123,9 @@ will never work with more accuracy than 1ms. */

for (const item of jobsInProgress) {
const { job, ts } = item;

// In theory ts should always be defined here so this
// check should not be necessary.
if (!ts) {
item.ts = now;
continue;
Expand All @@ -1141,7 +1146,7 @@ will never work with more accuracy than 1ms. */
}

this.startLockExtenderTimer(jobsInProgress);
}, this.opts.lockRenewTime / 2);
}, this.opts.lockRenewTime / 10);
}
}
}
Expand Down Expand Up @@ -1207,8 +1212,6 @@ will never work with more accuracy than 1ms. */
);

for (const jobId of erroredJobIds) {
// TODO: Send signal to process function that the job has been lost.

this.emit(
'error',
new Error(`could not renew lock for job ${jobId}`),
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1984,7 +1984,7 @@ describe('workers', function () {
connection,
prefix,
lockDuration: 1000,
lockRenewTime: 3000, // The lock will not be updated in time
lockRenewTime: 15000, // The lock will not be updated in time
},
);
await worker.waitUntilReady();
Expand Down
Loading