From 358ab16d442e3f2103cffd7259301966518b41e4 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 5 Sep 2022 22:14:26 +0000 Subject: [PATCH 1/2] Update babel monorepo to v7.19.0 --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 3ba3d9c0..2cde72d5 100644 --- a/package.json +++ b/package.json @@ -48,9 +48,9 @@ }, "devDependencies": { "@babel/cli": "7.18.10", - "@babel/core": "7.18.10", + "@babel/core": "7.19.0", "@babel/node": "7.18.10", - "@babel/preset-env": "7.18.10", + "@babel/preset-env": "7.19.0", "@types/node": "16.11.52", "babel-loader": "8.2.5", "chalk": "5.0.1", From 6f963c452071b8d775da9c974670a3b129088255 Mon Sep 17 00:00:00 2001 From: tysonrm Date: Tue, 6 Sep 2022 15:03:36 -0500 Subject: [PATCH 2/2] change broker and model.create to synchroous --- src/aegis.js | 13 +++- src/domain/model-factory.js | 3 +- src/domain/model.js | 72 +++++++++++---------- src/domain/orchestrator.js | 6 +- src/domain/thread-pool.js | 7 +- src/domain/use-cases/add-model.js | 33 +++++----- src/domain/use-cases/create-service-mesh.js | 18 ++---- src/domain/use-cases/invoke-port.js | 9 +-- src/domain/util/app-error.js | 15 +++-- 9 files changed, 98 insertions(+), 78 deletions(-) diff --git a/src/aegis.js b/src/aegis.js index b4aa6842..fc8bc886 100644 --- a/src/aegis.js +++ b/src/aegis.js @@ -8,6 +8,8 @@ const { badUserRoute, reload } = DomainEvents const { StorageService } = services const { StorageAdapter } = adapters const { pathToRegexp, match } = require('path-to-regexp') +const { nanoid } = require('nanoid') +const { AsyncLocalStorage } = require('async_hooks') const { find, save } = StorageAdapter const overrides = { find, save, ...StorageService } const broker = EventBrokerFactory.getInstance() @@ -168,12 +170,19 @@ async function handle (path, method, req, res) { } } +const newStore = () => nanoid() + +const asyncLocalStorage = new AsyncLocalStorage() + +const handler = (path, method, req, res) => + asyncLocalStorage.run(newStore(), handle, path, method, req, res) + /** * Tell components to clean up any system resources */ exports.dispose = async function () { console.log('system hot-reloading') - await broker.notify(reload, 'system reload') + broker.notify(reload, 'system reload') } /** @@ -194,5 +203,5 @@ exports.init = async function (remotes) { // load from storage await cache.load() // controllers - return handle + return handler } diff --git a/src/domain/model-factory.js b/src/domain/model-factory.js index 7d11863e..c70a42d3 100644 --- a/src/domain/model-factory.js +++ b/src/domain/model-factory.js @@ -129,7 +129,8 @@ const ModelFactory = { * @param {import('./datasource').default} datasource - persistence/cache * @returns {Promise>} the model instance */ - createModel: async function (broker, datasource, modelName, ...args) { + // createModel: async function (broker, datasource, modelName, ...args) { + createModel: function (broker, datasource, modelName, ...args) { const name = checkModelName(modelName) const spec = modelFactories.get(name) diff --git a/src/domain/model.js b/src/domain/model.js index 71fa4385..37645660 100644 --- a/src/domain/model.js +++ b/src/domain/model.js @@ -341,6 +341,13 @@ const Model = (() => { return datasource.find(id) }, + /** + * Returns writable stream for upserting the datasource. + * Use in combination with {@link Model}.list(), which will pipe + * its output to it. + * + * @returns {WritableStream} + */ createWriteStream () { return datasource.createWriteStream() }, @@ -357,32 +364,19 @@ const Model = (() => { }, /** - * Search existing model instances (asynchronously). * Searches cache first, then persistent storage if not found. * - * @param {{key1, keyN}} filter + * @param {{ + * writable:WritableStream, + * transform:import('stream').Transform, + * filter:{k:'v'}, + * sort:'asc'|'dsc'|*, + * limit:number + * }} options * @returns {Model[]} */ - async list ({ - filter, - writable = null, - transform = null, - cache = false, - serialize = true, - sort, - limit, - aggregate - }) { - return datasource.list({ - filter, - writable, - transform, - cache, - serialize, - sort, - limit, - aggregate - }) + async list (options) { + return datasource.list(options) }, /** @@ -483,24 +477,31 @@ const Model = (() => { * spec: import('./index').ModelSpecification * }} modelInfo Contains model specification and user input to build a model instance */ - const Model = async modelInfo => - Promise.resolve( - // Call factory with data from request payload - modelInfo.spec.factory(...modelInfo.args) - ).then(model => - make({ - model, - args: modelInfo.args, - spec: modelInfo.spec - }) - ) + // const Model = async modelInfo => + // Promise.resolve( + // // Call factory with data from request payload + // modelInfo.spec.factory(...modelInfo.args) + // ).then(model => + // make({ + // model, + // args: modelInfo.args, + // spec: modelInfo.spec + // }) + // ) + + const Model = modelInfo => + make({ + model: modelInfo.spec.factory(...modelInfo.args), + args: modelInfo.args, + spec: modelInfo.spec + }) const validate = event => model => model[VALIDATE]({}, event) /** * Create model instance */ - const makeModel = asyncPipe( + const makeModel = pipe( Model, withTimestamp(CREATETIME), withSerializers( @@ -535,7 +536,8 @@ const Model = (() => { * }} modelInfo * @returns {Promise>} */ - create: async modelInfo => makeModel(modelInfo), + // create: async modelInfo => makeModel(modelInfo), + create: modelInfo => makeModel(modelInfo), /** * Load a saved model diff --git a/src/domain/orchestrator.js b/src/domain/orchestrator.js index 92b34d16..80d67af4 100644 --- a/src/domain/orchestrator.js +++ b/src/domain/orchestrator.js @@ -29,8 +29,10 @@ export async function generateWorkflow (options) { ModelFactory.registerModel(workflow) } -export async function runWorkflow ({ wfName }) { - const model = await ModelFactory.createModel( +export function runWorkflow ({ wfName }) { + // export async function runWorkflow ({ wfName }) { + // const model = await ModelFactory.createModel( + const model = ModelFactory.createModel( EventBrokerFactory.getInstance(), DataSourceFactory.getSharedDataSource(wfName), wfName diff --git a/src/domain/thread-pool.js b/src/domain/thread-pool.js index 0dca1783..bc95d7a8 100644 --- a/src/domain/thread-pool.js +++ b/src/domain/thread-pool.js @@ -6,11 +6,12 @@ import { Worker, BroadcastChannel } from 'worker_threads' import domainEvents from './domain-events' import ModelFactory from '.' import { performance as perf } from 'perf_hooks' -import { AsyncResource } from 'async_hooks' +import { AsyncResource, AsyncLocalStorage } from 'async_hooks' import os from 'os' const { poolOpen, poolClose, poolDrain, poolAbort } = domainEvents const broker = EventBrokerFactory.getInstance() +const asyncLocalStorage = new AsyncLocalStorage() const NOJOBS = 'noJobsRunning' const MAINCHANNEL = 'mainChannel' const EVENTCHANNEL = 'eventChannel' @@ -54,7 +55,6 @@ class Job extends AsyncResource { this.options = options this.callback = (error, result) => this.runInAsyncScope(options.callback, this, error, result) - //options.callback(error, result) } } @@ -177,6 +177,7 @@ export class ThreadPool extends EventEmitter { transfer = [] } = job.options const startTime = perf.now() + console.log({ kAsyncStore: asyncLocalStorage.getStore() }) this[channel].once('message', result => { pool.jobTime(perf.now() - startTime) @@ -264,6 +265,7 @@ export class ThreadPool extends EventEmitter { * @returns {Promise<*>} anything that can be cloned */ runJob (jobName, jobData, options = {}) { + console.log({ asyncLocalStorage: asyncLocalStorage.getStore() }) return new Promise((resolve, reject) => { this.jobsRequested++ @@ -276,6 +278,7 @@ export class ThreadPool extends EventEmitter { jobName, jobData, ...options, + asyncStore: asyncLocalStorage.getStore(), callback: (error, result) => { if (error) return reject(error) if (result) return resolve(result) diff --git a/src/domain/use-cases/add-model.js b/src/domain/use-cases/add-model.js index 4b4b1440..7ced153b 100644 --- a/src/domain/use-cases/add-model.js +++ b/src/domain/use-cases/add-model.js @@ -48,25 +48,26 @@ export default function makeAddModel ({ return result } else { - const model = await models.createModel( - broker, - repository, - modelName, - input - ) - await repository.save(model.getId(), model) - try { - const event = models.createEvent(eventType, modelName, model) - await broker.notify(eventName, event) + // const model = await models.createModel( + const model = models.createModel(broker, repository, modelName, input) + await repository.save(model.getId(), model) + + try { + const event = models.createEvent(eventType, modelName, model) + broker.notify(eventName, event) + } catch (error) { + // remote the object if not processed + await repository.delete(model.getId()) + broker.emitError(error) + } + + // Return the latest changes + return repository.find(model.getId()) } catch (error) { - // remote the object if not processed - await repository.delete(model.getId()) - throw error + console.log({ fn: addModel.name, error }) + broker.notify('error', error) } - - // Return the latest changes - return repository.find(model.getId()) } } diff --git a/src/domain/use-cases/create-service-mesh.js b/src/domain/use-cases/create-service-mesh.js index 94350a1e..a2e1f6a3 100644 --- a/src/domain/use-cases/create-service-mesh.js +++ b/src/domain/use-cases/create-service-mesh.js @@ -4,19 +4,15 @@ const requiredMethods = ['connect', 'publish', 'subscribe', 'close'] const pluginName = process.env.SERVICE_MESH_PLUGIN || 'webswitch' export default function makeServiceMesh ({ broker, models, repository }) { - return async function (options) { - const plugin = await models.createModel( - broker, - repository, - pluginName, - options - ) + return function (options) { + const plugin = models.createModel(broker, repository, pluginName, options) const missingMethods = requiredMethods.filter(method => !plugin[method]) + const msg = `ServiceMesh plug-in is missing required methods ${missingMethods}` - if (missingMethods.length > 0) - throw new Error( - `ServiceMesh plug-in is missing required methods ${missingMethods}` - ) + if (missingMethods.length > 0) { + console.error({ fn: makeServiceMesh.n, msg }) + //throw new Error(msg) + } return plugin } diff --git a/src/domain/use-cases/invoke-port.js b/src/domain/use-cases/invoke-port.js index f57c726b..e25223d7 100644 --- a/src/domain/use-cases/invoke-port.js +++ b/src/domain/use-cases/invoke-port.js @@ -37,14 +37,15 @@ export default function makeInvokePort ({ } else { try { const { id = null, port, args } = input - const model = typeof(id) === undefined - ? await repository.find(id) - : await models.createModel(broker, repository, modelName, args) + const model = + typeof id === undefined + ? await repository.find(id) + : models.createModel(broker, repository, modelName, args) if (!model) throw new Error('no such id') const callback = model.getPorts()[port].callback || (x => x) - return await model[port](input,callback) + return await model[port](input, callback) } catch (error) { return new Error(error) } diff --git a/src/domain/util/app-error.js b/src/domain/util/app-error.js index 428a2875..2e074ba0 100644 --- a/src/domain/util/app-error.js +++ b/src/domain/util/app-error.js @@ -1,10 +1,15 @@ 'use strict' -class AppError { +class AppError extends Error { constructor (error) { - this.message = error?.message - this.stack = error?.stack - this.name = error?.name - this.hasError = true + super(error.message) + this.name = error.name + this.stack = error.stack + this.cause = error.cause || 'unknown' + this.time = date.now() + } + + toJSON () { + return { ...this, time: new Date(this.time).toUTCString() } } }