diff --git a/packages/libsql-client/src/__tests__/client.test.ts b/packages/libsql-client/src/__tests__/client.test.ts index 0bd7536..f069068 100644 --- a/packages/libsql-client/src/__tests__/client.test.ts +++ b/packages/libsql-client/src/__tests__/client.test.ts @@ -9,11 +9,6 @@ import "./helpers.js"; import type * as libsql from "../node.js"; import { createClient } from "../node.js"; -import * as migrations from "../migrations"; - -jest.spyOn(migrations, "getIsSchemaDatabase").mockImplementation( - (_params) => new Promise((resolve) => resolve(false)), -); const config = { url: process.env.URL ?? "ws://localhost:8080", diff --git a/packages/libsql-client/src/__tests__/migrations.test.ts b/packages/libsql-client/src/__tests__/migrations.test.ts deleted file mode 100644 index 52c1637..0000000 --- a/packages/libsql-client/src/__tests__/migrations.test.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { waitForLastMigrationJobToFinish } from "../migrations"; -import { server } from "./mocks/node"; - -beforeAll(() => server.listen()); -afterEach(() => server.resetHandlers()); -afterAll(() => server.close()); - -describe("waitForLastMigrationJobToFinish()", () => { - test("waits until the last job is completed", async () => { - await waitForLastMigrationJobToFinish({ - authToken: "fake-auth-token", - baseUrl: "http://fake-base-url.example.com", - }); - }); -}); diff --git a/packages/libsql-client/src/http.ts b/packages/libsql-client/src/http.ts index c6cc908..958d854 100644 --- a/packages/libsql-client/src/http.ts +++ b/packages/libsql-client/src/http.ts @@ -20,10 +20,6 @@ import { import { SqlCache } from "./sql_cache.js"; import { encodeBaseUrl } from "@libsql/core/uri"; import { supportedUrlLink } from "@libsql/core/util"; -import { - getIsSchemaDatabase, - waitForLastMigrationJobToFinish, -} from "./migrations.js"; import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; @@ -76,9 +72,7 @@ const sqlCacheCapacity = 30; export class HttpClient implements Client { #client: hrana.HttpClient; protocol: "http"; - #url: URL; #authToken: string | undefined; - #isSchemaDatabase: Promise | undefined; #promiseLimitFunction: ReturnType>; /** @private */ @@ -92,22 +86,10 @@ export class HttpClient implements Client { this.#client = hrana.openHttp(url, authToken, customFetch); this.#client.intMode = intMode; this.protocol = "http"; - this.#url = url; this.#authToken = authToken; this.#promiseLimitFunction = promiseLimit(concurrency); } - getIsSchemaDatabase(): Promise { - if (this.#isSchemaDatabase === undefined) { - this.#isSchemaDatabase = getIsSchemaDatabase({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } - - return this.#isSchemaDatabase; - } - private async limit(fn: () => Promise): Promise { return this.#promiseLimitFunction(fn); } @@ -115,7 +97,6 @@ export class HttpClient implements Client { async execute(stmt: InStatement): Promise { return this.limit(async () => { try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmt = stmtToHrana(stmt); // Pipeline all operations, so `hrana.HttpClient` can open the stream, execute the statement and @@ -129,13 +110,6 @@ export class HttpClient implements Client { } const rowsResult = await rowsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } return resultSetFromHrana(rowsResult); } catch (e) { @@ -150,7 +124,6 @@ export class HttpClient implements Client { ): Promise> { return this.limit>(async () => { try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmts = stmts.map(stmtToHrana); const version = await this.#client.getVersion(); @@ -180,13 +153,6 @@ export class HttpClient implements Client { } const results = await resultsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } return results; } catch (e) { diff --git a/packages/libsql-client/src/migrations.ts b/packages/libsql-client/src/migrations.ts deleted file mode 100644 index 3bfe473..0000000 --- a/packages/libsql-client/src/migrations.ts +++ /dev/null @@ -1,193 +0,0 @@ -type MigrationJobType = { - job_id: number; - status: string; -}; - -type ExtendedMigrationJobType = MigrationJobType & { - progress: Array<{ - namespace: string; - status: string; - error: string | null; - }>; -}; - -type MigrationResult = { - schema_version: number; - migrations: Array; -}; - -const SCHEMA_MIGRATION_SLEEP_TIME_IN_MS = 1000; -const SCHEMA_MIGRATION_MAX_RETRIES = 30; - -async function sleep(ms: number) { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); -} - -type isMigrationJobFinishedProps = { - authToken: string | undefined; - baseUrl: string; - jobId: number; -}; - -async function isMigrationJobFinished({ - authToken, - baseUrl, - jobId, -}: isMigrationJobFinishedProps): Promise { - const url = normalizeURLScheme(baseUrl + `/v1/jobs/${jobId}`); - const result = await fetch(url, { - method: "GET", - headers: { - Authorization: `Bearer ${authToken}`, - }, - }); - const json = (await result.json()) as ExtendedMigrationJobType; - const job = json as { status: string }; - if (result.status !== 200) { - throw new Error( - `Unexpected status code while fetching job status for migration with id ${jobId}: ${result.status}`, - ); - } - - if (job.status == "RunFailure") { - throw new Error("Migration job failed"); - } - - return job.status == "RunSuccess"; -} - -type getLastMigrationJobProps = { - authToken: string | undefined; - baseUrl: string; -}; - -function normalizeURLScheme(url: string) { - if (url.startsWith("ws://")) { - return url.replace("ws://", "http://"); - } - if (url.startsWith("wss://")) { - return url.replace("wss://", "https://"); - } - - return url; -} - -export async function getIsSchemaDatabase({ - authToken, - baseUrl, -}: { - authToken: string | undefined; - baseUrl: string; -}) { - let responseStatusCode; - try { - if (baseUrl.startsWith("http://127.0.0.1")) { - return false; - } - const url = normalizeURLScheme(baseUrl + "/v1/jobs"); - - const result = await fetch(url, { - method: "GET", - headers: { - Authorization: `Bearer ${authToken}`, - }, - }); - if (result.status === 404 || result.status === 500) { - return false; - } - - const json = (await result.json()) as { error: string }; - - const isChildDatabase = - result.status === 400 && json.error === "Invalid namespace"; - return !isChildDatabase; - } catch (e) { - console.error( - [ - `There has been an error while retrieving the database type.`, - `Debug information:`, - `- URL: ${baseUrl}`, - `- Response Status Code: ${ - responseStatusCode ? responseStatusCode : "N/A" - }`, - ].join("\n"), - ); - throw e; - } -} - -async function getLastMigrationJob({ - authToken, - baseUrl, -}: getLastMigrationJobProps): Promise { - const url = normalizeURLScheme(baseUrl + "/v1/jobs"); - const result = await fetch(url, { - method: "GET", - headers: { - Authorization: `Bearer ${authToken}`, - }, - }); - if (result.status !== 200) { - throw new Error( - "Unexpected status code while fetching migration jobs: " + - result.status, - ); - } - - const json = (await result.json()) as MigrationResult; - if (!json.migrations || json.migrations.length === 0) { - return null; - } - - const migrations = json.migrations || []; - let lastJob: MigrationJobType | undefined; - for (const migration of migrations) { - if (migration.job_id > (lastJob?.job_id || 0)) { - lastJob = migration; - } - } - if (!lastJob) { - throw new Error("No migration job found"); - } - if (lastJob?.status === "RunFailure") { - throw new Error("Last migration job failed"); - } - - return lastJob; -} - -type waitForLastMigrationJobToFinishProps = { - authToken: string | undefined; - baseUrl: string; -}; - -export async function waitForLastMigrationJobToFinish({ - authToken, - baseUrl, -}: getLastMigrationJobProps) { - const lastMigrationJob = await getLastMigrationJob({ - authToken: authToken, - baseUrl, - }); - if (!lastMigrationJob) { - return; - } - if (lastMigrationJob.status !== "RunSuccess") { - let i = 0; - while (i < SCHEMA_MIGRATION_MAX_RETRIES) { - i++; - const isLastMigrationJobFinished = await isMigrationJobFinished({ - authToken: authToken, - baseUrl, - jobId: lastMigrationJob.job_id, - }); - if (isLastMigrationJobFinished) { - break; - } - - await sleep(SCHEMA_MIGRATION_SLEEP_TIME_IN_MS); - } - } -} diff --git a/packages/libsql-client/src/ws.ts b/packages/libsql-client/src/ws.ts index 4d3c183..fbef981 100644 --- a/packages/libsql-client/src/ws.ts +++ b/packages/libsql-client/src/ws.ts @@ -21,10 +21,6 @@ import { import { SqlCache } from "./sql_cache.js"; import { encodeBaseUrl } from "@libsql/core/uri"; import { supportedUrlLink } from "@libsql/core/util"; -import { - getIsSchemaDatabase, - waitForLastMigrationJobToFinish, -} from "./migrations.js"; import promiseLimit from "promise-limit"; export * from "@libsql/core/api"; @@ -152,17 +148,6 @@ export class WsClient implements Client { this.#promiseLimitFunction = promiseLimit(concurrency); } - getIsSchemaDatabase(): Promise { - if (this.#isSchemaDatabase === undefined) { - this.#isSchemaDatabase = getIsSchemaDatabase({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } - - return this.#isSchemaDatabase; - } - private async limit(fn: () => Promise): Promise { return this.#promiseLimitFunction(fn); } @@ -171,7 +156,6 @@ export class WsClient implements Client { return this.limit(async () => { const streamState = await this.#openStream(); try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmt = stmtToHrana(stmt); // Schedule all operations synchronously, so they will be pipelined and executed in a single @@ -181,13 +165,6 @@ export class WsClient implements Client { streamState.stream.closeGracefully(); const hranaRowsResult = await hranaRowsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } return resultSetFromHrana(hranaRowsResult); } catch (e) { @@ -205,7 +182,6 @@ export class WsClient implements Client { return this.limit>(async () => { const streamState = await this.#openStream(); try { - const isSchemaDatabasePromise = this.getIsSchemaDatabase(); const hranaStmts = stmts.map(stmtToHrana); const version = await streamState.conn.client.getVersion(); @@ -221,13 +197,6 @@ export class WsClient implements Client { ); const results = await resultsPromise; - const isSchemaDatabase = await isSchemaDatabasePromise; - if (isSchemaDatabase) { - await waitForLastMigrationJobToFinish({ - authToken: this.#authToken, - baseUrl: this.#url.origin, - }); - } return results; } catch (e) {