Skip to content

Commit

Permalink
Throttle update notifications for web implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Oct 17, 2024
1 parent 64b349e commit 2178d69
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,9 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,
if (updatedTables.isNotEmpty && db.autocommit) {
client.fire(UpdateNotification(updatedTables));
updatedTables.clear();
updateDebouncer?.cancel();
updateDebouncer = null;
}
updateDebouncer?.cancel();
updateDebouncer = null;
}

db.updates.listen((event) {
Expand All @@ -316,6 +316,7 @@ Future<void> _sqliteConnectionIsolateInner(_SqliteConnectionParams params,

server.open((data) async {
if (data is _SqliteIsolateClose) {
// This is a transaction close message
if (txId != null) {
if (!db.autocommit) {
db.execute('ROLLBACK');
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import 'dart:async';

import 'package:sqlite_async/sqlite3_common.dart';

/// Wrap a CommonDatabase to throttle its updates stream.
/// This is so that we can throttle the updates _within_
/// the worker process, avoiding mass notifications over
/// the MessagePort.
class ThrottledCommonDatabase extends CommonDatabase {
final CommonDatabase _db;
final StreamController<bool> _transactionController =
StreamController.broadcast();

ThrottledCommonDatabase(this._db);

@override
int get userVersion => _db.userVersion;

@override
set userVersion(int userVersion) {
_db.userVersion = userVersion;
}

@override
bool get autocommit => _db.autocommit;

@override
DatabaseConfig get config => _db.config;

@override
void createAggregateFunction<V>(
{required String functionName,
required AggregateFunction<V> function,
AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(),
bool deterministic = false,
bool directOnly = true}) {
_db.createAggregateFunction(functionName: functionName, function: function);
}

@override
void createCollation(
{required String name, required CollatingFunction function}) {
_db.createCollation(name: name, function: function);
}

@override
void createFunction(
{required String functionName,
required ScalarFunction function,
AllowedArgumentCount argumentCount = const AllowedArgumentCount.any(),
bool deterministic = false,
bool directOnly = true}) {
_db.createFunction(functionName: functionName, function: function);
}

@override
void dispose() {
_db.dispose();
}

@override
void execute(String sql, [List<Object?> parameters = const []]) {
_db.execute(sql, parameters);
}

@override
int getUpdatedRows() {
// ignore: deprecated_member_use
return _db.getUpdatedRows();
}

@override
int get lastInsertRowId => _db.lastInsertRowId;

@override
CommonPreparedStatement prepare(String sql,
{bool persistent = false, bool vtab = true, bool checkNoTail = false}) {
return _db.prepare(sql,
persistent: persistent, vtab: vtab, checkNoTail: checkNoTail);
}

@override
List<CommonPreparedStatement> prepareMultiple(String sql,
{bool persistent = false, bool vtab = true}) {
return _db.prepareMultiple(sql, persistent: persistent, vtab: vtab);
}

@override
ResultSet select(String sql, [List<Object?> parameters = const []]) {
bool preAutocommit = _db.autocommit;
final result = _db.select(sql, parameters);
bool postAutocommit = _db.autocommit;
if (!preAutocommit && postAutocommit) {
_transactionController.add(true);
}
return result;
}

@override
int get updatedRows => _db.updatedRows;

@override
Stream<SqliteUpdate> get updates {
return throttledUpdates(_db, _transactionController.stream);
}
}

/// This throttles the database update stream to:
/// 1. Trigger max once every 1ms.
/// 2. Only trigger _after_ transactions.
Stream<SqliteUpdate> throttledUpdates(
CommonDatabase source, Stream transactionStream) {
StreamController<SqliteUpdate>? controller;
Set<String> insertedTables = {};
Set<String> updatedTables = {};
Set<String> deletedTables = {};
var paused = false;

Timer? updateDebouncer;

void maybeFireUpdates() {
updateDebouncer?.cancel();
updateDebouncer = null;

if (paused) {
// Continue collecting updates, but don't fire any
return;
}

if (!source.autocommit) {
// Inside a transaction - do not fire updates
return;
}

if (updatedTables.isNotEmpty) {
for (var tableName in updatedTables) {
controller!.add(SqliteUpdate(SqliteUpdateKind.update, tableName, 0));
}

updatedTables.clear();
}

if (insertedTables.isNotEmpty) {
for (var tableName in insertedTables) {
controller!.add(SqliteUpdate(SqliteUpdateKind.insert, tableName, 0));
}

insertedTables.clear();
}

if (deletedTables.isNotEmpty) {
for (var tableName in deletedTables) {
controller!.add(SqliteUpdate(SqliteUpdateKind.delete, tableName, 0));
}

deletedTables.clear();
}
}

void collectUpdate(SqliteUpdate event) {
if (event.kind == SqliteUpdateKind.insert) {
insertedTables.add(event.tableName);
} else if (event.kind == SqliteUpdateKind.update) {
updatedTables.add(event.tableName);
} else if (event.kind == SqliteUpdateKind.delete) {
deletedTables.add(event.tableName);
}

updateDebouncer ??=
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
}

StreamSubscription? txSubscription;
StreamSubscription? sourceSubscription;

controller = StreamController(onListen: () {
txSubscription = transactionStream.listen((event) {
maybeFireUpdates();
}, onError: (error) {
controller?.addError(error);
});

sourceSubscription = source.updates.listen(collectUpdate, onError: (error) {
controller?.addError(error);
});
}, onPause: () {
paused = true;
}, onResume: () {
paused = false;
maybeFireUpdates();
}, onCancel: () {
txSubscription?.cancel();
sourceSubscription?.cancel();
});

return controller.stream;
}
5 changes: 4 additions & 1 deletion packages/sqlite_async/lib/src/web/worker/worker_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:js_util' as js_util;
import 'package:mutex/mutex.dart';
import 'package:sqlite3/wasm.dart';
import 'package:sqlite3_web/sqlite3_web.dart';
import 'throttled_common_database.dart';

import '../protocol.dart';

Expand All @@ -18,7 +19,9 @@ base class AsyncSqliteController extends DatabaseController {

// Register any custom functions here if needed

return AsyncSqliteDatabase(database: db);
final throttled = ThrottledCommonDatabase(db);

return AsyncSqliteDatabase(database: throttled);
}

@override
Expand Down
9 changes: 8 additions & 1 deletion packages/sqlite_async/test/watch_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ void main() {
final db = await testUtils.setupDatabase(path: path);
await createTables(db);

const baseTime = 20;
const baseTime = 10;

const throttleDuration = Duration(milliseconds: baseTime);
// delay must be bigger than throttleDuration, and bigger
Expand Down Expand Up @@ -293,6 +293,13 @@ void main() {
// one event after the transaction
2
]));

// Other observed results (failure scenarios):
// [0, 0, 0]: The watch is triggered during the transaction
// and executes concurrently with the transaction.
// [0, 2, 2]: The watch is triggered during the transaction,
// but executes after the transaction (single connection).
// [0]: No updates triggered.
});
});
}

0 comments on commit 2178d69

Please sign in to comment.