From 95ed7bacef48af7d3ef2136927e0d00adfee4852 Mon Sep 17 00:00:00 2001 From: Yijing Date: Mon, 23 Oct 2023 22:09:45 -0500 Subject: [PATCH] change design to filter at event level --- docker-compose-dev.yml | 58 ++++++------- .../pull_filter_save_events_by_topic.ts | 81 ------------------- src/scripts/utils/event_abi_utils.ts | 18 ++++- src/scripts/utils/web3_utils.ts | 60 -------------- 4 files changed, 44 insertions(+), 173 deletions(-) delete mode 100644 src/scripts/pull_filter_save_events_by_topic.ts diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 6b28e327..b198be5c 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -40,35 +40,35 @@ services: MAX_TX_TO_PULL: 1000 BLOCK_FINALITY_THRESHOLD: 0 SECONDS_BETWEEN_RUNS: 1 - FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true" - UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap" - UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104 - FEAT_UNISWAP_V3_VIP_SWAP_EVENT: "true" - UNISWAP_V3_VIP_SWAP_START_BLOCK: 12553659 - FEAT_UNISWAP_V3_SWAP_EVENT: "true" - UNISWAP_V3_SWAP_START_BLOCK: 16670838 - FEAT_LIMIT_ORDERS: "true" - V4_NATIVE_FILL_START_BLOCK: "11591021" - FEAT_PLP_SWAP_EVENT: "true" - PLP_VIP_START_BLOCK: 11377457 - FEAT_OTC_ORDERS: "true" - OTC_ORDERS_FEATURE_START_BLOCK: 13143075 - FEAT_CANCEL_EVENTS: "true" - FEAT_STAKING: "true" - STAKING_DEPLOYMENT_BLOCK: 8952581 - FEAT_RFQ_EVENT: "true" - FEAT_V3_NATIVE_FILL: "true" - FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true" - FLASHWALLET_ADDRESS: "0x22f9dcf4647084d6c31b2765f6910cd85c178c18" - FLASHWALLET_DEPLOYMENT_BLOCK: 12231666 - FEAT_NFT: "true" - NFT_FEATURE_START_BLOCK: 14258205 - FEAT_UNISWAP_V2_PAIR_CREATED_EVENT: "true" - UNISWAP_V2_PAIR_CREATED_PROTOCOL_CONTRACT_ADDRESSES_AND_START_BLOCKS: "UniswapV2:0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f:10000835,SushiSwap:0xc0aee478e3658e2610c5f7a4a2e1777ce9e4f2ac:10794229" - FEAT_UNISWAP_V2_SYNC_EVENT: "true" - UNISWAP_V2_SYNC_START_BLOCK: 10000835 - FEAT_ONCHAIN_GOVERNANCE: "true" - ONCHAIN_GOVERNANCE_START_BLOCK: 16990159 + # FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true" + # UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap" + # UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104 + # FEAT_UNISWAP_V3_VIP_SWAP_EVENT: "true" + # UNISWAP_V3_VIP_SWAP_START_BLOCK: 12553659 + # FEAT_UNISWAP_V3_SWAP_EVENT: "true" + # UNISWAP_V3_SWAP_START_BLOCK: 16670838 + # FEAT_LIMIT_ORDERS: "true" + # V4_NATIVE_FILL_START_BLOCK: "11591021" + # FEAT_PLP_SWAP_EVENT: "true" + # PLP_VIP_START_BLOCK: 11377457 + # FEAT_OTC_ORDERS: "true" + # OTC_ORDERS_FEATURE_START_BLOCK: 13143075 + # FEAT_CANCEL_EVENTS: "true" + # FEAT_STAKING: "true" + # STAKING_DEPLOYMENT_BLOCK: 8952581 + # FEAT_RFQ_EVENT: "true" + # FEAT_V3_NATIVE_FILL: "true" + # FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true" + # FLASHWALLET_ADDRESS: "0x22f9dcf4647084d6c31b2765f6910cd85c178c18" + # FLASHWALLET_DEPLOYMENT_BLOCK: 12231666 + # FEAT_NFT: "true" + # NFT_FEATURE_START_BLOCK: 14258205 + # FEAT_UNISWAP_V2_PAIR_CREATED_EVENT: "true" + # UNISWAP_V2_PAIR_CREATED_PROTOCOL_CONTRACT_ADDRESSES_AND_START_BLOCKS: "UniswapV2:0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f:10000835,SushiSwap:0xc0aee478e3658e2610c5f7a4a2e1777ce9e4f2ac:10794229" + # FEAT_UNISWAP_V2_SYNC_EVENT: "true" + # UNISWAP_V2_SYNC_START_BLOCK: 10000835 + # FEAT_ONCHAIN_GOVERNANCE: "true" + # ONCHAIN_GOVERNANCE_START_BLOCK: 16990159 FEAT_WRAP_UNWRAP_NATIVE_EVENT: "true" WRAP_UNWRAP_NATIVE_CONTRACT_ADDRESS: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2' diff --git a/src/scripts/pull_filter_save_events_by_topic.ts b/src/scripts/pull_filter_save_events_by_topic.ts deleted file mode 100644 index 25a525ac..00000000 --- a/src/scripts/pull_filter_save_events_by_topic.ts +++ /dev/null @@ -1,81 +0,0 @@ -import { Producer } from 'kafkajs'; -import { web3Factory } from '@0x/dev-utils'; -import { logger } from '../utils/logger'; -import { Connection } from 'typeorm'; -import { Web3Source } from '../data_sources/events/web3'; - -import { getParseFilterSaveTxAsync } from './utils/web3_utils'; -import { PullAndSaveEventsByTopic } from './utils/event_abi_utils'; - -import { ETHEREUM_RPC_URL } from '../config'; -import { eventScrperProps, EventScraperProps, CommonEventParams } from '../events'; - -import { SCRIPT_RUN_DURATION } from '../utils/metrics'; - -const provider = web3Factory.getRpcProvider({ - rpcUrl: ETHEREUM_RPC_URL, -}); -const web3Source = new Web3Source(provider, ETHEREUM_RPC_URL); - -const pullAndSaveEventsByTopic = new PullAndSaveEventsByTopic(); - -export class EventsByTopicScraperCustom { - public async getParseFilterSaveEventsAsync(connection: Connection, producer: Producer): Promise { - const startTime = new Date().getTime(); - logger.info(`Pulling Events by Topic`); - - const currentBlock = await web3Source.getCurrentBlockAsync(); - - logger.info(`latest block: ${currentBlock.number}`); - - const promises: Promise[] = []; - - const commonParams: CommonEventParams = { - connection, - producer, - web3Source, - }; - - eventScrperProps.forEach((props: EventScraperProps) => { - if (props.enabled) { - promises.push( - pullAndSaveEventsByTopic.getParseSaveEventsByTopic( - commonParams.connection, - commonParams.producer, - commonParams.web3Source, - currentBlock, - props.name, - props.tType, - props.table, - props.topics, - props.contractAddress, - props.startBlock, - props.parser, - props.deleteOptions, - props.tokenMetadataMap, - props.callback, - ), - ); - } - }); - - const txHashes = [ - ...new Set( - (await Promise.all(promises)).reduce( - (accumulator: string[], value: string[]) => accumulator.concat(value), - [], - ), - ), - ] as string[]; - - if (txHashes.length) { - await getParseFilterSaveTxAsync(connection, producer, web3Source, txHashes); - } - - const endTime = new Date().getTime(); - const scriptDurationSeconds = (endTime - startTime) / 1000; - SCRIPT_RUN_DURATION.set({ script: 'events-by-topic' }, scriptDurationSeconds); - - logger.info(`Finished pulling events by topic in ${scriptDurationSeconds}`); - } -} \ No newline at end of file diff --git a/src/scripts/utils/event_abi_utils.ts b/src/scripts/utils/event_abi_utils.ts index 8c446db0..923e610e 100644 --- a/src/scripts/utils/event_abi_utils.ts +++ b/src/scripts/utils/event_abi_utils.ts @@ -4,9 +4,9 @@ import { hexToUtf8 } from 'web3-utils'; import { BlockWithoutTransactionData } from 'ethereum-types'; import { ContractCallInfo, LogPullInfo, Web3Source } from '../../data_sources/events/web3'; -import { Event } from '../../entities'; +import { Event, Transaction } from '../../entities'; import { chunk, DeleteOptions, kafkaSendAsync, kafkaSendCommandAsync, logger } from '../../utils'; -import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync } from './web3_utils'; +import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync, getParseTxsAsync } from './web3_utils'; import { RawLogEntry } from 'ethereum-types'; @@ -192,11 +192,12 @@ export class PullAndSaveEventsByTopic { const rawLogsArray = await web3Source.getBatchLogInfoForContractsAsync([logPullInfo]); let txHashes: string[] = []; + let filteredTxHashes: string[] = []; await Promise.all( rawLogsArray.map(async (rawLogs) => { const parsedLogsWithSkipped = rawLogs.logs.map((encodedLog: RawLogEntry) => parser(encodedLog)); - const parsedLogs = parsedLogsWithSkipped.filter((log: Event) => log !== null); + let parsedLogs = parsedLogsWithSkipped.filter((log: Event) => log !== null); SKIPPED_EVENTS.inc( { type: scrapingType, event: eventName }, parsedLogsWithSkipped.length - parsedLogs.length, @@ -245,6 +246,17 @@ export class PullAndSaveEventsByTopic { // Get list of tx hashes txHashes = parsedLogs.map((log: Event) => log.transactionHash); + if (eventName.toLowerCase().includes('wrap') && parsedLogs.length > 0) { + const txData = await getParseTxsAsync(connection, web3Source, txHashes); + const filteredTxs = txData.parsedTxs.filter( + (tx: Transaction) => tx.affiliateAddress && tx.affiliateAddress.trim() !== '', + ); + txHashes = filteredTxs.map((tx) => tx.transactionHash); + + const validTxHashSet = new Set(txHashes); + parsedLogs = parsedLogs.filter((log: Event) => validTxHashSet.has(log.transactionHash)); + } + // Get token metadata const tokens = extractTokensFromLogs(parsedLogs, tokenMetadataMap); await getParseSaveTokensAsync(connection, producer, web3Source, tokens); diff --git a/src/scripts/utils/web3_utils.ts b/src/scripts/utils/web3_utils.ts index 5b68c7db..26d9dead 100644 --- a/src/scripts/utils/web3_utils.ts +++ b/src/scripts/utils/web3_utils.ts @@ -699,63 +699,3 @@ export async function getParseSaveTxAsync( } logger.info(`Saved ${txData.parsedTxs.length} Transactions`); } - -export async function getParseFilterSaveTxAsync( - connection: Connection, - producer: Producer, - web3Source: Web3Source, - hashes: string[], -): Promise { - logger.info(`Searching for ${hashes.length} Transactions`); - - const txData = await getParseTxsAsync(connection, web3Source, hashes); - const filteredTx = txData.parsedTxs.filter((tx: Transaction) => tx.affiliateAddress != null); - const txHashList = filteredTx.map((tx) => `'${tx.transactionHash}'`).toString(); - - const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`; - const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`; - const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`; - if (txData.parsedTxs.length) { - // delete the transactions for the fetched events - const queryRunner = connection.createQueryRunner(); - await queryRunner.connect(); - await queryRunner.startTransaction(); - - await queryRunner.manager.query(txDeleteQuery); - await queryRunner.manager.query(txReceiptDeleteQuery); - await queryRunner.manager.query(txLogsDeleteQuery); - - for (const chunkItems of chunk(txData.parsedTxs, 300)) { - await queryRunner.manager.insert(Transaction, chunkItems); - } - for (const chunkItems of chunk(txData.parsedReceipts, 300)) { - await queryRunner.manager.insert(TransactionReceipt, chunkItems); - } - for (const chunkItems of chunk(txData.parsedTxLogs, 300)) { - await queryRunner.manager.insert(TransactionLogs, chunkItems); - } - - await queryRunner.commitTransaction(); - queryRunner.release(); - - await kafkaSendRawAsync( - producer, - `event-scraper.${CHAIN_NAME_LOWER}.transactions.transactions.v0`, - ['transactionHash'], - txData.parsedTxs, - ); - await kafkaSendRawAsync( - producer, - `event-scraper.${CHAIN_NAME_LOWER}.transactions.receipts.v0`, - ['transactionHash'], - txData.parsedReceipts, - ); - await kafkaSendRawAsync( - producer, - `event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`, - ['transactionHash'], - txData.parsedTxLogs, - ); - } - logger.info(`Saved ${txData.parsedTxs.length} Transactions`); -}