Skip to content

Commit

Permalink
Merge pull request #96 from windingtree/develop
Browse files Browse the repository at this point in the history
feat: 🎸 Added sheduled kind of jobs to the Queue
  • Loading branch information
kostysh authored Feb 19, 2024
2 parents 6375b6d + 05b5f4b commit e425184
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 76 deletions.
205 changes: 129 additions & 76 deletions packages/queue/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const logger = createLogger('Queue');
*/
export enum JobStatus {
Pending,
Scheduled,
Started,
Done,
Cancelled,
Expand Down Expand Up @@ -101,6 +102,8 @@ export interface JobConfig<T extends JobData = JobData> {
retries?: number;
/** Retries delay */
retriesDelay?: number;
/** Scheduled time */
scheduledTime?: number;
/** The history of the job */
history?: JobHistoryInterface;
}
Expand Down Expand Up @@ -133,6 +136,8 @@ export class Job<T extends JobData = JobData> {
retries: number;
/** The period of time between retries */
retriesDelay: number;
/** Scheduled time */
scheduledTime?: number;
/** The history of the job */
history: JobHistory;

Expand All @@ -152,6 +157,11 @@ export class Job<T extends JobData = JobData> {
this.maxRetries = config.maxRetries ?? 0;
this.retries = config.retries ?? 0;
this.retriesDelay = config.retriesDelay ?? 0;

if (this.isRecurrent && config.scheduledTime) {
throw new Error('Job cannot be recurrent and scheduled at the same time');
}
this.scheduledTime = config.scheduledTime;
this.history = new JobHistory(config.history ?? {});
}

Expand Down Expand Up @@ -204,7 +214,7 @@ export class Job<T extends JobData = JobData> {
get executable() {
return (
!this.expired &&
this.status === JobStatus.Pending &&
[JobStatus.Pending, JobStatus.Scheduled].includes(this.status) &&
((!this.isRecurrent &&
(this.maxRetries === 0 ||
(this.maxRetries > 0 && this.retries < this.maxRetries))) ||
Expand Down Expand Up @@ -401,7 +411,9 @@ export class Queue extends EventEmitter<QueueEvents> {
// Only Pending jobs must be restored
if (
jobConfig.history &&
JobHistory.getStatus(jobConfig.history) === JobStatus.Pending
[JobStatus.Pending, JobStatus.Scheduled].includes(
JobHistory.getStatus(jobConfig.history),
)
) {
this.add(jobConfig);
}
Expand Down Expand Up @@ -512,6 +524,87 @@ export class Queue extends EventEmitter<QueueEvents> {
void this.storageUpdate(job.id, job);
}

/**
* Executes Job
* @param {Job} job Job to execute
* @returns {Promise<void>}
*/
private async executeJob(job: Job): Promise<void> {
try {
if (!job.executable) {
return;
}

this.changeJobStatus(job, JobStatus.Started);

const handler = this.handlers.getHandler(job.handlerName);

const result = await job.execute(handler);
logger.trace(`Job #${job.id} execution result: ${String(result)}`);

if (result && job.isRecurrent) {
// If the job is recurrent and the handler returned true, reschedule the job
if (!job.expired) {
logger.trace(`Job #${job.id} is done but new one is scheduled`);
this.changeJobStatus(job, JobStatus.Done);
setTimeout(() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
isRecurrent: job.isRecurrent,
recurrenceInterval: job.recurrenceInterval,
maxRecurrences: job.maxRecurrences,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
}, job.recurrenceInterval);
} else {
logger.trace(`Job #${job.id} is expired`);
this.changeJobStatus(job, JobStatus.Expired);
}
} else {
logger.trace(`Job #${job.id} is done`);
this.changeJobStatus(job, JobStatus.Done);
}
} catch (error) {
logger.error(`Job #${job.id} is errored`, error);
job.history.errors.push(String(error));

if (job.maxRetries > 0 && job.retries < job.maxRetries) {
// If the job hasn't reached the maximum number of retries, retry it
job.retries++;

if (job.retriesDelay > 0) {
logger.trace(`Job #${job.id} filed but scheduled for restart`);
this.changeJobStatus(job, JobStatus.Failed);
setTimeout(
() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
},
backoffWithJitter(
job.retriesDelay,
job.retries,
job.retriesDelay * 10,
),
);
} else {
logger.trace(`Job #${job.id} failed and immediately restarted`);
this.changeJobStatus(job, JobStatus.Pending);
}
} else {
logger.trace(`Job #${job.id} filed`);
this.changeJobStatus(job, JobStatus.Failed);
}
}
}

/**
* Starts processing jobs in the queue.
* It finds executable jobs and runs them concurrently up to the concurrency limit.
Expand All @@ -524,13 +617,37 @@ export class Queue extends EventEmitter<QueueEvents> {
*/
private async start() {
try {
const now = Date.now();

const activeJobs = this.jobs.filter(
(job) => job.status === JobStatus.Started,
);
const pendingJobs = this.jobs.filter((job) => job.executable);
logger.trace(`Active jobs: ${activeJobs.length}`);

// Select all pending jobs except for scheduled
const pendingJobs = this.jobs.filter(
(job) =>
job.executable &&
(!job.scheduledTime ||
(job.scheduledTime && job.scheduledTime <= now)),
);
logger.trace(`Pending jobs: ${pendingJobs.length}`);

// Select all scheduled jobs
const scheduledJobs = this.jobs.filter(
(job) => job.executable && job.scheduledTime,
);

if (scheduledJobs.length > 0) {
scheduledJobs.forEach((job) => {
if (job.scheduledTime && job.scheduledTime > now) {
const delay = job.scheduledTime - now;
this.changeJobStatus(job, JobStatus.Scheduled);
setTimeout(() => void this.executeJob(job), delay);
}
});
}

const availableSlots = this.concurrencyLimit - activeJobs.length;
logger.trace(`Available slots: ${availableSlots}`);

Expand All @@ -546,79 +663,9 @@ export class Queue extends EventEmitter<QueueEvents> {
);

// Start all the selected jobs concurrently
const promises = jobsToStart.map(async (job) => {
try {
this.changeJobStatus(job, JobStatus.Started);

const handler = this.handlers.getHandler(job.handlerName);

const result = await job.execute(handler);
logger.trace(`Job #${job.id} execution result: ${String(result)}`);

if (result && job.isRecurrent) {
// If the job is recurrent and the handler returned true, reschedule the job
if (!job.expired) {
logger.trace(`Job #${job.id} is done but new one is scheduled`);
this.changeJobStatus(job, JobStatus.Done);
setTimeout(() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
isRecurrent: job.isRecurrent,
recurrenceInterval: job.recurrenceInterval,
maxRecurrences: job.maxRecurrences,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
}, job.recurrenceInterval);
} else {
logger.trace(`Job #${job.id} is expired`);
this.changeJobStatus(job, JobStatus.Expired);
}
} else {
logger.trace(`Job #${job.id} is done`);
this.changeJobStatus(job, JobStatus.Done);
}
} catch (error) {
logger.error(`Job #${job.id} is errored`, error);
job.history.errors.push(String(error));

if (job.maxRetries > 0 && job.retries < job.maxRetries) {
// If the job hasn't reached the maximum number of retries, retry it
job.retries++;

if (job.retriesDelay > 0) {
logger.trace(`Job #${job.id} filed but scheduled for restart`);
this.changeJobStatus(job, JobStatus.Failed);
setTimeout(
() => {
this.add({
handlerName: job.handlerName,
data: job.data,
expire: job.expire,
maxRetries: job.maxRetries,
retries: job.retries + 1,
});
},
backoffWithJitter(
job.retriesDelay,
job.retries,
job.retriesDelay * 10,
),
);
} else {
logger.trace(`Job #${job.id} failed and immediately restarted`);
this.changeJobStatus(job, JobStatus.Pending);
}
} else {
logger.trace(`Job #${job.id} filed`);
this.changeJobStatus(job, JobStatus.Failed);
}
}
});

await Promise.allSettled(promises);
await Promise.allSettled(
jobsToStart.map(async (job) => this.executeJob(job)),
);

// After these jobs are done, check if there are any more jobs to process
logger.trace('Trying to restart queue');
Expand Down Expand Up @@ -650,6 +697,12 @@ export class Queue extends EventEmitter<QueueEvents> {
*/
add<T extends JobData = JobData>(config: JobConfig<T>): string {
const job = new Job<T>(config);

// In case of restored Scheduled jobs we need to bring them to Pending again
if (job.status === JobStatus.Scheduled) {
job.status = JobStatus.Pending;
}

this.jobs.push(job);
logger.trace('Job added:', job);
void this.storageUpdate(job.id, job);
Expand Down
72 changes: 72 additions & 0 deletions packages/queue/test/queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,75 @@ describe('Queue', function () {
});
});
});

describe('Scheduled jobs', () => {
let queue: Queue;
let handler: JobHandler<JobData>;

beforeEach(() => {
queue = new Queue({ concurrencyLimit: 5 });
handler = async () => Promise.resolve(true);
queue.registerHandler('scheduledHandler', handler);
});

it('should correctly schedule a job for future execution', async function () {
const futureTime = Date.now() + 100;
const jobId = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: futureTime,
});

expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Scheduled);

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Done);
resolve(true);
}, 150);
});
});

it('should immediately execute a job scheduled for the past', async function () {
const pastTime = Date.now() - 1000;
const jobId = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: pastTime,
});

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Done);
resolve(true);
}, 50);
});
});

it('should handle the execution of multiple scheduled jobs', async function () {
const futureTimeShort = Date.now() + 50;
const futureTimeLong = Date.now() + 200;

const jobIdShort = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: futureTimeShort,
});
const jobIdLong = queue.add({
handlerName: 'scheduledHandler',
scheduledTime: futureTimeLong,
});

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobIdShort)?.status).to.equal(JobStatus.Done);
expect(queue.getLocal(jobIdLong)?.status).to.equal(JobStatus.Scheduled);
resolve(true);
}, 100);
});

await new Promise((resolve) => {
setTimeout(() => {
expect(queue.getLocal(jobIdLong)?.status).to.equal(JobStatus.Done);
resolve(true);
}, 250);
});
});
});

0 comments on commit e425184

Please sign in to comment.