Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to update priority of a promise function on the fly #209

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ Default: `0`

Priority of operation. Operations with greater priority will be scheduled first.

##### id

Type `string`

Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned as an incrementing bigint starting from 1n.

##### signal

[`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) for cancellation of the operation. When aborted, it will be removed from the queue and the `queue.add()` call will reject with an [error](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/reason). If the operation is already running, the signal will need to be handled by the operation itself.
Expand Down Expand Up @@ -236,6 +242,44 @@ console.log(queue.sizeBy({priority: 0}));
//=> 1
```

#### .setPriority(id, priority)

Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.

For example, this can be used to prioritize a promise function to run earlier.

```js
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 1});

queue.setPriority('🦀', 2);
```

In this case, the promise function with `id: '🦀'` runs second.

You can also deprioritize a promise function to delay its execution:

```js
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
queue.add(async () => '🦄');
queue.add(async () => '🦄', {priority: 0});

queue.setPriority('🦀', -1);
```

Here, the promise function with `id: '🦀'` executes last.

#### .pending

Number of running items (no longer in the queue).
Expand Down
46 changes: 46 additions & 0 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@

readonly #throwOnTimeout: boolean;

// Use to assign a unique identifier to a promise function, if not explicitly specified
#idAssigner = 1n;

/**
Per-operation timeout in milliseconds. Operations fulfill once `timeout` elapses if they haven't already.

Expand Down Expand Up @@ -228,12 +231,55 @@
});
}

/**
Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.

For example, this can be used to prioritize a promise function to run earlier.

```js
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 0, id: '🦀'});
queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦄', {priority: 1});

queue.setPriority('🦀', 2);
```

Check failure on line 251 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 20

Trailing spaces not allowed.

Check failure on line 251 in source/index.ts

View workflow job for this annotation

GitHub Actions / Node.js 18

Trailing spaces not allowed.
In this case, the promise function with `id: '🦀'` runs second.

You can also deprioritize a promise function to delay its execution:

```js
import PQueue from 'p-queue';

const queue = new PQueue({concurrency: 1});

queue.add(async () => '🦄', {priority: 1});
queue.add(async () => '🦀', {priority: 1, id: '🦀'});
queue.add(async () => '🦄');
queue.add(async () => '🦄', {priority: 0});

queue.setPriority('🦀', -1);
```
Here, the promise function with `id: '🦀'` executes last.
*/
setPriority(id: string, priority: number) {
RaishavHanspal marked this conversation as resolved.
Show resolved Hide resolved
this.#queue.setPriority(id, priority);
}

/**
Adds a sync or async task to the queue. Always returns a promise.
*/
async add<TaskResultType>(function_: Task<TaskResultType>, options: {throwOnTimeout: true} & Exclude<EnqueueOptionsType, 'throwOnTimeout'>): Promise<TaskResultType>;
async add<TaskResultType>(function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>): Promise<TaskResultType | void>;
async add<TaskResultType>(function_: Task<TaskResultType>, options: Partial<EnqueueOptionsType> = {}): Promise<TaskResultType | void> {
// In case `id` is not defined.
options.id ??= (this.#idAssigner++).toString();

options = {
timeout: this.timeout,
throwOnTimeout: this.#throwOnTimeout,
Expand Down
5 changes: 5 additions & 0 deletions source/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ export type QueueAddOptions = {
@default 0
*/
readonly priority?: number;

/**
Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned as an incrementing bigint starting from 1n.
*/
id?: string;
RaishavHanspal marked this conversation as resolved.
Show resolved Hide resolved
RaishavHanspal marked this conversation as resolved.
Show resolved Hide resolved
RaishavHanspal marked this conversation as resolved.
Show resolved Hide resolved
} & TaskOptions & TimeoutOptions;

export type TaskOptions = {
Expand Down
13 changes: 12 additions & 1 deletion source/priority-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp

const element = {
priority: options.priority,
id: options.id,
run,
};

if (this.size && this.#queue[this.size - 1]!.priority! >= options.priority!) {
if (this.size === 0 || this.#queue[this.size - 1]!.priority! >= options.priority!) {
this.#queue.push(element);
return;
}
Expand All @@ -32,6 +33,16 @@ export default class PriorityQueue implements Queue<RunFunction, PriorityQueueOp
this.#queue.splice(index, 0, element);
}

setPriority(id: string, priority: number) {
const existingIndex: number = this.#queue.findIndex((element: Readonly<PriorityQueueOptions>) => element.id === id);
if (existingIndex === -1) {
throw new Error('Invalid Index - No promise function of specified id available in the queue.');
}

const [item] = this.#queue.splice(existingIndex, 1);
this.enqueue(item!.run, {priority, id});
}

dequeue(): RunFunction | undefined {
const item = this.#queue.shift();
return item?.run;
Expand Down
1 change: 1 addition & 0 deletions source/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ export type Queue<Element, Options> = {
filter: (options: Readonly<Partial<Options>>) => Element[];
dequeue: () => Element | undefined;
enqueue: (run: Element, options?: Partial<Options>) => void;
setPriority: (id: string, priority: number) => void;
};
163 changes: 162 additions & 1 deletion test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import inRange from 'in-range';
import timeSpan from 'time-span';
import randomInt from 'random-int';
import pDefer from 'p-defer';
import PQueue, {AbortError} from '../source/index.js';
import PQueue from '../source/index.js';

const fixture = Symbol('fixture');

Expand Down Expand Up @@ -1134,3 +1134,164 @@ test('aborting multiple jobs at the same time', async t => {
await t.throwsAsync(task2, {instanceOf: DOMException});
t.like(queue, {size: 0, pending: 0});
});

test('.setPriority() - execute a promise before planned', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 1});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.setPriority('🐢', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🐢', '🦆']);
});

test('.setPriority() - execute a promise after planned', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 1});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.setPriority('🐢', -1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🦆', '🦆', '🦆', '🐢']);
});

test('.setPriority() - execute a promise before planned - concurrency 2', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 2});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('⚡️');
}, {id: '⚡️'});
queue.setPriority('⚡️', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '⚡️', '🐢']);
});

test('.setPriority() - execute a promise before planned - concurrency 3', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 3});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('⚡️');
}, {id: '⚡️'});
queue.add(async () => {
await delay(400);
result.push('🦀');
}, {id: '🦀'});
queue.setPriority('🦀', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
});

test('.setPriority() - execute a multiple promise before planned, with variable priority', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 2});
queue.add(async () => {
await delay(400);
result.push('🐌');
}, {id: '🐌'});
queue.add(async () => {
await delay(400);
result.push('🦆');
}, {id: '🦆'});
queue.add(async () => {
await delay(400);
result.push('🐢');
}, {id: '🐢'});
queue.add(async () => {
await delay(400);
result.push('⚡️');
}, {id: '⚡️'});
queue.add(async () => {
await delay(400);
result.push('🦀');
}, {id: '🦀'});
queue.setPriority('⚡️', 1);
queue.setPriority('🦀', 2);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🦀', '⚡️', '🐢']);
});

test('.setPriority() - execute a promise before planned - concurrency 3 and unspecified `id`', async t => {
const result: string[] = [];
const queue = new PQueue({concurrency: 3});
queue.add(async () => {
await delay(400);
result.push('🐌');
});
queue.add(async () => {
await delay(400);
result.push('🦆');
});
queue.add(async () => {
await delay(400);
result.push('🐢');
});
queue.add(async () => {
await delay(400);
result.push('⚡️');
});
queue.add(async () => {
await delay(400);
result.push('🦀');
});
queue.setPriority('5', 1);
await queue.onIdle();
t.deepEqual(result, ['🐌', '🦆', '🐢', '🦀', '⚡️']);
});
Loading