Skip to content

Commit

Permalink
Connection mgr (#7031)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Sobolev <[email protected]>
Signed-off-by: Denis Bykhov <[email protected]>
Co-authored-by: Andrey Sobolev <[email protected]>
  • Loading branch information
BykhovDenis and haiodo authored Oct 24, 2024
1 parent 723026c commit 34d1b2b
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 101 deletions.
2 changes: 2 additions & 0 deletions server/core/src/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ export interface DbAdapter extends LowLevelStorage {

helper?: () => DomainHelperOperations

closeContext?: (ctx: MeasureContext) => Promise<void>

close: () => Promise<void>
findAll: <T extends Doc>(
ctx: MeasureContext,
Expand Down
12 changes: 12 additions & 0 deletions server/core/src/dbAdapterManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,18 @@ export class DbAdapterManagerImpl implements DBAdapterManager {
private readonly adapters: Map<string, DbAdapter>
) {}

async closeContext (ctx: MeasureContext): Promise<void> {
for (const adapter of this.adapters.values()) {
try {
if (adapter.closeContext !== undefined) {
await adapter.closeContext(ctx)
}
} catch (err: any) {
Analytics.handleError(err)
}
}
}

getDefaultAdapter (): DbAdapter {
return this.defaultAdapter
}
Expand Down
4 changes: 4 additions & 0 deletions server/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ export interface DBAdapterManager {
registerHelper: (helper: DomainHelper) => Promise<void>

initAdapters: (ctx: MeasureContext) => Promise<void>

closeContext: (ctx: MeasureContext) => Promise<void>
}

export interface PipelineContext {
Expand All @@ -157,6 +159,8 @@ export interface PipelineContext {
head?: Middleware

broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise<void>

endContext?: (ctx: MeasureContext) => Promise<void>
}
/**
* @public
Expand Down
32 changes: 32 additions & 0 deletions server/middleware/src/connectionMgr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { generateId, type MeasureContext, type Tx } from '@hcengineering/core'
import {
BaseMiddleware,
type Middleware,
type PipelineContext,
type TxMiddlewareResult
} from '@hcengineering/server-core'

/**
* Will support apply tx
* @public
*/
export class ConnectionMgrMiddleware extends BaseMiddleware implements Middleware {
static async create (
ctx: MeasureContext,
context: PipelineContext,
next?: Middleware
): Promise<Middleware | undefined> {
return new ConnectionMgrMiddleware(context, next)
}

async tx (ctx: MeasureContext, tx: Tx[]): Promise<TxMiddlewareResult> {
if (ctx.id === undefined) {
ctx.id = generateId()
}
const result = await this.provideTx(ctx, tx)
this.context.endContext = async (_ctx: MeasureContext) => {
await this.context.adapterManager?.closeContext?.(ctx)
}
return result
}
}
3 changes: 2 additions & 1 deletion server/middleware/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
export * from './applyTx'
export * from './broadcast'
export * from './configuration'
export * from './connectionMgr'
export * from './contextName'
export * from './dbAdapter'
export * from './dbAdapterHelper'
Expand All @@ -27,10 +28,10 @@ export * from './lookup'
export * from './lowLevel'
export * from './model'
export * from './modified'
export * from './notifications'
export * from './private'
export * from './queryJoin'
export * from './spacePermissions'
export * from './spaceSecurity'
export * from './triggers'
export * from './txPush'
export * from './notifications'
1 change: 1 addition & 0 deletions server/middleware/src/triggers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware {
// We need to send all to recipients
await this.context.head?.handleBroadcast(ctx)
}
await this.context.endContext?.(ctx)
}

private async processDerivedTxes (ctx: MeasureContext<SessionData>, derived: Tx[]): Promise<void> {
Expand Down
Loading

0 comments on commit 34d1b2b

Please sign in to comment.