Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic queue implementation, early draft #54

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ on:
- "*" # matches every branch that doesn't contain a '/'
- "*/*" # matches every branch containing a single '/'
- "**" # matches every branch
# - "!main" # matches every branch except main

env:
MONGOMS_VERSION: ${{ inputs.mongo-version || '7.0.4' }}
Expand Down
4 changes: 4 additions & 0 deletions apps/minimal/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import GraphQL from '@zemble/graphql'
import GraphQLLogger from '@zemble/logger-graphql'
import Migrations from '@zemble/migrations'
import dryrunAdapter from '@zemble/migrations/adapters/dryrun'

// eslint-disable-next-line import/no-relative-packages
import Cron from '../../packages/queues-in-memory'
import Logger from '@zemble/pino'
import Routes from '@zemble/routes'

Expand All @@ -13,6 +16,7 @@ export default createApp({
plugins: [
Routes.configure(),
Logger.configure(),
Cron.configure(),
Bull.configure({
bullboard: {
nodeModulesRootPath: '../..',
Expand Down
17 changes: 8 additions & 9 deletions apps/minimal/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "minimal",
"version": "0.0.60",
"version": "0.0.39",
"description": "A minimal example app",
"private": true,
"main": "app.ts",
Expand All @@ -11,7 +11,7 @@
"lint": "eslint .",
"test": "bun test",
"typecheck": "tsc --noEmit",
"codegen": "graphql-codegen"
"graphql-codegen": "graphql-codegen"
},
"keywords": [
"zemble",
Expand Down Expand Up @@ -41,17 +41,16 @@
"@zemble/bull": "workspace:*",
"@zemble/bun": "workspace:*",
"@zemble/core": "workspace:*",
"@zemble/email-resend": "workspace:*",
"@zemble/graphql": "workspace:*",
"@zemble/logger-graphql": "workspace:*",
"@zemble/migrations": "workspace:*",
"@zemble/pino": "workspace:*",
"@zemble/queues-in-memory": "workspace:*",
"zemble-plugin-auth": "workspace:*",
"@zemble/routes": "workspace:*",
"@zemble/sms-46elks": "workspace:*",
"@zemble/sms-twilio": "workspace:*"
"@zemble/pino": "workspace:*",
"zemble-plugin-logger-graphql": "workspace:*"
},
"devDependencies": {
"@tsconfig/node20": "^20.1.4",
"typescript": "^5.4.5"
"@tsconfig/node20": "^20.1.2",
"typescript": "^5.3.3"
}
}
137 changes: 137 additions & 0 deletions packages/core/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/* eslint-disable @typescript-eslint/no-namespace */

// eslint-disable-next-line max-classes-per-file
import type { InitializeProvider, Plugin } from '.'
import type { JSON } from '@zemble/utils/JSON'
import type { WebSocketHandler } from 'bun'
Expand All @@ -16,6 +17,139 @@ export interface IEmail {
readonly name?: string
}

export class ZembleCron<DataType = unknown, ReturnType = unknown> implements IZembleCron<DataType, ReturnType> {
readonly worker: ZembleWorker<DataType, ReturnType>

readonly options: CronOptions

constructor(worker: ZembleWorker<DataType, ReturnType>, options: CronOptions) {
this.worker = worker
this.options = options
}

removeByIds(jobIds: readonly string[]): number {
return this.options.plugin.providers.cronProvider.removeByIds(jobIds)
}

getById<TDataType extends DataType, TReturnType extends ReturnType>(jobId: string): ZembleJob<TDataType, TReturnType> | undefined {
return this.options.plugin.providers.cronProvider.getById(
jobId,
)
}

getAllJobs<TDataType extends DataType, TReturnType extends ReturnType>(skip?: number | undefined, limit?: number | undefined): readonly ZembleJob<TDataType, TReturnType>[] {
return this.options.plugin.providers.cronProvider.getAllJobs(skip, limit)
}

obliterate(): number {
return this.options.plugin.providers.cronProvider.obliterate()
}
}

export class ZembleQueue<DataType, ReturnType> implements IZembleQueue<DataType, ReturnType> {
readonly worker: ZembleWorker<DataType, ReturnType>

constructor(worker: ZembleWorker<DataType, ReturnType>) {
this.worker = worker
}

add(data: DataType, opts?: JobOptions | undefined): Job<DataType, ReturnType> {
throw new Error('Method not implemented.')
}

addBulk(data: readonly DataType[], opts?: JobOptions | undefined): readonly ZembleJob<DataType, ReturnType>[] {
throw new Error('Method not implemented.')
}

removeByIds(jobId: readonly string[]): number {
throw new Error('Method not implemented.')
}

getById<TDataType extends DataType, TReturnType extends ReturnType>(jobId: string): ZembleJob<TDataType, TReturnType> {
throw new Error('Method not implemented.')
}

getAllJobs<TDataType extends DataType, TReturnType extends ReturnType>(skip?: number | undefined, limit?: number | undefined): readonly ZembleJob<TDataType, TReturnType>[] {
throw new Error('Method not implemented.')
}

obliterate(): number {
throw new Error('Method not implemented.')
}
}

export type ZembleJobOpts = {
readonly logger: IStandardLogger
}

export type ZembleWorker<DataType = unknown, ReturnType = unknown> = (job: ZembleJob<DataType, ReturnType>) => Promise<void> | void

type JobStatus = 'completed' | 'wait' | 'active' | 'delayed' | 'failed'

// queue with one recurring config - cron job?
// queue that is just a queue - queue?
// cron queue? - with possibility to add multiple cron jobs

export interface ZembleJob<DataType = unknown, ReturnType = unknown> {
// maybe force serialization handling of this type?
readonly data: DataType
readonly jobId: string
readonly repeatableId?: string
readonly runAtEarliest: Date
readonly createdAt: Date
readonly startedRunningAt: Date
readonly finishedRunningAt?: Date
readonly status: JobStatus
readonly returnValue?: ReturnType
readonly error?: Error
}

interface RepeatableJob<DataType = unknown> {
// maybe force serialization handling of this type?
readonly data: DataType
readonly repeatableId: string
readonly createdAt: Date
}

interface JobOptions {
readonly jobId?: string
readonly runAtEarliest?: Date
}

interface CronOptions {
readonly repeatPattern: string
readonly timezone?: string
}

interface RepeatableJobOptions {
readonly repeatPattern: string;
readonly repeatableId?: string
}

export interface IZembleCron<DataType, ReturnType> {
removeByIds(jobId: readonly string[]): number
getById<TDataType extends DataType, TReturnType extends ReturnType>(jobId: string): ZembleJob<TDataType, TReturnType> | undefined
getAllJobs<TDataType extends DataType, TReturnType extends ReturnType>(skip?: number, limit?: number): readonly ZembleJob<TDataType, TReturnType>[]
obliterate(): number
}

interface IZembleCronProvider extends IZembleCron<unknown, unknown> {
initialize(): void
}

export interface IZembleQueue<DataType, ReturnType> extends IZembleCron<DataType, ReturnType> {
add(data: DataType, opts?: JobOptions): ZembleJob<DataType, ReturnType>
addBulk(data: readonly DataType[], opts?: JobOptions): readonly ZembleJob<DataType, ReturnType>[]
}

export interface ZembleRepeatableQueue<DataType, ReturnType> extends IZembleCron<DataType, ReturnType> {
addRepeatable(data: DataType, opts: RepeatableJobOptions): RepeatableJob<DataType>
addRepeatableBulk(data: readonly DataType[], opts?: RepeatableJobOptions): readonly RepeatableJob<DataType>[]
removeRepeatableByIds(repeatableJobId: readonly string[]): number
getRepeatableById(repeatableJobId: string): RepeatableJob<DataType>
getAllRepeatableJobs(skip?: number, limit?: number): readonly RepeatableJob<DataType>[]
}

export type SendEmailParams = {
readonly to: readonly IEmail[] | IEmail | string | readonly string[],
readonly html?: string,
Expand Down Expand Up @@ -172,6 +306,9 @@ declare global {

// eslint-disable-next-line functional/prefer-readonly-type
logger: IStandardLogger

// eslint-disable-next-line functional/prefer-readonly-type
cronProvider: IZembleCronProvider
}

interface Providers extends DefaultProviders {
Expand Down
1 change: 1 addition & 0 deletions packages/push-apple/graphql/schema.generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export type MutationRegisterAppleStartLiveActivityPushTokenArgs = {
export type MutationRegisterAppleUpdateLiveActivityPushTokenArgs = {
appBundleId: Scalars['String']['input'];
isSandbox?: InputMaybe<Scalars['Boolean']['input']>;
liveActivityAttributes: Scalars['JSONObject']['input'];
liveActivityType: Scalars['String']['input'];
token: Scalars['String']['input'];
};
Expand Down
13 changes: 13 additions & 0 deletions packages/queues-in-memory/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# queues-in-memory

## Develop

```bash
bun dev
```

## Test

```bash
bun test
```
9 changes: 9 additions & 0 deletions packages/queues-in-memory/codegen.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import defaultConfig from '@zemble/graphql/codegen'

import type { CodegenConfig } from '@graphql-codegen/cli'

const config: CodegenConfig = {
...defaultConfig,
}

export default config
7 changes: 7 additions & 0 deletions packages/queues-in-memory/crons/hello-cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ZembleCron } from '@zemble/core'

export default new ZembleCron(() => {
console.log('hello cron!!')
}, {
repeatPattern: '*/5 * * * * *',
})
23 changes: 23 additions & 0 deletions packages/queues-in-memory/graphql/Mutation/randomNumber.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { createTestApp } from '@zemble/core/test-utils'
import {
it, expect,
} from 'bun:test'

import plugin from '../../plugin'
import { graphql } from '../client.generated'

const randomNumberMutation = graphql(`
mutation RandomNumber {
randomNumber
}
`)

it('Should return a number', async () => {
const app = await createTestApp(plugin)

const response = await app.gqlRequest(randomNumberMutation, {})
expect(response.data).toEqual({

randomNumber: expect.any(Number),
})
})
11 changes: 11 additions & 0 deletions packages/queues-in-memory/graphql/Mutation/randomNumber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { MutationResolvers } from '../schema.generated'

const randomNumber: MutationResolvers['randomNumber'] = (_, __, { pubsub }) => {
const randomNumber = Math.floor(Math.random() * 1000)

pubsub.publish('randomNumber', randomNumber)

return randomNumber
}

export default randomNumber
21 changes: 21 additions & 0 deletions packages/queues-in-memory/graphql/Query/hello.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { createTestApp } from '@zemble/core/test-utils'
import { it, expect } from 'bun:test'

import plugin from '../../plugin'
import { graphql } from '../client.generated'

const HelloWorldQuery = graphql(`
query Hello {
hello
}
`)

it('Should return world!', async () => {
const app = await createTestApp(plugin)

const response = await app.gqlRequest(HelloWorldQuery, {})

expect(response.data).toEqual({
hello: 'world!',
})
})
5 changes: 5 additions & 0 deletions packages/queues-in-memory/graphql/Query/hello.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import type { QueryResolvers } from '../schema.generated'

const hello: QueryResolvers['hello'] = () => 'world!'

export default hello
20 changes: 20 additions & 0 deletions packages/queues-in-memory/graphql/Subscription/countdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import type { SubscriptionResolvers } from '../schema.generated'

const countdown: SubscriptionResolvers['countdown'] = {
// This will return the value on every 1 sec until it reaches 0
// eslint-disable-next-line object-shorthand
subscribe: async function* (_, { from }, { logger }) {
// eslint-disable-next-line no-plusplus
for (let i = from; i >= 0; i--) {
// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
logger.info('countdown', { countdown: i })
setTimeout(resolve, 1000)
})
yield { countdown: i }
}
},
resolve: (payload: unknown) => (payload as { readonly countdown: number}).countdown,
}

export default countdown
15 changes: 15 additions & 0 deletions packages/queues-in-memory/graphql/Subscription/randomNumber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import type { SubscriptionResolvers } from '../schema.generated'

const randomNumber: SubscriptionResolvers['randomNumber'] = {
// subscribe to the randomNumber event
subscribe: (_, __, { pubsub }) => {
console.log('subscribing to randomNumber')
return pubsub.subscribe('randomNumber')
},
resolve: (payload: number) => {
console.log('resolving randomNumber', payload)
return payload
},
}

export default randomNumber
22 changes: 22 additions & 0 deletions packages/queues-in-memory/graphql/Subscription/tick.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { SubscriptionResolvers } from '../schema.generated'

let initialized = false
const initializeOnce = (pubsub: Zemble.PubSubType) => {
if (initialized) return
initialized = true
setInterval(() => {
pubsub.publish('tick', Date.now())
}, 1000)
}

const tick: SubscriptionResolvers['tick'] = {
// subscribe to the tick event
subscribe: (_, __, { pubsub }) => {
initializeOnce(pubsub)
console.log('subscribing to tick')
return pubsub.subscribe('tick')
},
resolve: (payload: number) => payload,
}

export default tick
Loading
Loading