Skip to content

Commit

Permalink
change design to filter at event level
Browse files Browse the repository at this point in the history
  • Loading branch information
0xyijing committed Oct 24, 2023
1 parent 628b191 commit 95ed7ba
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 173 deletions.
58 changes: 29 additions & 29 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,35 @@ services:
MAX_TX_TO_PULL: 1000
BLOCK_FINALITY_THRESHOLD: 0
SECONDS_BETWEEN_RUNS: 1
FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true"
UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap"
UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104
FEAT_UNISWAP_V3_VIP_SWAP_EVENT: "true"
UNISWAP_V3_VIP_SWAP_START_BLOCK: 12553659
FEAT_UNISWAP_V3_SWAP_EVENT: "true"
UNISWAP_V3_SWAP_START_BLOCK: 16670838
FEAT_LIMIT_ORDERS: "true"
V4_NATIVE_FILL_START_BLOCK: "11591021"
FEAT_PLP_SWAP_EVENT: "true"
PLP_VIP_START_BLOCK: 11377457
FEAT_OTC_ORDERS: "true"
OTC_ORDERS_FEATURE_START_BLOCK: 13143075
FEAT_CANCEL_EVENTS: "true"
FEAT_STAKING: "true"
STAKING_DEPLOYMENT_BLOCK: 8952581
FEAT_RFQ_EVENT: "true"
FEAT_V3_NATIVE_FILL: "true"
FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true"
FLASHWALLET_ADDRESS: "0x22f9dcf4647084d6c31b2765f6910cd85c178c18"
FLASHWALLET_DEPLOYMENT_BLOCK: 12231666
FEAT_NFT: "true"
NFT_FEATURE_START_BLOCK: 14258205
FEAT_UNISWAP_V2_PAIR_CREATED_EVENT: "true"
UNISWAP_V2_PAIR_CREATED_PROTOCOL_CONTRACT_ADDRESSES_AND_START_BLOCKS: "UniswapV2:0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f:10000835,SushiSwap:0xc0aee478e3658e2610c5f7a4a2e1777ce9e4f2ac:10794229"
FEAT_UNISWAP_V2_SYNC_EVENT: "true"
UNISWAP_V2_SYNC_START_BLOCK: 10000835
FEAT_ONCHAIN_GOVERNANCE: "true"
ONCHAIN_GOVERNANCE_START_BLOCK: 16990159
# FEAT_UNISWAP_V2_VIP_SWAP_EVENT: "true"
# UNISWAP_V2_VIP_SWAP_SOURCES: "UniswapV2,SushiSwap"
# UNISWAP_V2_VIP_SWAP_START_BLOCK: 10917104
# FEAT_UNISWAP_V3_VIP_SWAP_EVENT: "true"
# UNISWAP_V3_VIP_SWAP_START_BLOCK: 12553659
# FEAT_UNISWAP_V3_SWAP_EVENT: "true"
# UNISWAP_V3_SWAP_START_BLOCK: 16670838
# FEAT_LIMIT_ORDERS: "true"
# V4_NATIVE_FILL_START_BLOCK: "11591021"
# FEAT_PLP_SWAP_EVENT: "true"
# PLP_VIP_START_BLOCK: 11377457
# FEAT_OTC_ORDERS: "true"
# OTC_ORDERS_FEATURE_START_BLOCK: 13143075
# FEAT_CANCEL_EVENTS: "true"
# FEAT_STAKING: "true"
# STAKING_DEPLOYMENT_BLOCK: 8952581
# FEAT_RFQ_EVENT: "true"
# FEAT_V3_NATIVE_FILL: "true"
# FEAT_ERC20_BRIDGE_TRANSFER_FLASHWALLET: "true"
# FLASHWALLET_ADDRESS: "0x22f9dcf4647084d6c31b2765f6910cd85c178c18"
# FLASHWALLET_DEPLOYMENT_BLOCK: 12231666
# FEAT_NFT: "true"
# NFT_FEATURE_START_BLOCK: 14258205
# FEAT_UNISWAP_V2_PAIR_CREATED_EVENT: "true"
# UNISWAP_V2_PAIR_CREATED_PROTOCOL_CONTRACT_ADDRESSES_AND_START_BLOCKS: "UniswapV2:0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f:10000835,SushiSwap:0xc0aee478e3658e2610c5f7a4a2e1777ce9e4f2ac:10794229"
# FEAT_UNISWAP_V2_SYNC_EVENT: "true"
# UNISWAP_V2_SYNC_START_BLOCK: 10000835
# FEAT_ONCHAIN_GOVERNANCE: "true"
# ONCHAIN_GOVERNANCE_START_BLOCK: 16990159
FEAT_WRAP_UNWRAP_NATIVE_EVENT: "true"
WRAP_UNWRAP_NATIVE_CONTRACT_ADDRESS: '0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'

Expand Down
81 changes: 0 additions & 81 deletions src/scripts/pull_filter_save_events_by_topic.ts

This file was deleted.

18 changes: 15 additions & 3 deletions src/scripts/utils/event_abi_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import { hexToUtf8 } from 'web3-utils';
import { BlockWithoutTransactionData } from 'ethereum-types';

import { ContractCallInfo, LogPullInfo, Web3Source } from '../../data_sources/events/web3';
import { Event } from '../../entities';
import { Event, Transaction } from '../../entities';
import { chunk, DeleteOptions, kafkaSendAsync, kafkaSendCommandAsync, logger } from '../../utils';
import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync } from './web3_utils';
import { TokenMetadataMap, extractTokensFromLogs, getParseSaveTokensAsync, getParseTxsAsync } from './web3_utils';

import { RawLogEntry } from 'ethereum-types';

Expand Down Expand Up @@ -192,11 +192,12 @@ export class PullAndSaveEventsByTopic {
const rawLogsArray = await web3Source.getBatchLogInfoForContractsAsync([logPullInfo]);

let txHashes: string[] = [];
let filteredTxHashes: string[] = [];
await Promise.all(
rawLogsArray.map(async (rawLogs) => {
const parsedLogsWithSkipped = rawLogs.logs.map((encodedLog: RawLogEntry) => parser(encodedLog));

const parsedLogs = parsedLogsWithSkipped.filter((log: Event) => log !== null);
let parsedLogs = parsedLogsWithSkipped.filter((log: Event) => log !== null);
SKIPPED_EVENTS.inc(
{ type: scrapingType, event: eventName },
parsedLogsWithSkipped.length - parsedLogs.length,
Expand Down Expand Up @@ -245,6 +246,17 @@ export class PullAndSaveEventsByTopic {
// Get list of tx hashes
txHashes = parsedLogs.map((log: Event) => log.transactionHash);

if (eventName.toLowerCase().includes('wrap') && parsedLogs.length > 0) {
const txData = await getParseTxsAsync(connection, web3Source, txHashes);
const filteredTxs = txData.parsedTxs.filter(
(tx: Transaction) => tx.affiliateAddress && tx.affiliateAddress.trim() !== '',
);
txHashes = filteredTxs.map((tx) => tx.transactionHash);

const validTxHashSet = new Set(txHashes);
parsedLogs = parsedLogs.filter((log: Event) => validTxHashSet.has(log.transactionHash));
}

// Get token metadata
const tokens = extractTokensFromLogs(parsedLogs, tokenMetadataMap);
await getParseSaveTokensAsync(connection, producer, web3Source, tokens);
Expand Down
60 changes: 0 additions & 60 deletions src/scripts/utils/web3_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -699,63 +699,3 @@ export async function getParseSaveTxAsync(
}
logger.info(`Saved ${txData.parsedTxs.length} Transactions`);
}

export async function getParseFilterSaveTxAsync(
connection: Connection,
producer: Producer,
web3Source: Web3Source,
hashes: string[],
): Promise<void> {
logger.info(`Searching for ${hashes.length} Transactions`);

const txData = await getParseTxsAsync(connection, web3Source, hashes);
const filteredTx = txData.parsedTxs.filter((tx: Transaction) => tx.affiliateAddress != null);
const txHashList = filteredTx.map((tx) => `'${tx.transactionHash}'`).toString();

const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`;
const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`;
const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
if (txData.parsedTxs.length) {
// delete the transactions for the fetched events
const queryRunner = connection.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();

await queryRunner.manager.query(txDeleteQuery);
await queryRunner.manager.query(txReceiptDeleteQuery);
await queryRunner.manager.query(txLogsDeleteQuery);

for (const chunkItems of chunk(txData.parsedTxs, 300)) {
await queryRunner.manager.insert(Transaction, chunkItems);
}
for (const chunkItems of chunk(txData.parsedReceipts, 300)) {
await queryRunner.manager.insert(TransactionReceipt, chunkItems);
}
for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
await queryRunner.manager.insert(TransactionLogs, chunkItems);
}

await queryRunner.commitTransaction();
queryRunner.release();

await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.transactions.v0`,
['transactionHash'],
txData.parsedTxs,
);
await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.receipts.v0`,
['transactionHash'],
txData.parsedReceipts,
);
await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`,
['transactionHash'],
txData.parsedTxLogs,
);
}
logger.info(`Saved ${txData.parsedTxs.length} Transactions`);
}

0 comments on commit 95ed7ba

Please sign in to comment.