From bb82b1cc01fbf3a3909b391dc55d7be0c119622f Mon Sep 17 00:00:00 2001 From: Danil Nemirovsky Date: Thu, 10 Oct 2024 14:35:32 +0100 Subject: [PATCH] feat: Store as much as possible from every range (#4660) ### Description Currently, if Scraper cannot fetch and parse a block or a transaction, it will fail the whole chunk of logs. It means that a transaction which can be fetched and parsed, but happens to be in the same chunk as unparseable transaction, won't be inserted into database. This PR fixes it. Scraper will do its best to insert as many as possible blocks and transactions into database from the chunk. ### Related issues https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/4632 ### Backward compatibility Yes ### Testing Local run of E2E tests. Local run of Scraper --------- Co-authored-by: Danil Nemirovsky <4614623+ameten@users.noreply.github.com> --- .../agents/scraper/src/chain_scraper/mod.rs | 157 ++++++++++-------- rust/main/agents/scraper/src/db/message.rs | 33 ++-- rust/main/agents/scraper/src/db/payment.rs | 16 +- 3 files changed, 117 insertions(+), 89 deletions(-) diff --git a/rust/main/agents/scraper/src/chain_scraper/mod.rs b/rust/main/agents/scraper/src/chain_scraper/mod.rs index 23997766c7..71ddd84efb 100644 --- a/rust/main/agents/scraper/src/chain_scraper/mod.rs +++ b/rust/main/agents/scraper/src/chain_scraper/mod.rs @@ -14,7 +14,7 @@ use hyperlane_core::{ HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256, }; use itertools::Itertools; -use tracing::trace; +use tracing::{trace, warn}; use crate::db::{ BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment, @@ -98,18 +98,12 @@ impl HyperlaneSqlDb { .collect(); trace!(?blocks, "Ensured blocks"); - // all txns we care about - let txns_with_ids = - self.ensure_txns(block_hash_by_txn_hash.into_iter().map( - move |(txn_hash, block_hash)| { - let block_info = *blocks.get(&block_hash).as_ref().unwrap(); - TxnWithBlockId { - txn_hash, - block_id: block_info.id, - } - }, - )) - .await?; + // We ensure transactions only from blocks which are inserted into database + let txn_hash_with_block_ids = block_hash_by_txn_hash + .into_iter() + .filter_map(move |(txn, block)| blocks.get(&block).map(|b| (txn, b.id))) + .map(|(txn_hash, block_id)| TxnWithBlockId { txn_hash, block_id }); + let txns_with_ids = self.ensure_txns(txn_hash_with_block_ids).await?; Ok(txns_with_ids.map(move |TxnWithId { hash, id: txn_id }| TxnWithId { hash, id: txn_id })) } @@ -118,8 +112,10 @@ impl HyperlaneSqlDb { /// in. if it is in the database already: /// Fetches its associated database id /// if it is not in the database already: - /// Looks up its data with ethers and then returns the database id after + /// Looks up its data with the chain and then returns the database id after /// inserting it into the database. + /// if it cannot fetch and parse transaction, the transaction will be skipped and not returned + /// from this method. async fn ensure_txns( &self, txns: impl Iterator, @@ -153,7 +149,13 @@ impl HyperlaneSqlDb { for mut chunk in as_chunks::<(&H256, &mut (Option, i64))>(txns_to_fetch, CHUNK_SIZE) { for (hash, (_, block_id)) in chunk.iter() { - let info = self.provider.get_txn_by_hash(hash).await?; + let info = match self.provider.get_txn_by_hash(hash).await { + Ok(info) => info, + Err(e) => { + warn!(?hash, ?e, "error fetching and parsing transaction"); + continue; + } + }; hashes_to_insert.push(*hash); txns_to_insert.push(StorableTxn { info, @@ -161,35 +163,41 @@ impl HyperlaneSqlDb { }); } + // If we have no transactions to insert, we don't need to store them and update + // database transaction ids. + if txns_to_insert.is_empty() { + continue; + } + self.db.store_txns(txns_to_insert.drain(..)).await?; let ids = self.db.get_txn_ids(hashes_to_insert.drain(..)).await?; for (hash, (txn_id, _block_id)) in chunk.iter_mut() { - let _ = txn_id.insert(ids[hash]); + *txn_id = ids.get(hash).copied(); } } - Ok(txns + let ensured_txns = txns .into_iter() - .map(|(hash, (txn_id, _block_id))| TxnWithId { - hash, - id: txn_id.unwrap(), - })) + .filter_map(|(hash, (txn_id, _))| txn_id.map(|id| (hash, id))) + .map(|(hash, id)| TxnWithId { hash, id }); + + Ok(ensured_txns) } /// Takes a list of block hashes for each block /// if it is in the database already: /// Fetches its associated database id /// if it is not in the database already: - /// Looks up its data with ethers and then returns the database id after + /// Looks up its data with the chain and then returns the database id after /// inserting it into the database. + /// if it cannot fetch and parse block, the block will be skipped and not returned from + /// this method. async fn ensure_blocks( &self, block_hashes: impl Iterator, ) -> Result> { - // mapping of block hash to the database id and block timestamp. Optionals are - // in place because we will find the timestamp first if the block was not - // already in the db. + // Mapping of block hash to `BasicBlock` which contains database block id and block hash. let mut blocks: HashMap> = block_hashes.map(|b| (b, None)).collect(); @@ -222,7 +230,13 @@ impl HyperlaneSqlDb { for chunk in as_chunks(blocks_to_fetch, CHUNK_SIZE) { debug_assert!(!chunk.is_empty()); for (hash, block_info) in chunk { - let info = self.provider.get_block_by_hash(hash).await?; + let info = match self.provider.get_block_by_hash(hash).await { + Ok(info) => info, + Err(e) => { + warn!(?hash, ?e, "error fetching and parsing block"); + continue; + } + }; let basic_info_ref = block_info.insert(BasicBlock { id: -1, hash: *hash, @@ -231,6 +245,12 @@ impl HyperlaneSqlDb { hashes_to_insert.push(hash); } + // If we have no blocks to insert, we don't store them and we don't update + // database block ids. + if blocks_to_insert.is_empty() { + continue; + } + self.db .store_blocks( self.domain().id(), @@ -249,28 +269,25 @@ impl HyperlaneSqlDb { .collect::>(); for (block_ref, _) in blocks_to_insert.drain(..) { - block_ref.id = hashes[&block_ref.hash]; + if let Some(id) = hashes.get(&block_ref.hash) { + block_ref.id = *id; + } } } - // ensure we have updated all the block ids and that we have info for all of - // them. - #[cfg(debug_assertions)] - for (hash, block) in blocks.iter() { - let block = block.as_ref().unwrap(); - assert_eq!(hash, &block.hash); - assert!(block.id > 0); - } - - Ok(blocks + let ensured_blocks = blocks .into_iter() - .map(|(hash, block_info)| block_info.unwrap())) + .filter_map(|(hash, block_info)| block_info.filter(|b| b.id != -1)); + + Ok(ensured_blocks) } } #[async_trait] impl HyperlaneLogStore for HyperlaneSqlDb { - /// Store messages from the origin mailbox into the database. + /// Store dispatched messages from the origin mailbox into the database. + /// We store only messages from blocks and transaction which we could successfully insert + /// into database. async fn store_logs(&self, messages: &[(Indexed, LogMeta)]) -> Result { if messages.is_empty() { return Ok(0); @@ -280,20 +297,18 @@ impl HyperlaneLogStore for HyperlaneSqlDb { .await? .map(|t| (t.hash, t)) .collect(); - let storable = messages.iter().map(|m| { - let txn = txns - .get( - &m.1.transaction_id + let storable = messages + .iter() + .filter_map(|(message, meta)| { + txns.get( + &meta + .transaction_id .try_into() .expect("256-bit transaction ids are the maximum supported at this time"), ) - .unwrap(); - StorableMessage { - msg: m.0.inner().clone(), - meta: &m.1, - txn_id: txn.id, - } - }); + .map(|t| (message.inner().clone(), meta, t.id)) + }) + .map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id }); let stored = self .db .store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable) @@ -304,6 +319,9 @@ impl HyperlaneLogStore for HyperlaneSqlDb { #[async_trait] impl HyperlaneLogStore for HyperlaneSqlDb { + /// Store delivered message ids from the destination mailbox into the database. + /// We store only delivered messages ids from blocks and transaction which we could successfully + /// insert into database. async fn store_logs(&self, deliveries: &[(Indexed, LogMeta)]) -> Result { if deliveries.is_empty() { return Ok(0); @@ -313,22 +331,22 @@ impl HyperlaneLogStore for HyperlaneSqlDb { .await? .map(|t| (t.hash, t)) .collect(); - let storable = deliveries.iter().map(|(message_id, meta)| { - let txn_id = txns - .get( + let storable = deliveries + .iter() + .filter_map(|(message_id, meta)| { + txns.get( &meta .transaction_id .try_into() .expect("256-bit transaction ids are the maximum supported at this time"), ) - .unwrap() - .id; - StorableDelivery { - message_id: *message_id.inner(), + .map(|txn| (*message_id.inner(), meta, txn.id)) + }) + .map(|(message_id, meta, txn_id)| StorableDelivery { + message_id, meta, txn_id, - } - }); + }); let stored = self .db @@ -340,6 +358,9 @@ impl HyperlaneLogStore for HyperlaneSqlDb { #[async_trait] impl HyperlaneLogStore for HyperlaneSqlDb { + /// Store interchain gas payments into the database. + /// We store only interchain gas payments from blocks and transaction which we could + /// successfully insert into database. async fn store_logs( &self, payments: &[(Indexed, LogMeta)], @@ -352,22 +373,22 @@ impl HyperlaneLogStore for HyperlaneSqlDb { .await? .map(|t| (t.hash, t)) .collect(); - let storable = payments.iter().map(|(payment, meta)| { - let txn_id = txns - .get( + let storable = payments + .iter() + .filter_map(|(payment, meta)| { + txns.get( &meta .transaction_id .try_into() .expect("256-bit transaction ids are the maximum supported at this time"), ) - .unwrap() - .id; - StorablePayment { - payment: payment.inner(), + .map(|txn| (payment.inner(), meta, txn.id)) + }) + .map(|(payment, meta, txn_id)| StorablePayment { + payment, meta, txn_id, - } - }); + }); let stored = self.db.store_payments(self.domain().id(), storable).await?; Ok(stored as u32) diff --git a/rust/main/agents/scraper/src/db/message.rs b/rust/main/agents/scraper/src/db/message.rs index 1f62c6fa00..f8b99fec50 100644 --- a/rust/main/agents/scraper/src/db/message.rs +++ b/rust/main/agents/scraper/src/db/message.rs @@ -13,6 +13,7 @@ use crate::date_time; use crate::db::ScraperDb; use super::generated::{delivered_message, message}; + #[derive(Debug, Clone)] pub struct StorableDelivery<'a> { pub message_id: H256, @@ -178,9 +179,13 @@ impl ScraperDb { }) .collect_vec(); - debug_assert!(!models.is_empty()); trace!(?models, "Writing delivered messages to database"); + if models.is_empty() { + debug!("Wrote zero new delivered messages to database"); + return Ok(0); + } + Insert::many(models) .on_conflict( OnConflict::columns([delivered_message::Column::MsgId]) @@ -197,12 +202,10 @@ impl ScraperDb { .deliveries_count_since_id(domain, destination_mailbox, latest_id_before) .await?; - if new_deliveries_count > 0 { - debug!( - messages = new_deliveries_count, - "Wrote new delivered messages to database" - ); - } + debug!( + messages = new_deliveries_count, + "Wrote new delivered messages to database" + ); Ok(new_deliveries_count) } @@ -272,9 +275,13 @@ impl ScraperDb { }) .collect_vec(); - debug_assert!(!models.is_empty()); trace!(?models, "Writing messages to database"); + if models.is_empty() { + debug!("Wrote zero new messages to database"); + return Ok(0); + } + Insert::many(models) .on_conflict( OnConflict::columns([ @@ -299,12 +306,10 @@ impl ScraperDb { .dispatch_count_since_id(domain, origin_mailbox, latest_id_before) .await?; - if new_dispatch_count > 0 { - debug!( - messages = new_dispatch_count, - "Wrote new messages to database" - ); - } + debug!( + messages = new_dispatch_count, + "Wrote new messages to database" + ); Ok(new_dispatch_count) } } diff --git a/rust/main/agents/scraper/src/db/payment.rs b/rust/main/agents/scraper/src/db/payment.rs index 57c63b3e33..250f4f3c6f 100644 --- a/rust/main/agents/scraper/src/db/payment.rs +++ b/rust/main/agents/scraper/src/db/payment.rs @@ -42,9 +42,13 @@ impl ScraperDb { }) .collect_vec(); - debug_assert!(!models.is_empty()); trace!(?models, "Writing gas payments to database"); + if models.is_empty() { + debug!("Wrote zero new gas payments to database"); + return Ok(0); + } + Insert::many(models) .on_conflict( OnConflict::columns([ @@ -67,12 +71,10 @@ impl ScraperDb { .payments_count_since_id(domain, latest_id_before) .await?; - if new_payments_count > 0 { - debug!( - payments = new_payments_count, - "Wrote new gas payments to database" - ); - } + debug!( + payments = new_payments_count, + "Wrote new gas payments to database" + ); Ok(new_payments_count) }