Skip to content

Commit

Permalink
add filter functions to keep 0x affiliate only
Browse files Browse the repository at this point in the history
  • Loading branch information
0xyijing committed Oct 19, 2023
1 parent 8531a81 commit 628b191
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 15 deletions.
81 changes: 81 additions & 0 deletions src/scripts/pull_filter_save_events_by_topic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { Producer } from 'kafkajs';
import { web3Factory } from '@0x/dev-utils';
import { logger } from '../utils/logger';
import { Connection } from 'typeorm';
import { Web3Source } from '../data_sources/events/web3';

import { getParseFilterSaveTxAsync } from './utils/web3_utils';
import { PullAndSaveEventsByTopic } from './utils/event_abi_utils';

import { ETHEREUM_RPC_URL } from '../config';
import { eventScrperProps, EventScraperProps, CommonEventParams } from '../events';

import { SCRIPT_RUN_DURATION } from '../utils/metrics';

const provider = web3Factory.getRpcProvider({
rpcUrl: ETHEREUM_RPC_URL,
});
const web3Source = new Web3Source(provider, ETHEREUM_RPC_URL);

const pullAndSaveEventsByTopic = new PullAndSaveEventsByTopic();

export class EventsByTopicScraperCustom {
public async getParseFilterSaveEventsAsync(connection: Connection, producer: Producer): Promise<void> {
const startTime = new Date().getTime();
logger.info(`Pulling Events by Topic`);

const currentBlock = await web3Source.getCurrentBlockAsync();

logger.info(`latest block: ${currentBlock.number}`);

const promises: Promise<string[]>[] = [];

const commonParams: CommonEventParams = {
connection,
producer,
web3Source,
};

eventScrperProps.forEach((props: EventScraperProps) => {
if (props.enabled) {
promises.push(
pullAndSaveEventsByTopic.getParseSaveEventsByTopic(
commonParams.connection,
commonParams.producer,
commonParams.web3Source,
currentBlock,
props.name,
props.tType,
props.table,
props.topics,
props.contractAddress,
props.startBlock,
props.parser,
props.deleteOptions,
props.tokenMetadataMap,
props.callback,
),
);
}
});

const txHashes = [
...new Set(
(await Promise.all(promises)).reduce(
(accumulator: string[], value: string[]) => accumulator.concat(value),
[],
),
),
] as string[];

if (txHashes.length) {
await getParseFilterSaveTxAsync(connection, producer, web3Source, txHashes);
}

const endTime = new Date().getTime();
const scriptDurationSeconds = (endTime - startTime) / 1000;
SCRIPT_RUN_DURATION.set({ script: 'events-by-topic' }, scriptDurationSeconds);

logger.info(`Finished pulling events by topic in ${scriptDurationSeconds}`);
}
}
84 changes: 69 additions & 15 deletions src/scripts/utils/web3_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ export type TokenMetadataMap = {
export type TxDetailsType = {
parsedTxs: Transaction[];
parsedReceipts: TransactionReceipt[];
// parsedTxLogs: TransactionLogs[];
parsedTxLogs: TransactionLogs[];
};

export class TxDetails implements TxDetailsType {
parsedTxs = [];
parsedReceipts = [];
// parsedTxLogs = [];
parsedTxLogs = [];
}

export const MISSING_TRANSACTIONS = new Gauge({
Expand Down Expand Up @@ -626,7 +626,7 @@ export async function getParseTxsAsync(

const parsedTxs = foundTxs.map((rawTxn) => parseTransaction(rawTxn));
const parsedReceipts = foundTxReceipts.map((rawTxReceipt) => parseTransactionReceipt(rawTxReceipt));
//const parsedTxLogs = foundTxReceipts.map((rawTxReceipt) => parseTransactionLogs(rawTxReceipt));
const parsedTxLogs = foundTxReceipts.map((rawTxReceipt) => parseTransactionLogs(rawTxReceipt));

const foundHashes = foundTxReceipts.map((rawTxReceipt) => rawTxReceipt.transactionHash);
const missingHashes = dedupedHashes.filter((hash) => !foundHashes.includes(hash));
Expand All @@ -638,11 +638,7 @@ export async function getParseTxsAsync(

logger.debug(`got ${parsedReceipts.length} txs`);

return {
parsedTxs,
parsedReceipts,
//parsedTxLogs
};
return { parsedTxs, parsedReceipts, parsedTxLogs };
}

export async function getParseSaveTxAsync(
Expand All @@ -658,7 +654,7 @@ export async function getParseSaveTxAsync(
const txHashList = txData.parsedTxs.map((tx) => `'${tx.transactionHash}'`).toString();
const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`;
const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`;
// const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
if (txData.parsedTxs.length) {
// delete the transactions for the fetched events
const queryRunner = connection.createQueryRunner();
Expand All @@ -667,17 +663,77 @@ export async function getParseSaveTxAsync(

await queryRunner.manager.query(txDeleteQuery);
await queryRunner.manager.query(txReceiptDeleteQuery);
//await queryRunner.manager.query(txLogsDeleteQuery);
await queryRunner.manager.query(txLogsDeleteQuery);

for (const chunkItems of chunk(txData.parsedTxs, 300)) {
await queryRunner.manager.insert(Transaction, chunkItems);
}
for (const chunkItems of chunk(txData.parsedReceipts, 300)) {
await queryRunner.manager.insert(TransactionReceipt, chunkItems);
}
// for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
// await queryRunner.manager.insert(TransactionLogs, chunkItems);
// }
for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
await queryRunner.manager.insert(TransactionLogs, chunkItems);
}

await queryRunner.commitTransaction();
queryRunner.release();

await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.transactions.v0`,
['transactionHash'],
txData.parsedTxs,
);
await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.receipts.v0`,
['transactionHash'],
txData.parsedReceipts,
);
await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`,
['transactionHash'],
txData.parsedTxLogs,
);
}
logger.info(`Saved ${txData.parsedTxs.length} Transactions`);
}

export async function getParseFilterSaveTxAsync(
connection: Connection,
producer: Producer,
web3Source: Web3Source,
hashes: string[],
): Promise<void> {
logger.info(`Searching for ${hashes.length} Transactions`);

const txData = await getParseTxsAsync(connection, web3Source, hashes);
const filteredTx = txData.parsedTxs.filter((tx: Transaction) => tx.affiliateAddress != null);
const txHashList = filteredTx.map((tx) => `'${tx.transactionHash}'`).toString();

const txDeleteQuery = `DELETE FROM ${SCHEMA}.transactions WHERE transaction_hash IN (${txHashList})`;
const txReceiptDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_receipts WHERE transaction_hash IN (${txHashList});`;
const txLogsDeleteQuery = `DELETE FROM ${SCHEMA}.transaction_logs WHERE transaction_hash IN (${txHashList});`;
if (txData.parsedTxs.length) {
// delete the transactions for the fetched events
const queryRunner = connection.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();

await queryRunner.manager.query(txDeleteQuery);
await queryRunner.manager.query(txReceiptDeleteQuery);
await queryRunner.manager.query(txLogsDeleteQuery);

for (const chunkItems of chunk(txData.parsedTxs, 300)) {
await queryRunner.manager.insert(Transaction, chunkItems);
}
for (const chunkItems of chunk(txData.parsedReceipts, 300)) {
await queryRunner.manager.insert(TransactionReceipt, chunkItems);
}
for (const chunkItems of chunk(txData.parsedTxLogs, 300)) {
await queryRunner.manager.insert(TransactionLogs, chunkItems);
}

await queryRunner.commitTransaction();
queryRunner.release();
Expand All @@ -694,14 +750,12 @@ export async function getParseSaveTxAsync(
['transactionHash'],
txData.parsedReceipts,
);
/*
await kafkaSendRawAsync(
producer,
`event-scraper.${CHAIN_NAME_LOWER}.transactions.logs.v0`,
['transactionHash'],
txData.parsedTxLogs,
);
*/
}
logger.info(`Saved ${txData.parsedTxs.length} Transactions`);
}

0 comments on commit 628b191

Please sign in to comment.