Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial implementation endpoint based workers #7

Merged
merged 12 commits into from
Feb 25, 2024
Merged
36 changes: 33 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,45 @@ permissions:
contents: read # to fetch code (actions/checkout)

jobs:
test:
test-redis:
runs-on: ubuntu-latest

name: Testing with Bun
name: Testing with Bun, redis@${{ matrix.redis-version }}

strategy:
matrix:
redis-version: [7-alpine, 6-alpine]

steps:
- name: Checkout repository
uses: actions/checkout@v3 # v3
- uses: oven-sh/setup-bun@v1
- name: Start Redis
uses: supercharge/[email protected]
with:
redis-version: ${{ matrix.redis-version }}
- run: bun install
- run: cd node_modules/@taskforcesh/message-broker && bun run postinstall
- run: bun run tsc
- run: bun test

test-dragonflydb:
runs-on: ubuntu-latest

name: Testing with Bun, dragonflydb@latest

services:
dragonflydb:
image: docker.dragonflydb.io/dragonflydb/dragonfly
env:
DFLY_cluster_mode: emulated
DFLY_lock_on_hashtags: true
ports:
- 6379:6379

steps:
- name: Checkout repository
uses: actions/checkout@v3 # v3
- uses: oven-sh/setup-bun@v1
- run: bun install
- run: bun run tsc
- run: QUEUE_PREFIX={b} bun test
148 changes: 117 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,61 @@
# BullMQ Proxy

This is a lightweight and fast proxy for BullMQ that allows you to access your queues via websockets. As BullMQ is currently a library available only for NodeJS and Python, this proxy allows you to access your queues from any language that supports websockets.
This lightweight service works as a proxy for BullMQ queues. It has applications in many useful cases:

Note: this is a work in progress and it is not ready for production yet.
- Work with your queues in any language or platform that supports HTTP.
- Run your workers in serverless environments.
- Isolate your Redis instances and allow BullMQ operations from untrusted sources (such as external service for example a web app)
- Implement Access Control for your queues (coming soon).

The proxy provides a simple Restful HTTP API that supports the most important features available in BullMQ. You
can add jobs with any options you like and instantiate workers, also with any BullMQ compatible options.

## Roadmap

[x] Initial support for adding and processing jobs for any queue.
[x] Queue getters (retrieve jobs in any status from any queue).
[ ] Support redundancy (multiple proxies running in parallel).
[ ] Queue actions: Pause, Resume, Clean and Obliterate.
[ ] Job actions: promote, retry, remove.
[ ] Support for adding flows.
[ ] Dynamic rate-limit.
[ ] Manually consume jobs.

Although the service is not yet feature complete, you are very welcome to try it out and give us
feedback and report any issues you may find.

## Installation

As we wanted to maximize performance and reduce overhead as much as possible, we are using [Bun](https://github.com/oven-sh/bun) as
our runtime.
As we wanted to maximize performance and reduce overhead as much as possible, we are using [Bun](https://github.com/oven-sh/bun) as our runtime instead of NodeJS. Bun has the fastest HTTP (and Websocket) server for any javascript runtime.

The proxy can be installed as a dependency in your project (if you use bun), but we also
provide a Dockerfile that can be used to just run the proxy easily on any system that supports Docker.

## Use as a Docker container

```bash
docker build -t bullmq-proxy github.com/taskforcesh/bullmq-proxy
docker run -p 8080:8080 -e REDIS_HOST=redis -e REDIS_PORT=6379 -e AUTH_KEYS=my-auth-key-1,my-auth-key-2 bullmq-proxy
```

## Use as a dependency (Not yet available)

```bash
bun add bullmq-proxy ioredis
```

```typescript
import IORedis from "ioredis";
import { startProxy } from "./proxy";
import IORedis from 'ioredis'
import { startProxy } from './proxy'

const connection = new IORedis({
host: "localhost",
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null,
});
})

// Start proxyn passing an IORedis connection and an array of auth keys
startProxy(connection, ["my-auth-key-1", "my-auth-key-2"]);
```

## Use as a Docker container

```bash
docker build -t bullmq-proxy github.com/taskforcesh/bullmq-proxy
docker run -p 8080:8080 -e REDIS_HOST=redis -e REDIS_PORT=6379 -e AUTH_KEYS=my-auth-key-1,my-auth-key-2 bullmq-proxy
startProxy(connection, ['my-auth-key-1', 'my-auth-key-2'])
```

## Developing
Expand All @@ -51,7 +70,7 @@ Which will start the proxy locally connected to a local Redis instance (on port
If you then use a websocket client to connect to the proxy, you will see the following output in the console:

```bash
Starting BullMQ Proxy on port 8080 (c) 2023 Taskforce.sh v0.1.0
Starting BullMQ Proxy on port 8080 (c) 2024 Taskforce.sh Inc. v0.1.0
[1696158613843] BullMQ Proxy: Worker connected for queue test-queue with concurrency 20
[1696158613849] BullMQ Proxy: Queue connected for queue test-queue
[1696158613850] BullMQ Proxy: Queue events connected for queue test-queue with events waiting,active
Expand All @@ -61,6 +80,73 @@ Starting BullMQ Proxy on port 8080 (c) 2023 Taskforce.sh v0.1.0

# Connecting to the proxy

## HTTP Protocol

It is possible to access all the proxy features by using a standard HTTP REST-inspired API.

### Add jobs

Jobs can be easily added to a queue using the `POST /queues/:queueName` endpoint. The endpoint expects
a JSON body with the following interface:

```ts
type PostJobsBody = PostJobBody[];

interface PostJobBody {
data: any;
opts: JobOpts;
}

interface JobOpts {
...
}
```

The interface accepts an array of one or more jobs, that are to be added to the queue. Note that the call is atomic
and thus all or none of the jobs will be added to the queue if the call succeeds or fails respectively.

### Utility

- Get jobs
- Remove jobs
- Clean queue
- Pause queue
- Resume queue
- Promote job
- etc.

### Register endpoints

Several mechanisms can be used to process jobs that have been added to the queue. The first one is by registering
an endpoint that will receive the job. The endpoint is normally a URL that the Proxy will call as soon as there are jobs to be processed. This is a powerful mechanism as it allows to processing of jobs serverless, for instance, the endpoint could be an AWS Lambda or Cloudflare function.

The endpoints are registered using the `POST /endpoints/:queueName` endpoint. The endpoint expects a JSON
body with the following interface:

```ts
interface Endpoint {
url: string
method: 'POST'
headers: { [index: string]: string }
opts: {
concurrency?: number // Default 1
rateLimit?: {
max: number
duration: number
}
}
}
```

Note that only one endpoint can be registered per queue, calling this API more than once for a given queue name
will just overwrite any existing endpoint.

When an endpoint is called to process a job, the call should not return until the job has been completed (or failed). A call that times out will be considered a failed job by the proxy.

### List endpoints

### Remove endpoint

## WebSocket protocol

The proxy defines a very simple protocol that is used to communicate with the proxy. The protocol is based on JSON messages that are sent back and forth between the client and the proxy.
Expand All @@ -69,12 +155,12 @@ All the messages have the following structure:

```typescript
interface Message {
id: number;
data: QueuePayload | WorkerPayload | EventPayload;
id: number
data: QueuePayload | WorkerPayload | EventPayload
}
```

The `id` field is used to identify the message and the `data` field contains the actual data of the message, which can be anything, and is defined below.
The `id` field is used to identify the message and the `data` field contains the actual data of the message, which can be anything and is defined below.

The id field must be unique for each message sent by the client, and it will be guaranteed unique for every message sent by the proxy. The easiest and most efficient way to implement this uniqueness is to use a counter that is incremented for each message sent by the client (this is the way the proxy does it internally).

Expand All @@ -95,31 +181,31 @@ The Queue API is accessible at the `/queue/:queueName` endpoint. It allows you t

```typescript
interface QueuePayload {
fn: QueueFunction; // Any function defined in BullMQ API (add, pause, resume, etc)
args: any[]; // Arguments for the function
fn: QueueFunction // Any function defined in BullMQ API (add, pause, resume, etc)
args: any[] // Arguments for the function
}
```

The proxy will validate the function and the arguments and will return the result of the function call to the client, or an error if the function call failed following this interface:

```typescript
interface QueueResult {
ok?: any; // Result of the function call if it was successful
ok?: any // Result of the function call if it was successful
err?: {
message: string;
stack: string;
};
message: string
stack: string
}
}
```

## [Worker API](#worker-api)

The Worker API is accessible at the `/queue/:queueName/process/:concurrency` endpoint. It allows you start consuming jobs from a queue with the specified concurrency. As soon as the websocket connection is stablished, the proxy will start sending websocket messages with the jobs that are supposed to be processed by this client and send a message back to the proxy with the result of the job.
The Worker API is accessible at the `/queue/:queueName/process/:concurrency` endpoint. It allows you to start consuming jobs from a queue with the specified concurrency. As soon as the WebSocket connection is established, the proxy will start sending WebSocket messages with the jobs that are supposed to be processed by this client and send a message back to the proxy with the result of the job.

```typescript
interface WorkerPayload {
type: "process";
payload: any; // Any job payload
type: 'process'
payload: any // Any job payload
}
```

Expand All @@ -135,7 +221,7 @@ Everytime such an event is produced in the queue, the proxy will send a message

```typescript
interface EventPayload {
event: string; // Event name
args: any[]; // Array of arguments for the event
event: string // Event name
args: any[] // Array of arguments for the event
}
```
Binary file modified bun.lockb
Binary file not shown.
20 changes: 7 additions & 13 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,23 @@
"scripts": {
"dev": "bun run --watch src/index.ts",
"start": "bun src/index.ts",
"test": "mocha --require ts-node/register tests/**/*.ts",
"build:declaration": "tsc --emitDeclarationOnly"
"build:declaration": "tsc --emitDeclarationOnly",
"tsc": "tsc --noEmit"
},
"dependencies": {
"@sinclair/typebox": "^0.31.17",
"@taskforcesh/message-broker": "https://github.com/taskforcesh/message-broker.git#master",
"bullmq": "latest",
"bullmq": "^5.3.2",
"chalk": "^5.3.0",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reasons to use chalk? faster options are available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chalk is well maintained so thats a plus. Usually I pick "pino" though, as a fast and versatile alternative to console.log, maybe it is the best alternative here as well... 🤔

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on pino, it is awesome. it doesn't work in browsers, but from my understanding you only need it on the server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can change it in a separate PR as it will not imply a breaking change.

"ioredis": "^5.3.2"
},
"trustedDependencies": [
"@taskforcesh/message-broker"
],
"devDependencies": {
"@tsconfig/node-lts": "^18.12.5",
"@types/chai": "^4.3.6",
"@types/mocha": "^10.0.1",
"@types/node": "^12.0.0",
"@types/sinon": "^10.0.17",
"bun-types": "^1.0.3",
"chai": "^4.3.8",
"mocha": "^10.2.0",
"sinon": "^16.0.0",
"ts-node": "^8.2.0",
"typescript": "^5.2.2"
"bun-types": "^1.0.29",
"prettier": "^3.2.5",
"typescript": "^5.3.3"
}
}
51 changes: 51 additions & 0 deletions src/ascii-art.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* ASCII Art generated with https://www.asciiart.eu/image-to-ascii
* from an image generated by ChatGPT v4:
* "Art depiction of a full-body bull facing the viewer with a menacing look.
* This image showcases the bull's large horns, muscular body, and aggressive posture,
* embodying power and strength."
*/
const asciiArt = `
;
;: ;:
:; :;++;. ::. ;:
.:;. :X&X$&$$$;$&&&X+: .;:
::$XxX$$&&&&&&&;+X$$&$++XX$XX++$$x+;:: :x.:
:::+XXXx+x+++++:XXXX$$+;X$X$++:XXXX$$&&&&&&&&X:;.
:;:::::;;:. +++$$X$X;xXXXX::..:...::;+++;::;
;+::. ;.:XX+XxXX;X$$Xx;.:.:;X: ...
x:::. :.++:X$XxxX$$xx :;
+$Xx: .XX+.:++$$$x;;:; .+X&:
X$x;:::+;+;.+X$$$X;:::.::+X&$.
xX+;+::+;x++++X$$x:;:;.;.:+X+XX;
xx:+:;.;;.++++$$$x;;;.:.;:.xx;++$$x::;;;.
x+;;;:::;+;X$xXXXXX: :::.::+$x .:+$&&&&&&&;
+$;;;;:::. +x:+++++;X .:::.+:+x$+&&X$;XX;++$&&;
+$X+::;::::.;+:: ::X ::. :.;$&x+$$X&$$&&; +$x++
;&$$X::;:; ::::.::: .:: ::;X$&$X+$:+&Xx$$+ :.;X:
X$X$$+:.::: . ::...++XX$Xx;X:X&$++x::XX;+++
.X$;x+;::.+:. : . .::;x+x:XXx:::X$$+++ :+X$;+x+
+Xx:X; :::::: :. : :: +;+X+;.+++. x+$X+++.X$+;:;xX:
;:;: ::.:. : :. . :::;+++:. ::. :;:+;;+:.X&$x;.;x;
:..:.::. : ... ...:;;::. :; ::.:.;+:..+XX;:::;:
:;.:::.:. .. .. .:::;. .;x$Xx ..::.. :;:.;+x:.
$&;. .:. :: :: .. :. ;x$x:$; .::. . ::+;::
:&x++. .:. ;: ::: ::+$X:.x; .:: ... ..:. ::+
$;+X;..: . . +: .. .+x;Xx:;.:. . . ..: .:: :+
;+:++; .:. :. ;. .;:.::X;.; .... . .:;; :x
:;:.. . ::: ..;.;: ;.. .: : . .:::XX. :x:
:::.:. .::. .:+::.::;. :: : ;. .:::: :+
.;;:... ::::... :+Xx. ::.;;: . + ::.. ... +x;
+$X; :+$$X:. :::. :++ .:: ++X++
;+::.;. :+. ;++: ::. :+: :: .:: :x;.;;.
;:... :::.:; .x:.: : .:.::;+;:+
;..: ::.. X;.. :.; ::;;;;;
:X;:. :x+:. +x:: ::; ;;:+:
.$$x:: .$$X:: +X:: :::; ;;;;.
.;;;::.. +;;;:: x+;:: :::::: .;:;
.X:Xx;.. X+.x+;: +;+;: .+:;+; ..:
.X;XX;::: X+:xX+:. x;;+:. X;:x;::
.:;+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++:..
..:::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::::`

export default asciiArt;
Loading
Loading