From 2471e3f185c3a90db7d6589cc0e004532e72e741 Mon Sep 17 00:00:00 2001 From: Eddort Date: Tue, 19 Dec 2023 13:54:34 +0100 Subject: [PATCH] feat: new indexing logic --- .../execution-provider.service.ts | 9 ++ src/common/registry/fetch/operator.fetch.ts | 6 +- src/common/registry/main/abstract-registry.ts | 21 +-- src/common/registry/main/constants.ts | 1 + .../registry/storage/operator.entity.ts | 4 + src/jobs/keys-update/keys-update.service.ts | 141 +++++++++++++++--- src/storage/el-meta.entity.ts | 4 + src/storage/el-meta.storage.ts | 8 +- src/storage/sr-module.entity.ts | 7 +- src/storage/sr-module.storage.ts | 4 +- 10 files changed, 158 insertions(+), 47 deletions(-) diff --git a/src/common/execution-provider/execution-provider.service.ts b/src/common/execution-provider/execution-provider.service.ts index 45b1e074..e84eafaa 100644 --- a/src/common/execution-provider/execution-provider.service.ts +++ b/src/common/execution-provider/execution-provider.service.ts @@ -46,4 +46,13 @@ export class ExecutionProviderService { const block = await this.provider.getBlock(blockHashOrBlockTag); return { number: block.number, hash: block.hash, timestamp: block.timestamp }; } + + /** + * + * Returns full block info + */ + public async getFullBlock(blockHashOrBlockTag: number | string) { + const block = await this.provider.getBlock(blockHashOrBlockTag); + return block; + } } diff --git a/src/common/registry/fetch/operator.fetch.ts b/src/common/registry/fetch/operator.fetch.ts index 673d82f4..223e7090 100644 --- a/src/common/registry/fetch/operator.fetch.ts +++ b/src/common/registry/fetch/operator.fetch.ts @@ -72,8 +72,10 @@ export class RegistryOperatorFetchService { overrides: CallOverrides = {}, ): Promise { const fullInfo = true; - const operator = await this.getContract(moduleAddress).getNodeOperator(operatorIndex, fullInfo, overrides as any); - const finalizedOperator = await this.getContract(moduleAddress).getNodeOperator(operatorIndex, fullInfo, { + const contract = this.getContract(moduleAddress); + + const operator = await contract.getNodeOperator(operatorIndex, fullInfo, overrides as any); + const finalizedOperator = await contract.getNodeOperator(operatorIndex, fullInfo, { blockTag: 'finalized', }); diff --git a/src/common/registry/main/abstract-registry.ts b/src/common/registry/main/abstract-registry.ts index d5abac36..e0c3fc2b 100644 --- a/src/common/registry/main/abstract-registry.ts +++ b/src/common/registry/main/abstract-registry.ts @@ -87,26 +87,7 @@ export abstract class AbstractRegistryService { } // check how big the difference between the blocks is, if it exceeds, we should update the state anyway if (toBlockNumber - fromBlockNumber > BLOCKS_OVERLAP) return true; - - return await this.operatorFetch.operatorsWereChanged(moduleAddress, fromBlockNumber, toBlockNumber); - } - - /** - * - * @param moduleAddress contract address - * @returns Check if operators have been changed - */ - public async keysWereChanged( - moduleAddress: string, - fromBlockNumber: number, - toBlockNumber: number, - ): Promise { - if (fromBlockNumber > toBlockNumber) { - throw new Error(`invalid blocks range: ${fromBlockNumber} (fromBlockNumber) > ${toBlockNumber} (toBlockNumber)`); - } - // check how big the difference between the blocks is, if it exceeds, we should update the state anyway - if (toBlockNumber - fromBlockNumber > BLOCKS_OVERLAP) return true; - + // rename return await this.operatorFetch.operatorsWereChanged(moduleAddress, fromBlockNumber, toBlockNumber); } diff --git a/src/common/registry/main/constants.ts b/src/common/registry/main/constants.ts index 17883f7b..8a6fcc14 100644 --- a/src/common/registry/main/constants.ts +++ b/src/common/registry/main/constants.ts @@ -1 +1,2 @@ export const REGISTRY_GLOBAL_OPTIONS_TOKEN = Symbol('registryGlobalOptions'); +export const BLOCKS_OVERLAP = 120; diff --git a/src/common/registry/storage/operator.entity.ts b/src/common/registry/storage/operator.entity.ts index 2538a588..1074a491 100644 --- a/src/common/registry/storage/operator.entity.ts +++ b/src/common/registry/storage/operator.entity.ts @@ -17,6 +17,7 @@ export class RegistryOperator { this.totalSigningKeys = operator.totalSigningKeys; this.usedSigningKeys = operator.usedSigningKeys; this.moduleAddress = operator.moduleAddress; + this.finalizedUsedSigningKeys = operator.finalizedUsedSigningKeys; } @PrimaryKey() @@ -46,4 +47,7 @@ export class RegistryOperator { @PrimaryKey() @Property({ length: ADDRESS_LEN }) moduleAddress!: string; + + @Property() + finalizedUsedSigningKeys!: number; } diff --git a/src/jobs/keys-update/keys-update.service.ts b/src/jobs/keys-update/keys-update.service.ts index 2f5075b4..33388dba 100644 --- a/src/jobs/keys-update/keys-update.service.ts +++ b/src/jobs/keys-update/keys-update.service.ts @@ -1,4 +1,5 @@ import { Inject, Injectable } from '@nestjs/common'; +import { range } from '@lido-nestjs/utils'; import { LOGGER_PROVIDER, LoggerService } from 'common/logger'; import { ConfigService } from 'common/config'; import { JobService } from 'common/job'; @@ -15,6 +16,8 @@ import { PrometheusService } from 'common/prometheus'; import { SrModuleEntity } from 'storage/sr-module.entity'; import { StakingModule } from 'staking-router-modules/interfaces/staking-module.interface'; +const MAX_BLOCKS_OVERLAP = 30; + class KeyOutdatedError extends Error { lastBlock: number; @@ -115,10 +118,17 @@ export class KeysUpdateService { // read from database last execution layer data const prevElMeta = await this.elMetaStorage.get(); + // handle the situation when the node has fallen behind the service state if (prevElMeta && prevElMeta?.blockNumber > currElMeta.number) { this.logger.warn('Previous data is newer than current data', prevElMeta); return; } + + if (prevElMeta?.blockHash && prevElMeta.blockHash === currElMeta.hash) { + this.logger.debug?.('same state, skip', { prevElMeta, currElMeta }); + return; + } + // Get modules from storage const storageModules = await this.srModulesStorage.findAll(); // Get staking modules from SR contract @@ -133,44 +143,101 @@ export class KeysUpdateService { await this.entityManager.transactional( async () => { - // Update EL meta in db - await this.elMetaStorage.update(currElMeta); + const prevBlockHash = prevElMeta?.blockHash; + const currentBlockHash = currElMeta.hash; + + let lastChangedBlockHash = prevBlockHash || currentBlockHash; + + let isReorgDetected = false; for (const contractModule of contractModules) { + const { stakingModuleAddress } = contractModule; + // Find implementation for staking module const moduleInstance = this.stakingRouterService.getStakingRouterModuleImpl(contractModule.type); // Read current nonce from contract - const currNonce = await moduleInstance.getCurrentNonce(contractModule.stakingModuleAddress, currElMeta.hash); + const currNonce = await moduleInstance.getCurrentNonce(stakingModuleAddress, currentBlockHash); // Read module in storage const moduleInStorage = await this.srModulesStorage.findOneById(contractModule.moduleId); const prevNonce = moduleInStorage?.nonce; - // update staking module information - await this.srModulesStorage.upsert(contractModule, currNonce); this.logger.log(`Nonce previous value: ${prevNonce}, nonce current value: ${currNonce}`); - if (prevNonce === currNonce) { - this.logger.log("Nonce wasn't changed, no need to update keys"); - // case when prevELMeta is undefined but prevNonce === currNonce looks like invalid - // use here prevElMeta.blockNumber + 1 because operators were updated in database for prevElMeta.blockNumber block - if ( - prevElMeta && - prevElMeta.blockNumber < currElMeta.number && - (await moduleInstance.operatorsWereChanged( - contractModule.stakingModuleAddress, - prevElMeta.blockNumber + 1, - currElMeta.number, - )) - ) { - this.logger.log('Update events happened, need to update operators'); - await moduleInstance.updateOperators(contractModule.stakingModuleAddress, currElMeta.hash); - } + if (!prevElMeta) { + this.logger.log('No past state found, start indexing', { stakingModuleAddress, currentBlockHash }); + + await moduleInstance.update(stakingModuleAddress, currentBlockHash); + await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash); + lastChangedBlockHash = currentBlockHash; + continue; + } + + if (prevNonce !== currNonce) { + this.logger.log('Nonce has been changed, start indexing', { + stakingModuleAddress, + currentBlockHash, + prevNonce, + currNonce, + }); + + await moduleInstance.update(stakingModuleAddress, currentBlockHash); + await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash); + lastChangedBlockHash = currentBlockHash; + continue; + } + if (this.isTooMuchDiffBetweenBlocks(prevElMeta.blockNumber, currElMeta.number)) { + this.logger.log('Too much difference between the blocks, start indexing', { + stakingModuleAddress, + currentBlockHash, + }); + + await moduleInstance.update(stakingModuleAddress, currentBlockHash); + await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash); + lastChangedBlockHash = currentBlockHash; continue; } - await moduleInstance.update(contractModule.stakingModuleAddress, currElMeta.hash); + // calculate once per iteration + // no need to recheck each module separately + isReorgDetected = isReorgDetected ? true : await this.isReorgDetected(prevElMeta.blockHash, currentBlockHash); + + if (isReorgDetected) { + this.logger.log('Reorg detected, start indexing', { stakingModuleAddress, currentBlockHash }); + + await moduleInstance.update(stakingModuleAddress, currentBlockHash); + await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash); + lastChangedBlockHash = currentBlockHash; + continue; + } + + if ( + prevElMeta.blockNumber < currElMeta.number && + (await moduleInstance.operatorsWereChanged( + contractModule.stakingModuleAddress, + prevElMeta.blockNumber + 1, + currElMeta.number, + )) + ) { + this.logger.log('Update operator events happened, need to update operators', { + stakingModuleAddress, + currentBlockHash, + }); + + await moduleInstance.updateOperators(stakingModuleAddress, currentBlockHash); + await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash); + lastChangedBlockHash = currentBlockHash; + continue; + } + + this.logger.log('No changes have been detected in the module, indexing is not required', { + stakingModuleAddress, + currentBlockHash, + }); } + + // Update EL meta in db + await this.elMetaStorage.update({ ...currElMeta, lastChangedBlockHash }); }, { isolationLevel: IsolationLevel.READ_COMMITTED }, ); @@ -178,6 +245,36 @@ export class KeysUpdateService { return currElMeta; } + public async isReorgDetected(prevBlockHash: string, currentBlockHash: string) { + const currentBlock = await this.executionProvider.getFullBlock(currentBlockHash); + const prevBlock = await this.executionProvider.getFullBlock(prevBlockHash); + + if (currentBlock.parentHash === prevBlock.hash) return false; + // TODO: different hash but same number + if (currentBlock.number === prevBlock.number) return true; + + const blocks = await Promise.all( + range(prevBlock.number, currentBlock.number).map(async (bNumber) => { + return await this.executionProvider.getFullBlock(bNumber); + }), + ); + + for (let i = 1; i < blocks.length; i++) { + const previousBlock = blocks[i - 1]; + const currentBlock = blocks[i]; + + if (currentBlock.parentHash !== previousBlock.hash) { + return false; + } + } + + return true; + } + + public isTooMuchDiffBetweenBlocks(prevBlockNumber: number, currentBlockNumber: number) { + return currentBlockNumber - prevBlockNumber >= MAX_BLOCKS_OVERLAP; + } + /** * Update prometheus metrics of staking modules */ diff --git a/src/storage/el-meta.entity.ts b/src/storage/el-meta.entity.ts index 693e1c5c..49c82f21 100644 --- a/src/storage/el-meta.entity.ts +++ b/src/storage/el-meta.entity.ts @@ -11,6 +11,7 @@ export class ElMetaEntity { this.blockNumber = meta.blockNumber; this.blockHash = meta.blockHash.toLocaleLowerCase(); this.timestamp = meta.timestamp; + this.lastChangedBlockHash = meta.lastChangedBlockHash; } @PrimaryKey() @@ -22,4 +23,7 @@ export class ElMetaEntity { @Property() timestamp: number; + + @Property({ length: BLOCK_HASH_LEN }) + lastChangedBlockHash: string; } diff --git a/src/storage/el-meta.storage.ts b/src/storage/el-meta.storage.ts index 4407ae41..1a022bdf 100644 --- a/src/storage/el-meta.storage.ts +++ b/src/storage/el-meta.storage.ts @@ -12,13 +12,19 @@ export class ElMetaStorageService { return result[0] ?? null; } - async update(currElMeta: { number: number; hash: string; timestamp: number }): Promise { + async update(currElMeta: { + number: number; + hash: string; + timestamp: number; + lastChangedBlockHash: string; + }): Promise { await this.repository.nativeDelete({}); await this.repository.persist( new ElMetaEntity({ blockHash: currElMeta.hash, blockNumber: currElMeta.number, timestamp: currElMeta.timestamp, + lastChangedBlockHash: currElMeta.lastChangedBlockHash, }), ); await this.repository.flush(); diff --git a/src/storage/sr-module.entity.ts b/src/storage/sr-module.entity.ts index 2e23bc24..888bd4b8 100644 --- a/src/storage/sr-module.entity.ts +++ b/src/storage/sr-module.entity.ts @@ -7,7 +7,7 @@ import { SRModuleRepository } from './sr-module.repository'; export class SrModuleEntity implements StakingModule { [EntityRepositoryType]?: SRModuleRepository; - constructor(srModule: StakingModule, nonce: number) { + constructor(srModule: StakingModule, nonce: number, lastChangedBlockHash: string) { this.moduleId = srModule.moduleId; this.stakingModuleAddress = srModule.stakingModuleAddress; this.moduleFee = srModule.moduleFee; @@ -21,6 +21,7 @@ export class SrModuleEntity implements StakingModule { this.type = srModule.type; this.active = srModule.active; this.nonce = nonce; + this.lastChangedBlockHash = lastChangedBlockHash; } @PrimaryKey() @@ -81,4 +82,8 @@ export class SrModuleEntity implements StakingModule { // nonce value @Property() nonce: number; + + // last changed block hash + @Property() + lastChangedBlockHash: string; } diff --git a/src/storage/sr-module.storage.ts b/src/storage/sr-module.storage.ts index 0bbecd61..4a4f9661 100644 --- a/src/storage/sr-module.storage.ts +++ b/src/storage/sr-module.storage.ts @@ -21,7 +21,7 @@ export class SRModuleStorageService { return await this.repository.findAll(); } - async upsert(srModule: StakingModule, nonce: number): Promise { + async upsert(srModule: StakingModule, nonce: number, lastChangedBlockHash: string): Promise { // Try to find an existing entity by moduleId or stakingModuleAddress let existingModule = await this.repository.findOne({ moduleId: srModule.moduleId, @@ -32,6 +32,7 @@ export class SRModuleStorageService { existingModule = new SrModuleEntity( { ...srModule, stakingModuleAddress: srModule.stakingModuleAddress.toLowerCase() }, nonce, + lastChangedBlockHash, ); } else { // If the entity exists, update its properties @@ -45,6 +46,7 @@ export class SRModuleStorageService { existingModule.exitedValidatorsCount = srModule.exitedValidatorsCount; existingModule.active = srModule.active; existingModule.nonce = nonce; + existingModule.lastChangedBlockHash = lastChangedBlockHash; } // Save the entity (either a new one or an updated one)