diff --git a/server/core/src/adapter.ts b/server/core/src/adapter.ts index 66629f47f1..25f53f4e86 100644 --- a/server/core/src/adapter.ts +++ b/server/core/src/adapter.ts @@ -69,6 +69,8 @@ export interface DbAdapter extends LowLevelStorage { helper?: () => DomainHelperOperations + closeContext?: (ctx: MeasureContext) => Promise + close: () => Promise findAll: ( ctx: MeasureContext, diff --git a/server/core/src/dbAdapterManager.ts b/server/core/src/dbAdapterManager.ts index a27bbd1fab..e03fff9417 100644 --- a/server/core/src/dbAdapterManager.ts +++ b/server/core/src/dbAdapterManager.ts @@ -42,6 +42,18 @@ export class DbAdapterManagerImpl implements DBAdapterManager { private readonly adapters: Map ) {} + async closeContext (ctx: MeasureContext): Promise { + for (const adapter of this.adapters.values()) { + try { + if (adapter.closeContext !== undefined) { + await adapter.closeContext(ctx) + } + } catch (err: any) { + Analytics.handleError(err) + } + } + } + getDefaultAdapter (): DbAdapter { return this.defaultAdapter } diff --git a/server/core/src/types.ts b/server/core/src/types.ts index e5fe6f50f7..70c213b739 100644 --- a/server/core/src/types.ts +++ b/server/core/src/types.ts @@ -138,6 +138,8 @@ export interface DBAdapterManager { registerHelper: (helper: DomainHelper) => Promise initAdapters: (ctx: MeasureContext) => Promise + + closeContext: (ctx: MeasureContext) => Promise } export interface PipelineContext { @@ -157,6 +159,8 @@ export interface PipelineContext { head?: Middleware broadcastEvent?: (ctx: MeasureContext, tx: Tx[]) => Promise + + endContext?: (ctx: MeasureContext) => Promise } /** * @public diff --git a/server/middleware/src/connectionMgr.ts b/server/middleware/src/connectionMgr.ts new file mode 100644 index 0000000000..fc6665dc75 --- /dev/null +++ b/server/middleware/src/connectionMgr.ts @@ -0,0 +1,32 @@ +import { generateId, type MeasureContext, type Tx } from '@hcengineering/core' +import { + BaseMiddleware, + type Middleware, + type PipelineContext, + type TxMiddlewareResult +} from '@hcengineering/server-core' + +/** + * Will support apply tx + * @public + */ +export class ConnectionMgrMiddleware extends BaseMiddleware implements Middleware { + static async create ( + ctx: MeasureContext, + context: PipelineContext, + next?: Middleware + ): Promise { + return new ConnectionMgrMiddleware(context, next) + } + + async tx (ctx: MeasureContext, tx: Tx[]): Promise { + if (ctx.id === undefined) { + ctx.id = generateId() + } + const result = await this.provideTx(ctx, tx) + this.context.endContext = async (_ctx: MeasureContext) => { + await this.context.adapterManager?.closeContext?.(ctx) + } + return result + } +} diff --git a/server/middleware/src/index.ts b/server/middleware/src/index.ts index 512a0cd383..67d4157877 100644 --- a/server/middleware/src/index.ts +++ b/server/middleware/src/index.ts @@ -16,6 +16,7 @@ export * from './applyTx' export * from './broadcast' export * from './configuration' +export * from './connectionMgr' export * from './contextName' export * from './dbAdapter' export * from './dbAdapterHelper' @@ -27,10 +28,10 @@ export * from './lookup' export * from './lowLevel' export * from './model' export * from './modified' +export * from './notifications' export * from './private' export * from './queryJoin' export * from './spacePermissions' export * from './spaceSecurity' export * from './triggers' export * from './txPush' -export * from './notifications' diff --git a/server/middleware/src/triggers.ts b/server/middleware/src/triggers.ts index 84ddef6ece..7a64d6a37f 100644 --- a/server/middleware/src/triggers.ts +++ b/server/middleware/src/triggers.ts @@ -229,6 +229,7 @@ export class TriggersMiddleware extends BaseMiddleware implements Middleware { // We need to send all to recipients await this.context.head?.handleBroadcast(ctx) } + await this.context.endContext?.(ctx) } private async processDerivedTxes (ctx: MeasureContext, derived: Tx[]): Promise { diff --git a/server/postgres/src/storage.ts b/server/postgres/src/storage.ts index 09c29a1c5d..8a44e4cf50 100644 --- a/server/postgres/src/storage.ts +++ b/server/postgres/src/storage.ts @@ -65,7 +65,7 @@ import { updateHashForDoc } from '@hcengineering/server-core' import { createHash } from 'crypto' -import { type Pool, type PoolClient } from 'pg' +import { Pool, type PoolClient } from 'pg' import { type ValueType } from './types' import { convertDoc, @@ -89,18 +89,20 @@ abstract class PostgresAdapterBase implements DbAdapter { protected readonly _helper: DBCollectionHelper protected readonly tableFields = new Map() protected readonly queue: ((client: PoolClient) => Promise)[] = [] - private readonly mutex = new Mutex() + protected readonly mutex = new Mutex() + protected readonly connections = new Map() - protected readonly retryTxn = async (fn: (client: PoolClient) => Promise): Promise => { + protected readonly retryTxn = async ( + connection: Pool | PoolClient, + fn: (client: Pool | PoolClient) => Promise + ): Promise => { await this.mutex.runExclusive(async () => { - await this.processOps(this.txConnection, fn) + await this.processOps(connection, fn) }) } constructor ( protected readonly client: Pool, - protected readonly connection: PoolClient, - protected readonly txConnection: PoolClient, protected readonly refClient: PostgresClientReference, protected readonly workspaceId: WorkspaceId, protected readonly hierarchy: Hierarchy, @@ -109,30 +111,56 @@ abstract class PostgresAdapterBase implements DbAdapter { this._helper = new DBCollectionHelper(this.client, this.workspaceId) } - private async processOps (client: PoolClient, operation: (client: PoolClient) => Promise): Promise { + async closeContext (ctx: MeasureContext): Promise { + if (ctx.id === undefined) return + const conn = this.connections.get(ctx.id) + if (conn !== undefined) { + conn.release() + this.connections.delete(ctx.id) + } + } + + protected async getConnection (ctx: MeasureContext): Promise { + if (ctx.id === undefined) return this.client + const conn = this.connections.get(ctx.id) + if (conn !== undefined) return conn + const client = await this.client.connect() + this.connections.set(ctx.id, client) + return client + } + + private async processOps (client: Pool | PoolClient, operation: (client: PoolClient) => Promise): Promise { const backoffInterval = 100 // millis const maxTries = 5 let tries = 0 - while (true) { - await client.query('BEGIN;') - tries++ - - try { - const result = await operation(client) - await client.query('COMMIT;') - return result - } catch (err: any) { - await client.query('ROLLBACK;') + const _client = client instanceof Pool ? await client.connect() : client - if (err.code !== '40001' || tries === maxTries) { - throw err - } else { - console.log('Transaction failed. Retrying.') - console.log(err.message) - await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval)) + try { + while (true) { + await _client.query('BEGIN;') + tries++ + + try { + const result = await operation(_client) + await _client.query('COMMIT;') + return result + } catch (err: any) { + await _client.query('ROLLBACK;') + + if (err.code !== '40001' || tries === maxTries) { + throw err + } else { + console.log('Transaction failed. Retrying.') + console.log(err.message) + await new Promise((resolve) => setTimeout(resolve, tries * backoffInterval)) + } } } + } finally { + if (client instanceof Pool) { + _client.release() + } } } @@ -198,8 +226,9 @@ abstract class PostgresAdapterBase implements DbAdapter { abstract init (): Promise async close (): Promise { - this.txConnection.release() - this.connection.release() + this.connections.forEach((c) => { + c.release() + }) this.refClient.close() } @@ -215,7 +244,7 @@ abstract class PostgresAdapterBase implements DbAdapter { sqlChunks.push(`LIMIT ${options.limit}`) } const finalSql: string = [select, ...sqlChunks].join(' ') - const result = await this.connection.query(finalSql) + const result = await this.client.query(finalSql) return result.rows.map((p) => parseDocWithProjection(p, options?.projection)) } @@ -261,7 +290,7 @@ abstract class PostgresAdapterBase implements DbAdapter { if ((operations as any)['%hash%'] === undefined) { ;(operations as any)['%hash%'] = null } - await this.retryTxn(async (client) => { + await this.retryTxn(this.client, async (client) => { const res = await client.query(`SELECT * FROM ${translateDomain(domain)} WHERE ${translatedQuery} FOR UPDATE`) const docs = res.rows.map(parseDoc) for (const doc of docs) { @@ -309,7 +338,7 @@ abstract class PostgresAdapterBase implements DbAdapter { const domain = translateDomain(options?.domain ?? this.hierarchy.getDomain(_class)) const sqlChunks: string[] = [] const joins = this.buildJoin(_class, options?.lookup) - const select = `SELECT ${this.getProjection(ctx, _class, domain, options?.projection, joins)} FROM ${domain}` + const select = `SELECT ${this.getProjection(_class, domain, options?.projection, joins)} FROM ${domain}` const secJoin = this.addSecurity(query, domain, ctx.contextData) if (secJoin !== undefined) { sqlChunks.push(secJoin) @@ -318,12 +347,13 @@ abstract class PostgresAdapterBase implements DbAdapter { sqlChunks.push(this.buildJoinString(joins)) } sqlChunks.push(`WHERE ${this.buildQuery(_class, domain, query, joins, options)}`) + const connection = await this.getConnection(ctx) let total = options?.total === true ? 0 : -1 if (options?.total === true) { const totalReq = `SELECT COUNT(${domain}._id) as count FROM ${domain}` const totalSql = [totalReq, ...sqlChunks].join(' ') - const totalResult = await this.connection.query(totalSql) + const totalResult = await connection.query(totalSql) const parsed = Number.parseInt(totalResult.rows[0]?.count ?? '') total = Number.isNaN(parsed) ? 0 : parsed } @@ -335,7 +365,7 @@ abstract class PostgresAdapterBase implements DbAdapter { } const finalSql: string = [select, ...sqlChunks].join(' ') - const result = await this.connection.query(finalSql) + const result = await connection.query(finalSql) if (options?.lookup === undefined) { return toFindResult( result.rows.map((p) => parseDocWithProjection(p, options?.projection)), @@ -938,7 +968,6 @@ abstract class PostgresAdapterBase implements DbAdapter { } private getProjection( - ctx: MeasureContext, _class: Ref>, baseDomain: string, projection: Projection | undefined, @@ -1029,7 +1058,7 @@ abstract class PostgresAdapterBase implements DbAdapter { next: async () => { if (!initialized) { if (recheck === true) { - await this.retryTxn(async (client) => { + await this.retryTxn(client, async (client) => { await client.query( `UPDATE ${translateDomain(domain)} SET jsonb_set(data, '{%hash%}', 'NULL', true) WHERE "workspaceId" = $1 AND data ->> '%hash%' IS NOT NULL`, [this.workspaceId.name] @@ -1100,7 +1129,8 @@ abstract class PostgresAdapterBase implements DbAdapter { if (docs.length === 0) { return [] } - const res = await this.connection.query( + const connection = await this.getConnection(ctx) + const res = await connection.query( `SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [docs, this.workspaceId.name] ) @@ -1109,60 +1139,65 @@ abstract class PostgresAdapterBase implements DbAdapter { } async upload (ctx: MeasureContext, domain: Domain, docs: Doc[]): Promise { - const arr = docs.concat() - const fields = getDocFieldsByDomains(domain) - const filedsWithData = [...fields, 'data'] - const insertFields: string[] = [] - const onConflict: string[] = [] - for (const field of filedsWithData) { - insertFields.push(`"${field}"`) - onConflict.push(`"${field}" = EXCLUDED."${field}"`) - } - const insertStr = insertFields.join(', ') - const onConflictStr = onConflict.join(', ') - while (arr.length > 0) { - const part = arr.splice(0, 500) - const values: any[] = [] - const vars: string[] = [] - let index = 1 - for (let i = 0; i < part.length; i++) { - const doc = part[i] - const variables: string[] = [] - const d = convertDoc(domain, doc, this.workspaceId.name) - values.push(d.workspaceId) - variables.push(`$${index++}`) - for (const field of fields) { - values.push(d[field]) + await ctx.with('upload', { domain }, async (ctx) => { + const arr = docs.concat() + const fields = getDocFieldsByDomains(domain) + const filedsWithData = [...fields, 'data'] + const insertFields: string[] = [] + const onConflict: string[] = [] + for (const field of filedsWithData) { + insertFields.push(`"${field}"`) + onConflict.push(`"${field}" = EXCLUDED."${field}"`) + } + const insertStr = insertFields.join(', ') + const onConflictStr = onConflict.join(', ') + const connection = await this.getConnection(ctx) + while (arr.length > 0) { + const part = arr.splice(0, 500) + const values: any[] = [] + const vars: string[] = [] + let index = 1 + for (let i = 0; i < part.length; i++) { + const doc = part[i] + const variables: string[] = [] + const d = convertDoc(domain, doc, this.workspaceId.name) + values.push(d.workspaceId) variables.push(`$${index++}`) + for (const field of fields) { + values.push(d[field]) + variables.push(`$${index++}`) + } + values.push(d.data) + variables.push(`$${index++}`) + vars.push(`(${variables.join(', ')})`) } - values.push(d.data) - variables.push(`$${index++}`) - vars.push(`(${variables.join(', ')})`) - } - const vals = vars.join(',') - await this.retryTxn(async (client) => { - await client.query( - `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals} - ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`, - values - ) - }) - } + const vals = vars.join(',') + await this.retryTxn(connection, async (client) => { + await client.query( + `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals} + ON CONFLICT ("workspaceId", _id) DO UPDATE SET ${onConflictStr};`, + values + ) + }) + } + }) } async clean (ctx: MeasureContext, domain: Domain, docs: Ref[]): Promise { - await this.connection.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [ + const connection = await this.getConnection(ctx) + await connection.query(`DELETE FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2`, [ docs, this.workspaceId.name ]) } async groupBy(ctx: MeasureContext, domain: Domain, field: string): Promise> { + const connection = await this.getConnection(ctx) const key = isDataField(domain, field) ? `data ->> '${field}'` : `"${field}"` const result = await ctx.with('groupBy', { domain }, async (ctx) => { try { - const result = await this.connection.query( + const result = await connection.query( `SELECT DISTINCT ${key} as ${field} FROM ${translateDomain(domain)} WHERE "workspaceId" = $1`, [this.workspaceId.name] ) @@ -1177,7 +1212,8 @@ abstract class PostgresAdapterBase implements DbAdapter { async update (ctx: MeasureContext, domain: Domain, operations: Map, DocumentUpdate>): Promise { const ids = Array.from(operations.keys()) - await this.retryTxn(async (client) => { + const connection = await this.getConnection(ctx) + await this.retryTxn(connection, async (client) => { try { const res = await client.query( `SELECT * FROM ${translateDomain(domain)} WHERE _id = ANY($1) AND "workspaceId" = $2 FOR UPDATE`, @@ -1219,7 +1255,7 @@ abstract class PostgresAdapterBase implements DbAdapter { }) } - async insert (domain: string, docs: Doc[]): Promise { + async insert (ctx: MeasureContext, domain: string, docs: Doc[]): Promise { const fields = getDocFieldsByDomains(domain) const filedsWithData = [...fields, 'data'] const insertFields: string[] = [] @@ -1247,7 +1283,8 @@ abstract class PostgresAdapterBase implements DbAdapter { vars.push(`(${variables.join(', ')})`) } const vals = vars.join(',') - await this.retryTxn(async (client) => { + const connection = await this.getConnection(ctx) + await this.retryTxn(connection, async (client) => { await client.query( `INSERT INTO ${translateDomain(domain)} ("workspaceId", ${insertStr}) VALUES ${vals}`, values @@ -1312,8 +1349,9 @@ class PostgresAdapter extends PostgresAdapterBase { } private async txMixin (ctx: MeasureContext, tx: TxMixin): Promise { - await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async () => { - await this.retryTxn(async (client) => { + await ctx.with('tx-mixin', { _class: tx.objectClass, mixin: tx.mixin }, async (ctx) => { + const connection = await this.getConnection(ctx) + await this.retryTxn(connection, async (client) => { const doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) if (doc === undefined) return TxProcessor.updateMixin4Doc(doc, tx) @@ -1365,21 +1403,22 @@ class PostgresAdapter extends PostgresAdapterBase { protected async txCreateDoc (ctx: MeasureContext, tx: TxCreateDoc): Promise { const doc = TxProcessor.createDoc2Doc(tx) - return await ctx.with('create-doc', { _class: doc._class }, async () => { - return await this.insert(translateDomain(this.hierarchy.getDomain(doc._class)), [doc]) + return await ctx.with('create-doc', { _class: doc._class }, async (_ctx) => { + return await this.insert(_ctx, translateDomain(this.hierarchy.getDomain(doc._class)), [doc]) }) } protected async txUpdateDoc (ctx: MeasureContext, tx: TxUpdateDoc): Promise { - return await ctx.with('tx-update-doc', { _class: tx.objectClass }, async () => { + return await ctx.with('tx-update-doc', { _class: tx.objectClass }, async (_ctx) => { if (isOperator(tx.operations)) { let doc: Doc | undefined const ops = { '%hash%': null, ...tx.operations } - return await ctx.with( + return await _ctx.with( 'update with operations', { operations: JSON.stringify(Object.keys(tx.operations)) }, - async () => { - await this.retryTxn(async (client) => { + async (ctx) => { + const connection = await this.getConnection(ctx) + await this.retryTxn(connection, async (client) => { doc = await this.findDoc(ctx, client, tx.objectClass, tx.objectId, true) if (doc === undefined) return {} TxProcessor.applyUpdate(doc, ops) @@ -1409,7 +1448,7 @@ class PostgresAdapter extends PostgresAdapterBase { } ) } else { - return await this.updateDoc(ctx, tx, tx.retrieve ?? false) + return await this.updateDoc(_ctx, tx, tx.retrieve ?? false) } }) } @@ -1419,7 +1458,7 @@ class PostgresAdapter extends PostgresAdapterBase { tx: TxUpdateDoc, retrieve: boolean ): Promise { - return await ctx.with('update jsonb_set', {}, async () => { + return await ctx.with('update jsonb_set', {}, async (_ctx) => { const updates: string[] = ['"modifiedBy" = $1', '"modifiedOn" = $2'] const params: any[] = [tx.modifiedBy, tx.modifiedOn, tx.objectId, this.workspaceId.name] let paramsIndex = 5 @@ -1446,14 +1485,15 @@ class PostgresAdapter extends PostgresAdapterBase { } try { - await this.retryTxn(async (client) => { + const connection = await this.getConnection(_ctx) + await this.retryTxn(connection, async (client) => { await client.query( `UPDATE ${translateDomain(this.hierarchy.getDomain(tx.objectClass))} SET ${updates.join(', ')} WHERE _id = $3 AND "workspaceId" = $4`, params ) }) if (retrieve) { - const object = await this.findDoc(ctx, this.connection, tx.objectClass, tx.objectId) + const object = await this.findDoc(_ctx, connection, tx.objectClass, tx.objectId) return { object } } } catch (err) { @@ -1465,7 +1505,7 @@ class PostgresAdapter extends PostgresAdapterBase { private async findDoc ( ctx: MeasureContext, - client: PoolClient, + client: Pool | PoolClient, _class: Ref>, _id: Ref, forUpdate: boolean = false @@ -1482,9 +1522,10 @@ class PostgresAdapter extends PostgresAdapterBase { } protected async txRemoveDoc (ctx: MeasureContext, tx: TxRemoveDoc): Promise { - await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async () => { + await ctx.with('tx-remove-doc', { _class: tx.objectClass }, async (_ctx) => { const domain = translateDomain(this.hierarchy.getDomain(tx.objectClass)) - await this.retryTxn(async (client) => { + const connection = await this.getConnection(_ctx) + await this.retryTxn(connection, async (client) => { await client.query(`DELETE FROM ${domain} WHERE _id = $1 AND "workspaceId" = $2`, [ tx.objectId, this.workspaceId.name @@ -1507,7 +1548,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter { return [] } try { - await this.insert(DOMAIN_TX, tx) + await this.insert(ctx, DOMAIN_TX, tx) } catch (err) { console.error(err) } @@ -1515,7 +1556,7 @@ class PostgresTxAdapter extends PostgresAdapterBase implements TxAdapter { } async getModel (ctx: MeasureContext): Promise { - const res = await this.connection.query( + const res = await this.client.query( `SELECT * FROM ${translateDomain(DOMAIN_TX)} WHERE "workspaceId" = '${this.workspaceId.name}' AND data->>'objectSpace' = '${core.space.Model}' ORDER BY _id ASC, "modifiedOn" ASC` ) const model = res.rows.map((p) => parseDoc(p)) @@ -1539,9 +1580,7 @@ export async function createPostgresAdapter ( ): Promise { const client = getDBClient(url) const pool = await client.getClient() - const mainConnection = await pool.connect() - const txConnection = await pool.connect() - const adapter = new PostgresAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb) + const adapter = new PostgresAdapter(pool, client, workspaceId, hierarchy, modelDb) return adapter } @@ -1557,9 +1596,7 @@ export async function createPostgresTxAdapter ( ): Promise { const client = getDBClient(url) const pool = await client.getClient() - const mainConnection = await pool.connect() - const txConnection = await pool.connect() - const adapter = new PostgresTxAdapter(pool, mainConnection, txConnection, client, workspaceId, hierarchy, modelDb) + const adapter = new PostgresTxAdapter(pool, client, workspaceId, hierarchy, modelDb) await adapter.init() return adapter } diff --git a/server/postgres/src/utils.ts b/server/postgres/src/utils.ts index 6640666531..b071556c79 100644 --- a/server/postgres/src/utils.ts +++ b/server/postgres/src/utils.ts @@ -206,14 +206,17 @@ export class ClientRef implements PostgresClientReference { * @public */ export function getDBClient (connectionString: string, database?: string): PostgresClientReference { - const key = `${connectionString}${process.env.postgree_OPTIONS ?? '{}'}` + const extraOptions = JSON.parse(process.env.POSTGRES_OPTIONS ?? '{}') + const key = `${connectionString}${extraOptions}` let existing = connections.get(key) if (existing === undefined) { const pool = new Pool({ connectionString, application_name: 'transactor', - database + database, + max: 10, + ...extraOptions }) existing = new PostgresClientReferenceImpl(pool, () => { diff --git a/server/server-pipeline/src/pipeline.ts b/server/server-pipeline/src/pipeline.ts index 38a74b9f4f..ee77c3da08 100644 --- a/server/server-pipeline/src/pipeline.ts +++ b/server/server-pipeline/src/pipeline.ts @@ -18,6 +18,7 @@ import { ApplyTxMiddleware, BroadcastMiddleware, ConfigurationMiddleware, + ConnectionMgrMiddleware, ContextNameMiddleware, DBAdapterInitMiddleware, DBAdapterMiddleware, @@ -129,6 +130,7 @@ export function createServerPipeline ( ConfigurationMiddleware.create, LowLevelMiddleware.create, ContextNameMiddleware.create, + ConnectionMgrMiddleware.create, MarkDerivedEntryMiddleware.create, ApplyTxMiddleware.create, // Extract apply TxMiddleware.create, // Store tx into transaction domain