Skip to content

Commit

Permalink
feat: new indexing logic
Browse files Browse the repository at this point in the history
  • Loading branch information
eddort committed Dec 19, 2023
1 parent d5247d5 commit 2471e3f
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 47 deletions.
9 changes: 9 additions & 0 deletions src/common/execution-provider/execution-provider.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,13 @@ export class ExecutionProviderService {
const block = await this.provider.getBlock(blockHashOrBlockTag);
return { number: block.number, hash: block.hash, timestamp: block.timestamp };
}

/**
*
* Returns full block info
*/
public async getFullBlock(blockHashOrBlockTag: number | string) {
const block = await this.provider.getBlock(blockHashOrBlockTag);
return block;
}
}
6 changes: 4 additions & 2 deletions src/common/registry/fetch/operator.fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ export class RegistryOperatorFetchService {
overrides: CallOverrides = {},
): Promise<RegistryOperator> {
const fullInfo = true;
const operator = await this.getContract(moduleAddress).getNodeOperator(operatorIndex, fullInfo, overrides as any);
const finalizedOperator = await this.getContract(moduleAddress).getNodeOperator(operatorIndex, fullInfo, {
const contract = this.getContract(moduleAddress);

const operator = await contract.getNodeOperator(operatorIndex, fullInfo, overrides as any);
const finalizedOperator = await contract.getNodeOperator(operatorIndex, fullInfo, {
blockTag: 'finalized',
});

Expand Down
21 changes: 1 addition & 20 deletions src/common/registry/main/abstract-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,26 +87,7 @@ export abstract class AbstractRegistryService {
}
// check how big the difference between the blocks is, if it exceeds, we should update the state anyway
if (toBlockNumber - fromBlockNumber > BLOCKS_OVERLAP) return true;

return await this.operatorFetch.operatorsWereChanged(moduleAddress, fromBlockNumber, toBlockNumber);
}

/**
*
* @param moduleAddress contract address
* @returns Check if operators have been changed
*/
public async keysWereChanged(
moduleAddress: string,
fromBlockNumber: number,
toBlockNumber: number,
): Promise<boolean> {
if (fromBlockNumber > toBlockNumber) {
throw new Error(`invalid blocks range: ${fromBlockNumber} (fromBlockNumber) > ${toBlockNumber} (toBlockNumber)`);
}
// check how big the difference between the blocks is, if it exceeds, we should update the state anyway
if (toBlockNumber - fromBlockNumber > BLOCKS_OVERLAP) return true;

// rename
return await this.operatorFetch.operatorsWereChanged(moduleAddress, fromBlockNumber, toBlockNumber);
}

Expand Down
1 change: 1 addition & 0 deletions src/common/registry/main/constants.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export const REGISTRY_GLOBAL_OPTIONS_TOKEN = Symbol('registryGlobalOptions');
export const BLOCKS_OVERLAP = 120;
4 changes: 4 additions & 0 deletions src/common/registry/storage/operator.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export class RegistryOperator {
this.totalSigningKeys = operator.totalSigningKeys;
this.usedSigningKeys = operator.usedSigningKeys;
this.moduleAddress = operator.moduleAddress;
this.finalizedUsedSigningKeys = operator.finalizedUsedSigningKeys;
}

@PrimaryKey()
Expand Down Expand Up @@ -46,4 +47,7 @@ export class RegistryOperator {
@PrimaryKey()
@Property({ length: ADDRESS_LEN })
moduleAddress!: string;

@Property()
finalizedUsedSigningKeys!: number;
}
141 changes: 119 additions & 22 deletions src/jobs/keys-update/keys-update.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Inject, Injectable } from '@nestjs/common';
import { range } from '@lido-nestjs/utils';
import { LOGGER_PROVIDER, LoggerService } from 'common/logger';
import { ConfigService } from 'common/config';
import { JobService } from 'common/job';
Expand All @@ -15,6 +16,8 @@ import { PrometheusService } from 'common/prometheus';
import { SrModuleEntity } from 'storage/sr-module.entity';
import { StakingModule } from 'staking-router-modules/interfaces/staking-module.interface';

const MAX_BLOCKS_OVERLAP = 30;

class KeyOutdatedError extends Error {
lastBlock: number;

Expand Down Expand Up @@ -115,10 +118,17 @@ export class KeysUpdateService {
// read from database last execution layer data
const prevElMeta = await this.elMetaStorage.get();

// handle the situation when the node has fallen behind the service state
if (prevElMeta && prevElMeta?.blockNumber > currElMeta.number) {
this.logger.warn('Previous data is newer than current data', prevElMeta);
return;
}

if (prevElMeta?.blockHash && prevElMeta.blockHash === currElMeta.hash) {
this.logger.debug?.('same state, skip', { prevElMeta, currElMeta });
return;
}

// Get modules from storage
const storageModules = await this.srModulesStorage.findAll();
// Get staking modules from SR contract
Expand All @@ -133,51 +143,138 @@ export class KeysUpdateService {

await this.entityManager.transactional(
async () => {
// Update EL meta in db
await this.elMetaStorage.update(currElMeta);
const prevBlockHash = prevElMeta?.blockHash;
const currentBlockHash = currElMeta.hash;

let lastChangedBlockHash = prevBlockHash || currentBlockHash;

let isReorgDetected = false;

for (const contractModule of contractModules) {
const { stakingModuleAddress } = contractModule;

// Find implementation for staking module
const moduleInstance = this.stakingRouterService.getStakingRouterModuleImpl(contractModule.type);
// Read current nonce from contract
const currNonce = await moduleInstance.getCurrentNonce(contractModule.stakingModuleAddress, currElMeta.hash);
const currNonce = await moduleInstance.getCurrentNonce(stakingModuleAddress, currentBlockHash);
// Read module in storage
const moduleInStorage = await this.srModulesStorage.findOneById(contractModule.moduleId);
const prevNonce = moduleInStorage?.nonce;
// update staking module information
await this.srModulesStorage.upsert(contractModule, currNonce);

this.logger.log(`Nonce previous value: ${prevNonce}, nonce current value: ${currNonce}`);

if (prevNonce === currNonce) {
this.logger.log("Nonce wasn't changed, no need to update keys");
// case when prevELMeta is undefined but prevNonce === currNonce looks like invalid
// use here prevElMeta.blockNumber + 1 because operators were updated in database for prevElMeta.blockNumber block
if (
prevElMeta &&
prevElMeta.blockNumber < currElMeta.number &&
(await moduleInstance.operatorsWereChanged(
contractModule.stakingModuleAddress,
prevElMeta.blockNumber + 1,
currElMeta.number,
))
) {
this.logger.log('Update events happened, need to update operators');
await moduleInstance.updateOperators(contractModule.stakingModuleAddress, currElMeta.hash);
}
if (!prevElMeta) {
this.logger.log('No past state found, start indexing', { stakingModuleAddress, currentBlockHash });

await moduleInstance.update(stakingModuleAddress, currentBlockHash);
await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash);
lastChangedBlockHash = currentBlockHash;
continue;
}

if (prevNonce !== currNonce) {
this.logger.log('Nonce has been changed, start indexing', {
stakingModuleAddress,
currentBlockHash,
prevNonce,
currNonce,
});

await moduleInstance.update(stakingModuleAddress, currentBlockHash);
await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash);
lastChangedBlockHash = currentBlockHash;
continue;
}

if (this.isTooMuchDiffBetweenBlocks(prevElMeta.blockNumber, currElMeta.number)) {
this.logger.log('Too much difference between the blocks, start indexing', {
stakingModuleAddress,
currentBlockHash,
});

await moduleInstance.update(stakingModuleAddress, currentBlockHash);
await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash);
lastChangedBlockHash = currentBlockHash;
continue;
}

await moduleInstance.update(contractModule.stakingModuleAddress, currElMeta.hash);
// calculate once per iteration
// no need to recheck each module separately
isReorgDetected = isReorgDetected ? true : await this.isReorgDetected(prevElMeta.blockHash, currentBlockHash);

if (isReorgDetected) {
this.logger.log('Reorg detected, start indexing', { stakingModuleAddress, currentBlockHash });

await moduleInstance.update(stakingModuleAddress, currentBlockHash);
await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash);
lastChangedBlockHash = currentBlockHash;
continue;
}

if (
prevElMeta.blockNumber < currElMeta.number &&
(await moduleInstance.operatorsWereChanged(
contractModule.stakingModuleAddress,
prevElMeta.blockNumber + 1,
currElMeta.number,
))
) {
this.logger.log('Update operator events happened, need to update operators', {
stakingModuleAddress,
currentBlockHash,
});

await moduleInstance.updateOperators(stakingModuleAddress, currentBlockHash);
await this.srModulesStorage.upsert(contractModule, currNonce, currentBlockHash);
lastChangedBlockHash = currentBlockHash;
continue;
}

this.logger.log('No changes have been detected in the module, indexing is not required', {
stakingModuleAddress,
currentBlockHash,
});
}

// Update EL meta in db
await this.elMetaStorage.update({ ...currElMeta, lastChangedBlockHash });
},
{ isolationLevel: IsolationLevel.READ_COMMITTED },
);

return currElMeta;
}

public async isReorgDetected(prevBlockHash: string, currentBlockHash: string) {
const currentBlock = await this.executionProvider.getFullBlock(currentBlockHash);
const prevBlock = await this.executionProvider.getFullBlock(prevBlockHash);

if (currentBlock.parentHash === prevBlock.hash) return false;
// TODO: different hash but same number
if (currentBlock.number === prevBlock.number) return true;

const blocks = await Promise.all(
range(prevBlock.number, currentBlock.number).map(async (bNumber) => {
return await this.executionProvider.getFullBlock(bNumber);
}),
);

for (let i = 1; i < blocks.length; i++) {
const previousBlock = blocks[i - 1];
const currentBlock = blocks[i];

if (currentBlock.parentHash !== previousBlock.hash) {
return false;
}
}

return true;
}

public isTooMuchDiffBetweenBlocks(prevBlockNumber: number, currentBlockNumber: number) {
return currentBlockNumber - prevBlockNumber >= MAX_BLOCKS_OVERLAP;
}

/**
* Update prometheus metrics of staking modules
*/
Expand Down
4 changes: 4 additions & 0 deletions src/storage/el-meta.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class ElMetaEntity {
this.blockNumber = meta.blockNumber;
this.blockHash = meta.blockHash.toLocaleLowerCase();
this.timestamp = meta.timestamp;
this.lastChangedBlockHash = meta.lastChangedBlockHash;
}

@PrimaryKey()
Expand All @@ -22,4 +23,7 @@ export class ElMetaEntity {

@Property()
timestamp: number;

@Property({ length: BLOCK_HASH_LEN })
lastChangedBlockHash: string;
}
8 changes: 7 additions & 1 deletion src/storage/el-meta.storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ export class ElMetaStorageService {
return result[0] ?? null;
}

async update(currElMeta: { number: number; hash: string; timestamp: number }): Promise<void> {
async update(currElMeta: {
number: number;
hash: string;
timestamp: number;
lastChangedBlockHash: string;
}): Promise<void> {
await this.repository.nativeDelete({});
await this.repository.persist(
new ElMetaEntity({
blockHash: currElMeta.hash,
blockNumber: currElMeta.number,
timestamp: currElMeta.timestamp,
lastChangedBlockHash: currElMeta.lastChangedBlockHash,
}),
);
await this.repository.flush();
Expand Down
7 changes: 6 additions & 1 deletion src/storage/sr-module.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { SRModuleRepository } from './sr-module.repository';
export class SrModuleEntity implements StakingModule {
[EntityRepositoryType]?: SRModuleRepository;

constructor(srModule: StakingModule, nonce: number) {
constructor(srModule: StakingModule, nonce: number, lastChangedBlockHash: string) {
this.moduleId = srModule.moduleId;
this.stakingModuleAddress = srModule.stakingModuleAddress;
this.moduleFee = srModule.moduleFee;
Expand All @@ -21,6 +21,7 @@ export class SrModuleEntity implements StakingModule {
this.type = srModule.type;
this.active = srModule.active;
this.nonce = nonce;
this.lastChangedBlockHash = lastChangedBlockHash;
}

@PrimaryKey()
Expand Down Expand Up @@ -81,4 +82,8 @@ export class SrModuleEntity implements StakingModule {
// nonce value
@Property()
nonce: number;

// last changed block hash
@Property()
lastChangedBlockHash: string;
}
4 changes: 3 additions & 1 deletion src/storage/sr-module.storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export class SRModuleStorageService {
return await this.repository.findAll();
}

async upsert(srModule: StakingModule, nonce: number): Promise<void> {
async upsert(srModule: StakingModule, nonce: number, lastChangedBlockHash: string): Promise<void> {
// Try to find an existing entity by moduleId or stakingModuleAddress
let existingModule = await this.repository.findOne({
moduleId: srModule.moduleId,
Expand All @@ -32,6 +32,7 @@ export class SRModuleStorageService {
existingModule = new SrModuleEntity(
{ ...srModule, stakingModuleAddress: srModule.stakingModuleAddress.toLowerCase() },
nonce,
lastChangedBlockHash,
);
} else {
// If the entity exists, update its properties
Expand All @@ -45,6 +46,7 @@ export class SRModuleStorageService {
existingModule.exitedValidatorsCount = srModule.exitedValidatorsCount;
existingModule.active = srModule.active;
existingModule.nonce = nonce;
existingModule.lastChangedBlockHash = lastChangedBlockHash;
}

// Save the entity (either a new one or an updated one)
Expand Down

0 comments on commit 2471e3f

Please sign in to comment.