From 80c50219b2894713ab18433b19f5bad89e7e7511 Mon Sep 17 00:00:00 2001 From: Junkil Park Date: Thu, 12 Sep 2024 16:08:48 -0700 Subject: [PATCH] Event V2 Translation --- .../aptosdb/src/db/include/aptosdb_reader.rs | 11 +- storage/aptosdb/src/event_store/mod.rs | 3 +- storage/aptosdb/src/state_store/mod.rs | 2 +- storage/indexer/src/db_indexer.rs | 102 +++++++++++++++++- storage/indexer_schemas/src/schema/mod.rs | 14 +++ .../src/schema/translated_v1_event/mod.rs | 66 ++++++++++++ .../src/schema/translated_v1_event/test.rs | 20 ++++ types/src/account_config/events/deposit.rs | 21 +++- types/src/coin_deposit.rs | 54 ++++++++++ types/src/contract_event.rs | 1 + types/src/lib.rs | 1 + types/src/utility_coin.rs | 20 ++++ 12 files changed, 309 insertions(+), 6 deletions(-) create mode 100644 storage/indexer_schemas/src/schema/translated_v1_event/mod.rs create mode 100644 storage/indexer_schemas/src/schema/translated_v1_event/test.rs create mode 100644 types/src/coin_deposit.rs diff --git a/storage/aptosdb/src/db/include/aptosdb_reader.rs b/storage/aptosdb/src/db/include/aptosdb_reader.rs index 6770f114b379e..729488165f9dc 100644 --- a/storage/aptosdb/src/db/include/aptosdb_reader.rs +++ b/storage/aptosdb/src/db/include/aptosdb_reader.rs @@ -943,7 +943,16 @@ impl AptosDB { let mut events_with_version = event_indices .into_iter() .map(|(seq, ver, idx)| { - let event = self.event_store.get_event_by_version_and_index(ver, idx)?; + let event = match self.event_store.get_event_by_version_and_index(ver, idx)? { + event @ ContractEvent::V1(_) => event, + ContractEvent::V2(_) => ContractEvent::V1( + self.state_store + .internal_indexer_db + .as_ref() + .expect("Indexer not enabled") + .get_translated_v1_event_by_version_and_index(ver, idx)?, + ), + }; let v0 = match &event { ContractEvent::V1(event) => event, ContractEvent::V2(_) => bail!("Unexpected module event"), diff --git a/storage/aptosdb/src/event_store/mod.rs b/storage/aptosdb/src/event_store/mod.rs index 1ff4d31c330d4..e909ab782a4f7 100644 --- a/storage/aptosdb/src/event_store/mod.rs +++ b/storage/aptosdb/src/event_store/mod.rs @@ -19,13 +19,14 @@ use aptos_crypto::{ }; use aptos_db_indexer_schemas::schema::{ event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, + translated_v1_event::TranslatedV1EventSchema, }; use aptos_schemadb::{iterator::SchemaIterator, schema::ValueCodec, ReadOptions, SchemaBatch, DB}; use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result}; use aptos_types::{ account_address::AccountAddress, account_config::{new_block_event_key, NewBlockEvent}, - contract_event::ContractEvent, + contract_event::{ContractEvent, ContractEventV1}, event::EventKey, proof::position::Position, transaction::Version, diff --git a/storage/aptosdb/src/state_store/mod.rs b/storage/aptosdb/src/state_store/mod.rs index 1b30c6343eb89..8b155b4c94a5f 100644 --- a/storage/aptosdb/src/state_store/mod.rs +++ b/storage/aptosdb/src/state_store/mod.rs @@ -107,7 +107,7 @@ pub(crate) struct StateStore { buffered_state: Mutex, buffered_state_target_items: usize, smt_ancestors: Mutex>, - internal_indexer_db: Option, + pub internal_indexer_db: Option, } impl Deref for StateStore { diff --git a/storage/indexer/src/db_indexer.rs b/storage/indexer/src/db_indexer.rs index f21aade0905a5..636efb3bfe96f 100644 --- a/storage/indexer/src/db_indexer.rs +++ b/storage/indexer/src/db_indexer.rs @@ -9,6 +9,7 @@ use aptos_db_indexer_schemas::{ event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema, indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema, transaction_by_account::TransactionByAccountSchema, + translated_v1_event::TranslatedV1EventSchema, }, utils::{ error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter, @@ -17,22 +18,29 @@ use aptos_db_indexer_schemas::{ }; use aptos_schemadb::{SchemaBatch, DB}; use aptos_storage_interface::{ - db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result, + db_ensure as ensure, db_other_bail as bail, state_view::LatestDbStateCheckpointView, + AptosDbError, DbReader, Result, }; use aptos_types::{ account_address::AccountAddress, - contract_event::{ContractEvent, EventWithVersion}, + account_config::{CoinStoreResource, DepositEvent, DEPOSIT_EVENT_TYPE}, + coin_deposit::{CoinDeposit, COIN_DEPOSIT_TYPE_STR}, + contract_event::{ContractEvent, ContractEventV1, ContractEventV2, EventWithVersion}, event::EventKey, indexer::indexer_db_reader::Order, state_store::{ state_key::{prefix::StateKeyPrefix, StateKey}, state_value::StateValue, + TStateView, }, transaction::{AccountTransactionsWithProof, Transaction, Version}, write_set::{TransactionWrite, WriteSet}, + DummyCoinType, }; +use move_core_types::language_storage::StructTag; use std::{ cmp::min, + str::FromStr, sync::{ mpsc::{self, Receiver, Sender}, Arc, @@ -186,6 +194,16 @@ impl InternalIndexerDB { )) } + pub fn get_next_sequence_number(&self, event_key: &EventKey) -> Result { + let mut iter = self.db.iter::()?; + iter.seek_for_prev(&(*event_key, u64::max_value()))?; + + Ok(iter.next().transpose()?.map_or( + 0, + |((key, seq), _)| if &key == event_key { seq + 1 } else { 0 }, + )) + } + /// Given `event_key` and `start_seq_num`, returns events identified by transaction version and /// index among all events emitted by the same transaction. Result won't contain records with a /// transaction version > `ledger_version` and is in ascending order. @@ -273,6 +291,16 @@ impl InternalIndexerDB { .get::(key)? .map(|v| v.expect_version())) } + + pub fn get_translated_v1_event_by_version_and_index( + &self, + version: Version, + index: u64, + ) -> Result { + self.db + .get::(&(version, index))? + .ok_or_else(|| AptosDbError::NotFound(format!("Event {} of Txn {}", index, version))) + } } pub struct DBIndexer { @@ -392,6 +420,33 @@ impl DBIndexer { ) .expect("Failed to put events by version to a batch"); } + if let ContractEvent::V2(v2) = event { + if let Some(translated_v1_event) = self + .translate_event_v2_to_v1(v2) + .expect("Failure in translating event") + { + let key = *translated_v1_event.key(); + let sequence_number = translated_v1_event.sequence_number(); + batch + .put::( + &(key, sequence_number), + &(version, idx as u64), + ) + .expect("Failed to put events by key to a batch"); + batch + .put::( + &(key, version, sequence_number), + &(idx as u64), + ) + .expect("Failed to put events by version to a batch"); + batch + .put::( + &(version, idx as u64), + &translated_v1_event, + ) + .expect("Failed to put translated v1 events to a batch"); + } + } }); } @@ -436,6 +491,49 @@ impl DBIndexer { Ok(version) } + fn get_resource( + &self, + address: &AccountAddress, + struct_tag_str: &str, + ) -> Result> { + let state_view = self + .main_db_reader + .latest_state_checkpoint_view() + .expect("Failed to get state view"); + + let struct_tag = StructTag::from_str(struct_tag_str)?; + let state_key = StateKey::resource(address, &struct_tag)?; + let maybe_state_value = state_view.get_state_value(&state_key)?; + Ok(maybe_state_value) + } + + fn translate_event_v2_to_v1(&self, v2: &ContractEventV2) -> Result> { + match v2.type_tag().to_canonical_string().as_str() { + COIN_DEPOSIT_TYPE_STR => { + let coin_deposit = CoinDeposit::try_from_bytes(v2.event_data())?; + let struct_tag_str = format!("0x1::coin::CoinStore<{}>", coin_deposit.coin_type()); + // We can use `DummyCoinType` as it does not affect the correctness of deserialization. + let state_value = self + .get_resource(&AccountAddress::ONE, &struct_tag_str)? + .expect("Event handle resource not found"); + let coin_store_resource: CoinStoreResource = + bcs::from_bytes(state_value.bytes())?; + + let key = *coin_store_resource.deposit_events().key(); + let sequence_number = self.indexer_db.get_next_sequence_number(&key)?; + + let deposit_event = DepositEvent::new(coin_deposit.amount()); + Ok(Some(ContractEventV1::new( + key, + sequence_number, + DEPOSIT_EVENT_TYPE.clone(), + bcs::to_bytes(&deposit_event)?, + ))) + }, + _ => Ok(None), + } + } + pub fn get_account_transactions( &self, address: AccountAddress, diff --git a/storage/indexer_schemas/src/schema/mod.rs b/storage/indexer_schemas/src/schema/mod.rs index 0f4dfd4c7bcd6..ff9b8f50e4859 100644 --- a/storage/indexer_schemas/src/schema/mod.rs +++ b/storage/indexer_schemas/src/schema/mod.rs @@ -12,6 +12,9 @@ pub mod indexer_metadata; pub mod state_keys; pub mod table_info; pub mod transaction_by_account; +pub mod translated_v1_event; + +use anyhow::ensure; use aptos_schemadb::ColumnFamilyName; pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default"; @@ -22,6 +25,7 @@ pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key"; pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version"; pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account"; pub const STATE_KEYS_CF_NAME: ColumnFamilyName = "state_keys"; +pub const TRANSLATED_V1_EVENT_CF_NAME: ColumnFamilyName = "translated_v1_event"; pub fn column_families() -> Vec { vec![ @@ -41,3 +45,13 @@ pub fn internal_indexer_column_families() -> Vec { STATE_KEYS_CF_NAME, ] } + +fn ensure_slice_len_eq(data: &[u8], len: usize) -> anyhow::Result<()> { + ensure!( + data.len() == len, + "Unexpected data len {}, expected {}.", + data.len(), + len, + ); + Ok(()) +} diff --git a/storage/indexer_schemas/src/schema/translated_v1_event/mod.rs b/storage/indexer_schemas/src/schema/translated_v1_event/mod.rs new file mode 100644 index 0000000000000..9f196482389d8 --- /dev/null +++ b/storage/indexer_schemas/src/schema/translated_v1_event/mod.rs @@ -0,0 +1,66 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! This module defines physical storage schema for the contract events. +//! +//! A translated v1 event is keyed by the version of the transaction it belongs to and the index of +//! the original v2 event among all events yielded by the same transaction. +//! ```text +//! |<-------key----->|<---value--->| +//! | version | index | event bytes | +//! ``` + +use crate::schema::{ensure_slice_len_eq, TRANSLATED_V1_EVENT_CF_NAME}; +use anyhow::Result; +use aptos_schemadb::{ + define_pub_schema, + schema::{KeyCodec, ValueCodec}, +}; +use aptos_types::{contract_event::ContractEventV1, transaction::Version}; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use std::mem::size_of; + +define_pub_schema!( + TranslatedV1EventSchema, + Key, + ContractEventV1, + TRANSLATED_V1_EVENT_CF_NAME +); + +type Index = u64; +type Key = (Version, Index); + +impl KeyCodec for Key { + fn encode_key(&self) -> Result> { + let (version, index) = *self; + + let mut encoded_key = Vec::with_capacity(size_of::() + size_of::()); + encoded_key.write_u64::(version)?; + encoded_key.write_u64::(index)?; + Ok(encoded_key) + } + + fn decode_key(data: &[u8]) -> Result { + ensure_slice_len_eq(data, size_of::())?; + + let version_size = size_of::(); + + let version = (&data[..version_size]).read_u64::()?; + let index = (&data[version_size..]).read_u64::()?; + Ok((version, index)) + } +} + +impl ValueCodec for ContractEventV1 { + fn encode_value(&self) -> Result> { + bcs::to_bytes(self).map_err(Into::into) + } + + fn decode_value(data: &[u8]) -> Result { + bcs::from_bytes(data).map_err(Into::into) + } +} + +#[cfg(test)] +mod test; diff --git a/storage/indexer_schemas/src/schema/translated_v1_event/test.rs b/storage/indexer_schemas/src/schema/translated_v1_event/test.rs new file mode 100644 index 0000000000000..d28c191121c04 --- /dev/null +++ b/storage/indexer_schemas/src/schema/translated_v1_event/test.rs @@ -0,0 +1,20 @@ +// Copyright © Aptos Foundation +// Parts of the project are originally copyright © Meta Platforms, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding}; +use proptest::prelude::*; + +proptest! { + #[test] + fn test_encode_decode( + version in any::(), + index in any::(), + event in any::(), + ) { + assert_encode_decode::(&(version, index), &event); + } +} + +test_no_panic_decoding!(TranslatedV1EventSchema); diff --git a/types/src/account_config/events/deposit.rs b/types/src/account_config/events/deposit.rs index 2c59bc4fc336f..54269f54cfe22 100644 --- a/types/src/account_config/events/deposit.rs +++ b/types/src/account_config/events/deposit.rs @@ -2,7 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use move_core_types::{ident_str, identifier::IdentStr, move_resource::MoveStructType}; +use move_core_types::{ + ident_str, + identifier::IdentStr, + language_storage::{StructTag, TypeTag, CORE_CODE_ADDRESS}, + move_resource::MoveStructType, +}; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; /// Struct that represents a DepositPaymentEvent. @@ -12,6 +18,10 @@ pub struct DepositEvent { } impl DepositEvent { + pub fn new(amount: u64) -> Self { + Self { amount } + } + pub fn try_from_bytes(bytes: &[u8]) -> Result { bcs::from_bytes(bytes).map_err(Into::into) } @@ -26,3 +36,12 @@ impl MoveStructType for DepositEvent { const MODULE_NAME: &'static IdentStr = ident_str!("coin"); const STRUCT_NAME: &'static IdentStr = ident_str!("DepositEvent"); } + +pub static DEPOSIT_EVENT_TYPE: Lazy = Lazy::new(|| { + TypeTag::Struct(Box::new(StructTag { + address: CORE_CODE_ADDRESS, + module: ident_str!("coin").to_owned(), + name: ident_str!("DepositEvent").to_owned(), + type_args: vec![], + })) +}); diff --git a/types/src/coin_deposit.rs b/types/src/coin_deposit.rs new file mode 100644 index 0000000000000..691f6cc6bad89 --- /dev/null +++ b/types/src/coin_deposit.rs @@ -0,0 +1,54 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use move_core_types::{ + account_address::AccountAddress, + ident_str, + language_storage::{StructTag, TypeTag, CORE_CODE_ADDRESS}, +}; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub struct CoinDeposit { + coin_type: String, + account: AccountAddress, + amount: u64, +} + +impl CoinDeposit { + pub fn new(coin_type: String, account: AccountAddress, amount: u64) -> Self { + Self { + coin_type, + account, + amount, + } + } + + pub fn try_from_bytes(bytes: &[u8]) -> anyhow::Result { + bcs::from_bytes(bytes).map_err(Into::into) + } + + pub fn coin_type(&self) -> &str { + &self.coin_type + } + + pub fn account(&self) -> &AccountAddress { + &self.account + } + + pub fn amount(&self) -> u64 { + self.amount + } +} + +pub const COIN_DEPOSIT_TYPE_STR: &str = "00000000000000000000000000000001::coin::CoinDeposit"; + +pub static COIN_DEPOSIT_TYPE: Lazy = Lazy::new(|| { + TypeTag::Struct(Box::new(StructTag { + address: CORE_CODE_ADDRESS, + module: ident_str!("coin").to_owned(), + name: ident_str!("CoinDeposit").to_owned(), + type_args: vec![], + })) +}); diff --git a/types/src/contract_event.rs b/types/src/contract_event.rs index 59d9a27fae406..277c068ca57c0 100644 --- a/types/src/contract_event.rs +++ b/types/src/contract_event.rs @@ -169,6 +169,7 @@ impl ContractEvent { /// Entry produced via a call to the `emit_event` builtin. #[derive(Hash, Clone, Eq, PartialEq, Serialize, Deserialize, CryptoHasher)] +#[cfg_attr(any(test, feature = "fuzzing"), derive(Arbitrary))] pub struct ContractEventV1 { /// The unique key that the event was emitted to key: EventKey, diff --git a/types/src/lib.rs b/types/src/lib.rs index 9081b6c0f0a4d..c76d7d7507f12 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -11,6 +11,7 @@ pub mod block_info; pub mod block_metadata; pub mod block_metadata_ext; pub mod chain_id; +pub mod coin_deposit; pub mod contract_event; pub mod dkg; pub mod epoch_change; diff --git a/types/src/utility_coin.rs b/types/src/utility_coin.rs index 0074ac9fb7478..cdd3f4448c2ff 100644 --- a/types/src/utility_coin.rs +++ b/types/src/utility_coin.rs @@ -34,3 +34,23 @@ impl CoinType for AptosCoinType { AccountAddress::ONE } } + +pub static DUMMY_COIN_TYPE: Lazy = Lazy::new(|| { + TypeTag::Struct(Box::new(StructTag { + address: AccountAddress::ONE, + module: ident_str!("dummy_coin").to_owned(), + name: ident_str!("DummyCoin").to_owned(), + type_args: vec![], + })) +}); + +pub struct DummyCoinType; +impl CoinType for DummyCoinType { + fn type_tag() -> TypeTag { + DUMMY_COIN_TYPE.clone() + } + + fn coin_info_address() -> AccountAddress { + AccountAddress::ONE + } +}