diff --git a/.changeset/famous-teachers-design.md b/.changeset/famous-teachers-design.md new file mode 100644 index 00000000..922ba1a3 --- /dev/null +++ b/.changeset/famous-teachers-design.md @@ -0,0 +1,5 @@ +--- +'@powersync/common': patch +--- + +Fixed CRUD uploads which would not retry after failing until the connection status was updated. A failed CRUD operation should not change the status to `connected: false`. diff --git a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts index 606938a1..2adfc6b5 100644 --- a/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts +++ b/packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts @@ -221,13 +221,18 @@ export abstract class AbstractStreamingSyncImplementation } } catch (ex) { this.updateSyncStatus({ - connected: false, dataFlow: { uploading: false } }); await this.delayRetry(); - break; + if (!this.isConnected) { + // Exit the upload loop if the sync stream is no longer connected + break; + } + this.logger.debug( + `Caught exception when uploading. Upload will retry after a delay. Exception: ${ex.message}` + ); } finally { this.updateSyncStatus({ dataFlow: { diff --git a/packages/web/tests/stream.test.ts b/packages/web/tests/stream.test.ts index 46530c9f..e5041f76 100644 --- a/packages/web/tests/stream.test.ts +++ b/packages/web/tests/stream.test.ts @@ -1,10 +1,12 @@ import _ from 'lodash'; import Logger from 'js-logger'; -import { beforeAll, describe, expect, it } from 'vitest'; +import { beforeAll, describe, expect, it, vi } from 'vitest'; import { v4 as uuid } from 'uuid'; import { AbstractPowerSyncDatabase, Schema, SyncStatusOptions, TableV2, column } from '@powersync/common'; import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory'; +const UPLOAD_TIMEOUT_MS = 3000; + export async function waitForConnectionStatus( db: AbstractPowerSyncDatabase, statusCheck: SyncStatusOptions = { connected: true } @@ -30,7 +32,9 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke * Required since we cannot extend multiple classes. */ const callbacks: Map void> = new Map(); - const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c())); + const connector = new TestConnector(); + const uploadSpy = vi.spyOn(connector, 'uploadData'); + const remote = new MockRemote(connector, () => callbacks.forEach((c) => c())); const users = new TableV2({ name: column.text @@ -47,6 +51,8 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke enableMultiTabs: false, useWebWorker }, + // Makes tests faster + crudUploadThrottleMs: 0, schema }, remote @@ -62,30 +68,40 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke }); }); - const streamOpened = waitForStream(); + const connect = async () => { + const streamOpened = waitForStream(); + + const connectedPromise = powersync.connect(connector); - const connectedPromise = powersync.connect(new TestConnector()); + await streamOpened; - await streamOpened; + remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n')); - remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n')); + // Wait for connected to be true + await connectedPromise; + }; - // Wait for connected to be true - await connectedPromise; + await connect(); return { + connector, + connect, factory, powersync, remote, + uploadSpy, waitForStream }; } -describe('Stream test', () => { +describe('Streaming', () => { /** * Declares a test to be executed with different generated db functions */ - const itWithGenerators = async (name: string, test: (func: () => any) => Promise) => { + const itWithGenerators = async ( + name: string, + test: (createConnectedDatabase: () => ReturnType) => Promise + ) => { const funcWithWebWorker = generateConnectedDatabase; const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false }); @@ -93,38 +109,151 @@ describe('Stream test', () => { it(`${name} - without web worker`, () => test(funcWithoutWebWorker)); }; - describe('With Web Worker', () => { - beforeAll(() => Logger.useDefaults()); + beforeAll(() => Logger.useDefaults()); + + itWithGenerators('PowerSync reconnect on closed stream', async (createConnectedDatabase) => { + const { powersync, waitForStream, remote } = await createConnectedDatabase(); + expect(powersync.connected).toBe(true); + + // Close the stream + const newStream = waitForStream(); + remote.streamController?.close(); + + // A new stream should be requested + await newStream; + + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + itWithGenerators('PowerSync reconnect multiple connect calls', async (createConnectedDatabase) => { + // This initially performs a connect call + const { powersync, waitForStream } = await createConnectedDatabase(); + expect(powersync.connected).toBe(true); + + // Call connect again, a new stream should be requested + const newStream = waitForStream(); + powersync.connect(new TestConnector()); - itWithGenerators('PowerSync reconnect on closed stream', async (func) => { - const { powersync, waitForStream, remote } = await func(); - expect(powersync.connected).toBe(true); + // A new stream should be requested + await newStream; - // Close the stream - const newStream = waitForStream(); - remote.streamController?.close(); + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + itWithGenerators('Should trigger upload connector when connected', async (createConnectedDatabase) => { + const { powersync, uploadSpy } = await createConnectedDatabase(); + expect(powersync.connected).toBe(true); + + // do something which should trigger an upload + await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']); + // It should try and upload + await vi.waitFor( + () => { + // to-have-been-called seems to not work after failing the first check + expect(uploadSpy.mock.calls.length).equals(1); + }, + { + timeout: UPLOAD_TIMEOUT_MS + } + ); - // A new stream should be requested - await newStream; + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + itWithGenerators('Should retry failed uploads when connected', async (createConnectedDatabase) => { + const { powersync, uploadSpy } = await createConnectedDatabase(); + expect(powersync.connected).toBe(true); - await powersync.disconnectAndClear(); - await powersync.close(); + let uploadCounter = 0; + // This test will throw an exception a few times before uploading + const throwCounter = 2; + uploadSpy.mockImplementation(async (db) => { + if (uploadCounter++ < throwCounter) { + throw new Error('No uploads yet'); + } + // Now actually do the upload + const tx = await db.getNextCrudTransaction(); + await tx?.complete(); }); - itWithGenerators('PowerSync reconnect multiple connect calls', async (func) => { - // This initially performs a connect call - const { powersync, waitForStream } = await func(); - expect(powersync.connected).toBe(true); + // do something which should trigger an upload + await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']); - // Call connect again, a new stream should be requested - const newStream = waitForStream(); - powersync.connect(new TestConnector()); + // It should try and upload + await vi.waitFor( + () => { + // to-have-been-called seems to not work after failing a check + expect(uploadSpy.mock.calls.length).equals(throwCounter + 1); + }, + { + timeout: UPLOAD_TIMEOUT_MS + } + ); - // A new stream should be requested - await newStream; + await powersync.disconnectAndClear(); + await powersync.close(); + }); + + itWithGenerators('Should upload after reconnecting', async (createConnectedDatabase) => { + const { connect, powersync, uploadSpy } = await createConnectedDatabase(); + expect(powersync.connected).toBe(true); + + await powersync.disconnect(); + + // do something (offline) which should trigger an upload + await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']); + + await connect(); + + // It should try and upload + await vi.waitFor( + () => { + // to-have-been-called seems to not work after failing a check + expect(uploadSpy.mock.calls.length).equals(1); + }, + { + timeout: UPLOAD_TIMEOUT_MS + } + ); + + await powersync.disconnectAndClear(); + await powersync.close(); + }); - await powersync.disconnectAndClear(); - await powersync.close(); + itWithGenerators('Should update status when uploading', async (createConnectedDatabase) => { + const { powersync, uploadSpy } = await createConnectedDatabase(); + expect(powersync.connected).toBe(true); + + let uploadStartedPromise = new Promise((resolve) => { + uploadSpy.mockImplementation(async (db) => { + resolve(); + // Now actually do the upload + const tx = await db.getNextCrudTransaction(); + await tx?.complete(); + }); }); + + // do something which should trigger an upload + await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']); + + await uploadStartedPromise; + expect(powersync.currentStatus.dataFlowStatus.uploading).true; + + // Status should update after uploads are completed + await vi.waitFor( + () => { + // to-have-been-called seems to not work after failing a check + expect(powersync.currentStatus.dataFlowStatus.uploading).false; + }, + { + timeout: UPLOAD_TIMEOUT_MS + } + ); + + await powersync.disconnectAndClear(); + await powersync.close(); }); }); diff --git a/packages/web/tests/utils/MockStreamOpenFactory.ts b/packages/web/tests/utils/MockStreamOpenFactory.ts index 42660b33..b3eab3e2 100644 --- a/packages/web/tests/utils/MockStreamOpenFactory.ts +++ b/packages/web/tests/utils/MockStreamOpenFactory.ts @@ -27,7 +27,8 @@ export class TestConnector implements PowerSyncBackendConnector { }; } async uploadData(database: AbstractPowerSyncDatabase): Promise { - return; + const tx = await database.getNextCrudTransaction(); + await tx?.complete(); } } @@ -49,8 +50,16 @@ export class MockRemote extends AbstractRemote { post(path: string, data: any, headers?: Record | undefined): Promise { throw new Error('Method not implemented.'); } - get(path: string, headers?: Record | undefined): Promise { - throw new Error('Method not implemented.'); + async get(path: string, headers?: Record | undefined): Promise { + // mock a response for write checkpoint API + if (path.includes('checkpoint')) { + return { + data: { + write_checkpoint: '1000' + } + }; + } + throw new Error('Not implemented'); } async postStreaming( path: string,