Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(src/classes/job.ts): add custom serializer/deserializer for job data #2591

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/gitbook/patterns/data-serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Data Serialization and Deserialization

It can be convenient to use custom serializers and deserializers when working with complex data types. By default, only JSON-like data can be passed in as job data. If you need to pass data that doesn't conform to JSON standards (like a Map, Set, or Date), you can define custom serializers and deserializers for your queues and workers:

```typescript
import { Queue, Worker } from 'bullmq';
import superjson from 'superjson';

const queue = new Queue('my-queue', {
serializer: data => superjson.serialize(data),
});

await queue.add('my-job', {
date: new Date(),
map: new Map([['my-key', 'my-value']]),
});

const worker = new Worker(
'my-queue',
async job => {
console.log(job.data.date.getSeconds());
console.log(job.data.map.get('my-key'));
},
{
deserializer: data => superjson.deserialize(data),
},
);
```

{% hint style="warning" %}
If you are using third-party BullMQ integrations, such as dashboard or other monitoring solutions, passing custom serializers and deserializers to your queues and workers may have an adverse effect on the way these integrations operate. Defining your serializer to return a JSON compatible object is the best way to ensure that these integrations continue to work as expected.
{% endhint %}
12 changes: 10 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ export class Job<
json: JobJsonRaw,
jobId?: string,
): Job<T, R, N> {
const data = JSON.parse(json.data || '{}');
const rawData = json.data || '{}';
const { deserializer } = queue.opts;
const parsedData = JSON.parse(rawData);
const data = deserializer ? deserializer(parsedData) : parsedData;
const opts = Job.optsFromJSON(json.opts);

const job = new this<T, R, N>(
Expand Down Expand Up @@ -430,10 +433,15 @@ export class Job<
* @returns
*/
asJSON(): JobJson {
const { serializer } = this.queue.opts;

const rawData = typeof this.data === 'undefined' ? {} : this.data;
const data = serializer ? serializer(rawData) : rawData;

return {
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
data: JSON.stringify(data),
opts: this.optsAsJSON(this.opts),
parent: this.parent ? { ...this.parent } : undefined,
parentKey: this.parentKey,
Expand Down
11 changes: 11 additions & 0 deletions src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { AdvancedRepeatOptions } from './advanced-options';
import { DefaultJobOptions } from './base-job-options';
import { ConnectionOptions } from './redis-options';
import { DeserializeFn, SerializeFn } from './serialize';

export enum ClientType {
blocking = 'blocking',
Expand Down Expand Up @@ -31,6 +32,16 @@ export interface QueueBaseOptions {
* @defaultValue false
*/
skipVersionCheck?: boolean;

/**
* Pass a custom serializer to serialize job data into Redis
*/
serializer?: SerializeFn;

/**
* Pass a custom deserializer to deserialize job data
*/
deserializer?: DeserializeFn;
}

/**
Expand Down
25 changes: 25 additions & 0 deletions src/interfaces/serialize.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
export type JsonObject = { [key: string]: JsonValue };

export type JsonValue =
| null
| boolean
| number
| string
| JsonValue[]
| JsonObject;

/**
* Serialize job data into a custom JSON compatible object
*
* @param data - the job data
* @returns a JSON compatible object
*/
export type SerializeFn = (data: any) => JsonValue;

/**
* Deserialize job data into a custom JSON compatible object
*
* @param data - the job data
* @returns a JSON compatible object
*/
export type DeserializeFn = (data: any) => JsonValue;
68 changes: 68 additions & 0 deletions tests/test_job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { v4 } from 'uuid';
import { Job, Queue, QueueEvents, Worker } from '../src/classes';
import { JobsOptions } from '../src/types';
import { delay, getParentKey, removeAllQueueData } from '../src/utils';
import * as sinon from 'sinon';
import { DeserializeFn, SerializeFn } from '../src/interfaces/serialize';

describe('Job', function () {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -229,6 +231,72 @@ describe('Job', function () {
});
});

describe('serialize/deserialize', () => {
let queueName: string;
let queue: Queue;
const serializer: SerializeFn = data => ({ ...data, bar: 'foo' });
const deserializer: DeserializeFn = data => ({
...data,
abc: 'xyz',
});
const data = { foo: 'bar' };

beforeEach(() => {
queueName = `test-${v4()}`;
queue = new Queue(queueName, { connection, prefix, serializer });
});

afterEach(async () => {
await queue.close();
});

it('should serialize the job data with the queue serializer', async () => {
const spy = sinon.spy(queue.opts, 'serializer');
const job = await Job.create(queue, 'test', data);

expect(spy.callCount).to.be.equal(1);
expect(job.asJSON().data).to.be.equal('{"foo":"bar","bar":"foo"}');
});

it('should still be parsable by JSON.parse', async () => {
const job = await Job.create(queue, 'test', data);

const jobData = job.asJSON().data;
expect(JSON.parse(jobData)).to.deep.equal({ foo: 'bar', bar: 'foo' });
});

it('should deserialize the job data with the worker deserializer', async () => {
const data = { foo: 'bar' };
await Job.create(queue, 'test', data);

let worker: Worker;
const promise = new Promise<void>(async (resolve, reject) => {
worker = new Worker(
queueName,
async job => {
try {
expect(job.data).to.deep.equal({
foo: 'bar',
bar: 'foo',
abc: 'xyz',
});
} catch (err) {
reject(err);
}
resolve();
},
{ connection, prefix, deserializer },
);
});

try {
await promise;
} finally {
worker && (await worker.close());
}
});
});

describe('.update', function () {
it('should allow updating job data', async function () {
const job = await Job.create<{ foo?: string; baz?: string }>(
Expand Down