Skip to content

Commit

Permalink
[Fix] Unhandled exception on reconnect (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney authored Apr 5, 2024
1 parent 5b4b560 commit 6c43ec6
Show file tree
Hide file tree
Showing 18 changed files with 428 additions and 149 deletions.
5 changes: 5 additions & 0 deletions .changeset/clean-moles-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-web': patch
---

Fixed shared sync broadcast logger sanitization and error handling. Added ability to disabled broadcast logging in `WebPowerSyncFlags`.
5 changes: 5 additions & 0 deletions .changeset/warm-bottles-hammer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/powersync-sdk-common': patch
---

Fixed potential unhandled exception when aborting stream fetch request for `/sync/stream` endpoint
42 changes: 21 additions & 21 deletions demos/react-native-supabase-todolist/ios/Podfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1148,18 +1148,18 @@ DEPENDENCIES:
- boost (from `../node_modules/react-native/third-party-podspecs/boost.podspec`)
- DoubleConversion (from `../node_modules/react-native/third-party-podspecs/DoubleConversion.podspec`)
- EASClient (from `../../../node_modules/expo-eas-client/ios`)
- EXConstants (from `../../../node_modules/expo-constants/ios`)
- EXConstants (from `../node_modules/expo-constants/ios`)
- EXFont (from `../../../node_modules/expo-font/ios`)
- EXJSONUtils (from `../../../node_modules/expo-json-utils/ios`)
- EXManifests (from `../../../node_modules/expo-manifests/ios`)
- Expo (from `../../../node_modules/expo`)
- Expo (from `../node_modules/expo`)
- ExpoCamera (from `../../../node_modules/expo-camera/ios`)
- ExpoFileSystem (from `../../../node_modules/expo-file-system/ios`)
- ExpoFileSystem (from `../node_modules/expo-file-system/ios`)
- ExpoHead (from `../node_modules/expo-router/ios`)
- ExpoKeepAwake (from `../../../node_modules/expo-keep-awake/ios`)
- ExpoModulesCore (from `../../../node_modules/expo-modules-core`)
- ExpoSecureStore (from `../../../node_modules/expo-secure-store/ios`)
- EXSplashScreen (from `../../../node_modules/expo-splash-screen/ios`)
- EXSplashScreen (from `../node_modules/expo-splash-screen/ios`)
- EXStructuredHeaders (from `../../../node_modules/expo-structured-headers/ios`)
- EXUpdates (from `../../../node_modules/expo-updates/ios`)
- EXUpdatesInterface (from `../../../node_modules/expo-updates-interface/ios`)
Expand Down Expand Up @@ -1192,9 +1192,9 @@ DEPENDENCIES:
- React-logger (from `../node_modules/react-native/ReactCommon/logger`)
- React-Mapbuffer (from `../node_modules/react-native/ReactCommon`)
- react-native-encrypted-storage (from `../../../node_modules/react-native-encrypted-storage`)
- react-native-get-random-values (from `../../../node_modules/react-native-get-random-values`)
- "react-native-quick-sqlite (from `../../../node_modules/@journeyapps/react-native-quick-sqlite`)"
- react-native-safe-area-context (from `../../../node_modules/react-native-safe-area-context`)
- react-native-get-random-values (from `../node_modules/react-native-get-random-values`)
- "react-native-quick-sqlite (from `../node_modules/@journeyapps/react-native-quick-sqlite`)"
- react-native-safe-area-context (from `../node_modules/react-native-safe-area-context`)
- React-nativeconfig (from `../node_modules/react-native/ReactCommon`)
- React-NativeModulesApple (from `../node_modules/react-native/ReactCommon/react/nativemodule/core/platform/ios`)
- React-perflogger (from `../node_modules/react-native/ReactCommon/reactperflogger`)
Expand All @@ -1216,9 +1216,9 @@ DEPENDENCIES:
- React-utils (from `../node_modules/react-native/ReactCommon/react/utils`)
- ReactCommon/turbomodule/core (from `../node_modules/react-native/ReactCommon`)
- "RNCMaskedView (from `../../../node_modules/@react-native-community/masked-view`)"
- RNGestureHandler (from `../../../node_modules/react-native-gesture-handler`)
- RNReanimated (from `../../../node_modules/react-native-reanimated`)
- RNScreens (from `../../../node_modules/react-native-screens`)
- RNGestureHandler (from `../node_modules/react-native-gesture-handler`)
- RNReanimated (from `../node_modules/react-native-reanimated`)
- RNScreens (from `../node_modules/react-native-screens`)
- RNVectorIcons (from `../../../node_modules/react-native-vector-icons`)
- Yoga (from `../node_modules/react-native/ReactCommon/yoga`)

Expand All @@ -1240,19 +1240,19 @@ EXTERNAL SOURCES:
EASClient:
:path: "../../../node_modules/expo-eas-client/ios"
EXConstants:
:path: "../../../node_modules/expo-constants/ios"
:path: "../node_modules/expo-constants/ios"
EXFont:
:path: "../../../node_modules/expo-font/ios"
EXJSONUtils:
:path: "../../../node_modules/expo-json-utils/ios"
EXManifests:
:path: "../../../node_modules/expo-manifests/ios"
Expo:
:path: "../../../node_modules/expo"
:path: "../node_modules/expo"
ExpoCamera:
:path: "../../../node_modules/expo-camera/ios"
ExpoFileSystem:
:path: "../../../node_modules/expo-file-system/ios"
:path: "../node_modules/expo-file-system/ios"
ExpoHead:
:path: "../node_modules/expo-router/ios"
ExpoKeepAwake:
Expand All @@ -1262,7 +1262,7 @@ EXTERNAL SOURCES:
ExpoSecureStore:
:path: "../../../node_modules/expo-secure-store/ios"
EXSplashScreen:
:path: "../../../node_modules/expo-splash-screen/ios"
:path: "../node_modules/expo-splash-screen/ios"
EXStructuredHeaders:
:path: "../../../node_modules/expo-structured-headers/ios"
EXUpdates:
Expand Down Expand Up @@ -1323,11 +1323,11 @@ EXTERNAL SOURCES:
react-native-encrypted-storage:
:path: "../../../node_modules/react-native-encrypted-storage"
react-native-get-random-values:
:path: "../../../node_modules/react-native-get-random-values"
:path: "../node_modules/react-native-get-random-values"
react-native-quick-sqlite:
:path: "../../../node_modules/@journeyapps/react-native-quick-sqlite"
:path: "../node_modules/@journeyapps/react-native-quick-sqlite"
react-native-safe-area-context:
:path: "../../../node_modules/react-native-safe-area-context"
:path: "../node_modules/react-native-safe-area-context"
React-nativeconfig:
:path: "../node_modules/react-native/ReactCommon"
React-NativeModulesApple:
Expand Down Expand Up @@ -1371,11 +1371,11 @@ EXTERNAL SOURCES:
RNCMaskedView:
:path: "../../../node_modules/@react-native-community/masked-view"
RNGestureHandler:
:path: "../../../node_modules/react-native-gesture-handler"
:path: "../node_modules/react-native-gesture-handler"
RNReanimated:
:path: "../../../node_modules/react-native-reanimated"
:path: "../node_modules/react-native-reanimated"
RNScreens:
:path: "../../../node_modules/react-native-screens"
:path: "../node_modules/react-native-screens"
RNVectorIcons:
:path: "../../../node_modules/react-native-vector-icons"
Yoga:
Expand Down Expand Up @@ -1465,4 +1465,4 @@ SPEC CHECKSUMS:

PODFILE CHECKSUM: 91f1b09fe73837e9fdaecdd06e4916926352d556

COCOAPODS: 1.15.2
COCOAPODS: 1.13.0
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB

await this.syncStreamImplementation.waitForReady();
this.syncStreamImplementation.triggerCrudUpload();
this.syncStreamImplementation.connect();
await this.syncStreamImplementation.connect();
}

/**
Expand All @@ -277,6 +277,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
* Use {@link connect} to connect again.
*/
async disconnect() {
await this.waitForReady();
await this.syncStreamImplementation?.disconnect();
this.syncStatusListenerDisposer?.();
await this.syncStreamImplementation?.dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,21 @@ import Logger, { ILogger } from 'js-logger';

import {
BucketRequest,
StreamingSyncLine,
StreamingSyncRequest,
isStreamingKeepalive,
isStreamingSyncCheckpoint,
isStreamingSyncCheckpointComplete,
isStreamingSyncCheckpointDiff,
isStreamingSyncData,
StreamingSyncLine,
StreamingSyncRequest
isStreamingSyncData
} from './streaming-sync-types';
import { AbstractRemote } from './AbstractRemote';
import ndjsonStream from 'can-ndjson-stream';
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
import { SyncDataBucket } from '../bucket/SyncDataBucket';
import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver';
import { AbortOperation } from '../../../utils/AbortOperation';

export enum LockType {
CRUD = 'crud',
Expand Down Expand Up @@ -47,6 +48,14 @@ export interface AbstractStreamingSyncImplementationOptions {
}

export interface StreamingSyncImplementationListener extends BaseListener {
/**
* Triggered whenever a status update has been attempted to be made or
* refreshed.
*/
statusUpdated?: ((statusUpdate: SyncStatusOptions) => void) | undefined;
/**
* Triggers whenever the status' members have changed in value
*/
statusChanged?: ((status: SyncStatus) => void) | undefined;
}

Expand Down Expand Up @@ -86,6 +95,7 @@ export abstract class AbstractStreamingSyncImplementation
protected options: AbstractStreamingSyncImplementationOptions;
protected abortController: AbortController | null;
protected crudUpdateListener?: () => void;
protected streamingSyncPromise?: Promise<void>;

syncStatus: SyncStatus;
triggerCrudUpload: () => void;
Expand Down Expand Up @@ -224,16 +234,53 @@ export abstract class AbstractStreamingSyncImplementation
if (this.abortController) {
await this.disconnect();
}

this.abortController = new AbortController();
this.streamingSync(this.abortController.signal);
return this.waitForStatus({ connected: true });
this.streamingSyncPromise = this.streamingSync(this.abortController.signal);

// Return a promise that resolves when the connection status is updated
return new Promise<void>((resolve) => {
const l = this.registerListener({
statusUpdated: (update) => {
// This is triggered as soon as a connection is read from
if (typeof update.connected == 'undefined') {
// only concern with connection updates
return;
}

if (update.connected == false) {
/**
* This function does not reject if initial connect attempt failed
*/
this.logger.warn('Initial connect attempt did not successfully connect to server');
}

resolve();
l();
}
});
});
}

async disconnect(): Promise<void> {
if (!this.abortController) {
throw new Error('Disconnect not possible');
return;
}

// This might be called multiple times
if (!this.abortController.signal.aborted) {
this.abortController.abort(new AbortOperation('Disconnect has been requested'));
}
this.abortController.abort('Disconnected');

// Await any pending operations before completing the disconnect operation
try {
await this.streamingSyncPromise;
} catch (ex) {
// The operation might have failed, all we care about is if it has completed
this.logger.warn(ex);
}
this.streamingSyncPromise = undefined;

this.abortController = null;
this.updateSyncStatus({ connected: false });
}
Expand Down Expand Up @@ -261,7 +308,11 @@ export abstract class AbstractStreamingSyncImplementation
let nestedAbortController = new AbortController();

signal.addEventListener('abort', () => {
nestedAbortController.abort();
/**
* A request for disconnect was received upstream. Relay the request
* to the nested abort controller.
*/
nestedAbortController.abort(signal?.reason ?? new AbortOperation('Received command to disconnect from upstream'));
this.crudUpdateListener?.();
this.crudUpdateListener = undefined;
this.updateSyncStatus({
Expand All @@ -272,34 +323,58 @@ export abstract class AbstractStreamingSyncImplementation
});
});

/**
* This loops runs until [retry] is false or the abort signal is set to aborted.
* Aborting the nestedAbortController will:
* - Abort any pending fetch requests
* - Close any sync stream ReadableStreams (which will also close any established network requests)
*/
while (true) {
try {
if (signal?.aborted) {
break;
}
const { retry } = await this.streamingSyncIteration(nestedAbortController.signal);
if (!retry) {
/**
* A sync error ocurred that we cannot recover from here.
* This loop must terminate.
* The nestedAbortController will close any open network requests and streams below.
*/
break;
}
// Continue immediately
} catch (ex) {
this.logger.error(ex);
/**
* Either:
* - A network request failed with a failed connection or not OKAY response code.
* - There was a sync processing error.
* This loop will retry.
* The nested abort controller will cleanup any open network requests and streams.
* The WebRemote should only abort pending fetch requests or close active Readable streams.
*/
if (ex instanceof AbortOperation) {
this.logger.warn(ex);
} else {
this.logger.error(ex);
}
await this.delayRetry();
} finally {
if (!signal.aborted) {
nestedAbortController.abort(new AbortOperation('Closing sync stream network requests before retry.'));
nestedAbortController = new AbortController();
}

this.updateSyncStatus({
connected: false
});

// On error, wait a little before retrying
await this.delayRetry();
} finally {
// Abort any open network requests. Create a new nested controller for retry.
nestedAbortController.abort();
nestedAbortController = new AbortController();
}
}

// Mark as disconnected if here
if (this.abortController) {
await this.disconnect();
}
this.updateSyncStatus({ connected: false });
}

protected async streamingSyncIteration(signal: AbortSignal, progress?: () => void): Promise<{ retry?: boolean }> {
Expand Down Expand Up @@ -336,15 +411,6 @@ export abstract class AbstractStreamingSyncImplementation
},
signal
)) {
// A connection is active and messages are being received
if (!this.syncStatus.connected) {
// There is a connection now
Promise.resolve().then(() => this.triggerCrudUpload());
this.updateSyncStatus({
connected: true
});
}

if (isStreamingSyncCheckpoint(line)) {
targetCheckpoint = line.checkpoint;
const bucketsToDelete = new Set<string>(bucketSet);
Expand Down Expand Up @@ -476,6 +542,14 @@ export abstract class AbstractStreamingSyncImplementation
signal?: AbortSignal
): AsyncGenerator<StreamingSyncLine> {
const body = await this.options.remote.postStreaming('/sync/stream', req, {}, signal);

// A connection is active
// There is a connection now
Promise.resolve().then(() => this.triggerCrudUpload());
this.updateSyncStatus({
connected: true
});

const stream = ndjsonStream(body);
const reader = stream.getReader();

Expand Down Expand Up @@ -505,8 +579,12 @@ export abstract class AbstractStreamingSyncImplementation

if (!this.syncStatus.isEqual(updatedStatus)) {
this.syncStatus = updatedStatus;
// Only trigger this is there was a change
this.iterateListeners((cb) => cb.statusChanged?.(updatedStatus));
}

// trigger this for all updates
this.iterateListeners((cb) => cb.statusUpdated?.(options));
}

private async delayRetry() {
Expand Down
1 change: 1 addition & 0 deletions packages/powersync-sdk-common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ export * from './db/DBAdapter';
export * from './db/Column';
export * from './db/schema/TableV2';

export * from './utils/AbortOperation';
export * from './utils/BaseObserver';
export * from './utils/strings';
Loading

0 comments on commit 6c43ec6

Please sign in to comment.