diff --git a/.changeset/swift-pandas-laugh.md b/.changeset/swift-pandas-laugh.md new file mode 100644 index 00000000..114723f6 --- /dev/null +++ b/.changeset/swift-pandas-laugh.md @@ -0,0 +1,7 @@ +--- +"@journeyapps/powersync-sdk-react-native": minor +"@journeyapps/powersync-sdk-common": minor +"@journeyapps/powersync-sdk-web": minor +--- + +Added batch execution functionality to the web and react-native SDKs. This feature allows a SQL statement with multiple parameters to be executed in a single transaction, improving performance and consistency. diff --git a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts index 77a3ae16..e1bf1931 100644 --- a/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/powersync-sdk-common/src/client/AbstractPowerSyncDatabase.ts @@ -475,6 +475,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver, DBGetUtils { close: () => void; execute: (query: string, params?: any[]) => Promise; + executeBatch: (query: string, params?: any[][]) => Promise; name: string; readLock: (fn: (tx: LockContext) => Promise, options?: DBLockOptions) => Promise; readTransaction: (fn: (tx: Transaction) => Promise, options?: DBLockOptions) => Promise; diff --git a/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts b/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts index 9633526c..479815ef 100644 --- a/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts +++ b/packages/powersync-sdk-react-native/src/db/adapters/react-native-quick-sqlite/RNQSDBAdapter.ts @@ -62,6 +62,19 @@ export class RNQSDBAdapter extends BaseObserver implements DB return this.baseDB.execute(query, params); } + async executeBatch(query: string, params: any[][] = []): Promise { + const commands: any[] = []; + + for (let i = 0; i < params.length; i++) { + commands.push([query, params[i]]); + } + + const result = await this.baseDB.executeBatch(commands); + return { + rowsAffected: result.rowsAffected ? result.rowsAffected : 0 + }; + } + /** * This provides a top-level read only execute method which is executed inside a read-lock. * This is necessary since the high level `execute` method uses a write-lock under diff --git a/packages/powersync-sdk-web/src/db/adapters/SSRDBAdapter.ts b/packages/powersync-sdk-web/src/db/adapters/SSRDBAdapter.ts index 760e34d9..2cd96382 100644 --- a/packages/powersync-sdk-web/src/db/adapters/SSRDBAdapter.ts +++ b/packages/powersync-sdk-web/src/db/adapters/SSRDBAdapter.ts @@ -53,6 +53,10 @@ export class SSRDBAdapter extends BaseObserver implements DBA return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE); } + async executeBatch(query: string, params?: any[][]): Promise { + return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE); + } + async getAll(sql: string, parameters?: any[]): Promise { return []; } diff --git a/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts b/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts index 862c0d65..e7895fe1 100644 --- a/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts +++ b/packages/powersync-sdk-web/src/db/adapters/wa-sqlite/WASQLiteDBAdapter.ts @@ -11,7 +11,7 @@ import { } from '@journeyapps/powersync-sdk-common'; import * as Comlink from 'comlink'; import Logger, { ILogger } from 'js-logger'; -import type { DBWorkerInterface, OpenDB } from '../../../worker/db/open-db'; +import type { DBWorkerInterface, OpenDB, SQLBatchTuple } from '../../../worker/db/open-db'; import { getWorkerDatabaseOpener } from '../../../worker/db/open-worker-database'; export type WASQLiteFlags = { @@ -78,6 +78,10 @@ export class WASQLiteDBAdapter extends BaseObserver implement return this.writeLock((ctx) => ctx.execute(query, params)); } + async executeBatch(query: string, params?: any[][]): Promise { + return this.writeLock((ctx) => this._executeBatch(query, params)); + } + /** * Wraps the worker execute function, awaiting for it to be available */ @@ -93,6 +97,18 @@ export class WASQLiteDBAdapter extends BaseObserver implement }; }; + /** + * Wraps the worker executeBatch function, awaiting for it to be available + */ + private _executeBatch = async (query: string, params?: any[]): Promise => { + await this.initialized; + const result = await this.workerMethods!.executeBatch!(query, params); + return { + ...result, + rows: undefined, + }; + }; + /** * Attempts to close the connection. * Shared workers might not actually close the connection if other diff --git a/packages/powersync-sdk-web/src/worker/db/open-db.ts b/packages/powersync-sdk-web/src/worker/db/open-db.ts index 269ad748..3c654caf 100644 --- a/packages/powersync-sdk-web/src/worker/db/open-db.ts +++ b/packages/powersync-sdk-web/src/worker/db/open-db.ts @@ -15,14 +15,17 @@ export type DBWorkerInterface = { // Close is only exposed when used in a single non shared webworker close?: () => void; execute: WASQLiteExecuteMethod; + executeBatch: WASQLiteExecuteBatchMethod; registerOnTableChange: (callback: OnTableChangeCallback) => void; }; export type WASQLiteExecuteMethod = (sql: string, params?: any[]) => Promise; - +export type WASQLiteExecuteBatchMethod = (sql: string, params?: any[]) => Promise; export type OnTableChangeCallback = (opType: number, tableName: string, rowId: number) => void; export type OpenDB = (dbFileName: string) => DBWorkerInterface; +export type SQLBatchTuple = [string] | [string, Array | Array>]; + export async function _openDB(dbFileName: string): Promise { const { default: moduleFactory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs'); const module = await moduleFactory(); @@ -52,64 +55,142 @@ export async function _openDB(dbFileName: string): Promise { }; /** - * This executes SQL statements. + * This executes single SQL statements inside a requested lock. */ const execute = async (sql: string | TemplateStringsArray, bindings?: any[]): Promise => { // Running multiple statements on the same connection concurrently should not be allowed - return navigator.locks.request(`db-execute-${dbFileName}`, async () => { - const results = []; - for await (const stmt of sqlite3.statements(db, sql as string)) { - let columns; - const wrappedBindings = bindings ? [bindings] : [[]]; - for (const binding of wrappedBindings) { - // TODO not sure why this is needed currently, but booleans break - binding.forEach((b, index, arr) => { - if (typeof b == 'boolean') { - arr[index] = b ? 1 : 0; - } - }); + return _acquireExecuteLock(async () => { + return executeSingleStatement(sql, bindings); + }); + }; - sqlite3.reset(stmt); - if (bindings) { - sqlite3.bind_collection(stmt, binding); - } + /** + * This requests a lock for executing statements. + * Should only be used interanlly. + */ + const _acquireExecuteLock = (callback: () => Promise): Promise => { + return navigator.locks.request(`db-execute-${dbFileName}`, callback); + }; - const rows = []; - while ((await sqlite3.step(stmt)) === SQLite.SQLITE_ROW) { - const row = sqlite3.row(stmt); - rows.push(row); + /** + * This executes a single statement using SQLite3. + */ + const executeSingleStatement = async ( + sql: string | TemplateStringsArray, + bindings?: any[] + ): Promise => { + const results = []; + for await (const stmt of sqlite3.statements(db, sql as string)) { + let columns; + const wrappedBindings = bindings ? [bindings] : [[]]; + for (const binding of wrappedBindings) { + // TODO not sure why this is needed currently, but booleans break + binding.forEach((b, index, arr) => { + if (typeof b == 'boolean') { + arr[index] = b ? 1 : 0; } + }); - columns = columns ?? sqlite3.column_names(stmt); - if (columns.length) { - results.push({ columns, rows }); - } + sqlite3.reset(stmt); + if (bindings) { + sqlite3.bind_collection(stmt, binding); } - // When binding parameters, only a single statement is executed. - if (bindings) { - break; + const rows = []; + while ((await sqlite3.step(stmt)) === SQLite.SQLITE_ROW) { + const row = sqlite3.row(stmt); + rows.push(row); } - } - let rows: Record[] = []; - for (let resultset of results) { - for (let row of resultset.rows) { - let outRow: Record = {}; - resultset.columns.forEach((key, index) => { - outRow[key] = row[index]; - }); - rows.push(outRow); + columns = columns ?? sqlite3.column_names(stmt); + if (columns.length) { + results.push({ columns, rows }); } } - const result = { - insertId: sqlite3.last_insert_id(db), - rowsAffected: sqlite3.changes(db), - rows: { - _array: rows, - length: rows.length + // When binding parameters, only a single statement is executed. + if (bindings) { + break; + } + } + + let rows: Record[] = []; + for (let resultset of results) { + for (let row of resultset.rows) { + let outRow: Record = {}; + resultset.columns.forEach((key, index) => { + outRow[key] = row[index]; + }); + rows.push(outRow); + } + } + + const result = { + insertId: sqlite3.last_insert_id(db), + rowsAffected: sqlite3.changes(db), + rows: { + _array: rows, + length: rows.length + } + }; + + return result; + }; + + /** + * This executes SQL statements in a batch. + */ + const executeBatch = async (sql: string, bindings?: any[][]): Promise => { + return _acquireExecuteLock(async () => { + let affectedRows = 0; + + const str = sqlite3.str_new(db, sql); + const query = sqlite3.str_value(str); + try { + await executeSingleStatement('BEGIN TRANSACTION'); + + //Prepare statement once + let prepared = await sqlite3.prepare_v2(db, query); + if (prepared === null) { + return { + rowsAffected: 0 + }; + } + const wrappedBindings = bindings ? bindings : []; + for (const binding of wrappedBindings) { + // TODO not sure why this is needed currently, but booleans break + for (let i = 0; i < binding.length; i++) { + let b = binding[i]; + if (typeof b == 'boolean') { + binding[i] = b ? 1 : 0; + } + } + + //Reset bindings + sqlite3.reset(prepared.stmt); + if (bindings) { + sqlite3.bind_collection(prepared.stmt, binding); + } + + let result = await sqlite3.step(prepared.stmt); + if (result === SQLite.SQLITE_DONE) { + //The value returned by sqlite3_changes() immediately after an INSERT, UPDATE or DELETE statement run on a view is always zero. + affectedRows += sqlite3.changes(db); + } } + //Finalize prepared statement + await sqlite3.finalize(prepared.stmt); + await executeSingleStatement('COMMIT'); + } catch (err) { + await executeSingleStatement('ROLLBACK'); + return { + rowsAffected: 0 + }; + } finally { + sqlite3.str_finish(str); + } + const result = { + rowsAffected: affectedRows }; return result; @@ -118,6 +199,7 @@ export async function _openDB(dbFileName: string): Promise { return { execute: Comlink.proxy(execute), + executeBatch: Comlink.proxy(executeBatch), registerOnTableChange: Comlink.proxy(registerOnTableChange), close: Comlink.proxy(() => { sqlite3.close(db); diff --git a/packages/powersync-sdk-web/tests/crud.test.ts b/packages/powersync-sdk-web/tests/crud.test.ts index 95687a9b..0636c8eb 100644 --- a/packages/powersync-sdk-web/tests/crud.test.ts +++ b/packages/powersync-sdk-web/tests/crud.test.ts @@ -54,6 +54,32 @@ describe('CRUD Tests', () => { expect(tx.crud[0].equals(expectedCrudEntry)).true; }); + it('BATCH INSERT', async () => { + expect(await powersync.getAll('SELECT * FROM ps_crud')).empty; + + const query = `INSERT INTO assets(id, description) VALUES(?, ?)`; + await powersync.executeBatch(query, [ + [testId, 'test'], + ['mockId', 'test1'] + ]); + + expect(await powersync.getAll('SELECT data FROM ps_crud ORDER BY id')).deep.equals([ + { + data: `{"op":"PUT","type":"assets","id":"${testId}","data":{"description":"test"}}` + }, + { + data: `{"op":"PUT","type":"assets","id":"mockId","data":{"description":"test1"}}` + } + ]); + + const crudBatch = (await powersync.getCrudBatch(2))!; + expect(crudBatch.crud.length).equals(2); + const expectedCrudEntry = new CrudEntry(1, UpdateType.PUT, 'assets', testId, 1, { description: 'test' }); + const expectedCrudEntry2 = new CrudEntry(2, UpdateType.PUT, 'assets', 'mockId', 1, { description: 'test1' }); + expect(crudBatch.crud[0].equals(expectedCrudEntry)).true; + expect(crudBatch.crud[1].equals(expectedCrudEntry2)).true; + }); + it('INSERT OR REPLACE', async () => { await powersync.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']); await powersync.execute('DELETE FROM ps_crud WHERE 1'); @@ -96,6 +122,38 @@ describe('CRUD Tests', () => { expect(tx.crud[0].equals(expectedCrudEntry)).true; }); + it('BATCH UPDATE', async () => { + await powersync.executeBatch('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [ + [testId, 'test', 'test'], + ['mockId', 'test', 'test'] + ]); + await powersync.execute('DELETE FROM ps_crud WHERE 1'); + + await powersync.executeBatch('UPDATE assets SET description = ?, make = ?', [['test2', 'make2']]); + + expect(await powersync.getAll('SELECT data FROM ps_crud ORDER BY id')).deep.equals([ + { + data: `{"op":"PATCH","type":"assets","id":"${testId}","data":{"description":"test2","make":"make2"}}` + }, + { + data: `{"op":"PATCH","type":"assets","id":"mockId","data":{"description":"test2","make":"make2"}}` + } + ]); + + const crudBatch = (await powersync.getCrudBatch(2))!; + expect(crudBatch.crud.length).equals(2); + const expectedCrudEntry = new CrudEntry(3, UpdateType.PATCH, 'assets', testId, 2, { + description: 'test2', + make: 'make2' + }); + const expectedCrudEntry2 = new CrudEntry(4, UpdateType.PATCH, 'assets', 'mockId', 2, { + description: 'test2', + make: 'make2' + }); + expect(crudBatch.crud[0].equals(expectedCrudEntry)).true; + expect(crudBatch.crud[1].equals(expectedCrudEntry2)).true; + }); + it('DELETE', async () => { await powersync.execute('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [testId, 'test', 'test']); await powersync.execute('DELETE FROM ps_crud WHERE 1'); diff --git a/packages/powersync-sdk-web/tests/main.test.ts b/packages/powersync-sdk-web/tests/main.test.ts index 4f52e068..411dc5c3 100644 --- a/packages/powersync-sdk-web/tests/main.test.ts +++ b/packages/powersync-sdk-web/tests/main.test.ts @@ -47,4 +47,26 @@ describe('Basic', () => { expect(result.name).equals(testName); }); }); + + describe('executeBatchQuery', () => { + it('should execute a select query using getAll', async () => { + const result = await db.getAll('SELECT * FROM users'); + expect(result.length).toEqual(0); + }); + + it('should allow batch inserts', async () => { + const testName = 'Mugi'; + await db.executeBatch('INSERT INTO users (id, name) VALUES(?, ?)', [ + [uuid(), testName], + [uuid(), 'Steven'], + [uuid(), 'Chris'] + ]); + const result = await db.getAll('SELECT * FROM users'); + + expect(result.length).equals(3); + expect(result[0].name).equals(testName); + expect(result[1].name).equals('Steven'); + expect(result[2].name).equals('Chris'); + }); + }); });