From 7d5841d5a06d7ff5e7f3132bdf52ad70016a7946 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halvard=20M=C3=B8rstad?= Date: Tue, 16 Apr 2024 00:52:42 +0200 Subject: [PATCH] added pool support --- lib/client.ts | 7 +- lib/connection.test.ts | 129 +++++++++ lib/connection.ts | 11 +- lib/deferred.ts | 71 ----- lib/pool.ts | 383 +++++++++++++++++++------- lib/{client.test.ts => sqlx.test.ts} | 4 +- lib/utils/deferred.ts | 51 ++++ lib/utils/events.ts | 71 +++++ testold.ts | 394 --------------------------- 9 files changed, 544 insertions(+), 577 deletions(-) delete mode 100644 lib/deferred.ts rename lib/{client.test.ts => sqlx.test.ts} (89%) create mode 100644 lib/utils/deferred.ts create mode 100644 lib/utils/events.ts delete mode 100644 testold.ts diff --git a/lib/client.ts b/lib/client.ts index 6779730..13df578 100644 --- a/lib/client.ts +++ b/lib/client.ts @@ -16,10 +16,7 @@ import { } from "@halvardm/sqlx"; import { MysqlConnection, type MysqlConnectionOptions } from "./connection.ts"; import { buildQuery } from "./packets/builders/query.ts"; -import { - getRowObject, - type MysqlParameterType, -} from "./packets/parsers/result.ts"; +import type { MysqlParameterType } from "./packets/parsers/result.ts"; export interface MysqlTransactionOptions extends SqlxTransactionOptions { beginTransactionOptions: { @@ -346,7 +343,7 @@ export class MysqlClient extends MysqlTransactionable implements > { readonly connectionUrl: string; readonly connectionOptions: MysqlConnectionOptions; - readonly eventTarget: EventTarget; + eventTarget: EventTarget; get connected(): boolean { throw new Error("Method not implemented."); } diff --git a/lib/connection.test.ts b/lib/connection.test.ts index bf15dc9..98408bb 100644 --- a/lib/connection.test.ts +++ b/lib/connection.test.ts @@ -197,6 +197,135 @@ Deno.test("Connection", async (t) => { } }); + await t.step("can parse time", async () => { + const data = buildQuery(`SELECT CAST("09:04:10" AS time) as time`); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: ["09:04:10"], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 128, + fieldLen: 10, + fieldType: 11, + name: "time", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse date", async () => { + const data = buildQuery( + `SELECT CAST("2024-04-15 09:04:10" AS date) as date`, + ); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [new Date("2024-04-15T00:00:00.000Z")], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 128, + fieldLen: 10, + fieldType: 10, + name: "date", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse bigint", async () => { + const data = buildQuery(`SELECT 9223372036854775807 as result`); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [9223372036854775807n], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 129, + fieldLen: 20, + fieldType: 8, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse decimal", async () => { + const data = buildQuery( + `SELECT 0.012345678901234567890123456789 as result`, + ); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: ["0.012345678901234567890123456789"], + fields: [ + { + catalog: "def", + decimals: 30, + defaultVal: "", + encoding: 63, + fieldFlag: 129, + fieldLen: 33, + fieldType: 246, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse empty string", async () => { + const data = buildQuery(`SELECT '' as result`); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [""], + fields: [ + { + catalog: "def", + decimals: 31, + defaultVal: "", + encoding: 33, + fieldFlag: 1, + fieldLen: 0, + fieldType: 253, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + await t.step("can drop and create table", async () => { const dropTableSql = buildQuery("DROP TABLE IF EXISTS test;"); const dropTableReturned = connection.sendData(dropTableSql); diff --git a/lib/connection.ts b/lib/connection.ts index 8c361e9..4795f36 100644 --- a/lib/connection.ts +++ b/lib/connection.ts @@ -1,6 +1,5 @@ import { MysqlConnectionError, - MysqlError, MysqlProtocolError, MysqlReadError, MysqlResponseTimeoutError, @@ -14,7 +13,7 @@ import { parseHandshake, } from "./packets/parsers/handshake.ts"; import { - ConvertTypeOptions, + type ConvertTypeOptions, type FieldInfo, getRowObject, type MysqlParameterType, @@ -38,7 +37,6 @@ import { VERSION } from "./utils/meta.ts"; import { resolve } from "@std/path"; import { toCamelCase } from "@std/text"; import { AuthPluginName } from "./auth_plugins/mod.ts"; -import type { MysqlQueryOptions } from "./client.ts"; /** * Connection state @@ -164,7 +162,9 @@ export class MysqlConnection set conn(conn: Deno.Conn | null) { this._conn = conn; } - + get connected(): boolean { + return this.state === ConnectionState.CONNECTED; + } constructor( connectionUrl: string | URL, connectionOptions: MysqlConnectionOptions = {}, @@ -176,9 +176,6 @@ export class MysqlConnection connectionOptions, ); } - get connected(): boolean { - return this.state === ConnectionState.CONNECTED; - } async connect(): Promise { // TODO: implement connect timeout diff --git a/lib/deferred.ts b/lib/deferred.ts deleted file mode 100644 index cd9fcc2..0000000 --- a/lib/deferred.ts +++ /dev/null @@ -1,71 +0,0 @@ -/** @ignore */ -export class DeferredStack { - private _queue: PromiseWithResolvers[] = []; - private _size = 0; - - constructor( - readonly _maxSize: number, - private _array: T[] = [], - private readonly creator: () => Promise, - ) { - this._size = _array.length; - } - - get size(): number { - return this._size; - } - - get maxSize(): number { - return this._maxSize; - } - - get available(): number { - return this._array.length; - } - - async pop(): Promise { - if (this._array.length) { - return this._array.pop()!; - } else if (this._size < this._maxSize) { - this._size++; - let item: T; - try { - item = await this.creator(); - } catch (err) { - this._size--; - throw err; - } - return item; - } - const defer = Promise.withResolvers(); - this._queue.push(defer); - return await defer.promise; - } - - /** Returns false if the item is consumed by a deferred pop */ - push(item: T): boolean { - if (this._queue.length) { - this._queue.shift()!.resolve(item); - return false; - } else { - this._array.push(item); - return true; - } - } - - tryPopAvailable() { - return this._array.pop(); - } - - remove(item: T): boolean { - const index = this._array.indexOf(item); - if (index < 0) return false; - this._array.splice(index, 1); - this._size--; - return true; - } - - reduceSize() { - this._size--; - } -} diff --git a/lib/pool.ts b/lib/pool.ts index c4a38e6..987b3d2 100644 --- a/lib/pool.ts +++ b/lib/pool.ts @@ -1,129 +1,314 @@ -import { DeferredStack } from "./deferred.ts"; -import { Connection } from "./connection.ts"; +import { + type SqlxConnectionPool, + type SqlxConnectionPoolOptions, + SqlxError, + type SqlxPoolConnection, + type SqlxPoolConnectionEventType, + VERSION, +} from "@halvardm/sqlx"; +import { + MysqlClient, + type MysqlPrepared, + type MysqlQueryOptions, + type MySqlTransaction, + type MysqlTransactionOptions, +} from "./client.ts"; +import type { MysqlConnectionOptions } from "./connection.ts"; +import type { MysqlParameterType } from "./packets/parsers/result.ts"; +import { DeferredStack } from "./utils/deferred.ts"; +import type { ArrayRow, Row } from "../../deno-sqlx/lib/interfaces.ts"; +import { + MysqlPoolConnectionAcquireEvent, + MysqlPoolConnectionDestroyEvent, + MysqlPoolConnectionReleaseEvent, +} from "./utils/events.ts"; import { logger } from "./utils/logger.ts"; import { MysqlError } from "./utils/errors.ts"; -/** @ignore */ -export class PoolConnection extends Connection { - _pool?: ConnectionPool = undefined; - - private _idleTimer?: number = undefined; - private _idle = false; +export interface MysqlClientPoolOptions + extends MysqlConnectionOptions, SqlxConnectionPoolOptions { +} - /** - * Should be called by the pool. - */ - enterIdle() { - this._idle = true; - if (this.config.idleTimeout) { - this._idleTimer = setTimeout(() => { - logger().info("connection idle timeout"); - this._pool!.remove(this); - try { - this.close(); - } catch (error) { - logger().warn(`error closing idle connection`, error); - } - }, this.config.idleTimeout); - try { - // Don't block the event loop from finishing - Deno.unrefTimer(this._idleTimer); - } catch (_error) { - // unrefTimer() is unstable API in older version of Deno - } - } +export class MysqlPoolClient extends MysqlClient implements + SqlxPoolConnection< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction + > { + release(): Promise { + throw new Error("Method not implemented."); } +} - /** - * Should be called by the pool. - */ - exitIdle() { - this._idle = false; - if (this._idleTimer !== undefined) { - clearTimeout(this._idleTimer); - } +export class MysqlClientPool implements + SqlxConnectionPool< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + SqlxPoolConnectionEventType, + MysqlClientPoolOptions, + MysqlPoolClient, + DeferredStack + > { + readonly sqlxVersion: string = VERSION; + readonly connectionUrl: string; + readonly connectionOptions: MysqlClientPoolOptions; + readonly queryOptions: MysqlQueryOptions; + readonly eventTarget: EventTarget; + readonly deferredStack: DeferredStack; + get connected(): boolean { + throw new Error("Method not implemented."); } - /** - * Remove the connection from the pool permanently, when the connection is not usable. - */ - removeFromPool() { - this._pool!.reduceSize(); - this._pool = undefined; + constructor( + connectionUrl: string | URL, + connectionOptions: MysqlClientPoolOptions = {}, + ) { + this.connectionUrl = connectionUrl.toString(); + this.connectionOptions = connectionOptions; + this.queryOptions = connectionOptions; + this.eventTarget = new EventTarget(); + this.deferredStack = new DeferredStack(connectionOptions); } - returnToPool() { - this._pool?.push(this); + async execute( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + const conn = await this.acquire(); + let res: number | undefined = undefined; + let err: Error | undefined = undefined; + try { + res = await conn.execute(sql, params, options); + } catch (e) { + err = e; + } + await this.release(conn); + if (err) { + throw err; + } + return res; } -} - -/** @ignore */ -export class ConnectionPool { - _deferred: DeferredStack; - _connections: PoolConnection[] = []; - _closed: boolean = false; - - constructor(maxSize: number, creator: () => Promise) { - this._deferred = new DeferredStack(maxSize, this._connections, async () => { - const conn = await creator(); - conn._pool = this; - return conn; - }); + query< + T extends Row = Row< + MysqlParameterType + >, + >( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queryWrapper((conn) => conn.query(sql, params, options)); } - - get info() { - return { - size: this._deferred.size, - maxSize: this._deferred.maxSize, - available: this._deferred.available, - }; + queryOne< + T extends Row = Row< + MysqlParameterType + >, + >( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queryWrapper((conn) => conn.queryOne(sql, params, options)); } - - push(conn: PoolConnection) { - if (this._closed) { - conn.close(); - this.reduceSize(); + async *queryMany< + T extends Row = Row< + MysqlParameterType + >, + >( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): AsyncGenerator { + const conn = await this.acquire(); + let err: Error | undefined = undefined; + try { + for await (const row of conn.queryMany(sql, params, options)) { + yield row; + } + } catch (e) { + err = e; } - if (this._deferred.push(conn)) { - conn.enterIdle(); + await this.release(conn); + if (err) { + throw err; } } + queryArray< + T extends ArrayRow = ArrayRow< + MysqlParameterType + >, + >( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queryWrapper((conn) => + conn.queryArray(sql, params, options) + ); + } + queryOneArray< + T extends ArrayRow = ArrayRow< + MysqlParameterType + >, + >( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queryWrapper((conn) => + conn.queryOneArray(sql, params, options) + ); + } + async *queryManyArray< + T extends ArrayRow = ArrayRow< + MysqlParameterType + >, + >( + sql: string, + params?: (MysqlParameterType)[] | undefined, + options?: MysqlQueryOptions | undefined, + ): AsyncGenerator { + const conn = await this.acquire(); + let err: Error | undefined = undefined; + try { + for await (const row of conn.queryManyArray(sql, params, options)) { + yield row; + } + } catch (e) { + err = e; + } + await this.release(conn); + if (err) { + throw err; + } + } + sql< + T extends Row = Row< + MysqlParameterType + >, + >( + strings: TemplateStringsArray, + ...parameters: (MysqlParameterType)[] + ): Promise { + return this.#queryWrapper((conn) => conn.sql(strings, ...parameters)); + } + sqlArray< + T extends ArrayRow = ArrayRow< + MysqlParameterType + >, + >( + strings: TemplateStringsArray, + ...parameters: (MysqlParameterType)[] + ): Promise { + return this.#queryWrapper((conn) => + conn.sqlArray(strings, ...parameters) + ); + } - async pop(): Promise { - if (this._closed) { - throw new MysqlError("Connection pool is closed"); + beginTransaction( + options?: { + withConsistentSnapshot?: boolean | undefined; + readWrite?: "READ WRITE" | "READ ONLY" | undefined; + } | undefined, + ): Promise { + return this.#queryWrapper((conn) => conn.beginTransaction(options)); + } + transaction(fn: (t: MySqlTransaction) => Promise): Promise { + return this.#queryWrapper((conn) => conn.transaction(fn)); + } + async connect(): Promise { + for (let i = 0; i < this.deferredStack.maxSize; i++) { + const client = new MysqlPoolClient( + this.connectionUrl, + this.connectionOptions, + ); + client.release = () => this.release(client); + client.eventTarget = this.eventTarget; + if (!this.connectionOptions.lazyInitialization) { + await client.connect(); + } + this.deferredStack.push(client); } - let conn = this._deferred.tryPopAvailable(); - if (conn) { - conn.exitIdle(); - } else { - conn = await this._deferred.pop(); + } + async close(): Promise { + for (const client of this.deferredStack.stack) { + await client.close(); } + } + addEventListener( + type: SqlxPoolConnectionEventType, + listener: EventListenerOrEventListenerObject | null, + options?: boolean | AddEventListenerOptions | undefined, + ): void { + return this.eventTarget.addEventListener(type, listener, options); + } + removeEventListener( + type: SqlxPoolConnectionEventType, + callback: EventListenerOrEventListenerObject | null, + options?: boolean | EventListenerOptions | undefined, + ): void { + return this.eventTarget.removeEventListener(type, callback, options); + } + + dispatchEvent(event: Event): boolean { + return this.eventTarget.dispatchEvent(event); + } + + async acquire(): Promise { + const conn = await this.deferredStack.pop(); + dispatchEvent(new MysqlPoolConnectionAcquireEvent({ connection: conn })); return conn; } - remove(conn: PoolConnection) { - return this._deferred.remove(conn); + async release(connection: MysqlPoolClient): Promise { + dispatchEvent( + new MysqlPoolConnectionReleaseEvent({ connection: connection }), + ); + try { + this.deferredStack.push(connection); + } catch (e) { + if (e instanceof SqlxError && e.message === "Max pool size reached") { + logger().debug(e.message); + await connection.close(); + } else { + throw e; + } + } } - /** - * Close the pool and all connections in the pool. - * - * After closing, pop() will throw an error, - * push() will close the connection immediately. - */ - close() { - this._closed = true; + async destroy(connection: MysqlPoolClient): Promise { + dispatchEvent( + new MysqlPoolConnectionDestroyEvent({ connection: connection }), + ); + await connection.close(); + } - let conn: PoolConnection | undefined; - while (conn = this._deferred.tryPopAvailable()) { - conn.exitIdle(); - conn.close(); - this.reduceSize(); + async #queryWrapper(fn: (connection: MysqlClient) => Promise) { + const conn = await this.acquire(); + let res: T | undefined = undefined; + let err: Error | undefined = undefined; + try { + res = await fn(conn); + } catch (e) { + err = e; + } + await this.release(conn); + if (err) { + throw err; + } + if (!res) { + throw new MysqlError("No result"); } + return res; } - reduceSize() { - this._deferred.reduceSize(); + async [Symbol.asyncDispose](): Promise { + await this.close(); } } diff --git a/lib/client.test.ts b/lib/sqlx.test.ts similarity index 89% rename from lib/client.test.ts rename to lib/sqlx.test.ts index 2508467..04397cf 100644 --- a/lib/client.test.ts +++ b/lib/sqlx.test.ts @@ -1,11 +1,13 @@ import { MysqlClient } from "./client.ts"; +import { MysqlClientPool } from "./pool.ts"; import { URL_TEST_CONNECTION } from "./utils/testing.ts"; import { implementationTest } from "@halvardm/sqlx/testing"; -Deno.test("MysqlClient", async (t) => { +Deno.test("MySQL SQLx", async (t) => { await implementationTest({ t, Client: MysqlClient, + PoolClient: MysqlClientPool as any, connectionUrl: URL_TEST_CONNECTION, connectionOptions: {}, queries: { diff --git a/lib/utils/deferred.ts b/lib/utils/deferred.ts new file mode 100644 index 0000000..02aae1f --- /dev/null +++ b/lib/utils/deferred.ts @@ -0,0 +1,51 @@ +import { + type SqlxConnectionPoolOptions, + type SqlxDeferredStack, + SqlxError, +} from "@halvardm/sqlx"; +import type { MysqlPoolClient } from "../pool.ts"; + +export type DeferredStackOptions = SqlxConnectionPoolOptions; + +export class DeferredStack implements SqlxDeferredStack { + readonly maxSize: number; + stack: Array; + queue: Array>; + + get availableCount(): number { + return this.stack.length; + } + get queuedCount(): number { + return this.queue.length; + } + constructor(options: DeferredStackOptions) { + this.maxSize = options.poolSize ?? 10; + this.stack = []; + this.queue = []; + } + + push(client: MysqlPoolClient): void { + if (this.queue.length) { + const p = this.queue.shift()!; + p.resolve(client); + } else if (this.queue.length >= this.maxSize) { + throw new SqlxError("Max pool size reached"); + } else { + this.stack.push(client); + } + } + + async pop(): Promise { + const res = this.stack.pop(); + + if (res) { + await res.connect(); + return res; + } + + const p = Promise.withResolvers(); + this.queue.push(p); + + return p.promise; + } +} diff --git a/lib/utils/events.ts b/lib/utils/events.ts new file mode 100644 index 0000000..5c5df33 --- /dev/null +++ b/lib/utils/events.ts @@ -0,0 +1,71 @@ +import { + type SqlxConnectionEventInit, + SqlxPoolConnectionAcquireEvent, + SqlxPoolConnectionDestroyEvent, + SqlxPoolConnectionReleaseEvent, +} from "@halvardm/sqlx"; +import type { MysqlParameterType } from "../packets/parsers/result.ts"; +import type { + MysqlPrepared, + MysqlQueryOptions, + MySqlTransaction, + MysqlTransactionOptions, +} from "../client.ts"; +import type { MysqlPoolClient } from "../pool.ts"; + +export class MysqlPoolConnectionAcquireEvent + extends SqlxPoolConnectionAcquireEvent< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient, + SqlxConnectionEventInit< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient + > + > { +} + +export class MysqlPoolConnectionReleaseEvent + extends SqlxPoolConnectionReleaseEvent< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient, + SqlxConnectionEventInit< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient + > + > { +} + +export class MysqlPoolConnectionDestroyEvent + extends SqlxPoolConnectionDestroyEvent< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient, + SqlxConnectionEventInit< + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient + > + > { +} diff --git a/testold.ts b/testold.ts deleted file mode 100644 index 9463348..0000000 --- a/testold.ts +++ /dev/null @@ -1,394 +0,0 @@ -import { assertEquals, assertRejects } from "@std/assert"; -import { lessThan, parse } from "@std/semver"; -import { - MysqlConnectionError, - MysqlResponseTimeoutError, -} from "./lib/utils/errors.ts"; -import { - createTestDB, - delay, - isMariaDB, - registerTests, - testWithClient, -} from "./lib/utils/testing.ts"; -import * as stdlog from "@std/log"; -import { configLogger } from "./mod.ts"; -import { logger } from "./lib/logger.ts"; - -testWithClient(async function testCreateDb(client) { - await client.query(`CREATE DATABASE IF NOT EXISTS enok`); -}); - -testWithClient(async function testCreateTable(client) { - await client.query(`DROP TABLE IF EXISTS users`); - await client.query(` - CREATE TABLE users ( - id int(11) NOT NULL AUTO_INCREMENT, - name varchar(100) NOT NULL, - is_top tinyint(1) default 0, - created_at timestamp not null default current_timestamp, - PRIMARY KEY (id) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - `); -}); - -testWithClient(async function testInsert(client) { - let result = await client.execute(`INSERT INTO users(name) values(?)`, [ - "manyuanrong", - ]); - assertEquals(result, { affectedRows: 1, lastInsertId: 1 }); - result = await client.execute(`INSERT INTO users ?? values ?`, [ - ["id", "name"], - [2, "MySQL"], - ]); - assertEquals(result, { affectedRows: 1, lastInsertId: 2 }); -}); - -testWithClient(async function testUpdate(client) { - let result = await client.execute( - `update users set ?? = ?, ?? = ? WHERE id = ?`, - ["name", "MYR🦕", "created_at", new Date(), 1], - ); - assertEquals(result, { affectedRows: 1, lastInsertId: 0 }); -}); - -testWithClient(async function testQuery(client) { - let result = await client.query( - "select ??,`is_top`,`name` from ?? where id = ?", - ["id", "users", 1], - ); - assertEquals(result, [{ id: 1, name: "MYR🦕", is_top: 0 }]); -}); - -testWithClient(async function testQueryErrorOccurred(client) { - assertEquals(client.pool, { - size: 0, - maxSize: client.config.poolSize, - available: 0, - }); - await assertRejects( - () => client.query("select unknownfield from `users`"), - Error, - ); - await client.query("select 1"); - assertEquals(client.pool, { - size: 1, - maxSize: client.config.poolSize, - available: 1, - }); -}); - -testWithClient(async function testQueryList(client) { - const sql = "select ??,?? from ??"; - let result = await client.query(sql, ["id", "name", "users"]); - assertEquals(result, [ - { id: 1, name: "MYR🦕" }, - { id: 2, name: "MySQL" }, - ]); -}); - -testWithClient(async function testQueryTime(client) { - const sql = `SELECT CAST("09:04:10" AS time) as time`; - let result = await client.query(sql); - assertEquals(result, [{ time: "09:04:10" }]); -}); - -testWithClient(async function testQueryBigint(client) { - await client.query(`DROP TABLE IF EXISTS test_bigint`); - await client.query(`CREATE TABLE test_bigint ( - id int(11) NOT NULL AUTO_INCREMENT, - bigint_column bigint NOT NULL, - PRIMARY KEY (id) - ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); - - const value = "9223372036854775807"; - await client.execute( - "INSERT INTO test_bigint(bigint_column) VALUES (?)", - [value], - ); - - const result = await client.query("SELECT bigint_column FROM test_bigint"); - assertEquals(result, [{ bigint_column: BigInt(value) }]); -}); - -testWithClient(async function testQueryDecimal(client) { - await client.query(`DROP TABLE IF EXISTS test_decimal`); - await client.query(`CREATE TABLE test_decimal ( - id int(11) NOT NULL AUTO_INCREMENT, - decimal_column decimal(65,30) NOT NULL, - PRIMARY KEY (id) - ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); - - const value = "0.012345678901234567890123456789"; - await client.execute( - "INSERT INTO test_decimal(decimal_column) VALUES (?)", - [value], - ); - - const result = await client.query("SELECT decimal_column FROM test_decimal"); - assertEquals(result, [{ decimal_column: value }]); -}); - -testWithClient(async function testQueryDatetime(client) { - await client.useConnection(async (connection) => { - if ( - isMariaDB(connection) || - lessThan(parse(connection.serverVersion), parse("5.6.0")) - ) { - return; - } - - await client.query(`DROP TABLE IF EXISTS test_datetime`); - await client.query(`CREATE TABLE test_datetime ( - id int(11) NOT NULL AUTO_INCREMENT, - datetime datetime(6) NOT NULL, - PRIMARY KEY (id) - ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); - const datetime = new Date(); - await client.execute( - ` - INSERT INTO test_datetime (datetime) - VALUES (?)`, - [datetime], - ); - - const [row] = await client.query("SELECT datetime FROM test_datetime"); - assertEquals(row.datetime.toISOString(), datetime.toISOString()); // See https://github.com/denoland/deno/issues/6643 - }); -}); - -testWithClient(async function testDelete(client) { - let result = await client.execute(`delete from users where ?? = ?`, [ - "id", - 1, - ]); - assertEquals(result, { affectedRows: 1, lastInsertId: 0 }); -}); - -testWithClient(async function testPool(client) { - assertEquals(client.pool, { - maxSize: client.config.poolSize, - available: 0, - size: 0, - }); - const expect = new Array(10).fill([{ "1": 1 }]); - const result = await Promise.all(expect.map(() => client.query(`select 1`))); - - assertEquals(client.pool, { - maxSize: client.config.poolSize, - available: 3, - size: 3, - }); - assertEquals(result, expect); -}); - -testWithClient(async function testQueryOnClosed(client) { - for (const i of [0, 0, 0]) { - await assertRejects(async () => { - await client.transaction(async (conn) => { - conn.close(); - await conn.query("SELECT 1"); - }); - }, MysqlConnectionError); - } - assertEquals(client.pool?.size, 0); - await client.query("select 1"); -}); - -testWithClient(async function testTransactionSuccess(client) { - const success = await client.transaction(async (connection) => { - await connection.execute("insert into users(name) values(?)", [ - "transaction1", - ]); - await connection.execute("delete from users where id = ?", [2]); - return true; - }); - assertEquals(true, success); - const result = await client.query("select name,id from users"); - assertEquals([{ name: "transaction1", id: 3 }], result); -}); - -testWithClient(async function testTransactionRollback(client) { - let success; - await assertRejects(async () => { - success = await client.transaction(async (connection) => { - // Insert an existing id - await connection.execute("insert into users(name,id) values(?,?)", [ - "transaction2", - 3, - ]); - return true; - }); - }); - assertEquals(undefined, success); - const result = await client.query("select name from users"); - assertEquals([{ name: "transaction1" }], result); -}); - -testWithClient(async function testIdleTimeout(client) { - assertEquals(client.pool, { - maxSize: 3, - available: 0, - size: 0, - }); - await Promise.all(new Array(10).fill(0).map(() => client.query("select 1"))); - assertEquals(client.pool, { - maxSize: 3, - available: 3, - size: 3, - }); - await delay(500); - assertEquals(client.pool, { - maxSize: 3, - available: 3, - size: 3, - }); - await client.query("select 1"); - await delay(500); - assertEquals(client.pool, { - maxSize: 3, - available: 1, - size: 1, - }); - await delay(500); - assertEquals(client.pool, { - maxSize: 3, - available: 0, - size: 0, - }); -}, { - idleTimeout: 750, -}); - -testWithClient(async function testReadTimeout(client) { - await client.execute("select sleep(0.3)"); - - await assertRejects(async () => { - await client.execute("select sleep(0.7)"); - }, MysqlResponseTimeoutError); - - assertEquals(client.pool, { - maxSize: 3, - available: 0, - size: 0, - }); -}, { - timeout: 500, -}); - -testWithClient(async function testLargeQueryAndResponse(client) { - function buildLargeString(len: number) { - let str = ""; - for (let i = 0; i < len; i++) { - str += i % 10; - } - return str; - } - const largeString = buildLargeString(512 * 1024); - assertEquals( - await client.query(`select "${largeString}" as str`), - [{ str: largeString }], - ); -}); - -testWithClient(async function testExecuteIterator(client) { - await client.useConnection(async (conn) => { - await conn.execute(`DROP TABLE IF EXISTS numbers`); - await conn.execute(`CREATE TABLE numbers (num INT NOT NULL)`); - await conn.execute( - `INSERT INTO numbers (num) VALUES ${ - new Array(64).fill(0).map((v, idx) => `(${idx})`).join(",") - }`, - ); - const r = await conn.execute(`SELECT num FROM numbers`, [], true); - let count = 0; - for await (const row of r.iterator) { - assertEquals(row.num, count); - count++; - } - assertEquals(count, 64); - }); -}); - -// For MySQL 8, the default auth plugin is `caching_sha2_password`. Create user -// using `mysql_native_password` to test Authentication Method Mismatch. -testWithClient(async function testCreateUserWithMysqlNativePassword(client) { - const { version } = (await client.query(`SELECT VERSION() as version`))[0]; - if (version.startsWith("8.")) { - // MySQL 8 does not have `PASSWORD()` function - await client.execute( - `CREATE USER 'testuser'@'%' IDENTIFIED WITH mysql_native_password BY 'testpassword'`, - ); - } else { - await client.execute( - `CREATE USER 'testuser'@'%' IDENTIFIED WITH mysql_native_password`, - ); - await client.execute( - `SET PASSWORD FOR 'testuser'@'%' = PASSWORD('testpassword')`, - ); - } - await client.execute(`GRANT ALL ON test.* TO 'testuser'@'%'`); -}); - -testWithClient(async function testConnectWithMysqlNativePassword(client) { - assertEquals( - await client.query(`SELECT CURRENT_USER() AS user`), - [{ user: "testuser@%" }], - ); -}, { username: "testuser", password: "testpassword" }); - -testWithClient(async function testDropUserWithMysqlNativePassword(client) { - await client.execute(`DROP USER 'testuser'@'%'`); -}); - -testWithClient(async function testSelectEmptyString(client) { - assertEquals( - await client.query(`SELECT '' AS a`), - [{ a: "" }], - ); - assertEquals( - await client.query(`SELECT '' AS a, '' AS b, '' AS c`), - [{ a: "", b: "", c: "" }], - ); - assertEquals( - await client.query(`SELECT '' AS a, 'b' AS b, '' AS c`), - [{ a: "", b: "b", c: "" }], - ); -}); - -registerTests(); - -Deno.test("configLogger()", async () => { - let logCount = 0; - const fakeHandler = new class extends stdlog.BaseHandler { - constructor() { - super("INFO"); - } - log(msg: string) { - logCount++; - } - }(); - - await stdlog.setup({ - handlers: { - fake: fakeHandler, - }, - loggers: { - mysql: { - handlers: ["fake"], - }, - }, - }); - await configLogger({ logger: stdlog.getLogger("mysql") }); - logger().info("Test log"); - assertEquals(logCount, 1); - - await configLogger({ enable: false }); - logger().info("Test log"); - assertEquals(logCount, 1); -}); - -await createTestDB(); - -await new Promise((r) => setTimeout(r, 0)); -// Workaround to https://github.com/denoland/deno/issues/7844