diff --git a/.github/workflows/build-docker.yml b/.github/workflows/build-docker.yml index 0d6121d..2516aa2 100644 --- a/.github/workflows/build-docker.yml +++ b/.github/workflows/build-docker.yml @@ -51,9 +51,11 @@ jobs: id: docker_build uses: docker/build-push-action@v5 with: + context: . push: true # push to registry pull: true # always fetch the latest base images - tags: ghcr.io/taskforcesh/bullmq-proxy:latest + + tags: ${{ steps.metadata.outputs.tags }} cache-from: type=local,src=/tmp/.buildx-cache cache-to: type=local,dest=/tmp/.buildx-cache-new - name: Image digest diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 4ccd3c1..3e7cc6b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,6 +34,7 @@ jobs: - run: bun install - run: bun run tsc - run: bun test + - run: bun test ./src/e2e-test.ts test-dragonflydb: runs-on: ubuntu-latest @@ -56,3 +57,4 @@ jobs: - run: bun install - run: bun run tsc - run: QUEUE_PREFIX={b} bun test + - run: QUEUE_PREFIX={b} bun test ./src/e2e-test.ts diff --git a/docker-compose-dfly.yaml b/docker-compose-dfly.yaml new file mode 100644 index 0000000..35126ed --- /dev/null +++ b/docker-compose-dfly.yaml @@ -0,0 +1,21 @@ +version: '3' +services: + proxy: + image: ghcr.io/taskforcesh/bullmq-proxy:latest + ports: + - 8080:8080 + environment: + PORT: 8080 + REDIS_HOST: redis + REDIS_PORT: 6379 + REDIS_PASSWORD: ${REDIS_PASSWORD} + REDIS_TLS: ${REDIS_TLS} + AUTH_TOKENS: ${AUTH_TOKENS} + QUEUE_PREFIX: '{b}' + dragonflydb: + image: docker.dragonflydb.io/dragonflydb/dragonfly + environment: + DFLY_cluster_mode: emulated + DFLY_lock_on_hashtags: true + ports: + - 6379:6379 diff --git a/package.json b/package.json index 7c218a5..974d605 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,8 @@ "build:declaration": "tsc --emitDeclarationOnly", "tsc": "tsc --noEmit", "prepare": "husky", - "commitlint": "commitlint --edit" + "commitlint": "commitlint --edit", + "test:e2e": "bun test ./src/e2e-test.ts" }, "dependencies": { "@sinclair/typebox": "^0.31.17", diff --git a/src/authware/auth-by-tokens.spec.ts b/src/authware/auth-by-tokens.spec.ts new file mode 100644 index 0000000..b486e33 --- /dev/null +++ b/src/authware/auth-by-tokens.spec.ts @@ -0,0 +1,62 @@ +import { jest, mock, it, expect, describe, beforeEach, beforeAll } from "bun:test"; +import { warn } from "../utils/log"; +import { authByTokens } from "./auth-by-tokens"; + +import { config } from "../config"; + +describe('authByTokens', () => { + + beforeAll(() => { + mock.module('../config', () => ({ + config: { + ...config, + authTokens: ['testToken1', 'testToken2'] + } + })); + + mock.module('../utils/log', () => ({ + warn: jest.fn() + })); + }); + + beforeEach(() => { + jest.restoreAllMocks(); + }); + + it('returns false and logs a warning if the token is missing', async () => { + const req = new Request("http://example.com/path", { headers: {} }); + const url = new URL(req.url); + + const result = await authByTokens(req, url, {}); + + expect(result).toBe(false); + expect(warn).toHaveBeenCalledWith(expect.stringContaining('missing token')); + }); + + it('returns false and logs a warning for an invalid token', async () => { + const req = new Request("http://example.com/path", { + headers: { + 'authorization': 'Bearer invalidToken' + } + }); + const url = new URL(req.url); + + const result = await authByTokens(req, url, {}); + + expect(result).toBe(false); + expect(warn).toHaveBeenCalledWith(expect.stringContaining('invalid token')); + }); + + it('returns true for a valid token', async () => { + const req = new Request("http://example.com/path", { + headers: { + 'authorization': 'Bearer testToken1' + } + }); + const url = new URL(req.url); + + const result = await authByTokens(req, url, {}); + + expect(result).toBe(true); + }); +}); \ No newline at end of file diff --git a/src/authware/auth-by-tokens.ts b/src/authware/auth-by-tokens.ts new file mode 100644 index 0000000..1739b0f --- /dev/null +++ b/src/authware/auth-by-tokens.ts @@ -0,0 +1,31 @@ +import { Cluster, Redis } from "ioredis"; +import { config } from "../config"; +import { warn } from "../utils/log"; + +export const authByTokens = async ( + req: Request, + url: URL, + _params: Record, + _connection?: Redis | Cluster): Promise => { + const authTokens = config.authTokens; + + const from = + req.headers.get("x-forwarded-for") || req.headers.get("host"); + + const token = req.headers.get("authorization")?.split("Bearer ")[1]; + if (!token) { + warn( + `Unauthorized request (missing token) to path ${url.pathname.toString()} from ${from}` + ); + return false; + } + + if (!authTokens.includes(token!)) { + warn( + `Unauthorized request (invalid token) to path ${url.pathname.toString()} from ${from}` + ); + return false; + } + + return true; +} diff --git a/src/authware/auth-for-workers.spec.ts b/src/authware/auth-for-workers.spec.ts new file mode 100644 index 0000000..b867499 --- /dev/null +++ b/src/authware/auth-for-workers.spec.ts @@ -0,0 +1,77 @@ +import { afterAll, beforeAll, describe, expect, it, jest, mock } from "bun:test"; +import { authForWorkers } from "./auth-for-workers"; +import { Redis } from "ioredis"; + +let redisClient: Redis; + +beforeAll(async () => { + // Reset mocks before each test + redisClient = new Redis({ + maxRetriesPerRequest: null + }); +}); + +afterAll(async () => { + // Restore all mocks after all tests + await redisClient.quit(); +}); + +describe('authForWorkers', () => { + it('returns false if the token is missing', async () => { + const req = new Request("http://example.com?queueName=testQueue&jobId=123", { + headers: {} + }); + const url = new URL(req.url); + const mockConnection = { get: jest.fn() } as Redis; + + const result = await authForWorkers(req, url, {}, mockConnection); + + expect(result).toBe(false); + }); + + + it('returns false if jobId or queueName is missing', async () => { + const req = new Request("http://example.com?token=someToken", { + headers: {} + }); + const url = new URL(req.url); + const mockConnection = { get: jest.fn() } as Redis; + + const result = await authForWorkers(req, url, {}, mockConnection); + expect(result).toBe(false); + }); + + it('returns false if the token is invalid', async () => { + const req = new Request("http://example.com?queueName=testQueue&jobId=123", { + headers: { + "Authorization": "Bearer invalidToken" + } + }); + const url = new URL(req.url); + const mockConnection = { + get: jest.fn().mockResolvedValue('validToken') + } as Redis; + + const result = await authForWorkers(req, url, {}, mockConnection); + expect(result).toBe(false); + }); + + it('returns true if the token is valid and all parameters are present', async () => { + const queueName = 'testQueue'; + const jobId = '123'; + + const req = new Request(`http://example.com?queueName=${queueName}&jobId=${jobId}`, { + headers: { + "Authorization": "Bearer validToken" + } + }); + const url = new URL(req.url); + const mockConnection = { + get: jest.fn().mockResolvedValue('validToken') + } as Redis; + + const result = await authForWorkers(req, url, { queueName, jobId }, mockConnection); + + expect(result).toBe(true); + }); +}); diff --git a/src/authware/auth-for-workers.ts b/src/authware/auth-for-workers.ts new file mode 100644 index 0000000..8665ba5 --- /dev/null +++ b/src/authware/auth-for-workers.ts @@ -0,0 +1,48 @@ +import { getQueue } from "../utils/queue-factory"; +import { warn } from "../utils/log"; +import { Cluster, Redis } from "ioredis"; + +/** + * Authenticate and authorize job actions based on the job's lock token. + */ +export const authForWorkers = async ( + req: Request, + url: URL, + params: Record, + connection: Redis | Cluster): Promise => { + const from = + req.headers.get("x-forwarded-for") || req.headers.get("host"); + + const token = req.headers.get("authorization")?.split("Bearer ")[1]; + if (!token) { + warn( + `Unauthorized request (missing token) to path ${url.pathname.toString()} from ${from}` + ); + return false; + } + + // Check if the token is valid for the given job id. + const jobId = params.jobId; + const queueName = params.queueName; + + if (!jobId || !queueName) { + warn( + `Unauthorized request (missing jobId or queueName) to path ${url.pathname.toString()} from ${from}` + ); + return false; + } + + const queue = await getQueue(queueName, connection); + const prefix = queue.opts.prefix; + const jobLockKey = `${prefix}:${queueName}:${jobId}:lock`; + const lockToken = await connection.get(jobLockKey); + + if (lockToken !== token) { + warn( + `Unauthorized request (invalid token) to path ${url.pathname.toString()} from ${from}` + ); + return false; + } + + return true; +} diff --git a/src/config.ts b/src/config.ts index d426d2b..b9e2b7a 100644 --- a/src/config.ts +++ b/src/config.ts @@ -7,6 +7,7 @@ export const config = { uri: process.env.REDIS_URI || undefined, host: process.env.REDIS_HOST || "localhost", port: process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT, 10) : 6379, + username: process.env.REDIS_USERNAME || undefined, password: process.env.REDIS_PASSWORD || undefined, tls: process.env.REDIS_TLS === "true" ? {} : undefined, }, diff --git a/src/controllers/http/queue-http-controller.spec.ts b/src/controllers/http/queue-http-controller.spec.ts index bc4828f..f5da511 100644 --- a/src/controllers/http/queue-http-controller.spec.ts +++ b/src/controllers/http/queue-http-controller.spec.ts @@ -1,28 +1,41 @@ -import { describe, it, beforeEach, expect, jest, afterEach, mock } from 'bun:test'; +import { describe, it, expect, jest, beforeEach, afterEach } from 'bun:test'; import { QueueHttpController } from './queue-http-controller'; -import { JobJson } from 'bullmq'; +import { JobJson, Queue } from 'bullmq'; import { Redis } from 'ioredis'; +import { cleanCache } from '../../utils/queue-factory'; +import { config } from '../../config'; describe('QueueHttpController.addJobs', () => { - let fakeReq; + let fakeReq: Request; let opts: any; + let redisClient: Redis; beforeEach(() => { // Setup a fake request and options - fakeReq = { + fakeReq = { json: jest.fn().mockResolvedValue([{ name: 'jobName', data: 'jobData' }]), // Mock job data - }; + } as Request; + + redisClient = new Redis(); opts = { req: fakeReq, params: { queueName: 'testQueue' }, - redisClient: new Redis(), + redisClient, }; }); - afterEach(() => { - mock.restore(); + afterEach(async () => { + // Clean up + const queue = new Queue('testQueue', { connection: redisClient, prefix: config.defaultQueuePrefix }); + await queue.obliterate({ force: true }); + + // We need to clean the cache to eliminate side-effects between tests + await cleanCache(); + + await queue.close(); + await redisClient.quit(); }); it('should add jobs successfully', async () => { @@ -36,4 +49,26 @@ describe('QueueHttpController.addJobs', () => { expect(jobs[0]).toHaveProperty('name', 'jobName'); expect(jobs[0]).toHaveProperty('data', 'jobData'); }); + + it('should be possible to get a job after adding it', async () => { + const response = await QueueHttpController.addJobs(opts); + const jobs = await response!.json() as JobJson[]; + + expect(jobs).toBeArrayOfSize(1); + expect(jobs[0]).toHaveProperty('name', 'jobName'); + expect(jobs[0]).toHaveProperty('data', 'jobData'); + + const optsGet = { + params: { queueName: 'testQueue', jobId: jobs[0].id }, + req: fakeReq as Request, + redisClient: new Redis(), + }; + + const getResponse = await QueueHttpController.getJob(optsGet); + expect(getResponse).toBeDefined(); + expect(getResponse!.status).toBe(200); + const job = await getResponse!.json() as JobJson; + expect(job).toHaveProperty('name', 'jobName'); + expect(job).toHaveProperty('data', 'jobData'); + }); }); diff --git a/src/controllers/http/queue-http-controller.ts b/src/controllers/http/queue-http-controller.ts index 13caab5..8ddcfa5 100644 --- a/src/controllers/http/queue-http-controller.ts +++ b/src/controllers/http/queue-http-controller.ts @@ -1,13 +1,8 @@ import { JobState, Queue } from "bullmq"; -import { LRUCache } from "../../cache"; import { HttpHandlerOpts } from "../../interfaces/http-handler-opts"; -import { validateJob, validateQueueName } from "../../validators"; -import { config } from "../../config"; - -const cache = new LRUCache(config.queueCacheSize, async (queueName, queue) => { - await queue.close(); -}); +import { validateJob, validatePagination, validateQueueName } from "../../validators"; +import { getQueue } from "../../utils/queue-factory"; export const QueueHttpController = { /** @@ -23,11 +18,7 @@ export const QueueHttpController = { return new Response((err).message, { status: 400 }); } - let queue = cache.get(queueName); - if (!queue) { - queue = new Queue(queueName, { connection: opts.redisClient, prefix: config.defaultQueuePrefix }); - cache.put(queueName, queue); - } + const queue = await getQueue(queueName, opts.redisClient); try { const body = await opts.req.json(); @@ -60,26 +51,21 @@ export const QueueHttpController = { */ getJobs: async (opts: HttpHandlerOpts) => { const queueName = opts.params.queueName; - let queue = cache.get(queueName); - if (!queue) { - queue = new Queue(queueName, { connection: opts.redisClient }); - cache.put(queueName, queue); - } + let start; + let length; - const start = parseInt(opts.searchParams?.get("start") || "0"); - const length = parseInt(opts.searchParams?.get("length") || "10"); + try { + validateQueueName(queueName); - if (isNaN(start) || isNaN(length)) { - return new Response("Invalid start or length", { status: 400 }); - } + start = parseInt(opts.searchParams?.get("start") || "0"); + length = parseInt(opts.searchParams?.get("length") || "10"); - if (start < 0 || length < 0) { - return new Response("Start and length must be positive", { status: 400 }); + validatePagination(start, length); + } catch (err) { + return new Response((err).message, { status: 400 }); } - if (length > 100) { - return new Response("Length must be less than or equal to 100", { status: 400 }); - } + const queue = await getQueue(queueName, opts.redisClient); const statuses: JobState[] = (opts.searchParams?.get("statuses") || "waiting,active,completed,failed").split(",").filter((s) => s) as JobState[]; @@ -94,9 +80,35 @@ export const QueueHttpController = { const [counts, jobs] = await Promise.all([ queue.getJobCounts(...statuses), - queue.getJobs(statuses, start, start + length) + queue.getJobs(statuses, start, start + length - 1) ]); return new Response(JSON.stringify({ counts, jobs, start, length }), { status: 200 }); + }, + + /** + * getJob + * @param opts + * @returns + */ + getJob: async (opts: HttpHandlerOpts) => { + const queueName = opts.params.queueName; + try { + validateQueueName(queueName); + } catch (err) { + return new Response((err).message, { status: 400 }); + } + + const queue = await getQueue(queueName, opts.redisClient); + + const jobId = opts.params.jobId; + const job = await queue.getJob(jobId); + + if (!job) { + return new Response("Job not found", { status: 404 }); + } + + return new Response(JSON.stringify(job), { status: 200 }); } + } diff --git a/src/controllers/http/worker-http-controller.spec.ts b/src/controllers/http/worker-http-controller.spec.ts index 1b27689..e1bce1b 100644 --- a/src/controllers/http/worker-http-controller.spec.ts +++ b/src/controllers/http/worker-http-controller.spec.ts @@ -1,5 +1,5 @@ import { Redis } from 'ioredis'; -import { describe, it, jest, mock, expect, beforeAll } from "bun:test"; +import { describe, it, jest, mock, expect, beforeAll, afterAll } from "bun:test"; import { WorkerHttpController } from './worker-http-controller'; const fakeAddValidReq = { @@ -37,6 +37,10 @@ describe('WorkerHttpController.addWorker', () => { }); }); + afterAll(async () => { + await redisClient.quit(); + }); + it('should add a worker with valid metadata', async () => { const response = await WorkerHttpController.addWorker({ req: fakeAddValidReq, redisClient, params: {} }); diff --git a/src/controllers/http/worker-http-controller.ts b/src/controllers/http/worker-http-controller.ts index a15f69d..b9ab716 100644 --- a/src/controllers/http/worker-http-controller.ts +++ b/src/controllers/http/worker-http-controller.ts @@ -1,5 +1,5 @@ -import { Worker } from "bullmq"; +import { Job, Worker } from "bullmq"; import { Redis, Cluster } from "ioredis"; import { debug } from "../../utils/log"; @@ -25,7 +25,7 @@ const workerFromMetadata = (queueName: string, workerMetadata: WorkerMetadata, c debugEnabled && debug(`Starting worker for queue ${queueName} with endpoint ${workerMetadata.endpoint.url} and options ${workerMetadata.opts || 'default'}`); - const worker = new Worker(queueName, async (job) => { + const worker = new Worker(queueName, async (job: Job, token?: string) => { debugEnabled && debug(`Processing job ${job.id} from queue ${queueName} with endpoint ${workerEndpoint.url}`); // Process job by calling an external service using the worker endpoint @@ -39,7 +39,7 @@ const workerFromMetadata = (queueName: string, workerMetadata: WorkerMetadata, c const response = await fetch(workerEndpoint.url, { method: workerEndpoint.method, headers: workerEndpoint.headers, - body: JSON.stringify(job.toJSON()), + body: JSON.stringify({ job: job.toJSON(), token }), signal: controller.signal }); diff --git a/src/controllers/http/worker-job-http-controller.spec.ts b/src/controllers/http/worker-job-http-controller.spec.ts new file mode 100644 index 0000000..669eca0 --- /dev/null +++ b/src/controllers/http/worker-job-http-controller.spec.ts @@ -0,0 +1,147 @@ + +import { Redis } from 'ioredis'; +import { describe, it, expect, beforeAll, afterAll } from "bun:test"; +import { WorkerJobHttpController } from './worker-job-http-controller'; +import { config } from '../../config'; + +let redisClient: Redis; + +const queuePrefix = config.defaultQueuePrefix; + +beforeAll(async () => { + redisClient = new Redis({ + maxRetriesPerRequest: null + }); +}); + +afterAll(async () => { + await redisClient.quit(); +}); + +describe('WorkerJobHttpController.updateProgress', () => { + + it('returns a 500 response if the job does not exist', async () => { + const opts = { + params: { + queueName: 'invalid', + jobId: '1' + }, + req: { + json: () => Promise.resolve({ progress: 50 }) + } as Request, + redisClient: new Redis({ + maxRetriesPerRequest: null + }) + }; + + const response = await WorkerJobHttpController.updateProgress(opts); + expect(response.status).toBe(500); + expect(await response.text()).toBe('Missing key for job 1. updateProgress'); + }); + + it('updates job progress and returns a 200 response', async () => { + const authToken = 'test-123'; + const opts = { + params: { + queueName: 'valid', + jobId: '1' + }, + req: { + json: () => Promise.resolve({ progress: 50 }) + } as Request, + redisClient + }; + + await redisClient.hset(`${queuePrefix}:valid:1`, 'progress', 0); + await redisClient.set(`${queuePrefix}:valid:1:lock`, authToken); + + const response = await WorkerJobHttpController.updateProgress(opts); + + expect(response.status).toBe(200); + expect(await response.text()).toBe('OK'); + + // Verify that queue.updateJobProgress was called + const progress = await redisClient.hget(`${queuePrefix}:valid:1`, 'progress'); + expect(progress).toBe('{\"progress\":50}'); + + // cleanup + await redisClient.del(`${queuePrefix}:valid:1`); + await redisClient.del(`${queuePrefix}:valid:1:lock`); + }); +}); + + +describe('WorkerJobHttpController.addLog', () => { + it('adds a job log and returns a 200 response', async () => { + const authToken = 'test-123'; + const jobId = "42"; + const logMessage = "Log message"; + + const logsKey = `${queuePrefix}:valid:${jobId}:logs`; + + await redisClient.del(logsKey); + + // Assuming validateQueueName and getQueue are properly mocked above + const opts = { + params: { + queueName: 'valid', + jobId + }, + req: { + json: () => Promise.resolve(logMessage) + } as Request, + redisClient + }; + + await redisClient.hset(`${queuePrefix}:valid:${jobId}`, 'progress', 0); + await redisClient.set(`${queuePrefix}:valid:${jobId}:lock`, authToken); + + const response = await WorkerJobHttpController.addLog(opts); + expect(response.status).toBe(200); + expect(await response.text()).toBe('OK'); + + const logs = await redisClient.lrange(logsKey, 0, -1); + expect(logs).toBeArrayOfSize(1); + expect(logs[0]).toBe(logMessage); + }); +}); + + +describe('WorkerJobHttpController.getLogs', () => { + it('returns a 400 response if the start or length query params are invalid', async () => { + const opts = { + params: { + queueName: 'valid' + }, + searchParams: new URLSearchParams('start=invalid&length=invalid'), + req: {} as Request, + redisClient + }; + + const response = await WorkerJobHttpController.getLogs(opts); + expect(response.status).toBe(400); + expect(await response.text()).toBe('Invalid start or length'); + }); + + it.only('returns a 200 response with the logs', async () => { + const jobId = "42"; + const logsKey = `${queuePrefix}:valid:${jobId}:logs`; + + await redisClient.del(logsKey); + await redisClient.rpush(logsKey, "log1", "log2", "log3"); + + const opts = { + params: { + queueName: 'valid', + jobId + }, + searchParams: new URLSearchParams('start=0&length=2'), + req: {} as Request, + redisClient + }; + + const response = await WorkerJobHttpController.getLogs(opts); + expect(response.status).toBe(200); + expect(await response.json()).toEqual({ count: 3, logs: ["log1", "log2"] }); + }); +}); diff --git a/src/controllers/http/worker-job-http-controller.ts b/src/controllers/http/worker-job-http-controller.ts new file mode 100644 index 0000000..ca902d8 --- /dev/null +++ b/src/controllers/http/worker-job-http-controller.ts @@ -0,0 +1,77 @@ +import { HttpHandlerOpts } from "../../interfaces"; +import { getQueue } from "../../utils/queue-factory"; +import { validatePagination, validateQueueName } from "../../validators"; + +/** + * This controller is responsible for performing operations on a job that + * is currently being processed by a worker. For example, updating the job's + * progress or adding logs. + * + */ +export const WorkerJobHttpController = { + updateProgress: async (opts: HttpHandlerOpts) => { + const queueName = opts.params.queueName; + try { + validateQueueName(queueName); + } catch (err) { + return new Response((err).message, { status: 400 }); + } + + const jobId = opts.params.jobId; + try { + const queue = await getQueue(queueName, opts.redisClient); + await queue.updateJobProgress(jobId, await opts.req.json()); + return new Response('OK', { status: 200 }); + } catch (err) { + return new Response((err).message, { status: 500 }); + } + }, + + addLog: async (opts: HttpHandlerOpts) => { + const queueName = opts.params.queueName; + try { + validateQueueName(queueName); + } catch (err) { + return new Response((err).message, { status: 400 }); + } + + const jobId = opts.params.jobId; + try { + const queue = await getQueue(queueName, opts.redisClient); + + const log = await opts.req.json(); + + await queue.addJobLog(jobId, log); + + return new Response('OK', { status: 200 }); + } catch (err) { + console.log(err); + return new Response((err).message, { status: 500 }); + } + }, + + getLogs: async (opts: HttpHandlerOpts) => { + const queueName = opts.params.queueName; + let start, length; + + try { + validateQueueName(queueName); + + start = parseInt(opts.searchParams?.get("start") || "0"); + length = parseInt(opts.searchParams?.get("length") || "10"); + + validatePagination(start, length); + } catch (err) { + return new Response((err).message, { status: 400 }); + } + + const jobId = opts.params.jobId; + try { + const queue = await getQueue(queueName, opts.redisClient); + const logs = await queue.getJobLogs(jobId, start, start + length - 1); + return new Response(JSON.stringify(logs), { status: 200 }); + } catch (err) { + return new Response((err).message, { status: 500 }); + } + } +} diff --git a/src/e2e-test.ts b/src/e2e-test.ts new file mode 100644 index 0000000..38bbb68 --- /dev/null +++ b/src/e2e-test.ts @@ -0,0 +1,170 @@ +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, jest, mock } from "bun:test"; +import { Server } from "bun"; +import { startProxy } from "./proxy"; +import { Redis } from "ioredis"; +import { config } from "./config"; +import { JobJson, Queue } from "bullmq"; +import { cleanCache } from "./utils/queue-factory"; + +const token = 'test-token'; + +describe("e2e", () => { + + const queueName = 'testQueue-e2e'; + + beforeAll(() => { + mock.module('./config', () => ({ + config: { + ...config, + authTokens: [token] + } + })); + }); + + afterAll(() => { + jest.restoreAllMocks(); + }); + + let redisClient: Redis; + beforeEach(() => { + redisClient = new Redis({ + maxRetriesPerRequest: null + }); + }); + + afterEach(async () => { + const queue = new Queue(queueName, { connection: redisClient, prefix: config.defaultQueuePrefix }); + await queue.obliterate({ force: true }); + + // We need to clean the cache to eliminate side-effects between tests + await cleanCache(); + + await queue.close(); + await redisClient.quit(); + }); + + it("process a job updating progress and adding logs", async () => { + const proxy = await startProxy(0, redisClient, { skipInitWorkers: true }); + const proxyPort = proxy.port; + + let server: Server; + const processingJob = new Promise((resolve, reject) => { + server = Bun.serve({ + // Typescript requires this dummy websocket + websocket: undefined as any, + port: 0, + async fetch(req: Request) { + try { + const { job, token } = await req.json(); + expect(job).toHaveProperty('name', 'test-job'); + expect(job).toHaveProperty('data', 'test'); + expect(job).toHaveProperty('opts'); + expect(token).toBe(token); + + const updateProgress = await fetch(`http://localhost:${proxyPort}/queues/${queueName}/jobs/${job.id}/progress`, { + method: 'POST', + body: JSON.stringify(100), + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${token}` + } + }); + + expect(updateProgress.status).toBe(200); + + const addLogs = await fetch(`http://localhost:${proxyPort}/queues/${queueName}/jobs/${job.id}/logs`, { + method: 'POST', + body: JSON.stringify("log message"), + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${token}` + } + }); + + expect(addLogs.status).toBe(200); + + resolve(); + return new Response("foo bar"); + } + catch (e) { + console.error(e); + reject(e); + } + } + }) + }); + + // Add a job to a queue + const addJobResponse = await fetch(`http://localhost:${proxyPort}/queues/${queueName}/jobs`, { + method: 'POST', + body: JSON.stringify([{ name: "test-job", data: 'test' }]), + headers: { + 'Content-Type': 'application/json', + "Authorization": `Bearer ${token}` + }, + }); + expect(addJobResponse.status).toBe(200); + const jobsAdded = await addJobResponse.json(); + expect(jobsAdded).toHaveLength(1); + expect(jobsAdded[0]).toHaveProperty('id'); + expect(jobsAdded[0]).toHaveProperty('name', 'test-job'); + expect(jobsAdded[0]).toHaveProperty('data', 'test'); + expect(jobsAdded[0]).toHaveProperty('opts'); + + // Register a worker + const workerResponse = await fetch(`http://localhost:${proxyPort}/workers`, { + method: 'POST', + body: JSON.stringify({ + queue: queueName, + endpoint: { + url: `http://localhost:${server!.port}`, + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + }, + }), + headers: { + 'Content-Type': 'application/json', + "Authorization": `Bearer ${token}` + }, + }); + + expect(workerResponse.status).toBe(200); + await processingJob; + + // Wait so that the job has a chance to return its value + await new Promise(resolve => setTimeout(resolve, 1000)); + + const getJobResponse = await fetch(`http://localhost:${proxyPort}/queues/${queueName}/jobs/${jobsAdded[0].id}`, { + headers: { + "Authorization": `Bearer ${token}` + } + }); + + expect(getJobResponse.status).toBe(200); + const job = await getJobResponse.json() as JobJson; + + expect(job).toHaveProperty('id', jobsAdded[0].id); + expect(job).toHaveProperty('name', 'test-job'); + expect(job).toHaveProperty('data', 'test'); + expect(job).toHaveProperty('opts'); + expect(job.returnvalue).toBe("foo bar"); + expect(job.progress).toBe(100); + + const getJobLogsResponse = await fetch(`http://localhost:${proxyPort}/queues/${queueName}/jobs/${jobsAdded[0].id}/logs`, { + headers: { + "Authorization": `Bearer ${token}` + } + }); + + expect(getJobLogsResponse.status).toBe(200); + const { logs, count } = await getJobLogsResponse.json() as { count: number, logs: string[] }; + expect(count).toBe(1); + expect(logs).toHaveLength(1); + expect(logs[0]).toBe("log message"); + + server!.stop(); + proxy.stop(); + }) +}); diff --git a/src/fetch-handler.ts b/src/fetch-handler.ts index 5864597..4143741 100644 --- a/src/fetch-handler.ts +++ b/src/fetch-handler.ts @@ -1,4 +1,4 @@ -import IORedis from "ioredis"; +import { Redis, Cluster } from "ioredis"; import { QueueController, @@ -11,6 +11,7 @@ import { Server } from "bun"; import queuesRoutes from "./routes/queues-routes"; import asciiArt from "./ascii-art"; import workersRoutes from "./routes/workers-routes"; +import jobsRoutes from "./routes/jobs-routes"; const pkg = require("../package.json"); @@ -19,6 +20,7 @@ const routeMatcher = new RouteMatcher(); // Standard HTTP Routes queuesRoutes(routeMatcher); workersRoutes(routeMatcher); +jobsRoutes(routeMatcher); // WebSocket Routes routeMatcher.addWebSocketRoute<{ queueName: string }>("queue", "/ws/queues/:queueName", QueueController); @@ -31,7 +33,7 @@ routeMatcher.addWebSocketRoute<{ queueName: string }>( "/ws/queues/:queueName/events", QueueEventsController ); -export const fetchHandler = (connection: IORedis, authTokens: string[] = []) => async (req: Request, server: Server) => { +export const fetchHandler = (connection: Redis | Cluster, authTokens: string[] = []) => async (req: Request, server: Server) => { const url = new URL(req.url); const { searchParams } = url; const { method } = req; @@ -44,24 +46,6 @@ export const fetchHandler = (connection: IORedis, authTokens: string[] = []) => ); } - const from = - req.headers.get("x-forwarded-for") || req.headers.get("host"); - - const token = searchParams.get("token") || req.headers.get("authorization")?.split("Bearer ")[1]; - if (!token) { - warn( - `Unauthorized request (missing token) to path ${url.pathname.toString()} from ${from}` - ); - return new Response("Unauthorized", { status: 401 }); - } - - if (!authTokens.includes(token!)) { - warn( - `Unauthorized request (invalid token) to path ${url.pathname.toString()} from ${from}` - ); - return new Response("Unauthorized", { status: 401 }); - } - // Choose controller based on path const route = routeMatcher.match(url.pathname, method); if (!route) { @@ -72,6 +56,10 @@ export const fetchHandler = (connection: IORedis, authTokens: string[] = []) => return new Response("Not found", { status: 404 }); } + if (route.auth && !await route.auth(req, url, route.params, connection)) { + return new Response("Unauthorized", { status: 401 }); + } + const queueName = route.params?.queueName; let controller; let events; diff --git a/src/index.ts b/src/index.ts index ac1c0ca..9600ee0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +import { Server } from "bun"; import IORedis, { Cluster, Redis } from "ioredis"; import { startProxy } from "./proxy"; @@ -18,6 +19,7 @@ if (config.redis.uri) { host: config.redis.host, port: config.redis.port, password: config.redis.password, + username: config.redis.username, tls: config.redis.tls, retryStrategy: () => 1000, maxRetriesPerRequest: null, @@ -28,8 +30,8 @@ connection.on("error", (err) => { console.error("Redis connection error", err); }) -startProxy(config.port, connection, config.authTokens).then(() => { - info(`Running BullMQ Proxy on port ${config.port} (c) ${new Date().getFullYear()} Taskforce.sh Inc. v${pkg.version}`); +startProxy(config.port, connection).then((server: Server) => { + info(`Running BullMQ Proxy on port ${server.port} (c) ${new Date().getFullYear()} Taskforce.sh Inc. v${pkg.version}`); }).catch((err) => { error(`Error starting server ${(err).message}`); }); diff --git a/src/proxy.spec.ts b/src/proxy.spec.ts index e95fcb9..712f827 100644 --- a/src/proxy.spec.ts +++ b/src/proxy.spec.ts @@ -63,7 +63,7 @@ describe('Proxy', () => { }) } - await startProxy(3000, redisClientMock as Redis, ['validToken']); + await startProxy(3000, redisClientMock as Redis, { skipInitWorkers: true }); expect(Bun.serve).toHaveBeenCalledTimes(1); expect(Bun.serve).toHaveBeenCalledWith( diff --git a/src/proxy.ts b/src/proxy.ts index 58a7d30..151046b 100644 --- a/src/proxy.ts +++ b/src/proxy.ts @@ -50,18 +50,26 @@ const websocket = { perMessageDeflate: false, }; +export interface ProxyOpts { + skipInitWorkers?: boolean; +} + export const startProxy = async ( port: number, connection: IORedis, - authTokens: string[] = [], + opts: ProxyOpts = {}, ) => { console.log(chalk.gray(asciiArt)) - await WorkerHttpController.init(connection); + if (opts.skipInitWorkers !== true) { + await WorkerHttpController.init(connection); + } - Bun.serve({ + const server = Bun.serve({ port, - fetch: fetchHandler(connection, authTokens), + fetch: fetchHandler(connection), websocket, }); + + return server; }; diff --git a/src/routes/jobs-routes.ts b/src/routes/jobs-routes.ts new file mode 100644 index 0000000..7dd73f5 --- /dev/null +++ b/src/routes/jobs-routes.ts @@ -0,0 +1,28 @@ +import { JobJson } from "bullmq"; +import { RouteMatcher } from "../utils/router-matcher"; +import { authForWorkers } from "../authware/auth-for-workers"; +import { WorkerJobHttpController } from "../controllers/http/worker-job-http-controller"; +import { authByTokens } from "../authware/auth-by-tokens"; + +export default (routeMatcher: RouteMatcher) => { + routeMatcher.addHttpRoute( + "updateProgress", + "/queues/:queueName/jobs/:jobId/progress", + WorkerJobHttpController.updateProgress, + "post", + authForWorkers); + + routeMatcher.addHttpRoute<{ counts: number, jobs: JobJson[] }>( + "addLog", + "/queues/:queueName/jobs/:jobId/logs", + WorkerJobHttpController.addLog, + "post", + authForWorkers); + + routeMatcher.addHttpRoute<{ counts: number, jobs: JobJson[] }>( + "getLogs", + "/queues/:queueName/jobs/:jobId/logs", + WorkerJobHttpController.getLogs, + "get", + authByTokens); +} diff --git a/src/routes/queues-routes.ts b/src/routes/queues-routes.ts index d04cdeb..030837a 100644 --- a/src/routes/queues-routes.ts +++ b/src/routes/queues-routes.ts @@ -1,16 +1,27 @@ import { JobJson } from "bullmq"; import { QueueHttpController } from "../controllers/http/queue-http-controller"; import { RouteMatcher } from "../utils/router-matcher"; +import { authByTokens } from "../authware/auth-by-tokens"; export default (routeMatcher: RouteMatcher) => { routeMatcher.addHttpRoute( "addJobs", - "/queues/:queueName", + "/queues/:queueName/jobs", QueueHttpController.addJobs, - "post"); + "post", + authByTokens); routeMatcher.addHttpRoute<{ counts: number, jobs: JobJson[] }>( "getJobs", - "/queues/:queueName", - QueueHttpController.getJobs); + "/queues/:queueName/jobs", + QueueHttpController.getJobs, + "get", + authByTokens); + + routeMatcher.addHttpRoute( + "getJob", + "/queues/:queueName/jobs/:jobId", + QueueHttpController.getJob, + "get", + authByTokens); } diff --git a/src/routes/workers-routes.ts b/src/routes/workers-routes.ts index 1ae7af0..ff9b5ef 100644 --- a/src/routes/workers-routes.ts +++ b/src/routes/workers-routes.ts @@ -3,22 +3,27 @@ import { WorkerHttpController } from "../controllers/http/worker-http-controller import { RouteMatcher } from "../utils/router-matcher"; import { WorkerMetadata } from "../interfaces"; +import { authByTokens } from "../authware/auth-by-tokens"; + export default (routeMatcher: RouteMatcher) => { routeMatcher.addHttpRoute( "addWorker", "/workers", WorkerHttpController.addWorker, - "post"); + "post", + authByTokens); routeMatcher.addHttpRoute( "getWorkers", "/workers", WorkerHttpController.getWorkers, - "get") + "get", + authByTokens) routeMatcher.addHttpRoute( "removeWorker", "/workers/:queueName", WorkerHttpController.removeWorker, - "delete") + "delete", + authByTokens) } diff --git a/src/utils/queue-factory.ts b/src/utils/queue-factory.ts new file mode 100644 index 0000000..0599d99 --- /dev/null +++ b/src/utils/queue-factory.ts @@ -0,0 +1,27 @@ +import { Cluster, Redis } from "ioredis"; + +import { LRUCache } from "../cache"; +import { config } from "../config"; +import { Queue } from "bullmq"; + +const cache = new LRUCache(config.queueCacheSize, async (queueName, queue) => { + await queue.close(); +}); + +export const getQueue = async (queueName: string, redisClient: Redis | Cluster) => { + let queue = cache.get(queueName); + if (!queue) { + queue = new Queue(queueName, { connection: redisClient, prefix: config.defaultQueuePrefix }); + cache.put(queueName, queue); + } + return queue; +} + +/** + * Clear the cache. + * + * Closes all queues and clears the cache. + */ +export const cleanCache = async () => { + await cache.clear(); +} diff --git a/src/utils/router-matcher.ts b/src/utils/router-matcher.ts index 8e99997..ea1cbd9 100644 --- a/src/utils/router-matcher.ts +++ b/src/utils/router-matcher.ts @@ -1,23 +1,29 @@ import { WebSocketHandler } from "bun"; - -import { URL } from "url"; import { HttpHandlerOpts } from "../interfaces/http-handler-opts"; +import { Cluster, Redis } from "ioredis"; -type HttpHandler = (opts: HttpHandlerOpts) => Promise +type HttpHandler = (opts: HttpHandlerOpts) => Promise; +type AuthFn = ( + req: Request, + url: URL, + params: Record, + connection: Redis | Cluster) => Promise; type RoutePattern = { name: string; pattern: RegExp; paramNames: string[]; method?: string; + auth?: AuthFn; websocketHandler?: WebSocketHandler; httpHandler?: HttpHandler; }; type MatchResult = { name: string; - params: any; + params: Record; query?: any; + auth?: AuthFn; websocketHandler?: WebSocketHandler; httpHandler?: HttpHandler; }; @@ -25,7 +31,7 @@ type MatchResult = { export class RouteMatcher { private routes: RoutePattern[] = []; - addRoute(name: string, pattern: string, handler?: WebSocketHandler | HttpHandler): void { + addRoute(name: string, pattern: string, handler?: WebSocketHandler | HttpHandler) { const paramNames: string[] = []; const regexPattern = new RegExp( "^" + @@ -46,16 +52,20 @@ export class RouteMatcher { websocketHandler = handler; } } - this.routes.push({ name, pattern: regexPattern, paramNames, httpHandler, websocketHandler }); + const route: RoutePattern = { name, pattern: regexPattern, paramNames, httpHandler, websocketHandler }; + this.routes.push(route); + return route; } - addWebSocketRoute(name: string, pattern: string, handler: WebSocketHandler): void { - this.addRoute(name, pattern, handler); + addWebSocketRoute(name: string, pattern: string, handler: WebSocketHandler) { + return this.addRoute(name, pattern, handler); } - addHttpRoute(name: string, pattern: string, handler: HttpHandler, method: string = "get"): void { - this.addRoute(name, pattern, handler); - this.routes[this.routes.length - 1].method = method.toLowerCase(); + addHttpRoute(name: string, pattern: string, handler: HttpHandler, method: string = "get", auth?: AuthFn) { + const route = this.addRoute(name, pattern, handler); + route.method = method.toLowerCase(); + route.auth = auth; + return route; } match(url: string, method: string = "get"): MatchResult | null { @@ -77,6 +87,7 @@ export class RouteMatcher { name: route.name, params, websocketHandler: route.websocketHandler, + auth: route.auth, httpHandler: route.httpHandler, }; diff --git a/src/validators/queue.validators.ts b/src/validators/queue.validators.ts index ae003ab..584c862 100644 --- a/src/validators/queue.validators.ts +++ b/src/validators/queue.validators.ts @@ -117,3 +117,17 @@ export const validateJobOpts = (opts: JobsOptions) => { validateRepeatOpts(opts.repeat); } } + +export const validatePagination = (start: number, length: number) => { + if (isNaN(start) || isNaN(length)) { + throw new Error("Invalid start or length"); + } + + if (start < 0 || length < 0) { + throw new Error("Start and length must be positive"); + } + + if (length > 100) { + throw new Error("Length must be less than or equal to 100"); + } +}