Skip to content

Commit

Permalink
Revert the magic (#9023)
Browse files Browse the repository at this point in the history
* Revert "Set concurrentDBOperationsLimit to 1 for Safari"

This reverts commit ebf4f35

* Revert "add clears to the retry/timeout queue (#8960)"

This reverts commit 38a3d42

* Revert "add clears to the retry/timeout queue"

This reverts commit 720c390

* Revert "Increase DB operation timeout (#8958)"

This reverts commit 501df4a

* Revert "Serialize writes (#8935)"

This reverts commit f1eead8
  • Loading branch information
justinbarry authored Aug 14, 2020
1 parent db5c9de commit 85b570f
Show file tree
Hide file tree
Showing 20 changed files with 59 additions and 256 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"fix": "gts fix"
},
"devDependencies": {
"@types/async": "3.2.3",
"@types/async": "3.0.1",
"@types/bn.js": "4.11.5",
"@types/chai": "4.2.3",
"@types/chai-as-promised": "7.1.2",
Expand Down
3 changes: 1 addition & 2 deletions packages/augur-artifacts/src/environments/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,5 @@
"trackMarketInvalidBids": true,
"fallbackProvider": "torus",
"primaryProvider" : "wallet"
},
"concurrentDBOperationsLimit": 10
}
}
5 changes: 4 additions & 1 deletion packages/augur-artifacts/src/environments/mainnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
"enabled": true,
"autoReport": false,
"createCheckpoints": true

},
"syncing": {
"enabled": true
Expand All @@ -131,6 +132,7 @@
"sdk": {
"enabled": false,
"ws": "ws://localhost:60557"

},
"server": {
"httpPort": 9003,
Expand All @@ -141,8 +143,9 @@
"startWS": true,
"wssPort": 9002,
"startWSS": true

},
"logLevel": 0,
"logLevel": 2,
"showReloadModal": false,
"averageBlocktime": 15000,
"ui": {
Expand Down
3 changes: 1 addition & 2 deletions packages/augur-artifacts/src/environments/v2.json
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,5 @@
"fallbackProvider": "torus",
"primaryProvider": "wallet"
},
"averageBlocktime": 2000,
"concurrentDBOperationsLimit": 10
"averageBlocktime": 2000
}
2 changes: 1 addition & 1 deletion packages/augur-sdk-lite/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"lz-string": "1.4.4"
},
"devDependencies": {
"@types/async": "3.2.3",
"@types/async": "3.0.1",
"@types/cors": "2.8.6",
"@types/node": "10.14.18",
"@types/node-fetch": "2.5.1",
Expand Down
4 changes: 2 additions & 2 deletions packages/augur-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
"comlink": "^4.3.0",
"cors": "2.8.5",
"cross-fetch": "3.0.4",
"dexie": "3.0.2",
"dexie": "3.0.0-rc.4",
"ethereum-types": "2.1.6",
"ethereumjs-blockstream": "7.0.0",
"ethereumjs-tx": "2.1.2",
Expand All @@ -77,7 +77,7 @@
"devDependencies": {
"@0x/mesh-browser": "9.3.0",
"@augurproject/types": "^2.1.1",
"@types/async": "3.2.3",
"@types/async": "3.0.1",
"@types/cors": "2.8.6",
"@types/dexie": "1.3.1",
"@types/express": "4.17.1",
Expand Down
6 changes: 1 addition & 5 deletions packages/augur-sdk/src/state/create-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,14 @@ export async function createServer(config: SDKConfiguration, client?: Augur): Pr
contractEvents.getEventTopics,
contractEvents.parseLogs,
);

const db = DB.createAndInitializeDB(
Number(config.networkId),
config.uploadBlockNumber,
logFilterAggregator,
client,
config.zeroX?.mesh?.enabled || config.zeroX?.rpc?.enabled,
client.config.concurrentDBOperationsLimit
config.zeroX?.mesh?.enabled || config.zeroX?.rpc?.enabled
);

await db;

if(config.warpSync?.createCheckpoints && config.warpSync?.autoReport) {
client.events.on(SubscriptionEventName.WarpSyncHashUpdated,
async ({ hash }) => {
Expand Down
119 changes: 7 additions & 112 deletions packages/augur-sdk/src/state/db/AbstractTable.ts
Original file line number Diff line number Diff line change
@@ -1,71 +1,23 @@
import { logger } from '@augurproject/utils';
import Dexie from 'dexie';
import * as _ from 'lodash';
import { AsyncQueue, queue, timeout, retry } from 'async';

export type PrimitiveID = string | number | Date;

export type ID = PrimitiveID | Array<PrimitiveID>;

export const ALL_DOCS_BATCH_SIZE = 200;

export enum WriteTaskType {
ADD,
PUT,
UPSERT,
CLEAR,
}

interface WriteQueueTask {
type: WriteTaskType;
documents?: BaseDocument[];
table: AbstractTable;
}

export interface BaseDocument {
[key: string]: any;
}

export const DEFAULT_CONCURRENCY = 10;
export abstract class AbstractTable {
table: Dexie.Table<any, any>;
protected networkId: number;
readonly dbName: string;
protected idFields: string[];
protected syncing: boolean;

static setConcurrency = (limit = DEFAULT_CONCURRENCY) => {
if(limit !== DEFAULT_CONCURRENCY) logger.info(`Setting concurrent DB write limit to ${limit}`);
AbstractTable.writeQueue = queue(
(task: WriteQueueTask, callback) => {
return retry(
{
times: 2,
interval: function (retryCount) {
return 100;
},
errorFilter: function (err) {
return err['code'] === "ETIMEDOUT";
}
}, timeout(async function WriteOperation() {
if (task.type === WriteTaskType.ADD) {
return task.table.bulkAddDocumentsInternal(task.documents);
} else if (task.type === WriteTaskType.PUT) {
return task.table.bulkPutDocumentsInternal(task.documents);
} else if (task.type === WriteTaskType.CLEAR) {
return task.table.clearInternal();
} else {
return task.table.bulkUpsertDocumentsInternal(task.documents);
}
}, 10000), callback);
},
limit
);
return AbstractTable.writeQueue;
}

private static writeQueue: AsyncQueue<WriteQueueTask> = AbstractTable.setConcurrency();

protected constructor(networkId: number, dbName: string, db: Dexie) {
this.networkId = networkId;
this.dbName = dbName;
Expand All @@ -76,14 +28,11 @@ export abstract class AbstractTable {
}

async clearDB(): Promise<void> {
console.log(`AbstractTable: clearDB request for ${this.dbName} started`);
await this.table.clear();
console.log(`AbstractTable: clearDB request for ${this.dbName} finished`);
}

// We pull all docs in batches to avoid maximum IPC message errors
async allDocs(): Promise<any[]> {
console.log(`AbstractTable: allDocs request for ${this.dbName} started`);
const results: any[] = [];
const documentCount = await this.getDocumentCount();
for (let batchIdx = 0; batchIdx * ALL_DOCS_BATCH_SIZE <= documentCount; batchIdx++) {
Expand All @@ -93,99 +42,45 @@ export abstract class AbstractTable {
.toArray();
results.push(...batchResults);
}
console.log(`AbstractTable: allDocs request for ${this.dbName} finished`);
return results;
}

async delete() {
return this.addWriteTask(WriteTaskType.CLEAR);
}

async clear() {
return this.addWriteTask(WriteTaskType.CLEAR);
await this.table.clear();
return Dexie.delete(this.dbName);
}

async getDocumentCount(): Promise<number> {
console.log(`AbstractTable: getDocumentCount request for ${this.dbName} started`);
const count = await this.table.count();
console.log(`AbstractTable: getDocumentCount request for ${this.dbName} finished`);
return count;
return this.table.count();
}

protected async getDocument<Document>(id: string): Promise<Document | undefined> {
return this.table.get(id);
}

private async addWriteTask(type: WriteTaskType, documents?: BaseDocument[]): Promise<void> {
return new Promise((resolve, reject) => {
AbstractTable.writeQueue.push({ table: this, documents, type }, (err) => {
if (err) {
console.log(`WriteOperationDied: ${JSON.stringify(err)}`);
reject(err);
}
resolve();
});
});
}

protected async bulkAddDocuments(documents: BaseDocument[]): Promise<void> {
return this.addWriteTask(WriteTaskType.ADD, documents);
}

protected async bulkPutDocuments(documents: BaseDocument[], documentIds?: any[]): Promise<void> {
return this.addWriteTask(WriteTaskType.PUT, documents);
}

protected async bulkUpsertDocuments(documents: BaseDocument[]): Promise<void> {
return this.addWriteTask(WriteTaskType.UPSERT, documents);
}

protected async clearInternal(): Promise<void> {
console.log(`AbstractTable: clear request for ${this.dbName} started`);
await this.table.clear();
console.log(`AbstractTable: clear request for ${this.dbName} finished`);
}

protected async bulkAddDocumentsInternal(documents: BaseDocument[]): Promise<void> {
console.log(`AbstractTable: bulkAddDocuments request for ${this.dbName} started. ${documents.length} docs`);
for (const document of documents) {
delete document.constructor;
}
try {
await this.table.bulkAdd(documents);
} catch(e) {
console.warn(`AbstractTable: bulkAddDocuments request for ${this.dbName} failed.`, e);
// We intentionally do not throw here so that retries on timeouts that actually succeed do not halt syncing
}
console.log(`AbstractTable: bulkAddDocuments request for ${this.dbName} finished. ${documents.length} docs`);
await this.table.bulkAdd(documents);
}

protected async bulkPutDocumentsInternal(documents: BaseDocument[], documentIds?: any[]): Promise<void> {
console.log(`AbstractTable: bulkPutDocuments request for ${this.dbName} started. ${documents.length} docs`);
protected async bulkPutDocuments(documents: BaseDocument[], documentIds?: any[]): Promise<void> {
for (const document of documents) {
delete document.constructor;
}
try {
await this.table.bulkPut(documents);
} catch(e) {
console.warn(`AbstractTable: bulkPutDocuments request for ${this.dbName} failed.`, e);
throw e;
}
await this.table.bulkPut(documents);
console.log(`AbstractTable: bulkPutDocuments request for ${this.dbName} finished. ${documents.length} docs`);
}

protected async bulkUpsertDocumentsInternal(documents: BaseDocument[]): Promise<void> {
console.log(`AbstractTable: bulkUpsertDocuments request for ${this.dbName} started. ${documents.length} docs`);
protected async bulkUpsertDocuments(documents: BaseDocument[]): Promise<void> {
const documentIds = _.map(documents, this.getIDValue.bind(this));
const existingDocuments = await this.table.bulkGet(documentIds);
let docIndex = 0;
for (const existingDocument of existingDocuments) {
existingDocuments[docIndex] = Object.assign({}, existingDocument || {}, documents[docIndex]);
docIndex++;
}
await this.bulkPutDocumentsInternal(existingDocuments, documentIds);
console.log(`AbstractTable: bulkUpsertDocuments request for ${this.dbName} finished. ${documents.length} docs`);
await this.bulkPutDocuments(existingDocuments, documentIds);
}

protected async saveDocuments(documents: BaseDocument[]): Promise<void> {
Expand Down
5 changes: 0 additions & 5 deletions packages/augur-sdk/src/state/db/BaseSyncableDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ export class BaseSyncableDB extends RollbackTable {
return super.delete();
}

async clear() {
this.db.unregisterEventListener(this.eventName, this.addNewBlock);
return super.clear();
}

protected async saveDocuments(documents: BaseDocument[]): Promise<void> {
return this.bulkAddDocuments(documents);
}
Expand Down
Loading

0 comments on commit 85b570f

Please sign in to comment.