Skip to content

Commit

Permalink
Merge pull request #226 from tursodatabase/add-queue-to-execute
Browse files Browse the repository at this point in the history
Add concurrency limits
  • Loading branch information
penberg authored Jun 25, 2024
2 parents e46c530 + 63f54be commit 352024e
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 170 deletions.
8 changes: 7 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/libsql-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@
"@libsql/core": "^0.6.2",
"@libsql/hrana-client": "^0.6.0",
"js-base64": "^3.7.5",
"libsql": "^0.3.10"
"libsql": "^0.3.10",
"promise-limit": "^2.7.0"
},
"devDependencies": {
"@types/jest": "^29.2.5",
Expand Down
202 changes: 112 additions & 90 deletions packages/libsql-client/src/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
getIsSchemaDatabase,
waitForLastMigrationJobToFinish,
} from "./migrations.js";
import promiseLimit from "promise-limit";

export * from "@libsql/core/api";

Expand Down Expand Up @@ -61,7 +62,13 @@ export function _createClient(config: ExpandedConfig): Client {
}

const url = encodeBaseUrl(config.scheme, config.authority, config.path);
return new HttpClient(url, config.authToken, config.intMode, config.fetch);
return new HttpClient(
url,
config.authToken,
config.intMode,
config.fetch,
config.concurrency,
);
}

const sqlCacheCapacity = 30;
Expand All @@ -72,19 +79,22 @@ export class HttpClient implements Client {
#url: URL;
#authToken: string | undefined;
#isSchemaDatabase: Promise<boolean> | undefined;
#promiseLimitFunction: ReturnType<typeof promiseLimit<any>>;

/** @private */
constructor(
url: URL,
authToken: string | undefined,
intMode: IntMode,
customFetch: Function | undefined,
concurrency: number,
) {
this.#client = hrana.openHttp(url, authToken, customFetch);
this.#client.intMode = intMode;
this.protocol = "http";
this.#url = url;
this.#authToken = authToken;
this.#promiseLimitFunction = promiseLimit<any>(concurrency);
}

getIsSchemaDatabase(): Promise<boolean> {
Expand All @@ -98,116 +108,128 @@ export class HttpClient implements Client {
return this.#isSchemaDatabase;
}

private async limit<T>(fn: () => Promise<T>): Promise<T> {
return this.#promiseLimitFunction(fn);
}

async execute(stmt: InStatement): Promise<ResultSet> {
try {
const isSchemaDatabasePromise = this.getIsSchemaDatabase();
const hranaStmt = stmtToHrana(stmt);

// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and
// close the stream in a single HTTP request.
let rowsPromise: Promise<hrana.RowsResult>;
const stream = this.#client.openStream();
return this.limit<ResultSet>(async () => {
try {
rowsPromise = stream.query(hranaStmt);
} finally {
stream.closeGracefully();
}

const rowsResult = await rowsPromise;
const isSchemaDatabase = await isSchemaDatabasePromise;
if (isSchemaDatabase) {
await waitForLastMigrationJobToFinish({
authToken: this.#authToken,
baseUrl: this.#url.origin,
});
const isSchemaDatabasePromise = this.getIsSchemaDatabase();
const hranaStmt = stmtToHrana(stmt);

// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and
// close the stream in a single HTTP request.
let rowsPromise: Promise<hrana.RowsResult>;
const stream = this.#client.openStream();
try {
rowsPromise = stream.query(hranaStmt);
} finally {
stream.closeGracefully();
}

const rowsResult = await rowsPromise;
const isSchemaDatabase = await isSchemaDatabasePromise;
if (isSchemaDatabase) {
await waitForLastMigrationJobToFinish({
authToken: this.#authToken,
baseUrl: this.#url.origin,
});
}

return resultSetFromHrana(rowsResult);
} catch (e) {
throw mapHranaError(e);
}

return resultSetFromHrana(rowsResult);
} catch (e) {
throw mapHranaError(e);
}
});
}

async batch(
stmts: Array<InStatement>,
mode: TransactionMode = "deferred",
): Promise<Array<ResultSet>> {
try {
const isSchemaDatabasePromise = this.getIsSchemaDatabase();
const hranaStmts = stmts.map(stmtToHrana);
const version = await this.#client.getVersion();

// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and
// close the stream in a single HTTP request.
let resultsPromise: Promise<Array<ResultSet>>;
const stream = this.#client.openStream();
return this.limit<Array<ResultSet>>(async () => {
try {
// It makes sense to use a SQL cache even for a single batch, because it may contain the same
// statement repeated multiple times.
const sqlCache = new SqlCache(stream, sqlCacheCapacity);
sqlCache.apply(hranaStmts);

// TODO: we do not use a cursor here, because it would cause three roundtrips:
// 1. pipeline request to store SQL texts
// 2. cursor request
// 3. pipeline request to close the stream
const batch = stream.batch(false);
resultsPromise = executeHranaBatch(
mode,
version,
batch,
hranaStmts,
);
} finally {
stream.closeGracefully();
}

const results = await resultsPromise;
const isSchemaDatabase = await isSchemaDatabasePromise;
if (isSchemaDatabase) {
await waitForLastMigrationJobToFinish({
authToken: this.#authToken,
baseUrl: this.#url.origin,
});
const isSchemaDatabasePromise = this.getIsSchemaDatabase();
const hranaStmts = stmts.map(stmtToHrana);
const version = await this.#client.getVersion();

// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the batch and
// close the stream in a single HTTP request.
let resultsPromise: Promise<Array<ResultSet>>;
const stream = this.#client.openStream();
try {
// It makes sense to use a SQL cache even for a single batch, because it may contain the same
// statement repeated multiple times.
const sqlCache = new SqlCache(stream, sqlCacheCapacity);
sqlCache.apply(hranaStmts);

// TODO: we do not use a cursor here, because it would cause three roundtrips:
// 1. pipeline request to store SQL texts
// 2. cursor request
// 3. pipeline request to close the stream
const batch = stream.batch(false);
resultsPromise = executeHranaBatch(
mode,
version,
batch,
hranaStmts,
);
} finally {
stream.closeGracefully();
}

const results = await resultsPromise;
const isSchemaDatabase = await isSchemaDatabasePromise;
if (isSchemaDatabase) {
await waitForLastMigrationJobToFinish({
authToken: this.#authToken,
baseUrl: this.#url.origin,
});
}

return results;
} catch (e) {
throw mapHranaError(e);
}

return results;
} catch (e) {
throw mapHranaError(e);
}
});
}

async transaction(
mode: TransactionMode = "write",
): Promise<HttpTransaction> {
try {
const version = await this.#client.getVersion();
return new HttpTransaction(
this.#client.openStream(),
mode,
version,
);
} catch (e) {
throw mapHranaError(e);
}
return this.limit<HttpTransaction>(async () => {
try {
const version = await this.#client.getVersion();
return new HttpTransaction(
this.#client.openStream(),
mode,
version,
);
} catch (e) {
throw mapHranaError(e);
}
});
}

async executeMultiple(sql: string): Promise<void> {
try {
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and
// close the stream in a single HTTP request.
let promise: Promise<void>;
const stream = this.#client.openStream();
return this.limit<void>(async () => {
try {
promise = stream.sequence(sql);
} finally {
stream.closeGracefully();
// Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the sequence and
// close the stream in a single HTTP request.
let promise: Promise<void>;
const stream = this.#client.openStream();
try {
promise = stream.sequence(sql);
} finally {
stream.closeGracefully();
}

await promise;
} catch (e) {
throw mapHranaError(e);
}

await promise;
} catch (e) {
throw mapHranaError(e);
}
});
}

sync(): Promise<void> {
Expand Down
Loading

0 comments on commit 352024e

Please sign in to comment.