diff --git a/src/scripts/pull_filter_save_events_by_topic.ts b/src/scripts/pull_filter_save_events_by_topic.ts new file mode 100644 index 00000000..25a525ac --- /dev/null +++ b/src/scripts/pull_filter_save_events_by_topic.ts @@ -0,0 +1,81 @@ +import { Producer } from 'kafkajs'; +import { web3Factory } from '@0x/dev-utils'; +import { logger } from '../utils/logger'; +import { Connection } from 'typeorm'; +import { Web3Source } from '../data_sources/events/web3'; + +import { getParseFilterSaveTxAsync } from './utils/web3_utils'; +import { PullAndSaveEventsByTopic } from './utils/event_abi_utils'; + +import { ETHEREUM_RPC_URL } from '../config'; +import { eventScrperProps, EventScraperProps, CommonEventParams } from '../events'; + +import { SCRIPT_RUN_DURATION } from '../utils/metrics'; + +const provider = web3Factory.getRpcProvider({ + rpcUrl: ETHEREUM_RPC_URL, +}); +const web3Source = new Web3Source(provider, ETHEREUM_RPC_URL); + +const pullAndSaveEventsByTopic = new PullAndSaveEventsByTopic(); + +export class EventsByTopicScraperCustom { + public async getParseFilterSaveEventsAsync(connection: Connection, producer: Producer): Promise { + const startTime = new Date().getTime(); + logger.info(`Pulling Events by Topic`); + + const currentBlock = await web3Source.getCurrentBlockAsync(); + + logger.info(`latest block: ${currentBlock.number}`); + + const promises: Promise[] = []; + + const commonParams: CommonEventParams = { + connection, + producer, + web3Source, + }; + + eventScrperProps.forEach((props: EventScraperProps) => { + if (props.enabled) { + promises.push( + pullAndSaveEventsByTopic.getParseSaveEventsByTopic( + commonParams.connection, + commonParams.producer, + commonParams.web3Source, + currentBlock, + props.name, + props.tType, + props.table, + props.topics, + props.contractAddress, + props.startBlock, + props.parser, + props.deleteOptions, + props.tokenMetadataMap, + props.callback, + ), + ); + } + }); + + const txHashes = [ + ...new Set( + (await Promise.all(promises)).reduce( + (accumulator: string[], value: string[]) => accumulator.concat(value), + [], + ), + ), + ] as string[]; + + if (txHashes.length) { + await getParseFilterSaveTxAsync(connection, producer, web3Source, txHashes); + } + + const endTime = new Date().getTime(); + const scriptDurationSeconds = (endTime - startTime) / 1000; + SCRIPT_RUN_DURATION.set({ script: 'events-by-topic' }, scriptDurationSeconds); + + logger.info(`Finished pulling events by topic in ${scriptDurationSeconds}`); + } +} \ No newline at end of file diff --git a/src/scripts/utils/web3_utils.ts b/src/scripts/utils/web3_utils.ts index 776bd9f1..5b68c7db 100644 --- a/src/scripts/utils/web3_utils.ts +++ b/src/scripts/utils/web3_utils.ts @@ -37,13 +37,13 @@ export type TokenMetadataMap = { export type TxDetailsType = { parsedTxs: Transaction[]; parsedReceipts: TransactionReceipt[]; - // parsedTxLogs: TransactionLogs[]; + parsedTxLogs: TransactionLogs[]; }; export class TxDetails implements TxDetailsType { parsedTxs = []; parsedReceipts = []; - // parsedTxLogs = []; + parsedTxLogs = []; } export const MISSING_TRANSACTIONS = new Gauge({ @@ -626,7 +626,7 @@ export async function getParseTxsAsync( const parsedTxs = foundTxs.map((rawTxn) => parseTransaction(rawTxn)); const parsedReceipts = foundTxReceipts.map((rawTxReceipt) => parseTransactionReceipt(rawTxReceipt)); - //const parsedTxLogs = foundTxReceipts.map((rawTxReceipt) => parseTransactionLogs(rawTxReceipt)); + const parsedTxLogs = foundTxReceipts.map((rawTxReceipt) => parseTransactionLogs(rawTxReceipt)); const foundHashes = foundTxReceipts.map((rawTxReceipt) => rawTxReceipt.transactionHash); const missingHashes = dedupedHashes.filter((hash) => !foundHashes.includes(hash)); @@ -638,11 +638,7 @@ export async function getParseTxsAsync( logger.debug(`got ${parsedReceipts.length} txs`); - return { - parsedTxs, - parsedReceipts, - //parsedTxLogs - }; + return { parsedTxs, parsedReceipts, parsedTxLogs }; } export async function getParseSaveTxAsync( @@ -658,7 +654,7 @@ export async function getParseSaveTxAsync( const txHashList = txData.parsedTxs.map((tx) => `'${tx.transactionHash}'`).toString(); const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`; const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`; - // const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`; + const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`; if (txData.parsedTxs.length) { // delete the transactions for the fetched events const queryRunner = connection.createQueryRunner(); @@ -667,7 +663,7 @@ export async function getParseSaveTxAsync( await queryRunner.manager.query(txDeleteQuery); await queryRunner.manager.query(txReceiptDeleteQuery); - //await queryRunner.manager.query(txLogsDeleteQuery); + await queryRunner.manager.query(txLogsDeleteQuery); for (const chunkItems of chunk(txData.parsedTxs, 300)) { await queryRunner.manager.insert(Transaction, chunkItems); @@ -675,9 +671,69 @@ export async function getParseSaveTxAsync( for (const chunkItems of chunk(txData.parsedReceipts, 300)) { await queryRunner.manager.insert(TransactionReceipt, chunkItems); } - // for (const chunkItems of chunk(txData.parsedTxLogs, 300)) { - // await queryRunner.manager.insert(TransactionLogs, chunkItems); - // } + for (const chunkItems of chunk(txData.parsedTxLogs, 300)) { + await queryRunner.manager.insert(TransactionLogs, chunkItems); + } + + await queryRunner.commitTransaction(); + queryRunner.release(); + + await kafkaSendRawAsync( + producer, + `event-scraper.${CHAIN_NAME_LOWER}.transactions.transactions.v0`, + ['transactionHash'], + txData.parsedTxs, + ); + await kafkaSendRawAsync( + producer, + `event-scraper.${CHAIN_NAME_LOWER}.transactions.receipts.v0`, + ['transactionHash'], + txData.parsedReceipts, + ); + await kafkaSendRawAsync( + producer, + `event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`, + ['transactionHash'], + txData.parsedTxLogs, + ); + } + logger.info(`Saved ${txData.parsedTxs.length} Transactions`); +} + +export async function getParseFilterSaveTxAsync( + connection: Connection, + producer: Producer, + web3Source: Web3Source, + hashes: string[], +): Promise { + logger.info(`Searching for ${hashes.length} Transactions`); + + const txData = await getParseTxsAsync(connection, web3Source, hashes); + const filteredTx = txData.parsedTxs.filter((tx: Transaction) => tx.affiliateAddress != null); + const txHashList = filteredTx.map((tx) => `'${tx.transactionHash}'`).toString(); + + const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`; + const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`; + const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`; + if (txData.parsedTxs.length) { + // delete the transactions for the fetched events + const queryRunner = connection.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); + + await queryRunner.manager.query(txDeleteQuery); + await queryRunner.manager.query(txReceiptDeleteQuery); + await queryRunner.manager.query(txLogsDeleteQuery); + + for (const chunkItems of chunk(txData.parsedTxs, 300)) { + await queryRunner.manager.insert(Transaction, chunkItems); + } + for (const chunkItems of chunk(txData.parsedReceipts, 300)) { + await queryRunner.manager.insert(TransactionReceipt, chunkItems); + } + for (const chunkItems of chunk(txData.parsedTxLogs, 300)) { + await queryRunner.manager.insert(TransactionLogs, chunkItems); + } await queryRunner.commitTransaction(); queryRunner.release(); @@ -694,14 +750,12 @@ export async function getParseSaveTxAsync( ['transactionHash'], txData.parsedReceipts, ); - /* await kafkaSendRawAsync( producer, `event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`, ['transactionHash'], txData.parsedTxLogs, ); - */ } logger.info(`Saved ${txData.parsedTxs.length} Transactions`); }