Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/add job progress support #12

Merged
merged 15 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ jobs:
id: docker_build
uses: docker/build-push-action@v5
with:
context: .
push: true # push to registry
pull: true # always fetch the latest base images
tags: ghcr.io/taskforcesh/bullmq-proxy:latest

tags: ${{ steps.metadata.outputs.tags }}
cache-from: type=local,src=/tmp/.buildx-cache
cache-to: type=local,dest=/tmp/.buildx-cache-new
- name: Image digest
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
- run: bun install
- run: bun run tsc
- run: bun test
- run: bun test ./src/e2e-test.ts

test-dragonflydb:
runs-on: ubuntu-latest
Expand All @@ -56,3 +57,4 @@ jobs:
- run: bun install
- run: bun run tsc
- run: QUEUE_PREFIX={b} bun test
- run: QUEUE_PREFIX={b} bun test ./src/e2e-test.ts
21 changes: 21 additions & 0 deletions docker-compose-dfly.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version: '3'
services:
proxy:
image: ghcr.io/taskforcesh/bullmq-proxy:latest
ports:
- 8080:8080
environment:
PORT: 8080
REDIS_HOST: redis
REDIS_PORT: 6379
REDIS_PASSWORD: ${REDIS_PASSWORD}
REDIS_TLS: ${REDIS_TLS}
AUTH_TOKENS: ${AUTH_TOKENS}
QUEUE_PREFIX: '{b}'
dragonflydb:
image: docker.dragonflydb.io/dragonflydb/dragonfly
environment:
DFLY_cluster_mode: emulated
DFLY_lock_on_hashtags: true
ports:
- 6379:6379
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"build:declaration": "tsc --emitDeclarationOnly",
"tsc": "tsc --noEmit",
"prepare": "husky",
"commitlint": "commitlint --edit"
"commitlint": "commitlint --edit",
"test:e2e": "bun test ./src/e2e-test.ts"
},
"dependencies": {
"@sinclair/typebox": "^0.31.17",
Expand Down
62 changes: 62 additions & 0 deletions src/authware/auth-by-tokens.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { jest, mock, it, expect, describe, beforeEach, beforeAll } from "bun:test";
import { warn } from "../utils/log";
import { authByTokens } from "./auth-by-tokens";

import { config } from "../config";

describe('authByTokens', () => {

beforeAll(() => {
mock.module('../config', () => ({
config: {
...config,
authTokens: ['testToken1', 'testToken2']
}
}));

mock.module('../utils/log', () => ({
warn: jest.fn()
}));
});

beforeEach(() => {
jest.restoreAllMocks();
});

it('returns false and logs a warning if the token is missing', async () => {
const req = new Request("http://example.com/path", { headers: {} });
const url = new URL(req.url);

const result = await authByTokens(req, url, {});

expect(result).toBe(false);
expect(warn).toHaveBeenCalledWith(expect.stringContaining('missing token'));
});

it('returns false and logs a warning for an invalid token', async () => {
const req = new Request("http://example.com/path", {
headers: {
'authorization': 'Bearer invalidToken'
}
});
const url = new URL(req.url);

const result = await authByTokens(req, url, {});

expect(result).toBe(false);
expect(warn).toHaveBeenCalledWith(expect.stringContaining('invalid token'));
});

it('returns true for a valid token', async () => {
const req = new Request("http://example.com/path", {
headers: {
'authorization': 'Bearer testToken1'
}
});
const url = new URL(req.url);

const result = await authByTokens(req, url, {});

expect(result).toBe(true);
});
});
31 changes: 31 additions & 0 deletions src/authware/auth-by-tokens.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Cluster, Redis } from "ioredis";
import { config } from "../config";
import { warn } from "../utils/log";

export const authByTokens = async (
req: Request,
url: URL,
_params: Record<string, string>,
_connection?: Redis | Cluster): Promise<boolean> => {
const authTokens = config.authTokens;

const from =
req.headers.get("x-forwarded-for") || req.headers.get("host");

const token = req.headers.get("authorization")?.split("Bearer ")[1];
if (!token) {
warn(
`Unauthorized request (missing token) to path ${url.pathname.toString()} from ${from}`
);
return false;
}

if (!authTokens.includes(token!)) {
warn(
`Unauthorized request (invalid token) to path ${url.pathname.toString()} from ${from}`
);
return false;
}

return true;
}
77 changes: 77 additions & 0 deletions src/authware/auth-for-workers.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { afterAll, beforeAll, describe, expect, it, jest, mock } from "bun:test";
import { authForWorkers } from "./auth-for-workers";
import { Redis } from "ioredis";

let redisClient: Redis;

beforeAll(async () => {
// Reset mocks before each test
redisClient = new Redis({
maxRetriesPerRequest: null
});
});

afterAll(async () => {
// Restore all mocks after all tests
await redisClient.quit();
});

describe('authForWorkers', () => {
it('returns false if the token is missing', async () => {
const req = new Request("http://example.com?queueName=testQueue&jobId=123", {
headers: {}
});
const url = new URL(req.url);
const mockConnection = <unknown>{ get: jest.fn() } as Redis;

const result = await authForWorkers(req, url, {}, mockConnection);

expect(result).toBe(false);
});


it('returns false if jobId or queueName is missing', async () => {
const req = new Request("http://example.com?token=someToken", {
headers: {}
});
const url = new URL(req.url);
const mockConnection = <unknown>{ get: jest.fn() } as Redis;

const result = await authForWorkers(req, url, {}, mockConnection);
expect(result).toBe(false);
});

it('returns false if the token is invalid', async () => {
const req = new Request("http://example.com?queueName=testQueue&jobId=123", {
headers: {
"Authorization": "Bearer invalidToken"
}
});
const url = new URL(req.url);
const mockConnection = <unknown>{
get: jest.fn().mockResolvedValue('validToken')
} as Redis;

const result = await authForWorkers(req, url, {}, mockConnection);
expect(result).toBe(false);
});

it('returns true if the token is valid and all parameters are present', async () => {
const queueName = 'testQueue';
const jobId = '123';

const req = new Request(`http://example.com?queueName=${queueName}&jobId=${jobId}`, {
headers: {
"Authorization": "Bearer validToken"
}
});
const url = new URL(req.url);
const mockConnection = <unknown>{
get: jest.fn().mockResolvedValue('validToken')
} as Redis;

const result = await authForWorkers(req, url, { queueName, jobId }, mockConnection);

expect(result).toBe(true);
});
});
48 changes: 48 additions & 0 deletions src/authware/auth-for-workers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { getQueue } from "../utils/queue-factory";
import { warn } from "../utils/log";
import { Cluster, Redis } from "ioredis";

/**
* Authenticate and authorize job actions based on the job's lock token.
*/
export const authForWorkers = async (
req: Request,
url: URL,
params: Record<string, string>,
connection: Redis | Cluster): Promise<boolean> => {
const from =
req.headers.get("x-forwarded-for") || req.headers.get("host");

const token = req.headers.get("authorization")?.split("Bearer ")[1];
if (!token) {
warn(
`Unauthorized request (missing token) to path ${url.pathname.toString()} from ${from}`
);
return false;
}

// Check if the token is valid for the given job id.
const jobId = params.jobId;
const queueName = params.queueName;

if (!jobId || !queueName) {
warn(
`Unauthorized request (missing jobId or queueName) to path ${url.pathname.toString()} from ${from}`
);
return false;
}

const queue = await getQueue(queueName, connection);
const prefix = queue.opts.prefix;
const jobLockKey = `${prefix}:${queueName}:${jobId}:lock`;
const lockToken = await connection.get(jobLockKey);

if (lockToken !== token) {
warn(
`Unauthorized request (invalid token) to path ${url.pathname.toString()} from ${from}`
);
return false;
}

return true;
}
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const config = {
uri: process.env.REDIS_URI || undefined,
host: process.env.REDIS_HOST || "localhost",
port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT, 10) : 6379,
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls: process.env.REDIS_TLS === "true" ? {} : undefined,
},
Expand Down
51 changes: 43 additions & 8 deletions src/controllers/http/queue-http-controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@

import { describe, it, beforeEach, expect, jest, afterEach, mock } from 'bun:test';
import { describe, it, expect, jest, beforeEach, afterEach } from 'bun:test';
import { QueueHttpController } from './queue-http-controller';
import { JobJson } from 'bullmq';
import { JobJson, Queue } from 'bullmq';
import { Redis } from 'ioredis';
import { cleanCache } from '../../utils/queue-factory';
import { config } from '../../config';

describe('QueueHttpController.addJobs', () => {
let fakeReq;
let fakeReq: Request;
let opts: any;

let redisClient: Redis;
beforeEach(() => {
// Setup a fake request and options
fakeReq = {
fakeReq = <unknown>{
json: jest.fn().mockResolvedValue([{ name: 'jobName', data: 'jobData' }]), // Mock job data
};
} as Request;

redisClient = new Redis();

opts = {
req: fakeReq,
params: { queueName: 'testQueue' },
redisClient: new Redis(),
redisClient,
};
});

afterEach(() => {
mock.restore();
afterEach(async () => {
// Clean up
const queue = new Queue('testQueue', { connection: redisClient, prefix: config.defaultQueuePrefix });
await queue.obliterate({ force: true });

// We need to clean the cache to eliminate side-effects between tests
await cleanCache();

await queue.close();
await redisClient.quit();
});

it('should add jobs successfully', async () => {
Expand All @@ -36,4 +49,26 @@ describe('QueueHttpController.addJobs', () => {
expect(jobs[0]).toHaveProperty('name', 'jobName');
expect(jobs[0]).toHaveProperty('data', 'jobData');
});

it('should be possible to get a job after adding it', async () => {
const response = await QueueHttpController.addJobs(opts);
const jobs = await response!.json() as JobJson[];

expect(jobs).toBeArrayOfSize(1);
expect(jobs[0]).toHaveProperty('name', 'jobName');
expect(jobs[0]).toHaveProperty('data', 'jobData');

const optsGet = {
params: { queueName: 'testQueue', jobId: jobs[0].id },
req: <unknown>fakeReq as Request,
redisClient: new Redis(),
};

const getResponse = await QueueHttpController.getJob(optsGet);
expect(getResponse).toBeDefined();
expect(getResponse!.status).toBe(200);
const job = await getResponse!.json() as JobJson;
expect(job).toHaveProperty('name', 'jobName');
expect(job).toHaveProperty('data', 'jobData');
});
});
Loading