From ca293a94852c59cb531f3615058178fd66aba18a Mon Sep 17 00:00:00 2001 From: Giovanni Date: Wed, 19 Jun 2024 12:32:33 -0400 Subject: [PATCH 1/7] Add queue to the execute function --- package-lock.json | 28 ++++++++++++++- packages/libsql-client/package.json | 3 +- packages/libsql-client/src/http.ts | 55 ++++++++++++++++------------- 3 files changed, 59 insertions(+), 27 deletions(-) diff --git a/package-lock.json b/package-lock.json index d143eb6..5f99de4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4712,7 +4712,8 @@ "@libsql/core": "^0.6.2", "@libsql/hrana-client": "^0.6.0", "js-base64": "^3.7.5", - "libsql": "^0.3.10" + "libsql": "^0.3.10", + "p-limit": "^5.0.0" }, "devDependencies": { "@types/jest": "^29.2.5", @@ -4747,6 +4748,31 @@ "typescript": "^4.9.4" } }, + "packages/libsql-client/node_modules/p-limit": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-5.0.0.tgz", + "integrity": "sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==", + "dependencies": { + "yocto-queue": "^1.0.0" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "packages/libsql-client/node_modules/yocto-queue": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.0.0.tgz", + "integrity": "sha512-9bnSc/HEW2uRy67wc+T8UwauLuPJVn28jb+GtJY16iiKWyvmYJRXVT4UamsAEGQfPohgr2q4Tq0sQbQlxTfi1g==", + "engines": { + "node": ">=12.20" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "packages/libsql-core": { "name": "@libsql/core", "version": "0.6.2", diff --git a/packages/libsql-client/package.json b/packages/libsql-client/package.json index daef4f8..4d1ab16 100644 --- a/packages/libsql-client/package.json +++ b/packages/libsql-client/package.json @@ -105,7 +105,8 @@ "@libsql/core": "^0.6.2", "@libsql/hrana-client": "^0.6.0", "js-base64": "^3.7.5", - "libsql": "^0.3.10" + "libsql": "^0.3.10", + "p-limit": "^5.0.0" }, "devDependencies": { "@types/jest": "^29.2.5", diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index d6177e8..7f8fa86 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -24,9 +24,12 @@ import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; +import pLimit from "p-limit"; export * from "@libsql/core/api"; +const limit = pLimit(1); + export function createClient(config: Config): Client { return _createClient(expandConfig(config, true)); } @@ -99,33 +102,35 @@ export class HttpClient implements Client { } async execute(stmt: InStatement): Promise { - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmt = stmtToHrana(stmt); - - // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and - // close the stream in a single HTTP request. - let rowsPromise: Promise; - const stream = this.#client.openStream(); + return limit(async () => { try { - rowsPromise = stream.query(hranaStmt); - } finally { - stream.closeGracefully(); - } - - const rowsResult = await rowsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmt = stmtToHrana(stmt); + + // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and + // close the stream in a single HTTP request. + let rowsPromise: Promise; + const stream = this.#client.openStream(); + try { + rowsPromise = stream.query(hranaStmt); + } finally { + stream.closeGracefully(); + } + + const rowsResult = await rowsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } + + return resultSetFromHrana(rowsResult); + } catch (e) { + throw mapHranaError(e); } - - return resultSetFromHrana(rowsResult); - } catch (e) { - throw mapHranaError(e); - } + }); } async batch( From 27d4fba563f39bc142ccb92fa79bbfbaa7416ad1 Mon Sep 17 00:00:00 2001 From: Giovanni Date: Thu, 20 Jun 2024 07:14:16 -0400 Subject: [PATCH 2/7] Replace p-limit with promise-limit --- package-lock.json | 32 ++++++----------------------- packages/libsql-client/package.json | 2 +- packages/libsql-client/src/http.ts | 4 ++-- 3 files changed, 9 insertions(+), 29 deletions(-) diff --git a/package-lock.json b/package-lock.json index 5f99de4..6e87be0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3996,6 +3996,11 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/promise-limit": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/promise-limit/-/promise-limit-2.7.0.tgz", + "integrity": "sha512-7nJ6v5lnJsXwGprnGXga4wx6d1POjvi5Qmf1ivTRxTjH4Z/9Czja/UCMLVmB9N93GeWOU93XaFaEt6jbuoagNw==" + }, "node_modules/prompts": { "version": "2.4.2", "dev": true, @@ -4713,7 +4718,7 @@ "@libsql/hrana-client": "^0.6.0", "js-base64": "^3.7.5", "libsql": "^0.3.10", - "p-limit": "^5.0.0" + "promise-limit": "^2.7.0" }, "devDependencies": { "@types/jest": "^29.2.5", @@ -4748,31 +4753,6 @@ "typescript": "^4.9.4" } }, - "packages/libsql-client/node_modules/p-limit": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-5.0.0.tgz", - "integrity": "sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ==", - "dependencies": { - "yocto-queue": "^1.0.0" - }, - "engines": { - "node": ">=18" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, - "packages/libsql-client/node_modules/yocto-queue": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-1.0.0.tgz", - "integrity": "sha512-9bnSc/HEW2uRy67wc+T8UwauLuPJVn28jb+GtJY16iiKWyvmYJRXVT4UamsAEGQfPohgr2q4Tq0sQbQlxTfi1g==", - "engines": { - "node": ">=12.20" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "packages/libsql-core": { "name": "@libsql/core", "version": "0.6.2", diff --git a/packages/libsql-client/package.json b/packages/libsql-client/package.json index 4d1ab16..50a42f2 100644 --- a/packages/libsql-client/package.json +++ b/packages/libsql-client/package.json @@ -106,7 +106,7 @@ "@libsql/hrana-client": "^0.6.0", "js-base64": "^3.7.5", "libsql": "^0.3.10", - "p-limit": "^5.0.0" + "promise-limit": "^2.7.0" }, "devDependencies": { "@types/jest": "^29.2.5", diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 7f8fa86..364e5fc 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -24,11 +24,11 @@ import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; -import pLimit from "p-limit"; +import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; -const limit = pLimit(1); +const limit = promiseLimit(1); export function createClient(config: Config): Client { return _createClient(expandConfig(config, true)); From 496d1c03fec6c4428694662a2584f179fd3cd46a Mon Sep 17 00:00:00 2001 From: Giovanni Date: Fri, 21 Jun 2024 06:28:01 -0400 Subject: [PATCH 3/7] Add concurrency option to createClient --- packages/libsql-client/src/http.ts | 15 ++++++-- packages/libsql-client/src/ws.ts | 62 ++++++++++++++++++------------ packages/libsql-core/src/api.ts | 8 ++++ packages/libsql-core/src/config.ts | 4 ++ 4 files changed, 60 insertions(+), 29 deletions(-) diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 364e5fc..168bb46 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -28,8 +28,6 @@ import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; -const limit = promiseLimit(1); - export function createClient(config: Config): Client { return _createClient(expandConfig(config, true)); } @@ -64,7 +62,13 @@ export function _createClient(config: ExpandedConfig): Client { } const url = encodeBaseUrl(config.scheme, config.authority, config.path); - return new HttpClient(url, config.authToken, config.intMode, config.fetch); + return new HttpClient( + url, + config.authToken, + config.intMode, + config.fetch, + config.concurrency, + ); } const sqlCacheCapacity = 30; @@ -75,6 +79,7 @@ export class HttpClient implements Client { #url: URL; #authToken: string | undefined; #isSchemaDatabase: boolean | undefined; + #limit: ReturnType>; /** @private */ constructor( @@ -82,12 +87,14 @@ export class HttpClient implements Client { authToken: string | undefined, intMode: IntMode, customFetch: Function | undefined, + concurrency: number | undefined, ) { this.#client = hrana.openHttp(url, authToken, customFetch); this.#client.intMode = intMode; this.protocol = "http"; this.#url = url; this.#authToken = authToken; + this.#limit = promiseLimit(concurrency); } async getIsSchemaDatabase(): Promise { @@ -102,7 +109,7 @@ export class HttpClient implements Client { } async execute(stmt: InStatement): Promise { - return limit(async () => { + return this.#limit(async () => { try { const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmt = stmtToHrana(stmt); diff --git a/packages/libsql-client/src/ws.ts b/packages/libsql-client/src/ws.ts index d628a59..e467d70 100644 --- a/packages/libsql-client/src/ws.ts +++ b/packages/libsql-client/src/ws.ts @@ -25,6 +25,7 @@ import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; +import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; @@ -84,7 +85,13 @@ export function _createClient(config: ExpandedConfig): WsClient { throw mapHranaError(e); } - return new WsClient(client, url, config.authToken, config.intMode); + return new WsClient( + client, + url, + config.authToken, + config.intMode, + config.concurrency, + ); } // This object maintains state for a single WebSocket connection. @@ -125,6 +132,7 @@ export class WsClient implements Client { closed: boolean; protocol: "ws"; #isSchemaDatabase: boolean | undefined; + #limit: ReturnType>; /** @private */ constructor( @@ -132,6 +140,7 @@ export class WsClient implements Client { url: URL, authToken: string | undefined, intMode: IntMode, + concurrency: number | undefined, ) { this.#url = url; this.#authToken = authToken; @@ -140,6 +149,7 @@ export class WsClient implements Client { this.#futureConnState = undefined; this.closed = false; this.protocol = "ws"; + this.#limit = promiseLimit(concurrency); } async getIsSchemaDatabase(): Promise { @@ -154,32 +164,34 @@ export class WsClient implements Client { } async execute(stmt: InStatement): Promise { - const streamState = await this.#openStream(); - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmt = stmtToHrana(stmt); - - // Schedule all operations synchronously, so they will be pipelined and executed in a single - // network roundtrip. - streamState.conn.sqlCache.apply([hranaStmt]); - const hranaRowsPromise = streamState.stream.query(hranaStmt); - streamState.stream.closeGracefully(); + return this.#limit(async () => { + const streamState = await this.#openStream(); + try { + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmt = stmtToHrana(stmt); + + // Schedule all operations synchronously, so they will be pipelined and executed in a single + // network roundtrip. + streamState.conn.sqlCache.apply([hranaStmt]); + const hranaRowsPromise = streamState.stream.query(hranaStmt); + streamState.stream.closeGracefully(); + + const hranaRowsResult = await hranaRowsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } - const hranaRowsResult = await hranaRowsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); + return resultSetFromHrana(hranaRowsResult); + } catch (e) { + throw mapHranaError(e); + } finally { + this._closeStream(streamState); } - - return resultSetFromHrana(hranaRowsResult); - } catch (e) { - throw mapHranaError(e); - } finally { - this._closeStream(streamState); - } + }); } async batch( diff --git a/packages/libsql-core/src/api.ts b/packages/libsql-core/src/api.ts index 6278fc0..3860e4d 100644 --- a/packages/libsql-core/src/api.ts +++ b/packages/libsql-core/src/api.ts @@ -46,6 +46,14 @@ export interface Config { * with the Web `Response`. */ fetch?: Function; + + /** Concurrency limit. + * + * By default, the client performs up to 20 concurrent requests. You can set this option to a higher + * number to increase the concurrency limit. You can also set this option to `undefined` to disable concurrency + * completely. + */ + concurrency?: number | undefined; } /** Representation of integers from database as JavaScript values. See {@link Config.intMode}. */ diff --git a/packages/libsql-core/src/config.ts b/packages/libsql-core/src/config.ts index 2eb4267..23000a6 100644 --- a/packages/libsql-core/src/config.ts +++ b/packages/libsql-core/src/config.ts @@ -15,6 +15,7 @@ export interface ExpandedConfig { syncInterval: number | undefined; intMode: IntMode; fetch: Function | undefined; + concurrency: number | undefined; } export type ExpandedScheme = "wss" | "ws" | "https" | "http" | "file"; @@ -31,6 +32,7 @@ export function expandConfig( ); } + const concurrency = "concurrency" in config ? config.concurrency : 20; let tls: boolean | undefined = config.tls; let authToken = config.authToken; let encryptionKey = config.encryptionKey; @@ -56,6 +58,7 @@ export function expandConfig( authToken: undefined, encryptionKey: undefined, authority: undefined, + concurrency, }; } @@ -133,5 +136,6 @@ export function expandConfig( syncInterval, intMode, fetch: config.fetch, + concurrency, }; } From c599e38652cd6dfc617caadbe28b87d77569a706 Mon Sep 17 00:00:00 2001 From: Giovanni Date: Fri, 21 Jun 2024 07:30:34 -0400 Subject: [PATCH 4/7] Add concurrency limit to batch, transaction, and executeMultiple --- packages/libsql-client/src/http.ts | 136 +++++++++++++++-------------- packages/libsql-client/src/ws.ts | 114 ++++++++++++------------ 2 files changed, 131 insertions(+), 119 deletions(-) diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 168bb46..0148736 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -79,7 +79,7 @@ export class HttpClient implements Client { #url: URL; #authToken: string | undefined; #isSchemaDatabase: boolean | undefined; - #limit: ReturnType>; + #limit: ReturnType>; /** @private */ constructor( @@ -94,7 +94,7 @@ export class HttpClient implements Client { this.protocol = "http"; this.#url = url; this.#authToken = authToken; - this.#limit = promiseLimit(concurrency); + this.#limit = promiseLimit(concurrency); } async getIsSchemaDatabase(): Promise { @@ -144,82 +144,88 @@ export class HttpClient implements Client { stmts: Array, mode: TransactionMode = "deferred", ): Promise> { - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmts = stmts.map(stmtToHrana); - const version = await this.#client.getVersion(); - - // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and - // close the stream in a single HTTP request. - let resultsPromise: Promise>; - const stream = this.#client.openStream(); + return this.#limit(async () => { try { - // It makes sense to use a SQL cache even for a single batch, because it may contain the same - // statement repeated multiple times. - const sqlCache = new SqlCache(stream, sqlCacheCapacity); - sqlCache.apply(hranaStmts); - - // TODO: we do not use a cursor here, because it would cause three roundtrips: - // 1. pipeline request to store SQL texts - // 2. cursor request - // 3. pipeline request to close the stream - const batch = stream.batch(false); - resultsPromise = executeHranaBatch( - mode, - version, - batch, - hranaStmts, - ); - } finally { - stream.closeGracefully(); - } + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmts = stmts.map(stmtToHrana); + const version = await this.#client.getVersion(); - const results = await resultsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } + // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and + // close the stream in a single HTTP request. + let resultsPromise: Promise>; + const stream = this.#client.openStream(); + try { + // It makes sense to use a SQL cache even for a single batch, because it may contain the same + // statement repeated multiple times. + const sqlCache = new SqlCache(stream, sqlCacheCapacity); + sqlCache.apply(hranaStmts); + + // TODO: we do not use a cursor here, because it would cause three roundtrips: + // 1. pipeline request to store SQL texts + // 2. cursor request + // 3. pipeline request to close the stream + const batch = stream.batch(false); + resultsPromise = executeHranaBatch( + mode, + version, + batch, + hranaStmts, + ); + } finally { + stream.closeGracefully(); + } - return results; - } catch (e) { - throw mapHranaError(e); - } + const results = await resultsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } + + return results; + } catch (e) { + throw mapHranaError(e); + } + }); } async transaction( mode: TransactionMode = "write", ): Promise { - try { - const version = await this.#client.getVersion(); - return new HttpTransaction( - this.#client.openStream(), - mode, - version, - ); - } catch (e) { - throw mapHranaError(e); - } + return this.#limit(async () => { + try { + const version = await this.#client.getVersion(); + return new HttpTransaction( + this.#client.openStream(), + mode, + version, + ); + } catch (e) { + throw mapHranaError(e); + } + }); } async executeMultiple(sql: string): Promise { - try { - // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and - // close the stream in a single HTTP request. - let promise: Promise; - const stream = this.#client.openStream(); + this.#limit(async () => { try { - promise = stream.sequence(sql); - } finally { - stream.closeGracefully(); - } + // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and + // close the stream in a single HTTP request. + let promise: Promise; + const stream = this.#client.openStream(); + try { + promise = stream.sequence(sql); + } finally { + stream.closeGracefully(); + } - await promise; - } catch (e) { - throw mapHranaError(e); - } + await promise; + } catch (e) { + throw mapHranaError(e); + } + }); } sync(): Promise { diff --git a/packages/libsql-client/src/ws.ts b/packages/libsql-client/src/ws.ts index e467d70..e180024 100644 --- a/packages/libsql-client/src/ws.ts +++ b/packages/libsql-client/src/ws.ts @@ -132,7 +132,7 @@ export class WsClient implements Client { closed: boolean; protocol: "ws"; #isSchemaDatabase: boolean | undefined; - #limit: ReturnType>; + #limit: ReturnType>; /** @private */ constructor( @@ -149,7 +149,7 @@ export class WsClient implements Client { this.#futureConnState = undefined; this.closed = false; this.protocol = "ws"; - this.#limit = promiseLimit(concurrency); + this.#limit = promiseLimit(concurrency); } async getIsSchemaDatabase(): Promise { @@ -198,67 +198,73 @@ export class WsClient implements Client { stmts: Array, mode: TransactionMode = "deferred", ): Promise> { - const streamState = await this.#openStream(); - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmts = stmts.map(stmtToHrana); - const version = await streamState.conn.client.getVersion(); - - // Schedule all operations synchronously, so they will be pipelined and executed in a single - // network roundtrip. - streamState.conn.sqlCache.apply(hranaStmts); - const batch = streamState.stream.batch(version >= 3); - const resultsPromise = executeHranaBatch( - mode, - version, - batch, - hranaStmts, - ); + return this.#limit(async () => { + const streamState = await this.#openStream(); + try { + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmts = stmts.map(stmtToHrana); + const version = await streamState.conn.client.getVersion(); - const results = await resultsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } + // Schedule all operations synchronously, so they will be pipelined and executed in a single + // network roundtrip. + streamState.conn.sqlCache.apply(hranaStmts); + const batch = streamState.stream.batch(version >= 3); + const resultsPromise = executeHranaBatch( + mode, + version, + batch, + hranaStmts, + ); + + const results = await resultsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } - return results; - } catch (e) { - throw mapHranaError(e); - } finally { - this._closeStream(streamState); - } + return results; + } catch (e) { + throw mapHranaError(e); + } finally { + this._closeStream(streamState); + } + }); } async transaction(mode: TransactionMode = "write"): Promise { - const streamState = await this.#openStream(); - try { - const version = await streamState.conn.client.getVersion(); - // the BEGIN statement will be batched with the first statement on the transaction to save a - // network roundtrip - return new WsTransaction(this, streamState, mode, version); - } catch (e) { - this._closeStream(streamState); - throw mapHranaError(e); - } + return this.#limit(async () => { + const streamState = await this.#openStream(); + try { + const version = await streamState.conn.client.getVersion(); + // the BEGIN statement will be batched with the first statement on the transaction to save a + // network roundtrip + return new WsTransaction(this, streamState, mode, version); + } catch (e) { + this._closeStream(streamState); + throw mapHranaError(e); + } + }); } async executeMultiple(sql: string): Promise { - const streamState = await this.#openStream(); - try { - // Schedule all operations synchronously, so they will be pipelined and executed in a single - // network roundtrip. - const promise = streamState.stream.sequence(sql); - streamState.stream.closeGracefully(); + this.#limit(async () => { + const streamState = await this.#openStream(); + try { + // Schedule all operations synchronously, so they will be pipelined and executed in a single + // network roundtrip. + const promise = streamState.stream.sequence(sql); + streamState.stream.closeGracefully(); - await promise; - } catch (e) { - throw mapHranaError(e); - } finally { - this._closeStream(streamState); - } + await promise; + } catch (e) { + throw mapHranaError(e); + } finally { + this._closeStream(streamState); + } + }); } sync(): Promise { From fab870eebb40527e125e502d00462c8566b406d9 Mon Sep 17 00:00:00 2001 From: Giovanni Date: Fri, 21 Jun 2024 08:02:34 -0400 Subject: [PATCH 5/7] Add type safety to the limit funcion --- packages/libsql-client/src/http.ts | 16 ++++++++++------ packages/libsql-client/src/ws.ts | 16 ++++++++++------ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 20e7e11..5aba350 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -79,7 +79,7 @@ export class HttpClient implements Client { #url: URL; #authToken: string | undefined; #isSchemaDatabase: Promise | undefined; - #limit: ReturnType>; + #promiseLimitFunction: ReturnType>; /** @private */ constructor( @@ -94,7 +94,7 @@ export class HttpClient implements Client { this.protocol = "http"; this.#url = url; this.#authToken = authToken; - this.#limit = promiseLimit(concurrency); + this.#promiseLimitFunction = promiseLimit(concurrency); } getIsSchemaDatabase(): Promise { @@ -108,8 +108,12 @@ export class HttpClient implements Client { return this.#isSchemaDatabase; } + private async limit(fn: () => Promise): Promise { + return this.#promiseLimitFunction(fn); + } + async execute(stmt: InStatement): Promise { - return this.#limit(async () => { + return this.limit(async () => { try { const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmt = stmtToHrana(stmt); @@ -144,7 +148,7 @@ export class HttpClient implements Client { stmts: Array, mode: TransactionMode = "deferred", ): Promise> { - return this.#limit(async () => { + return this.limit>(async () => { try { const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmts = stmts.map(stmtToHrana); @@ -194,7 +198,7 @@ export class HttpClient implements Client { async transaction( mode: TransactionMode = "write", ): Promise { - return this.#limit(async () => { + return this.limit(async () => { try { const version = await this.#client.getVersion(); return new HttpTransaction( @@ -209,7 +213,7 @@ export class HttpClient implements Client { } async executeMultiple(sql: string): Promise { - this.#limit(async () => { + this.limit(async () => { try { // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and // close the stream in a single HTTP request. diff --git a/packages/libsql-client/src/ws.ts b/packages/libsql-client/src/ws.ts index 6d41d62..ef7726d 100644 --- a/packages/libsql-client/src/ws.ts +++ b/packages/libsql-client/src/ws.ts @@ -132,7 +132,7 @@ export class WsClient implements Client { closed: boolean; protocol: "ws"; #isSchemaDatabase: Promise | undefined; - #limit: ReturnType>; + #promiseLimitFunction: ReturnType>; /** @private */ constructor( @@ -149,7 +149,7 @@ export class WsClient implements Client { this.#futureConnState = undefined; this.closed = false; this.protocol = "ws"; - this.#limit = promiseLimit(concurrency); + this.#promiseLimitFunction = promiseLimit(concurrency); } getIsSchemaDatabase(): Promise { @@ -163,8 +163,12 @@ export class WsClient implements Client { return this.#isSchemaDatabase; } + private async limit(fn: () => Promise): Promise { + return this.#promiseLimitFunction(fn); + } + async execute(stmt: InStatement): Promise { - return this.#limit(async () => { + return this.limit(async () => { const streamState = await this.#openStream(); try { const isSchemaDatabasePromise = this.getIsSchemaDatabase(); @@ -198,7 +202,7 @@ export class WsClient implements Client { stmts: Array, mode: TransactionMode = "deferred", ): Promise> { - return this.#limit(async () => { + return this.limit>(async () => { const streamState = await this.#openStream(); try { const isSchemaDatabasePromise = this.getIsSchemaDatabase(); @@ -235,7 +239,7 @@ export class WsClient implements Client { } async transaction(mode: TransactionMode = "write"): Promise { - return this.#limit(async () => { + return this.limit(async () => { const streamState = await this.#openStream(); try { const version = await streamState.conn.client.getVersion(); @@ -250,7 +254,7 @@ export class WsClient implements Client { } async executeMultiple(sql: string): Promise { - this.#limit(async () => { + this.limit(async () => { const streamState = await this.#openStream(); try { // Schedule all operations synchronously, so they will be pipelined and executed in a single From 350fb3c980d2d0e794f68b2b9bb4de166f9a8a10 Mon Sep 17 00:00:00 2001 From: Giovanni Date: Fri, 21 Jun 2024 10:12:53 -0400 Subject: [PATCH 6/7] Fix tests by returning a value from the executeMultiple method --- packages/libsql-client/src/http.ts | 2 +- packages/libsql-client/src/ws.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 5aba350..63b5f00 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -213,7 +213,7 @@ export class HttpClient implements Client { } async executeMultiple(sql: string): Promise { - this.limit(async () => { + return this.limit(async () => { try { // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and // close the stream in a single HTTP request. diff --git a/packages/libsql-client/src/ws.ts b/packages/libsql-client/src/ws.ts index ef7726d..4d3c183 100644 --- a/packages/libsql-client/src/ws.ts +++ b/packages/libsql-client/src/ws.ts @@ -254,7 +254,7 @@ export class WsClient implements Client { } async executeMultiple(sql: string): Promise { - this.limit(async () => { + return this.limit(async () => { const streamState = await this.#openStream(); try { // Schedule all operations synchronously, so they will be pipelined and executed in a single From 63f54be0f23b43c33789a3868919464f258bda18 Mon Sep 17 00:00:00 2001 From: Giovanni Date: Tue, 25 Jun 2024 08:16:26 -0400 Subject: [PATCH 7/7] Update default values for the concurrency parameter --- packages/libsql-client/src/http.ts | 2 +- packages/libsql-core/src/config.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 63b5f00..c6cc908 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -87,7 +87,7 @@ export class HttpClient implements Client { authToken: string | undefined, intMode: IntMode, customFetch: Function | undefined, - concurrency: number | undefined, + concurrency: number, ) { this.#client = hrana.openHttp(url, authToken, customFetch); this.#client.intMode = intMode; diff --git a/packages/libsql-core/src/config.ts b/packages/libsql-core/src/config.ts index 23000a6..13bfb1d 100644 --- a/packages/libsql-core/src/config.ts +++ b/packages/libsql-core/src/config.ts @@ -15,7 +15,7 @@ export interface ExpandedConfig { syncInterval: number | undefined; intMode: IntMode; fetch: Function | undefined; - concurrency: number | undefined; + concurrency: number; } export type ExpandedScheme = "wss" | "ws" | "https" | "http" | "file"; @@ -32,7 +32,7 @@ export function expandConfig( ); } - const concurrency = "concurrency" in config ? config.concurrency : 20; + const concurrency = Math.max(0, config.concurrency || 20); let tls: boolean | undefined = config.tls; let authToken = config.authToken; let encryptionKey = config.encryptionKey;