diff --git a/package-lock.json b/package-lock.json index d143eb6..6e87be0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3996,6 +3996,11 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/promise-limit": { + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/promise-limit/-/promise-limit-2.7.0.tgz", + "integrity": "sha512-7nJ6v5lnJsXwGprnGXga4wx6d1POjvi5Qmf1ivTRxTjH4Z/9Czja/UCMLVmB9N93GeWOU93XaFaEt6jbuoagNw==" + }, "node_modules/prompts": { "version": "2.4.2", "dev": true, @@ -4712,7 +4717,8 @@ "@libsql/core": "^0.6.2", "@libsql/hrana-client": "^0.6.0", "js-base64": "^3.7.5", - "libsql": "^0.3.10" + "libsql": "^0.3.10", + "promise-limit": "^2.7.0" }, "devDependencies": { "@types/jest": "^29.2.5", diff --git a/packages/libsql-client/package.json b/packages/libsql-client/package.json index daef4f8..50a42f2 100644 --- a/packages/libsql-client/package.json +++ b/packages/libsql-client/package.json @@ -105,7 +105,8 @@ "@libsql/core": "^0.6.2", "@libsql/hrana-client": "^0.6.0", "js-base64": "^3.7.5", - "libsql": "^0.3.10" + "libsql": "^0.3.10", + "promise-limit": "^2.7.0" }, "devDependencies": { "@types/jest": "^29.2.5", diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index 1ec283b..c6cc908 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -24,6 +24,7 @@ import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; +import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; @@ -61,7 +62,13 @@ export function _createClient(config: ExpandedConfig): Client { } const url = encodeBaseUrl(config.scheme, config.authority, config.path); - return new HttpClient(url, config.authToken, config.intMode, config.fetch); + return new HttpClient( + url, + config.authToken, + config.intMode, + config.fetch, + config.concurrency, + ); } const sqlCacheCapacity = 30; @@ -72,6 +79,7 @@ export class HttpClient implements Client { #url: URL; #authToken: string | undefined; #isSchemaDatabase: Promise | undefined; + #promiseLimitFunction: ReturnType>; /** @private */ constructor( @@ -79,12 +87,14 @@ export class HttpClient implements Client { authToken: string | undefined, intMode: IntMode, customFetch: Function | undefined, + concurrency: number, ) { this.#client = hrana.openHttp(url, authToken, customFetch); this.#client.intMode = intMode; this.protocol = "http"; this.#url = url; this.#authToken = authToken; + this.#promiseLimitFunction = promiseLimit(concurrency); } getIsSchemaDatabase(): Promise { @@ -98,116 +108,128 @@ export class HttpClient implements Client { return this.#isSchemaDatabase; } + private async limit(fn: () => Promise): Promise { + return this.#promiseLimitFunction(fn); + } + async execute(stmt: InStatement): Promise { - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmt = stmtToHrana(stmt); - - // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and - // close the stream in a single HTTP request. - let rowsPromise: Promise; - const stream = this.#client.openStream(); + return this.limit(async () => { try { - rowsPromise = stream.query(hranaStmt); - } finally { - stream.closeGracefully(); - } - - const rowsResult = await rowsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmt = stmtToHrana(stmt); + + // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and + // close the stream in a single HTTP request. + let rowsPromise: Promise; + const stream = this.#client.openStream(); + try { + rowsPromise = stream.query(hranaStmt); + } finally { + stream.closeGracefully(); + } + + const rowsResult = await rowsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } + + return resultSetFromHrana(rowsResult); + } catch (e) { + throw mapHranaError(e); } - - return resultSetFromHrana(rowsResult); - } catch (e) { - throw mapHranaError(e); - } + }); } async batch( stmts: Array, mode: TransactionMode = "deferred", ): Promise> { - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmts = stmts.map(stmtToHrana); - const version = await this.#client.getVersion(); - - // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and - // close the stream in a single HTTP request. - let resultsPromise: Promise>; - const stream = this.#client.openStream(); + return this.limit>(async () => { try { - // It makes sense to use a SQL cache even for a single batch, because it may contain the same - // statement repeated multiple times. - const sqlCache = new SqlCache(stream, sqlCacheCapacity); - sqlCache.apply(hranaStmts); - - // TODO: we do not use a cursor here, because it would cause three roundtrips: - // 1. pipeline request to store SQL texts - // 2. cursor request - // 3. pipeline request to close the stream - const batch = stream.batch(false); - resultsPromise = executeHranaBatch( - mode, - version, - batch, - hranaStmts, - ); - } finally { - stream.closeGracefully(); - } - - const results = await resultsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmts = stmts.map(stmtToHrana); + const version = await this.#client.getVersion(); + + // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and + // close the stream in a single HTTP request. + let resultsPromise: Promise>; + const stream = this.#client.openStream(); + try { + // It makes sense to use a SQL cache even for a single batch, because it may contain the same + // statement repeated multiple times. + const sqlCache = new SqlCache(stream, sqlCacheCapacity); + sqlCache.apply(hranaStmts); + + // TODO: we do not use a cursor here, because it would cause three roundtrips: + // 1. pipeline request to store SQL texts + // 2. cursor request + // 3. pipeline request to close the stream + const batch = stream.batch(false); + resultsPromise = executeHranaBatch( + mode, + version, + batch, + hranaStmts, + ); + } finally { + stream.closeGracefully(); + } + + const results = await resultsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } + + return results; + } catch (e) { + throw mapHranaError(e); } - - return results; - } catch (e) { - throw mapHranaError(e); - } + }); } async transaction( mode: TransactionMode = "write", ): Promise { - try { - const version = await this.#client.getVersion(); - return new HttpTransaction( - this.#client.openStream(), - mode, - version, - ); - } catch (e) { - throw mapHranaError(e); - } + return this.limit(async () => { + try { + const version = await this.#client.getVersion(); + return new HttpTransaction( + this.#client.openStream(), + mode, + version, + ); + } catch (e) { + throw mapHranaError(e); + } + }); } async executeMultiple(sql: string): Promise { - try { - // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and - // close the stream in a single HTTP request. - let promise: Promise; - const stream = this.#client.openStream(); + return this.limit(async () => { try { - promise = stream.sequence(sql); - } finally { - stream.closeGracefully(); + // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and + // close the stream in a single HTTP request. + let promise: Promise; + const stream = this.#client.openStream(); + try { + promise = stream.sequence(sql); + } finally { + stream.closeGracefully(); + } + + await promise; + } catch (e) { + throw mapHranaError(e); } - - await promise; - } catch (e) { - throw mapHranaError(e); - } + }); } sync(): Promise { diff --git a/packages/libsql-client/src/ws.ts b/packages/libsql-client/src/ws.ts index 91088d8..4d3c183 100644 --- a/packages/libsql-client/src/ws.ts +++ b/packages/libsql-client/src/ws.ts @@ -25,6 +25,7 @@ import { getIsSchemaDatabase, waitForLastMigrationJobToFinish, } from "./migrations.js"; +import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; @@ -84,7 +85,13 @@ export function _createClient(config: ExpandedConfig): WsClient { throw mapHranaError(e); } - return new WsClient(client, url, config.authToken, config.intMode); + return new WsClient( + client, + url, + config.authToken, + config.intMode, + config.concurrency, + ); } // This object maintains state for a single WebSocket connection. @@ -125,6 +132,7 @@ export class WsClient implements Client { closed: boolean; protocol: "ws"; #isSchemaDatabase: Promise | undefined; + #promiseLimitFunction: ReturnType>; /** @private */ constructor( @@ -132,6 +140,7 @@ export class WsClient implements Client { url: URL, authToken: string | undefined, intMode: IntMode, + concurrency: number | undefined, ) { this.#url = url; this.#authToken = authToken; @@ -140,6 +149,7 @@ export class WsClient implements Client { this.#futureConnState = undefined; this.closed = false; this.protocol = "ws"; + this.#promiseLimitFunction = promiseLimit(concurrency); } getIsSchemaDatabase(): Promise { @@ -153,100 +163,112 @@ export class WsClient implements Client { return this.#isSchemaDatabase; } + private async limit(fn: () => Promise): Promise { + return this.#promiseLimitFunction(fn); + } + async execute(stmt: InStatement): Promise { - const streamState = await this.#openStream(); - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmt = stmtToHrana(stmt); - - // Schedule all operations synchronously, so they will be pipelined and executed in a single - // network roundtrip. - streamState.conn.sqlCache.apply([hranaStmt]); - const hranaRowsPromise = streamState.stream.query(hranaStmt); - streamState.stream.closeGracefully(); - - const hranaRowsResult = await hranaRowsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } + return this.limit(async () => { + const streamState = await this.#openStream(); + try { + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmt = stmtToHrana(stmt); + + // Schedule all operations synchronously, so they will be pipelined and executed in a single + // network roundtrip. + streamState.conn.sqlCache.apply([hranaStmt]); + const hranaRowsPromise = streamState.stream.query(hranaStmt); + streamState.stream.closeGracefully(); + + const hranaRowsResult = await hranaRowsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } - return resultSetFromHrana(hranaRowsResult); - } catch (e) { - throw mapHranaError(e); - } finally { - this._closeStream(streamState); - } + return resultSetFromHrana(hranaRowsResult); + } catch (e) { + throw mapHranaError(e); + } finally { + this._closeStream(streamState); + } + }); } async batch( stmts: Array, mode: TransactionMode = "deferred", ): Promise> { - const streamState = await this.#openStream(); - try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); - const hranaStmts = stmts.map(stmtToHrana); - const version = await streamState.conn.client.getVersion(); - - // Schedule all operations synchronously, so they will be pipelined and executed in a single - // network roundtrip. - streamState.conn.sqlCache.apply(hranaStmts); - const batch = streamState.stream.batch(version >= 3); - const resultsPromise = executeHranaBatch( - mode, - version, - batch, - hranaStmts, - ); + return this.limit>(async () => { + const streamState = await this.#openStream(); + try { + const isSchemaDatabasePromise = this.getIsSchemaDatabase(); + const hranaStmts = stmts.map(stmtToHrana); + const version = await streamState.conn.client.getVersion(); + + // Schedule all operations synchronously, so they will be pipelined and executed in a single + // network roundtrip. + streamState.conn.sqlCache.apply(hranaStmts); + const batch = streamState.stream.batch(version >= 3); + const resultsPromise = executeHranaBatch( + mode, + version, + batch, + hranaStmts, + ); + + const results = await resultsPromise; + const isSchemaDatabase = await isSchemaDatabasePromise; + if (isSchemaDatabase) { + await waitForLastMigrationJobToFinish({ + authToken: this.#authToken, + baseUrl: this.#url.origin, + }); + } - const results = await resultsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); + return results; + } catch (e) { + throw mapHranaError(e); + } finally { + this._closeStream(streamState); } - - return results; - } catch (e) { - throw mapHranaError(e); - } finally { - this._closeStream(streamState); - } + }); } async transaction(mode: TransactionMode = "write"): Promise { - const streamState = await this.#openStream(); - try { - const version = await streamState.conn.client.getVersion(); - // the BEGIN statement will be batched with the first statement on the transaction to save a - // network roundtrip - return new WsTransaction(this, streamState, mode, version); - } catch (e) { - this._closeStream(streamState); - throw mapHranaError(e); - } + return this.limit(async () => { + const streamState = await this.#openStream(); + try { + const version = await streamState.conn.client.getVersion(); + // the BEGIN statement will be batched with the first statement on the transaction to save a + // network roundtrip + return new WsTransaction(this, streamState, mode, version); + } catch (e) { + this._closeStream(streamState); + throw mapHranaError(e); + } + }); } async executeMultiple(sql: string): Promise { - const streamState = await this.#openStream(); - try { - // Schedule all operations synchronously, so they will be pipelined and executed in a single - // network roundtrip. - const promise = streamState.stream.sequence(sql); - streamState.stream.closeGracefully(); + return this.limit(async () => { + const streamState = await this.#openStream(); + try { + // Schedule all operations synchronously, so they will be pipelined and executed in a single + // network roundtrip. + const promise = streamState.stream.sequence(sql); + streamState.stream.closeGracefully(); - await promise; - } catch (e) { - throw mapHranaError(e); - } finally { - this._closeStream(streamState); - } + await promise; + } catch (e) { + throw mapHranaError(e); + } finally { + this._closeStream(streamState); + } + }); } sync(): Promise { diff --git a/packages/libsql-core/src/api.ts b/packages/libsql-core/src/api.ts index 6278fc0..3860e4d 100644 --- a/packages/libsql-core/src/api.ts +++ b/packages/libsql-core/src/api.ts @@ -46,6 +46,14 @@ export interface Config { * with the Web `Response`. */ fetch?: Function; + + /** Concurrency limit. + * + * By default, the client performs up to 20 concurrent requests. You can set this option to a higher + * number to increase the concurrency limit. You can also set this option to `undefined` to disable concurrency + * completely. + */ + concurrency?: number | undefined; } /** Representation of integers from database as JavaScript values. See {@link Config.intMode}. */ diff --git a/packages/libsql-core/src/config.ts b/packages/libsql-core/src/config.ts index 2eb4267..13bfb1d 100644 --- a/packages/libsql-core/src/config.ts +++ b/packages/libsql-core/src/config.ts @@ -15,6 +15,7 @@ export interface ExpandedConfig { syncInterval: number | undefined; intMode: IntMode; fetch: Function | undefined; + concurrency: number; } export type ExpandedScheme = "wss" | "ws" | "https" | "http" | "file"; @@ -31,6 +32,7 @@ export function expandConfig( ); } + const concurrency = Math.max(0, config.concurrency || 20); let tls: boolean | undefined = config.tls; let authToken = config.authToken; let encryptionKey = config.encryptionKey; @@ -56,6 +58,7 @@ export function expandConfig( authToken: undefined, encryptionKey: undefined, authority: undefined, + concurrency, }; } @@ -133,5 +136,6 @@ export function expandConfig( syncInterval, intMode, fetch: config.fetch, + concurrency, }; }