Skip to content

Commit

Permalink
feat: Store as much as possible from every range (#4660)
Browse files Browse the repository at this point in the history
### Description

Currently, if Scraper cannot fetch and parse a block or a transaction,
it will fail the whole chunk of logs. It means that a transaction which
can be fetched and parsed, but happens to be in the same chunk as
unparseable transaction, won't be inserted into database.

This PR fixes it. Scraper will do its best to insert as many as possible
blocks and transactions into database from the chunk.

### Related issues

#4632

### Backward compatibility

Yes

### Testing

Local run of E2E tests.
Local run of Scraper

---------

Co-authored-by: Danil Nemirovsky <[email protected]>
  • Loading branch information
ameten and ameten authored Oct 10, 2024
1 parent e89f9e3 commit bb82b1c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 89 deletions.
157 changes: 89 additions & 68 deletions rust/main/agents/scraper/src/chain_scraper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hyperlane_core::{
HyperlaneWatermarkedLogStore, Indexed, InterchainGasPayment, LogMeta, H256,
};
use itertools::Itertools;
use tracing::trace;
use tracing::{trace, warn};

use crate::db::{
BasicBlock, BlockCursor, ScraperDb, StorableDelivery, StorableMessage, StorablePayment,
Expand Down Expand Up @@ -98,18 +98,12 @@ impl HyperlaneSqlDb {
.collect();
trace!(?blocks, "Ensured blocks");

// all txns we care about
let txns_with_ids =
self.ensure_txns(block_hash_by_txn_hash.into_iter().map(
move |(txn_hash, block_hash)| {
let block_info = *blocks.get(&block_hash).as_ref().unwrap();
TxnWithBlockId {
txn_hash,
block_id: block_info.id,
}
},
))
.await?;
// We ensure transactions only from blocks which are inserted into database
let txn_hash_with_block_ids = block_hash_by_txn_hash
.into_iter()
.filter_map(move |(txn, block)| blocks.get(&block).map(|b| (txn, b.id)))
.map(|(txn_hash, block_id)| TxnWithBlockId { txn_hash, block_id });
let txns_with_ids = self.ensure_txns(txn_hash_with_block_ids).await?;

Ok(txns_with_ids.map(move |TxnWithId { hash, id: txn_id }| TxnWithId { hash, id: txn_id }))
}
Expand All @@ -118,8 +112,10 @@ impl HyperlaneSqlDb {
/// in. if it is in the database already:
/// Fetches its associated database id
/// if it is not in the database already:
/// Looks up its data with ethers and then returns the database id after
/// Looks up its data with the chain and then returns the database id after
/// inserting it into the database.
/// if it cannot fetch and parse transaction, the transaction will be skipped and not returned
/// from this method.
async fn ensure_txns(
&self,
txns: impl Iterator<Item = TxnWithBlockId>,
Expand Down Expand Up @@ -153,43 +149,55 @@ impl HyperlaneSqlDb {

for mut chunk in as_chunks::<(&H256, &mut (Option<i64>, i64))>(txns_to_fetch, CHUNK_SIZE) {
for (hash, (_, block_id)) in chunk.iter() {
let info = self.provider.get_txn_by_hash(hash).await?;
let info = match self.provider.get_txn_by_hash(hash).await {
Ok(info) => info,
Err(e) => {
warn!(?hash, ?e, "error fetching and parsing transaction");
continue;
}
};
hashes_to_insert.push(*hash);
txns_to_insert.push(StorableTxn {
info,
block_id: *block_id,
});
}

// If we have no transactions to insert, we don't need to store them and update
// database transaction ids.
if txns_to_insert.is_empty() {
continue;
}

self.db.store_txns(txns_to_insert.drain(..)).await?;
let ids = self.db.get_txn_ids(hashes_to_insert.drain(..)).await?;

for (hash, (txn_id, _block_id)) in chunk.iter_mut() {
let _ = txn_id.insert(ids[hash]);
*txn_id = ids.get(hash).copied();
}
}

Ok(txns
let ensured_txns = txns
.into_iter()
.map(|(hash, (txn_id, _block_id))| TxnWithId {
hash,
id: txn_id.unwrap(),
}))
.filter_map(|(hash, (txn_id, _))| txn_id.map(|id| (hash, id)))
.map(|(hash, id)| TxnWithId { hash, id });

Ok(ensured_txns)
}

/// Takes a list of block hashes for each block
/// if it is in the database already:
/// Fetches its associated database id
/// if it is not in the database already:
/// Looks up its data with ethers and then returns the database id after
/// Looks up its data with the chain and then returns the database id after
/// inserting it into the database.
/// if it cannot fetch and parse block, the block will be skipped and not returned from
/// this method.
async fn ensure_blocks(
&self,
block_hashes: impl Iterator<Item = H256>,
) -> Result<impl Iterator<Item = BasicBlock>> {
// mapping of block hash to the database id and block timestamp. Optionals are
// in place because we will find the timestamp first if the block was not
// already in the db.
// Mapping of block hash to `BasicBlock` which contains database block id and block hash.
let mut blocks: HashMap<H256, Option<BasicBlock>> =
block_hashes.map(|b| (b, None)).collect();

Expand Down Expand Up @@ -222,7 +230,13 @@ impl HyperlaneSqlDb {
for chunk in as_chunks(blocks_to_fetch, CHUNK_SIZE) {
debug_assert!(!chunk.is_empty());
for (hash, block_info) in chunk {
let info = self.provider.get_block_by_hash(hash).await?;
let info = match self.provider.get_block_by_hash(hash).await {
Ok(info) => info,
Err(e) => {
warn!(?hash, ?e, "error fetching and parsing block");
continue;
}
};
let basic_info_ref = block_info.insert(BasicBlock {
id: -1,
hash: *hash,
Expand All @@ -231,6 +245,12 @@ impl HyperlaneSqlDb {
hashes_to_insert.push(hash);
}

// If we have no blocks to insert, we don't store them and we don't update
// database block ids.
if blocks_to_insert.is_empty() {
continue;
}

self.db
.store_blocks(
self.domain().id(),
Expand All @@ -249,28 +269,25 @@ impl HyperlaneSqlDb {
.collect::<HashMap<_, _>>();

for (block_ref, _) in blocks_to_insert.drain(..) {
block_ref.id = hashes[&block_ref.hash];
if let Some(id) = hashes.get(&block_ref.hash) {
block_ref.id = *id;
}
}
}

// ensure we have updated all the block ids and that we have info for all of
// them.
#[cfg(debug_assertions)]
for (hash, block) in blocks.iter() {
let block = block.as_ref().unwrap();
assert_eq!(hash, &block.hash);
assert!(block.id > 0);
}

Ok(blocks
let ensured_blocks = blocks
.into_iter()
.map(|(hash, block_info)| block_info.unwrap()))
.filter_map(|(hash, block_info)| block_info.filter(|b| b.id != -1));

Ok(ensured_blocks)
}
}

#[async_trait]
impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb {
/// Store messages from the origin mailbox into the database.
/// Store dispatched messages from the origin mailbox into the database.
/// We store only messages from blocks and transaction which we could successfully insert
/// into database.
async fn store_logs(&self, messages: &[(Indexed<HyperlaneMessage>, LogMeta)]) -> Result<u32> {
if messages.is_empty() {
return Ok(0);
Expand All @@ -280,20 +297,18 @@ impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb {
.await?
.map(|t| (t.hash, t))
.collect();
let storable = messages.iter().map(|m| {
let txn = txns
.get(
&m.1.transaction_id
let storable = messages
.iter()
.filter_map(|(message, meta)| {
txns.get(
&meta
.transaction_id
.try_into()
.expect("256-bit transaction ids are the maximum supported at this time"),
)
.unwrap();
StorableMessage {
msg: m.0.inner().clone(),
meta: &m.1,
txn_id: txn.id,
}
});
.map(|t| (message.inner().clone(), meta, t.id))
})
.map(|(msg, meta, txn_id)| StorableMessage { msg, meta, txn_id });
let stored = self
.db
.store_dispatched_messages(self.domain().id(), &self.mailbox_address, storable)
Expand All @@ -304,6 +319,9 @@ impl HyperlaneLogStore<HyperlaneMessage> for HyperlaneSqlDb {

#[async_trait]
impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {
/// Store delivered message ids from the destination mailbox into the database.
/// We store only delivered messages ids from blocks and transaction which we could successfully
/// insert into database.
async fn store_logs(&self, deliveries: &[(Indexed<Delivery>, LogMeta)]) -> Result<u32> {
if deliveries.is_empty() {
return Ok(0);
Expand All @@ -313,22 +331,22 @@ impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {
.await?
.map(|t| (t.hash, t))
.collect();
let storable = deliveries.iter().map(|(message_id, meta)| {
let txn_id = txns
.get(
let storable = deliveries
.iter()
.filter_map(|(message_id, meta)| {
txns.get(
&meta
.transaction_id
.try_into()
.expect("256-bit transaction ids are the maximum supported at this time"),
)
.unwrap()
.id;
StorableDelivery {
message_id: *message_id.inner(),
.map(|txn| (*message_id.inner(), meta, txn.id))
})
.map(|(message_id, meta, txn_id)| StorableDelivery {
message_id,
meta,
txn_id,
}
});
});

let stored = self
.db
Expand All @@ -340,6 +358,9 @@ impl HyperlaneLogStore<Delivery> for HyperlaneSqlDb {

#[async_trait]
impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb {
/// Store interchain gas payments into the database.
/// We store only interchain gas payments from blocks and transaction which we could
/// successfully insert into database.
async fn store_logs(
&self,
payments: &[(Indexed<InterchainGasPayment>, LogMeta)],
Expand All @@ -352,22 +373,22 @@ impl HyperlaneLogStore<InterchainGasPayment> for HyperlaneSqlDb {
.await?
.map(|t| (t.hash, t))
.collect();
let storable = payments.iter().map(|(payment, meta)| {
let txn_id = txns
.get(
let storable = payments
.iter()
.filter_map(|(payment, meta)| {
txns.get(
&meta
.transaction_id
.try_into()
.expect("256-bit transaction ids are the maximum supported at this time"),
)
.unwrap()
.id;
StorablePayment {
payment: payment.inner(),
.map(|txn| (payment.inner(), meta, txn.id))
})
.map(|(payment, meta, txn_id)| StorablePayment {
payment,
meta,
txn_id,
}
});
});

let stored = self.db.store_payments(self.domain().id(), storable).await?;
Ok(stored as u32)
Expand Down
33 changes: 19 additions & 14 deletions rust/main/agents/scraper/src/db/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::date_time;
use crate::db::ScraperDb;

use super::generated::{delivered_message, message};

#[derive(Debug, Clone)]
pub struct StorableDelivery<'a> {
pub message_id: H256,
Expand Down Expand Up @@ -178,9 +179,13 @@ impl ScraperDb {
})
.collect_vec();

debug_assert!(!models.is_empty());
trace!(?models, "Writing delivered messages to database");

if models.is_empty() {
debug!("Wrote zero new delivered messages to database");
return Ok(0);
}

Insert::many(models)
.on_conflict(
OnConflict::columns([delivered_message::Column::MsgId])
Expand All @@ -197,12 +202,10 @@ impl ScraperDb {
.deliveries_count_since_id(domain, destination_mailbox, latest_id_before)
.await?;

if new_deliveries_count > 0 {
debug!(
messages = new_deliveries_count,
"Wrote new delivered messages to database"
);
}
debug!(
messages = new_deliveries_count,
"Wrote new delivered messages to database"
);
Ok(new_deliveries_count)
}

Expand Down Expand Up @@ -272,9 +275,13 @@ impl ScraperDb {
})
.collect_vec();

debug_assert!(!models.is_empty());
trace!(?models, "Writing messages to database");

if models.is_empty() {
debug!("Wrote zero new messages to database");
return Ok(0);
}

Insert::many(models)
.on_conflict(
OnConflict::columns([
Expand All @@ -299,12 +306,10 @@ impl ScraperDb {
.dispatch_count_since_id(domain, origin_mailbox, latest_id_before)
.await?;

if new_dispatch_count > 0 {
debug!(
messages = new_dispatch_count,
"Wrote new messages to database"
);
}
debug!(
messages = new_dispatch_count,
"Wrote new messages to database"
);
Ok(new_dispatch_count)
}
}
16 changes: 9 additions & 7 deletions rust/main/agents/scraper/src/db/payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@ impl ScraperDb {
})
.collect_vec();

debug_assert!(!models.is_empty());
trace!(?models, "Writing gas payments to database");

if models.is_empty() {
debug!("Wrote zero new gas payments to database");
return Ok(0);
}

Insert::many(models)
.on_conflict(
OnConflict::columns([
Expand All @@ -67,12 +71,10 @@ impl ScraperDb {
.payments_count_since_id(domain, latest_id_before)
.await?;

if new_payments_count > 0 {
debug!(
payments = new_payments_count,
"Wrote new gas payments to database"
);
}
debug!(
payments = new_payments_count,
"Wrote new gas payments to database"
);
Ok(new_payments_count)
}

Expand Down

0 comments on commit bb82b1c

Please sign in to comment.