Skip to content

Commit

Permalink
ft_watcher: worker to invoke ft
Browse files Browse the repository at this point in the history
test: initial swap monitor testing

ft_swap: parse redeem params to get fill vaa

add id for redeem event

ft_watcher: parse swap event and input

ft_watcher: plug swap layer into ft watcher

Signed-off-by: bingyuyap <[email protected]>
  • Loading branch information
bingyuyap committed Aug 7, 2024
1 parent de8f1f8 commit 63aaa73
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 25 deletions.
12 changes: 12 additions & 0 deletions database/fast-transfer-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ DROP TABLE IF EXISTS fast_transfer_executions;
DROP TABLE IF EXISTS fast_transfer_settlements;
DROP TABLE IF EXISTS auction_logs;
DROP TABLE IF EXISTS auction_history_mapping;
DROP TABLE IF EXISTS redeem_swaps;

DROP TYPE IF EXISTS FastTransferStatus;
DROP TYPE IF EXISTS FastTransferProtocol;
Expand Down Expand Up @@ -92,3 +93,14 @@ CREATE TABLE auction_history_mapping (
auction_pubkey VARCHAR(255) PRIMARY KEY,
index INT NOT NULL
);

-- Redeem Swaps table to track the final swap before funds reach the user's account
CREATE TABLE redeem_swaps (
fill_vaa_id VARCHAR(255) PRIMARY KEY,
tx_hash VARCHAR(255) NOT NULL,
recipient VARCHAR(255) NOT NULL,
output_token VARCHAR(255) NOT NULL,
output_amount BIGINT NOT NULL,
relaying_fee BIGINT NOT NULL,
timestamp TIMESTAMP NOT NULL
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"to": "0xdA11B3bc8705D84BEae4a796035bDcCc9b59d1ee",
"from": "0x3E5e9111Ae8eB78Fe1CC3bb8915d5D461F3Ef9A9",
"contractAddress": null,
"transactionIndex": 0,
"logsBloom": "0x0200000000400100008000010000000100000000000000000800010000020000000200000000000001000200000000000000000000000008000000000080000000000000000000400800000800000000000000000000000000000000000000000002000002000002800000000001380000000000010100000000001000840000000000000000040000020000000000020000010001000020000000000000040000000000000020000200000000000000000000000000000000000000000000000000001200000040000000000000000000000000000000000000000000002000400002000000000000000018000000000c080000000800000000000000000000",
"blockHash": "0x823e2da477c5e8f8aff4bec177b11adca3ad16550d65faf38a3895e4c23d503b",
"transactionHash": "0x8e61395ff443d67697fdafad62403dca90f2ffb0181e141b0c1ed52090873d13",
"logs": [
{
"transactionIndex": 0,
"blockNumber": 20034952,
"transactionHash": "0x8e61395ff443d67697fdafad62403dca90f2ffb0181e141b0c1ed52090873d13",
"address": "0xdA11B3bc8705D84BEae4a796035bDcCc9b59d1ee",
"topics": [
"0x5cdf07ad0fc222442720b108e3ed4c4640f0fadc2ab2253e66f259a0fea83480",
"0x00000000000000000000000095ced938f7991cd0dfcb48f0a06a40fa1af46ebc"
],
"data": "0x000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb4800000000000000000000000000000000000000000000000000000004a817c80000000000000000000000000000000000000000000000000000000000009f6a8c",
"logIndex": 7,
"blockHash": "0x823e2da477c5e8f8aff4bec177b11adca3ad16550d65faf38a3895e4c23d503b"
}
],
"blockNumber": 20034952,
"confirmations": 1,
"status": 1,
"type": 2,
"byzantium": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"hash": "0x8e61395ff443d67697fdafad62403dca90f2ffb0181e141b0c1ed52090873d13",
"type": 2,
"accessList": [],
"blockHash": "0x823e2da477c5e8f8aff4bec177b11adca3ad16550d65faf38a3895e4c23d503b",
"blockNumber": 20034952,
"transactionIndex": 0,
"confirmations": 1,
"from": "0x3E5e9111Ae8eB78Fe1CC3bb8915d5D461F3Ef9A9",
"to": "0xdA11B3bc8705D84BEae4a796035bDcCc9b59d1ee",
"nonce": 19,
"data": "0x604009a900000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000060000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000600000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000032000000000000000000000000000000000000000000000000000000000000001800100000000010026ef88e25acfa030a19569d2e6bf14c4c79a74d8b13d290c070f29a61082f4306b33c82d9a3159a1f97e94ed5b06a06aa343f39ac63a773093417b10d1ccdfe60066b24580000000000001cb0406e59555bf0371b7c4fff1812a11a8d92dad02ad422062971d61dcce2cd000000000000000022001c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d6100000000000000000000000000000000000000000000000000000004a8b7328c00000005000000000000000000000027aa2c95ecc476da8c735e951c5308579c8a1cb5b068dc7427a85554c8e0dc562300000000000000000000000092e813b6baf1d17618586118c1a3cfffe2d283dc00720100019fb9f4f1d72bf9e5bd12c996bb162a33ceed72d2e24e985bca01b09c21663b1e000000000000000000000000da11b3bc8705d84beae4a796035bdccc9b59d1ee002d0100000000000000000000000095ced938f7991cd0dfcb48f0a06a40fa1af46ebc02000f42400000009f6a8c0000000000000000000000000000000000000000000000000000000000000000f80000000000000005000000000000000000000027a65fc943419a5ad590042fd67c9791fd015acf53a54cc823edb8ff81b9ed722e000000000000000000000000bd3fa81b58ba92a82136038b25adec7066af315500000000000000000000000092e813b6baf1d17618586118c1a3cfffe2d283dc00000000c6fa7af3bedbad3a3d65f36aabc97431b1bbe4c2d2f6e0e47ca60203452f5d6100000000000000000000000092e813b6baf1d17618586118c1a3cfffe2d283dc00000000000000000000000000000000000000000000000000000004a8b7328ccb0406e59555bf0371b7c4fff1812a11a8d92dad02ad422062971d61dcce2cd000000000000000000000000000000000000000000000000000000000000000000000000000000041cb279020da108b02ca3efc668b320ed18b3daf092b86c39a748ee523a9142e214d4c3191d296681b4efd8c1399da400f2963bf2e8a4b0c869b2b4084596da3291c00000000000000000000000000000000000000000000000000000000000000",
"r": "0x700b0cf7513be7506ac000bfe48c87ff69f15cff40ddf457477f8a722d0238f9",
"s": "0x212632d962cdadfb8edf9a2fd54d99db96bce957d03f7b077b64db3613218279",
"v": 1,
"creates": null,
"chainId": 1
}
5 changes: 4 additions & 1 deletion watcher/src/fastTransfer/consts.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Chain, Network } from '@wormhole-foundation/sdk-base';
import { Network } from '@wormhole-foundation/sdk-base';

export type FastTransferContracts = 'MatchingEngine' | 'TokenRouter' | 'USDCMint';

Expand All @@ -16,6 +16,8 @@ export interface SolanaContractAddresses {
export interface EthereumContractAddresses {
TokenRouter: string;
CircleBridge?: string;
// Devnet has no swap layer as they need the mainnet quotes from Uniswap
SwapLayer?: string;
}

export type ContractAddresses = SolanaContractAddresses | EthereumContractAddresses;
Expand All @@ -24,6 +26,7 @@ export type FastTransferContractAddresses = {
[key in Network]?: {
Solana?: SolanaContractAddresses;
ArbitrumSepolia?: EthereumContractAddresses;
Ethereum?: EthereumContractAddresses;
};
};

Expand Down
106 changes: 106 additions & 0 deletions watcher/src/fastTransfer/swapLayer/parser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { ethers } from 'ethers';
import { RedeemSwap } from '../types';
import { parseVaa } from '@wormhole-foundation/wormhole-monitor-common';

class SwapLayerParser {
private provider: ethers.providers.JsonRpcProvider;
private swapLayerAddress: string;
private swapLayerInterface: ethers.utils.Interface;

constructor(provider: ethers.providers.JsonRpcProvider, swapLayerAddress: string) {
this.provider = provider;
this.swapLayerAddress = swapLayerAddress;
this.swapLayerInterface = new ethers.utils.Interface([
'event Redeemed(address indexed recipient, address outputToken, uint256 outputAmount, uint256 relayingFee)',
]);
}

async parseSwapLayerTransaction(txHash: string, blockTime: number): Promise<RedeemSwap | null> {
const receipt = await this.provider.getTransactionReceipt(txHash);

const tx = await this.provider.getTransaction(txHash);
if (!receipt || !tx) return null;

// Remove the function selector (first 4 bytes)
const inputData = '0x' + tx.data.slice(10);

// Use AbiCoder to decode the raw input data
let fillVaaId: string = '';
const abiCoder = new ethers.utils.AbiCoder();
try {
const decodedInput = abiCoder.decode(['bytes', 'tuple(bytes, bytes, bytes)'], inputData);

const encodedWormholeMessage = decodedInput[1][0];
if (encodedWormholeMessage && encodedWormholeMessage.length >= 8) {
const vaaBytes = Buffer.from(encodedWormholeMessage.slice(2), 'hex'); // Remove leading '0x'
const parsedVaa = parseVaa(vaaBytes);

fillVaaId = `${parsedVaa.emitterChain}/${parsedVaa.emitterAddress.toString('hex')}/${
parsedVaa.sequence
}`;
}
} catch (error) {
console.error('Error decoding input data:', error);
}

const swapEvent = receipt.logs
.filter((log) => log.address.toLowerCase() === this.swapLayerAddress.toLowerCase())
.map((log) => {
try {
return this.swapLayerInterface.parseLog(log);
} catch (e) {
return null;
}
})
.find((event) => event && event.name === 'Redeemed');

if (!swapEvent) return null;

return {
tx_hash: txHash,
recipient: swapEvent.args.recipient,
output_amount: BigInt(swapEvent.args.outputAmount.toString()),
output_token: swapEvent.args.outputToken,
timestamp: new Date(blockTime * 1000),
relaying_fee: BigInt(swapEvent.args.relayingFee.toString()),
fill_vaa_id: fillVaaId,
};
}

async getFTSwapInRange(fromBlock: number, toBlock: number): Promise<RedeemSwap[]> {
const filter = {
address: this.swapLayerAddress,
fromBlock,
toBlock,
topics: [this.swapLayerInterface.getEventTopic('Redeemed')],
};

const logs = await this.provider.getLogs(filter);

const blocks: Map<number, ethers.providers.Block> = new Map();

const results = await Promise.all(
logs.map(async (log) => {
const blockTime = await this.fetchBlockTime(blocks, log.blockNumber);
const txHash = log.transactionHash;
return this.parseSwapLayerTransaction(txHash, blockTime);
})
);

return results.filter((result): result is RedeemSwap => result !== null);
}

private async fetchBlockTime(
blocks: Map<number, ethers.providers.Block>,
blockNumber: number
): Promise<number> {
let block = blocks.get(blockNumber);
if (!block) {
block = await this.provider.getBlock(blockNumber);
blocks.set(blockNumber, block);
}
return block.timestamp;
}
}

export default SwapLayerParser;
10 changes: 10 additions & 0 deletions watcher/src/fastTransfer/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,13 @@ export type AuctionUpdatedEvent = {
name: 'AuctionUpdated';
data: AuctionUpdated;
};

export type RedeemSwap = {
tx_hash: string;
recipient: string;
output_token: string;
output_amount: bigint;
relaying_fee: bigint;
timestamp: Date;
fill_vaa_id: string;
};
77 changes: 54 additions & 23 deletions watcher/src/watchers/FTEVMWatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,25 @@ import { ethers } from 'ethers';
import { AXIOS_CONFIG_JSON, RPCS_BY_CHAIN } from '../consts';
import { makeBlockKey } from '../databases/utils';
import TokenRouterParser from '../fastTransfer/tokenRouter/parser';
import { MarketOrder } from '../fastTransfer/types';
import SwapLayerParser from '../fastTransfer/swapLayer/parser';
import { MarketOrder, RedeemSwap } from '../fastTransfer/types';
import { Block } from './EVMWatcher';
import { BigNumber } from 'ethers';
import axios from 'axios';
import { sleep } from '@wormhole-foundation/wormhole-monitor-common';

export type BlockTag = 'finalized' | 'safe' | 'latest';

export class FTEVMWatcher extends Watcher {
finalizedBlockTag: BlockTag;
lastTimestamp: number;
latestFinalizedBlockNumber: number;
tokenRouterAddress: string;
swapLayerAddress: string | undefined;
rpc: string;
provider: ethers.providers.JsonRpcProvider;
parser: TokenRouterParser;
tokenRouterParser: TokenRouterParser;
swapLayerParser: SwapLayerParser | null;
pg: Knex | null = null;

constructor(
Expand All @@ -35,9 +39,13 @@ export class FTEVMWatcher extends Watcher {
this.latestFinalizedBlockNumber = 0;
this.finalizedBlockTag = finalizedBlockTag;
this.tokenRouterAddress = FAST_TRANSFER_CONTRACTS[network]?.[chain]?.TokenRouter!;
this.swapLayerAddress = FAST_TRANSFER_CONTRACTS[network]?.[chain]?.SwapLayer;
this.provider = new ethers.providers.JsonRpcProvider(RPCS_BY_CHAIN[network][chain]);
this.rpc = RPCS_BY_CHAIN[this.network][this.chain]!;
this.parser = new TokenRouterParser(this.network, chain, this.provider);
this.tokenRouterParser = new TokenRouterParser(this.network, chain, this.provider);
this.swapLayerParser = this.swapLayerAddress
? new SwapLayerParser(this.provider, this.swapLayerAddress)
: null;
this.logger.debug('FTWatcher', network, chain, finalizedBlockTag);
// hacky way to not connect to the db in tests
// this is to allow ci to run without a db
Expand Down Expand Up @@ -124,61 +132,84 @@ export class FTEVMWatcher extends Watcher {
}

async getFtMessagesForBlocks(fromBlock: number, toBlock: number): Promise<string> {
const { results, lastBlockTime } = await this.parser.getFTResultsInRange(fromBlock, toBlock);
const tokenRouterPromise = this.tokenRouterParser.getFTResultsInRange(fromBlock, toBlock);
const swapLayerPromise = this.swapLayerParser?.getFTSwapInRange(fromBlock, toBlock) || [];

const [tokenRouterResults, swapLayerResults] = await Promise.all([
tokenRouterPromise,
swapLayerPromise,
]);

if (tokenRouterResults.results.length) {
await this.saveBatch(
tokenRouterResults.results,
'market_orders',
'fast_vaa_id',
fromBlock,
toBlock
);
}

if (results.length) {
await this.saveFastTransfers(results, fromBlock, toBlock);
if (swapLayerResults.length) {
await this.saveBatch(swapLayerResults, 'redeem_swaps', 'fill_vaa_id', fromBlock, toBlock);
}

// we do not need to compare the lastBlockTime from tokenRouter and swapLayer as they both use toBlock
const lastBlockTime = tokenRouterResults.lastBlockTime;
return makeBlockKey(toBlock.toString(), lastBlockTime.toString());
}

// saves fast transfers in smaller batches to reduce the impact in any case anything fails
// saves items in smaller batches to reduce the impact in any case anything fails
// retry with exponential backoff is used here
async saveFastTransfers(
fastTransfers: MarketOrder[],
fromBlock: number,
toBlock: number
private async saveBatch<T>(
items: T[],
tableName: string,
conflictColumn: string,
fromBlock?: number,
toBlock?: number
): Promise<void> {
if (!this.pg) {
return;
}

const batchSize = 50;
const maxRetries = 3;
const totalBatches = Math.ceil(fastTransfers.length / batchSize);
const totalBatches = Math.ceil(items.length / batchSize);

this.logger.debug(
`Attempting to save ${fastTransfers.length} fast transfers in batches of ${batchSize}`
);
this.logger.debug(`Attempting to save ${items.length} ${tableName} in batches of ${batchSize}`);

for (let batchIndex = 0; batchIndex < fastTransfers.length; batchIndex += batchSize) {
const batch = fastTransfers.slice(batchIndex, batchIndex + batchSize);
for (let batchIndex = 0; batchIndex < items.length; batchIndex += batchSize) {
const batch = items.slice(batchIndex, batchIndex + batchSize);
const batchNumber = Math.floor(batchIndex / batchSize) + 1;

for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
await this.pg('market_orders').insert(batch).onConflict('fast_vaa_id').merge();
await this.pg(tableName).insert(batch).onConflict(conflictColumn).merge();
this.logger.info(
`Successfully saved batch ${batchNumber}/${totalBatches} (${batch.length} transfers)`
`Successfully saved batch ${batchNumber}/${totalBatches} (${batch.length} ${tableName})`
);
break;
} catch (e) {
if (attempt === maxRetries) {
const errorMessage = `Failed to save batch ${batchNumber}/${totalBatches} of ${tableName} after ${maxRetries} attempts`;
this.logger.error(
`Failed to save batch ${batchNumber}/${totalBatches} from block ${fromBlock} - ${toBlock} after ${maxRetries} attempts`,
fromBlock && toBlock
? `${errorMessage} from block ${fromBlock} - ${toBlock}`
: errorMessage,
e
);
} else {
// Wait before retrying (exponential backoff)
this.logger.warn(
`Attempt ${attempt} failed for batch ${batchNumber}/${totalBatches}. Retrying...`
`Attempt ${attempt} failed for batch ${batchNumber}/${totalBatches} of ${tableName}. Retrying...`
);
await sleep(1000 * Math.pow(2, attempt - 1));
}
}
}
}
this.logger.info(`Completed saving fast transfers from block ${fromBlock} - ${toBlock}`);
this.logger.info(
`Completed saving ${items.length} ${tableName} from ${fromBlock} to ${toBlock}`
);
}
}

Expand Down
Loading

0 comments on commit 63aaa73

Please sign in to comment.