Skip to content

Commit

Permalink
Merge pull request #103 from windingtree/feat/level-queue-test
Browse files Browse the repository at this point in the history
feat: add levelDB + queue test
  • Loading branch information
kostysh authored Mar 19, 2024
2 parents 5abd470 + 13e9c42 commit e27a402
Show file tree
Hide file tree
Showing 3 changed files with 2,653 additions and 7,284 deletions.
5 changes: 5 additions & 0 deletions packages/storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
"publishConfig": {
"access": "public"
},
"devDependencies": {
"@types/luxon": "^3.3.5",
"@windingtree/sdk-queue": "workspace:*",
"luxon": "^1.23.0"
},
"dependencies": {
"@windingtree/sdk-logger": "workspace:*",
"buffer": "^6.0.3",
Expand Down
85 changes: 85 additions & 0 deletions packages/storage/test/levelQueue.memory.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { describe, expect, it } from 'vitest';
import { GenericStorageOptions } from '../src/index.js';
import { createInitializer } from '../src/level.js';
import { JobHandler, Queue } from '@windingtree/sdk-queue';
import { DateTime } from 'luxon';

describe('Level Queue Test', () => {
const createJobHandler =
<JobData = unknown, HandlerOptions = unknown>(
handler: JobHandler<JobData, HandlerOptions>,
) =>
(options: HandlerOptions = {} as HandlerOptions) =>
(data: JobData) =>
handler(data, options);

it('Initializer queue jobs test', async () => {
const options: GenericStorageOptions = { scope: 'queueTestInitializer' };
const initializer = createInitializer(options);
const initializedStorage = await initializer();

let queue = new Queue({
storage: initializedStorage,
idsKeyName: 'jobsIds',
concurrencyLimit: 3,
});

const idsSet: Set<number> = new Set();
let hasDoubleJobs = false;
const testHandler = createJobHandler<{ id: number }>(async (data) => {
if (data?.id) {
const { id } = data;

if (idsSet.has(id)) {
hasDoubleJobs = true;
}

idsSet.add(data?.id);
}

return Promise.resolve(false);
});

queue.registerHandler('test', testHandler());

const id = Math.floor(Math.random() * 100);
const timeoutSeconds = 3;

DateTime.fromJSDate(new Date()).plus({ minutes: 5 }).toSeconds();

const jobId = queue.add({
handlerName: 'test',
data: { id },
maxRetries: 1,
expire: DateTime.fromJSDate(new Date()).plus({ minutes: 5 }).toSeconds(),
scheduledTime: DateTime.fromJSDate(new Date())
.plus({ seconds: timeoutSeconds })
.toMillis(),
});

await new Promise<void>((resolve) => {
return setTimeout(() => resolve(), 500);
});

await new Promise<void>((resolve) => {
return setTimeout(() => resolve(), (timeoutSeconds + 1) * 1000);
});

await queue.stop();

queue = new Queue({
storage: initializedStorage,
idsKeyName: 'jobsIds',
concurrencyLimit: 3,
});

await new Promise<void>((resolve) => {
return setTimeout(() => resolve(), 1000);
});

const data = (await queue.get(jobId))?.data;

expect((data as { id: number }).id).toBe(id);
expect(hasDoubleJobs).toBe(false);
});
});
Loading

0 comments on commit e27a402

Please sign in to comment.