Skip to content

Commit

Permalink
[Fix] CRUD upload retries (#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevensJourney authored Jun 10, 2024
1 parent f0cef67 commit 1b66145
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 38 deletions.
5 changes: 5 additions & 0 deletions .changeset/famous-teachers-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Fixed CRUD uploads which would not retry after failing until the connection status was updated. A failed CRUD operation should not change the status to `connected: false`.
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,18 @@ export abstract class AbstractStreamingSyncImplementation
}
} catch (ex) {
this.updateSyncStatus({
connected: false,
dataFlow: {
uploading: false
}
});
await this.delayRetry();
break;
if (!this.isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
}
this.logger.debug(
`Caught exception when uploading. Upload will retry after a delay. Exception: ${ex.message}`
);
} finally {
this.updateSyncStatus({
dataFlow: {
Expand Down
195 changes: 162 additions & 33 deletions packages/web/tests/stream.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import _ from 'lodash';
import Logger from 'js-logger';
import { beforeAll, describe, expect, it } from 'vitest';
import { beforeAll, describe, expect, it, vi } from 'vitest';
import { v4 as uuid } from 'uuid';
import { AbstractPowerSyncDatabase, Schema, SyncStatusOptions, TableV2, column } from '@powersync/common';
import { MockRemote, MockStreamOpenFactory, TestConnector } from './utils/MockStreamOpenFactory';

const UPLOAD_TIMEOUT_MS = 3000;

export async function waitForConnectionStatus(
db: AbstractPowerSyncDatabase,
statusCheck: SyncStatusOptions = { connected: true }
Expand All @@ -30,7 +32,9 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
* Required since we cannot extend multiple classes.
*/
const callbacks: Map<string, () => void> = new Map();
const remote = new MockRemote(new TestConnector(), () => callbacks.forEach((c) => c()));
const connector = new TestConnector();
const uploadSpy = vi.spyOn(connector, 'uploadData');
const remote = new MockRemote(connector, () => callbacks.forEach((c) => c()));

const users = new TableV2({
name: column.text
Expand All @@ -47,6 +51,8 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
enableMultiTabs: false,
useWebWorker
},
// Makes tests faster
crudUploadThrottleMs: 0,
schema
},
remote
Expand All @@ -62,69 +68,192 @@ export async function generateConnectedDatabase({ useWebWorker } = { useWebWorke
});
});

const streamOpened = waitForStream();
const connect = async () => {
const streamOpened = waitForStream();

const connectedPromise = powersync.connect(connector);

const connectedPromise = powersync.connect(new TestConnector());
await streamOpened;

await streamOpened;
remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n'));

remote.streamController?.enqueue(new TextEncoder().encode('{"token_expires_in":3426}\n'));
// Wait for connected to be true
await connectedPromise;
};

// Wait for connected to be true
await connectedPromise;
await connect();

return {
connector,
connect,
factory,
powersync,
remote,
uploadSpy,
waitForStream
};
}

describe('Stream test', () => {
describe('Streaming', () => {
/**
* Declares a test to be executed with different generated db functions
*/
const itWithGenerators = async (name: string, test: (func: () => any) => Promise<void>) => {
const itWithGenerators = async (
name: string,
test: (createConnectedDatabase: () => ReturnType<typeof generateConnectedDatabase>) => Promise<void>
) => {
const funcWithWebWorker = generateConnectedDatabase;
const funcWithoutWebWorker = () => generateConnectedDatabase({ useWebWorker: false });

it(`${name} - with web worker`, () => test(funcWithWebWorker));
it(`${name} - without web worker`, () => test(funcWithoutWebWorker));
};

describe('With Web Worker', () => {
beforeAll(() => Logger.useDefaults());
beforeAll(() => Logger.useDefaults());

itWithGenerators('PowerSync reconnect on closed stream', async (createConnectedDatabase) => {
const { powersync, waitForStream, remote } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

// Close the stream
const newStream = waitForStream();
remote.streamController?.close();

// A new stream should be requested
await newStream;

await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('PowerSync reconnect multiple connect calls', async (createConnectedDatabase) => {
// This initially performs a connect call
const { powersync, waitForStream } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

// Call connect again, a new stream should be requested
const newStream = waitForStream();
powersync.connect(new TestConnector());

itWithGenerators('PowerSync reconnect on closed stream', async (func) => {
const { powersync, waitForStream, remote } = await func();
expect(powersync.connected).toBe(true);
// A new stream should be requested
await newStream;

// Close the stream
const newStream = waitForStream();
remote.streamController?.close();
await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('Should trigger upload connector when connected', async (createConnectedDatabase) => {
const { powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

// do something which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);
// It should try and upload
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing the first check
expect(uploadSpy.mock.calls.length).equals(1);
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

// A new stream should be requested
await newStream;
await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('Should retry failed uploads when connected', async (createConnectedDatabase) => {
const { powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

await powersync.disconnectAndClear();
await powersync.close();
let uploadCounter = 0;
// This test will throw an exception a few times before uploading
const throwCounter = 2;
uploadSpy.mockImplementation(async (db) => {
if (uploadCounter++ < throwCounter) {
throw new Error('No uploads yet');
}
// Now actually do the upload
const tx = await db.getNextCrudTransaction();
await tx?.complete();
});

itWithGenerators('PowerSync reconnect multiple connect calls', async (func) => {
// This initially performs a connect call
const { powersync, waitForStream } = await func();
expect(powersync.connected).toBe(true);
// do something which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);

// Call connect again, a new stream should be requested
const newStream = waitForStream();
powersync.connect(new TestConnector());
// It should try and upload
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(uploadSpy.mock.calls.length).equals(throwCounter + 1);
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

// A new stream should be requested
await newStream;
await powersync.disconnectAndClear();
await powersync.close();
});

itWithGenerators('Should upload after reconnecting', async (createConnectedDatabase) => {
const { connect, powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

await powersync.disconnect();

// do something (offline) which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);

await connect();

// It should try and upload
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(uploadSpy.mock.calls.length).equals(1);
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

await powersync.disconnectAndClear();
await powersync.close();
});

await powersync.disconnectAndClear();
await powersync.close();
itWithGenerators('Should update status when uploading', async (createConnectedDatabase) => {
const { powersync, uploadSpy } = await createConnectedDatabase();
expect(powersync.connected).toBe(true);

let uploadStartedPromise = new Promise<void>((resolve) => {
uploadSpy.mockImplementation(async (db) => {
resolve();
// Now actually do the upload
const tx = await db.getNextCrudTransaction();
await tx?.complete();
});
});

// do something which should trigger an upload
await powersync.execute('INSERT INTO users (id, name) VALUES (uuid(), ?)', ['name']);

await uploadStartedPromise;
expect(powersync.currentStatus.dataFlowStatus.uploading).true;

// Status should update after uploads are completed
await vi.waitFor(
() => {
// to-have-been-called seems to not work after failing a check
expect(powersync.currentStatus.dataFlowStatus.uploading).false;
},
{
timeout: UPLOAD_TIMEOUT_MS
}
);

await powersync.disconnectAndClear();
await powersync.close();
});
});
15 changes: 12 additions & 3 deletions packages/web/tests/utils/MockStreamOpenFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ export class TestConnector implements PowerSyncBackendConnector {
};
}
async uploadData(database: AbstractPowerSyncDatabase): Promise<void> {
return;
const tx = await database.getNextCrudTransaction();
await tx?.complete();
}
}

Expand All @@ -49,8 +50,16 @@ export class MockRemote extends AbstractRemote {
post(path: string, data: any, headers?: Record<string, string> | undefined): Promise<any> {
throw new Error('Method not implemented.');
}
get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
throw new Error('Method not implemented.');
async get(path: string, headers?: Record<string, string> | undefined): Promise<any> {
// mock a response for write checkpoint API
if (path.includes('checkpoint')) {
return {
data: {
write_checkpoint: '1000'
}
};
}
throw new Error('Not implemented');
}
async postStreaming(
path: string,
Expand Down

0 comments on commit 1b66145

Please sign in to comment.