From 2178d69d896ced8c16bdae055a224e7bac9c7644 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 17 Oct 2024 16:07:24 +0200 Subject: [PATCH] Throttle update notifications for web implementation. --- .../native_sqlite_connection_impl.dart | 5 +- .../web/worker/throttled_common_database.dart | 197 ++++++++++++++++++ .../lib/src/web/worker/worker_utils.dart | 5 +- packages/sqlite_async/test/watch_test.dart | 9 +- 4 files changed, 212 insertions(+), 4 deletions(-) create mode 100644 packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart diff --git a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart index 299339c..7df4ac8 100644 --- a/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart +++ b/packages/sqlite_async/lib/src/native/database/native_sqlite_connection_impl.dart @@ -299,9 +299,9 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, if (updatedTables.isNotEmpty && db.autocommit) { client.fire(UpdateNotification(updatedTables)); updatedTables.clear(); - updateDebouncer?.cancel(); - updateDebouncer = null; } + updateDebouncer?.cancel(); + updateDebouncer = null; } db.updates.listen((event) { @@ -316,6 +316,7 @@ Future _sqliteConnectionIsolateInner(_SqliteConnectionParams params, server.open((data) async { if (data is _SqliteIsolateClose) { + // This is a transaction close message if (txId != null) { if (!db.autocommit) { db.execute('ROLLBACK'); diff --git a/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart new file mode 100644 index 0000000..da69d12 --- /dev/null +++ b/packages/sqlite_async/lib/src/web/worker/throttled_common_database.dart @@ -0,0 +1,197 @@ +import 'dart:async'; + +import 'package:sqlite_async/sqlite3_common.dart'; + +/// Wrap a CommonDatabase to throttle its updates stream. +/// This is so that we can throttle the updates _within_ +/// the worker process, avoiding mass notifications over +/// the MessagePort. +class ThrottledCommonDatabase extends CommonDatabase { + final CommonDatabase _db; + final StreamController _transactionController = + StreamController.broadcast(); + + ThrottledCommonDatabase(this._db); + + @override + int get userVersion => _db.userVersion; + + @override + set userVersion(int userVersion) { + _db.userVersion = userVersion; + } + + @override + bool get autocommit => _db.autocommit; + + @override + DatabaseConfig get config => _db.config; + + @override + void createAggregateFunction( + {required String functionName, + required AggregateFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true}) { + _db.createAggregateFunction(functionName: functionName, function: function); + } + + @override + void createCollation( + {required String name, required CollatingFunction function}) { + _db.createCollation(name: name, function: function); + } + + @override + void createFunction( + {required String functionName, + required ScalarFunction function, + AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(), + bool deterministic = false, + bool directOnly = true}) { + _db.createFunction(functionName: functionName, function: function); + } + + @override + void dispose() { + _db.dispose(); + } + + @override + void execute(String sql, [List parameters = const []]) { + _db.execute(sql, parameters); + } + + @override + int getUpdatedRows() { + // ignore: deprecated_member_use + return _db.getUpdatedRows(); + } + + @override + int get lastInsertRowId => _db.lastInsertRowId; + + @override + CommonPreparedStatement prepare(String sql, + {bool persistent = false, bool vtab = true, bool checkNoTail = false}) { + return _db.prepare(sql, + persistent: persistent, vtab: vtab, checkNoTail: checkNoTail); + } + + @override + List prepareMultiple(String sql, + {bool persistent = false, bool vtab = true}) { + return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab); + } + + @override + ResultSet select(String sql, [List parameters = const []]) { + bool preAutocommit = _db.autocommit; + final result = _db.select(sql, parameters); + bool postAutocommit = _db.autocommit; + if (!preAutocommit && postAutocommit) { + _transactionController.add(true); + } + return result; + } + + @override + int get updatedRows => _db.updatedRows; + + @override + Stream get updates { + return throttledUpdates(_db, _transactionController.stream); + } +} + +/// This throttles the database update stream to: +/// 1. Trigger max once every 1ms. +/// 2. Only trigger _after_ transactions. +Stream throttledUpdates( + CommonDatabase source, Stream transactionStream) { + StreamController? controller; + Set insertedTables = {}; + Set updatedTables = {}; + Set deletedTables = {}; + var paused = false; + + Timer? updateDebouncer; + + void maybeFireUpdates() { + updateDebouncer?.cancel(); + updateDebouncer = null; + + if (paused) { + // Continue collecting updates, but don't fire any + return; + } + + if (!source.autocommit) { + // Inside a transaction - do not fire updates + return; + } + + if (updatedTables.isNotEmpty) { + for (var tableName in updatedTables) { + controller!.add(SqliteUpdate(SqliteUpdateKind.update, tableName, 0)); + } + + updatedTables.clear(); + } + + if (insertedTables.isNotEmpty) { + for (var tableName in insertedTables) { + controller!.add(SqliteUpdate(SqliteUpdateKind.insert, tableName, 0)); + } + + insertedTables.clear(); + } + + if (deletedTables.isNotEmpty) { + for (var tableName in deletedTables) { + controller!.add(SqliteUpdate(SqliteUpdateKind.delete, tableName, 0)); + } + + deletedTables.clear(); + } + } + + void collectUpdate(SqliteUpdate event) { + if (event.kind == SqliteUpdateKind.insert) { + insertedTables.add(event.tableName); + } else if (event.kind == SqliteUpdateKind.update) { + updatedTables.add(event.tableName); + } else if (event.kind == SqliteUpdateKind.delete) { + deletedTables.add(event.tableName); + } + + updateDebouncer ??= + Timer(const Duration(milliseconds: 1), maybeFireUpdates); + } + + StreamSubscription? txSubscription; + StreamSubscription? sourceSubscription; + + controller = StreamController(onListen: () { + txSubscription = transactionStream.listen((event) { + maybeFireUpdates(); + }, onError: (error) { + controller?.addError(error); + }); + + sourceSubscription = source.updates.listen(collectUpdate, onError: (error) { + controller?.addError(error); + }); + }, onPause: () { + paused = true; + }, onResume: () { + paused = false; + maybeFireUpdates(); + }, onCancel: () { + txSubscription?.cancel(); + sourceSubscription?.cancel(); + }); + + return controller.stream; +} diff --git a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart index b4657dd..1d8fb5c 100644 --- a/packages/sqlite_async/lib/src/web/worker/worker_utils.dart +++ b/packages/sqlite_async/lib/src/web/worker/worker_utils.dart @@ -4,6 +4,7 @@ import 'dart:js_util' as js_util; import 'package:mutex/mutex.dart'; import 'package:sqlite3/wasm.dart'; import 'package:sqlite3_web/sqlite3_web.dart'; +import 'throttled_common_database.dart'; import '../protocol.dart'; @@ -18,7 +19,9 @@ base class AsyncSqliteController extends DatabaseController { // Register any custom functions here if needed - return AsyncSqliteDatabase(database: db); + final throttled = ThrottledCommonDatabase(db); + + return AsyncSqliteDatabase(database: throttled); } @override diff --git a/packages/sqlite_async/test/watch_test.dart b/packages/sqlite_async/test/watch_test.dart index b179009..08a80cb 100644 --- a/packages/sqlite_async/test/watch_test.dart +++ b/packages/sqlite_async/test/watch_test.dart @@ -258,7 +258,7 @@ void main() { final db = await testUtils.setupDatabase(path: path); await createTables(db); - const baseTime = 20; + const baseTime = 10; const throttleDuration = Duration(milliseconds: baseTime); // delay must be bigger than throttleDuration, and bigger @@ -293,6 +293,13 @@ void main() { // one event after the transaction 2 ])); + + // Other observed results (failure scenarios): + // [0, 0, 0]: The watch is triggered during the transaction + // and executes concurrently with the transaction. + // [0, 2, 2]: The watch is triggered during the transaction, + // but executes after the transaction (single connection). + // [0]: No updates triggered. }); }); }