Skip to content

Commit

Permalink
avro streams
Browse files Browse the repository at this point in the history
pipelines

clean code

restore

cr
  • Loading branch information
AlexandrMov committed Sep 18, 2023
1 parent 49a330e commit 9c158bc
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 25 deletions.
20 changes: 20 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Debug Nest Framework",
"runtimeExecutable": "yarn",
"runtimeArgs": [
"start:debug",
],
"cwd": "${workspaceFolder}/src",
"autoAttachChildProcesses": true,
"restart": true,
"sourceMaps": true,
"stopOnEntry": false,
"console": "integratedTerminal",
}
]
}
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"ethers": "^5.5.4",
"fastify-swagger": "^4.13.1",
"jsonstream": "^1.0.3",
"pg-query-stream": "^4.5.3",
"prom-client": "^14.0.1",
"reflect-metadata": "^0.1.13",
"rimraf": "^3.0.2",
Expand All @@ -81,6 +82,7 @@
"@nestjs/testing": "^8.2.5",
"@types/cache-manager": "^3.4.2",
"@types/jest": "^27.4.0",
"@types/jsonstream": "^0.8.31",
"@types/node": "^17.0.9",
"@types/supertest": "^2.0.11",
"@typescript-eslint/eslint-plugin": "^5.10.0",
Expand Down
1 change: 1 addition & 0 deletions src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import { StakingRouterModule } from 'staking-router-modules';
autoLoadEntities: false,
cache: { enabled: false },
debug: false,
safe: true,
registerRequestContext: true,
allowGlobalContext: false,
};
Expand Down
9 changes: 9 additions & 0 deletions src/common/registry/storage/key.storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ export class RegistryKeyStorageService {
return await this.repository.find(where, options);
}

findStream(where: FilterQuery<RegistryKey>, fields?: string[]): AsyncIterable<RegistryKey> {
const knex = this.repository.getKnex();
return knex
.select(fields || '*')
.from<RegistryKey>('registry_key')
.where(where)
.stream();
}

/** find all keys */
async findAll(moduleAddress: string): Promise<RegistryKey[]> {
return await this.repository.find(
Expand Down
9 changes: 9 additions & 0 deletions src/common/registry/storage/operator.storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ export class RegistryOperatorStorageService {
return await this.repository.find(where, options);
}

findStream(where: FilterQuery<RegistryOperator>, fields?: string[]): AsyncIterable<RegistryOperator> {
const knex = this.repository.getKnex();
return knex
.select(fields || '*')
.from<RegistryOperator>('registry_operator')
.where(where)
.stream();
}

/** find all operators */
async findAll(moduleAddress: string): Promise<RegistryOperator[]> {
return await this.repository.find(
Expand Down
1 change: 1 addition & 0 deletions src/common/streams/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './streamify';
30 changes: 30 additions & 0 deletions src/common/streams/streamify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Readable, ReadableOptions } from 'stream';

class GeneratorToStream<GeneratorResultType> extends Readable {
constructor(options: ReadableOptions, protected readonly generator: AsyncGenerator<GeneratorResultType>) {
super(options);
}

_read() {
try {
this.generator
.next()
.then((result) => {
if (!result.done) {
this.push(result.value);
} else {
this.push(null);
}
})
.catch((e) => {
this.emit('error', e);
});
} catch (e) {
this.emit('error', e);
}
}
}

export function streamify<GeneratorResultType>(generator: AsyncGenerator<GeneratorResultType>) {
return new GeneratorToStream<GeneratorResultType>({ objectMode: true }, generator);
}
2 changes: 1 addition & 1 deletion src/http/keys/keys.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export class KeysController {
try {
for (const keysGenerator of keysGenerators) {
for await (const keysBatch of keysGenerator) {
jsonStream.write(keysBatch);
jsonStream.write(JSON.stringify(keysBatch));
}
}
} finally {
Expand Down
2 changes: 1 addition & 1 deletion src/http/sr-modules-keys/sr-modules-keys.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class SRModulesKeysController {
reply.type('application/json').send(jsonStream);

for await (const keysBatch of keysGenerator) {
jsonStream.write(keysBatch);
jsonStream.write(JSON.stringify(keysBatch));
}

jsonStream.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,32 @@ export class SRModuleOperatorsKeysResponse {
})
meta!: ELMeta;
}

export class SRModulesOperatorsKeysStreamResponse {
@ApiProperty({
type: 'array',
items: { oneOf: [{ $ref: getSchemaPath(CuratedOperator) }] },
description: 'Operators of staking router module',
})
operators?: SRModuleOperator[];

@ApiProperty({
type: 'array',
items: { oneOf: [{ $ref: getSchemaPath(CuratedKey) }] },
description: 'Keys of staking router module',
})
keys?: SRModuleKey[];

@ApiProperty({
type: 'array',
items: { oneOf: [{ $ref: getSchemaPath(SRModule) }] },
description: 'List of Staking Router',
})
modules?: SRModule[];

@ApiProperty({
nullable: true,
type: () => ELMeta,
})
meta?: ELMeta;
}
1 change: 1 addition & 0 deletions src/http/sr-modules-operators-keys/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './sr-modules-operators-keys.module';
export * from './sr-modules-operators-keys.controller';
export * from './sr-modules-operators-keys.service';
export * from './sr-modules-operators-keys.types';
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { pipeline } from 'node:stream/promises';
import { IsolationLevel } from '@mikro-orm/core';
import { Controller, Get, Version, Param, Query, NotFoundException, HttpStatus, Res } from '@nestjs/common';
import { ApiOperation, ApiResponse, ApiTags, ApiParam, ApiNotFoundResponse } from '@nestjs/swagger';
import { SRModuleOperatorsKeysResponse } from './entities';
import { SRModuleOperatorsKeysResponse, SRModulesOperatorsKeysStreamResponse } from './entities';
import { ModuleId, KeyQuery } from 'http/common/entities/';
import { SRModulesOperatorsKeysService } from './sr-modules-operators-keys.service';
import { TooEarlyResponse } from 'http/common/entities/http-exceptions';
import { EntityManager } from '@mikro-orm/knex';
import * as JSONStream from 'jsonstream';
import type { FastifyReply } from 'fastify';
import { IsolationLevel } from '@mikro-orm/core';
import { streamify } from 'common/streams';

@Controller('/modules')
@ApiTags('operators-keys')
Expand Down Expand Up @@ -63,12 +65,36 @@ export class SRModulesOperatorsKeysController {
reply.type('application/json').send(jsonStream);

for await (const keysBatch of keysGenerator) {
jsonStream.write(keysBatch);
jsonStream.write(JSON.stringify(keysBatch));
}

jsonStream.end();
},
{ isolationLevel: IsolationLevel.REPEATABLE_READ },
);
}

@Version('2')
@ApiOperation({ summary: 'Comprehensive stream for staking router modules, operators and their keys' })
@ApiResponse({
status: 200,
description: 'Stream of all SR modules, operators and keys',
type: SRModulesOperatorsKeysStreamResponse,
})
@ApiResponse({
status: 425,
description: 'Meta has not exist yet, maybe data was not written in db yet',
type: TooEarlyResponse,
})
@Get('operators/keys')
async getModulesOperatorsKeysStream(@Res() reply: FastifyReply) {
const jsonStream = JSONStream.stringify();

reply.type('application/json').send(jsonStream);

await this.entityManager.transactional(
() => pipeline([streamify(this.srModulesOperatorsKeys.getModulesOperatorsKeysGenerator()), jsonStream]),
{ isolationLevel: IsolationLevel.REPEATABLE_READ },
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import { LOGGER_PROVIDER } from '@lido-nestjs/logger';
import { StakingRouterService } from 'staking-router-modules/staking-router.service';
import { KeyEntity, OperatorEntity } from 'staking-router-modules/interfaces/staking-module.interface';
import { EntityManager } from '@mikro-orm/knex';
import { MetaStreamRecord, ModulesOperatorsKeysRecord } from './sr-modules-operators-keys.types';

@Injectable()
export class SRModulesOperatorsKeysService {
constructor(
@Inject(LOGGER_PROVIDER) protected readonly logger: LoggerService,
protected readonly configService: ConfigService,
protected stakingRouterService: StakingRouterService,
protected readonly entityManager: EntityManager,
protected readonly stakingRouterService: StakingRouterService,
readonly entityManager: EntityManager,
) {}

public async get(
Expand All @@ -29,13 +30,49 @@ export class SRModulesOperatorsKeysService {

const moduleInstance = this.stakingRouterService.getStakingRouterModuleImpl(module.type);

const keysGenerator: AsyncGenerator<KeyEntity> = await moduleInstance.getKeysStream(
module.stakingModuleAddress,
filters,
);
const keysGenerator: AsyncGenerator<KeyEntity> = moduleInstance.getKeysStream(module.stakingModuleAddress, filters);
const operatorsFilter = filters.operatorIndex ? { index: filters.operatorIndex } : {};
const operators: OperatorEntity[] = await moduleInstance.getOperators(module.stakingModuleAddress, operatorsFilter);

return { operators, keysGenerator, module, meta: { elBlockSnapshot } };
}

public async *getModulesOperatorsKeysGenerator(): AsyncGenerator<ModulesOperatorsKeysRecord> {
const { stakingModules, elBlockSnapshot } = await this.stakingRouterService.getStakingModulesAndMeta();

const meta: MetaStreamRecord = { elBlockSnapshot };
for (const stakingModule of stakingModules) {
const moduleInstance = this.stakingRouterService.getStakingRouterModuleImpl(stakingModule.type);

const keysGenerator = moduleInstance.getKeysStream(stakingModule.stakingModuleAddress, {});
const operatorsGenerator = moduleInstance.getOperatorsStream(stakingModule.stakingModuleAddress, {});

let nextKey = await keysGenerator.next();
let nextOperator = await operatorsGenerator.next();

yield {
stakingModule,
meta,
key: nextKey.value || null,
operator: nextOperator.value || null,
};

do {
if (!nextKey.done) {
nextKey = await keysGenerator.next();
}

if (!nextOperator.done) {
nextOperator = await operatorsGenerator.next();
}

yield {
stakingModule: null,
meta: null,
key: nextKey.value || null,
operator: nextOperator.value || null,
};
} while (!nextKey.done || !nextOperator.done);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { ELBlockSnapshot } from 'http/common/entities';
import { KeyEntity, OperatorEntity } from 'staking-router-modules/interfaces/staking-module.interface';
import { SrModuleEntity } from 'storage/sr-module.entity';

export type MetaStreamRecord = { elBlockSnapshot: ELBlockSnapshot } | null;

export type ModulesOperatorsKeysRecord = {
stakingModule: SrModuleEntity | null;
key: KeyEntity | null;
operator: OperatorEntity | null;
meta: MetaStreamRecord;
};
50 changes: 36 additions & 14 deletions src/staking-router-modules/curated-module.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,29 +56,51 @@ export class CuratedModuleService implements StakingModuleInterface {
public async *getKeysStream(contractAddress: string, filters: KeysFilter): AsyncGenerator<RegistryKey> {
const where = {};
if (filters.operatorIndex != undefined) {
where['operatorIndex'] = filters.operatorIndex;
where['operator_index'] = filters.operatorIndex;
}

if (filters.used != undefined) {
where['used'] = filters.used;
}

where['moduleAddress'] = contractAddress;

const batchSize = 10000;
let offset = 0;
where['module_address'] = contractAddress;

while (true) {
const chunk = await this.keyStorageService.find(where, { limit: batchSize, offset });
if (chunk.length === 0) {
break;
}
const keyStream = this.keyStorageService.findStream(where, [
'index',
'operator_index as operatorIndex',
'key',
'deposit_signature as depositSignature',
'used',
'module_address as moduleAddress',
]);

offset += batchSize;
for await (const record of keyStream) {
yield record;
}
}

for (const record of chunk) {
yield record;
}
public async *getOperatorsStream(moduleAddress: string, filters?: OperatorsFilter): AsyncGenerator<RegistryOperator> {
const where = {};
if (filters?.index != undefined) {
where['index'] = filters.index;
}
// we store operators of modules with the same impl at the same table
where['module_address'] = moduleAddress;

const operatorStream = this.operatorStorageService.findStream(where, [
'index',
'active',
'name',
'reward_address as rewardAddress',
'staking_limit as stakingLimit',
'stopped_validators as stoppedValidators',
'total_signing_keys as totalSigningKeys',
'used_signing_keys as usedSigningKeys',
'module_address as moduleAddress',
]);

for await (const record of operatorStream) {
yield record;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ export interface StakingModuleInterface {

getOperators(moduleAddress: string, filters?: OperatorsFilter): Promise<OperatorEntity[]>;

getOperatorsStream(moduleAddress: string, filters?: OperatorsFilter): AsyncGenerator<OperatorEntity>;

getOperator(moduleAddress: string, index: number): Promise<OperatorEntity | null>;

getCurrentNonce(moduleAddress: string, blockHash: string): Promise<number>;
Expand Down
Loading

0 comments on commit 9c158bc

Please sign in to comment.