Skip to content

Commit

Permalink
Merge pull request #116 from powersync-ja/feat/execute-batch-web
Browse files Browse the repository at this point in the history
Add executeBatch to React Native and Web
  • Loading branch information
mugikhan authored Apr 8, 2024
2 parents 6c43ec6 + 8f7caa5 commit c546134
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 46 deletions.
7 changes: 7 additions & 0 deletions .changeset/swift-pandas-laugh.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@journeyapps/powersync-sdk-react-native": minor
"@journeyapps/powersync-sdk-common": minor
"@journeyapps/powersync-sdk-web": minor
---

Added batch execution functionality to the web and react-native SDKs. This feature allows a SQL statement with multiple parameters to be executed in a single transaction, improving performance and consistency.
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,15 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
return this.database.execute(sql, parameters);
}

/**
* Execute a batch write (INSERT/UPDATE/DELETE) query with multiple sets of parameters
* and optionally return results.
*/
async executeBatch(query: string, params?: any[][]) {
await this.waitForReady();
return this.database.executeBatch(query, params);
}

/**
* Execute a read-only query and return results.
*/
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync-sdk-common/src/db/DBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ export interface TableUpdateOperation {
opType: RowUpdateType;
rowId: number;
}

/**
* Notification of an update to one or more tables, for the purpose of realtime change notifications.
*/
Expand Down Expand Up @@ -96,6 +95,7 @@ export interface DBLockOptions {
export interface DBAdapter extends BaseObserverInterface<DBAdapterListener>, DBGetUtils {
close: () => void;
execute: (query: string, params?: any[]) => Promise<QueryResult>;
executeBatch: (query: string, params?: any[][]) => Promise<QueryResult>;
name: string;
readLock: <T>(fn: (tx: LockContext) => Promise<T>, options?: DBLockOptions) => Promise<T>;
readTransaction: <T>(fn: (tx: Transaction) => Promise<T>, options?: DBLockOptions) => Promise<T>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ export class RNQSDBAdapter extends BaseObserver<DBAdapterListener> implements DB
return this.baseDB.execute(query, params);
}

async executeBatch(query: string, params: any[][] = []): Promise<QueryResult> {
const commands: any[] = [];

for (let i = 0; i < params.length; i++) {
commands.push([query, params[i]]);
}

const result = await this.baseDB.executeBatch(commands);
return {
rowsAffected: result.rowsAffected ? result.rowsAffected : 0
};
}

/**
* This provides a top-level read only execute method which is executed inside a read-lock.
* This is necessary since the high level `execute` method uses a write-lock under
Expand Down
4 changes: 4 additions & 0 deletions packages/powersync-sdk-web/src/db/adapters/SSRDBAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ export class SSRDBAdapter extends BaseObserver<DBAdapterListener> implements DBA
return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE);
}

async executeBatch(query: string, params?: any[][]): Promise<QueryResult> {
return this.writeMutex.runExclusive(async () => MOCK_QUERY_RESPONSE);
}

async getAll<T>(sql: string, parameters?: any[]): Promise<T[]> {
return [];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
} from '@journeyapps/powersync-sdk-common';
import * as Comlink from 'comlink';
import Logger, { ILogger } from 'js-logger';
import type { DBWorkerInterface, OpenDB } from '../../../worker/db/open-db';
import type { DBWorkerInterface, OpenDB, SQLBatchTuple } from '../../../worker/db/open-db';
import { getWorkerDatabaseOpener } from '../../../worker/db/open-worker-database';

export type WASQLiteFlags = {
Expand Down Expand Up @@ -78,6 +78,10 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
return this.writeLock((ctx) => ctx.execute(query, params));
}

async executeBatch(query: string, params?: any[][]): Promise<QueryResult> {
return this.writeLock((ctx) => this._executeBatch(query, params));
}

/**
* Wraps the worker execute function, awaiting for it to be available
*/
Expand All @@ -93,6 +97,18 @@ export class WASQLiteDBAdapter extends BaseObserver<DBAdapterListener> implement
};
};

/**
* Wraps the worker executeBatch function, awaiting for it to be available
*/
private _executeBatch = async (query: string, params?: any[]): Promise<QueryResult> => {
await this.initialized;
const result = await this.workerMethods!.executeBatch!(query, params);
return {
...result,
rows: undefined,
};
};

/**
* Attempts to close the connection.
* Shared workers might not actually close the connection if other
Expand Down
170 changes: 126 additions & 44 deletions packages/powersync-sdk-web/src/worker/db/open-db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ export type DBWorkerInterface = {
// Close is only exposed when used in a single non shared webworker
close?: () => void;
execute: WASQLiteExecuteMethod;
executeBatch: WASQLiteExecuteBatchMethod;
registerOnTableChange: (callback: OnTableChangeCallback) => void;
};

export type WASQLiteExecuteMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;

export type WASQLiteExecuteBatchMethod = (sql: string, params?: any[]) => Promise<WASQLExecuteResult>;
export type OnTableChangeCallback = (opType: number, tableName: string, rowId: number) => void;
export type OpenDB = (dbFileName: string) => DBWorkerInterface;

export type SQLBatchTuple = [string] | [string, Array<any> | Array<Array<any>>];

export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
const { default: moduleFactory } = await import('@journeyapps/wa-sqlite/dist/wa-sqlite-async.mjs');
const module = await moduleFactory();
Expand Down Expand Up @@ -52,64 +55,142 @@ export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {
};

/**
* This executes SQL statements.
* This executes single SQL statements inside a requested lock.
*/
const execute = async (sql: string | TemplateStringsArray, bindings?: any[]): Promise<WASQLExecuteResult> => {
// Running multiple statements on the same connection concurrently should not be allowed
return navigator.locks.request(`db-execute-${dbFileName}`, async () => {
const results = [];
for await (const stmt of sqlite3.statements(db, sql as string)) {
let columns;
const wrappedBindings = bindings ? [bindings] : [[]];
for (const binding of wrappedBindings) {
// TODO not sure why this is needed currently, but booleans break
binding.forEach((b, index, arr) => {
if (typeof b == 'boolean') {
arr[index] = b ? 1 : 0;
}
});
return _acquireExecuteLock(async () => {
return executeSingleStatement(sql, bindings);
});
};

sqlite3.reset(stmt);
if (bindings) {
sqlite3.bind_collection(stmt, binding);
}
/**
* This requests a lock for executing statements.
* Should only be used interanlly.
*/
const _acquireExecuteLock = (callback: () => Promise<any>): Promise<any> => {
return navigator.locks.request(`db-execute-${dbFileName}`, callback);
};

const rows = [];
while ((await sqlite3.step(stmt)) === SQLite.SQLITE_ROW) {
const row = sqlite3.row(stmt);
rows.push(row);
/**
* This executes a single statement using SQLite3.
*/
const executeSingleStatement = async (
sql: string | TemplateStringsArray,
bindings?: any[]
): Promise<WASQLExecuteResult> => {
const results = [];
for await (const stmt of sqlite3.statements(db, sql as string)) {
let columns;
const wrappedBindings = bindings ? [bindings] : [[]];
for (const binding of wrappedBindings) {
// TODO not sure why this is needed currently, but booleans break
binding.forEach((b, index, arr) => {
if (typeof b == 'boolean') {
arr[index] = b ? 1 : 0;
}
});

columns = columns ?? sqlite3.column_names(stmt);
if (columns.length) {
results.push({ columns, rows });
}
sqlite3.reset(stmt);
if (bindings) {
sqlite3.bind_collection(stmt, binding);
}

// When binding parameters, only a single statement is executed.
if (bindings) {
break;
const rows = [];
while ((await sqlite3.step(stmt)) === SQLite.SQLITE_ROW) {
const row = sqlite3.row(stmt);
rows.push(row);
}
}

let rows: Record<string, any>[] = [];
for (let resultset of results) {
for (let row of resultset.rows) {
let outRow: Record<string, any> = {};
resultset.columns.forEach((key, index) => {
outRow[key] = row[index];
});
rows.push(outRow);
columns = columns ?? sqlite3.column_names(stmt);
if (columns.length) {
results.push({ columns, rows });
}
}

const result = {
insertId: sqlite3.last_insert_id(db),
rowsAffected: sqlite3.changes(db),
rows: {
_array: rows,
length: rows.length
// When binding parameters, only a single statement is executed.
if (bindings) {
break;
}
}

let rows: Record<string, any>[] = [];
for (let resultset of results) {
for (let row of resultset.rows) {
let outRow: Record<string, any> = {};
resultset.columns.forEach((key, index) => {
outRow[key] = row[index];
});
rows.push(outRow);
}
}

const result = {
insertId: sqlite3.last_insert_id(db),
rowsAffected: sqlite3.changes(db),
rows: {
_array: rows,
length: rows.length
}
};

return result;
};

/**
* This executes SQL statements in a batch.
*/
const executeBatch = async (sql: string, bindings?: any[][]): Promise<WASQLExecuteResult> => {
return _acquireExecuteLock(async () => {
let affectedRows = 0;

const str = sqlite3.str_new(db, sql);
const query = sqlite3.str_value(str);
try {
await executeSingleStatement('BEGIN TRANSACTION');

//Prepare statement once
let prepared = await sqlite3.prepare_v2(db, query);
if (prepared === null) {
return {
rowsAffected: 0
};
}
const wrappedBindings = bindings ? bindings : [];
for (const binding of wrappedBindings) {
// TODO not sure why this is needed currently, but booleans break
for (let i = 0; i < binding.length; i++) {
let b = binding[i];
if (typeof b == 'boolean') {
binding[i] = b ? 1 : 0;
}
}

//Reset bindings
sqlite3.reset(prepared.stmt);
if (bindings) {
sqlite3.bind_collection(prepared.stmt, binding);
}

let result = await sqlite3.step(prepared.stmt);
if (result === SQLite.SQLITE_DONE) {
//The value returned by sqlite3_changes() immediately after an INSERT, UPDATE or DELETE statement run on a view is always zero.
affectedRows += sqlite3.changes(db);
}
}
//Finalize prepared statement
await sqlite3.finalize(prepared.stmt);
await executeSingleStatement('COMMIT');
} catch (err) {
await executeSingleStatement('ROLLBACK');
return {
rowsAffected: 0
};
} finally {
sqlite3.str_finish(str);
}
const result = {
rowsAffected: affectedRows
};

return result;
Expand All @@ -118,6 +199,7 @@ export async function _openDB(dbFileName: string): Promise<DBWorkerInterface> {

return {
execute: Comlink.proxy(execute),
executeBatch: Comlink.proxy(executeBatch),
registerOnTableChange: Comlink.proxy(registerOnTableChange),
close: Comlink.proxy(() => {
sqlite3.close(db);
Expand Down
58 changes: 58 additions & 0 deletions packages/powersync-sdk-web/tests/crud.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,32 @@ describe('CRUD Tests', () => {
expect(tx.crud[0].equals(expectedCrudEntry)).true;
});

it('BATCH INSERT', async () => {
expect(await powersync.getAll('SELECT * FROM ps_crud')).empty;

const query = `INSERT INTO assets(id, description) VALUES(?, ?)`;
await powersync.executeBatch(query, [
[testId, 'test'],
['mockId', 'test1']
]);

expect(await powersync.getAll('SELECT data FROM ps_crud ORDER BY id')).deep.equals([
{
data: `{"op":"PUT","type":"assets","id":"${testId}","data":{"description":"test"}}`
},
{
data: `{"op":"PUT","type":"assets","id":"mockId","data":{"description":"test1"}}`
}
]);

const crudBatch = (await powersync.getCrudBatch(2))!;
expect(crudBatch.crud.length).equals(2);
const expectedCrudEntry = new CrudEntry(1, UpdateType.PUT, 'assets', testId, 1, { description: 'test' });
const expectedCrudEntry2 = new CrudEntry(2, UpdateType.PUT, 'assets', 'mockId', 1, { description: 'test1' });
expect(crudBatch.crud[0].equals(expectedCrudEntry)).true;
expect(crudBatch.crud[1].equals(expectedCrudEntry2)).true;
});

it('INSERT OR REPLACE', async () => {
await powersync.execute('INSERT INTO assets(id, description) VALUES(?, ?)', [testId, 'test']);
await powersync.execute('DELETE FROM ps_crud WHERE 1');
Expand Down Expand Up @@ -96,6 +122,38 @@ describe('CRUD Tests', () => {
expect(tx.crud[0].equals(expectedCrudEntry)).true;
});

it('BATCH UPDATE', async () => {
await powersync.executeBatch('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [
[testId, 'test', 'test'],
['mockId', 'test', 'test']
]);
await powersync.execute('DELETE FROM ps_crud WHERE 1');

await powersync.executeBatch('UPDATE assets SET description = ?, make = ?', [['test2', 'make2']]);

expect(await powersync.getAll('SELECT data FROM ps_crud ORDER BY id')).deep.equals([
{
data: `{"op":"PATCH","type":"assets","id":"${testId}","data":{"description":"test2","make":"make2"}}`
},
{
data: `{"op":"PATCH","type":"assets","id":"mockId","data":{"description":"test2","make":"make2"}}`
}
]);

const crudBatch = (await powersync.getCrudBatch(2))!;
expect(crudBatch.crud.length).equals(2);
const expectedCrudEntry = new CrudEntry(3, UpdateType.PATCH, 'assets', testId, 2, {
description: 'test2',
make: 'make2'
});
const expectedCrudEntry2 = new CrudEntry(4, UpdateType.PATCH, 'assets', 'mockId', 2, {
description: 'test2',
make: 'make2'
});
expect(crudBatch.crud[0].equals(expectedCrudEntry)).true;
expect(crudBatch.crud[1].equals(expectedCrudEntry2)).true;
});

it('DELETE', async () => {
await powersync.execute('INSERT INTO assets(id, description, make) VALUES(?, ?, ?)', [testId, 'test', 'test']);
await powersync.execute('DELETE FROM ps_crud WHERE 1');
Expand Down
Loading

0 comments on commit c546134

Please sign in to comment.