Skip to content

Commit

Permalink
Event V2 Translation
Browse files Browse the repository at this point in the history
  • Loading branch information
junkil-park committed Oct 3, 2024
1 parent 6140801 commit 123fbfe
Show file tree
Hide file tree
Showing 17 changed files with 401 additions and 28 deletions.
64 changes: 48 additions & 16 deletions api/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use aptos_types::{
account_address::AccountAddress,
account_config::{AccountResource, NewBlockEvent},
chain_id::ChainId,
contract_event::EventWithVersion,
contract_event::{ContractEvent, EventWithVersion},
event::EventKey,
indexer::indexer_db_reader::IndexerReader,
ledger_info::LedgerInfoWithSignatures,
Expand Down Expand Up @@ -818,12 +818,15 @@ impl Context {
.into_iter()
.zip(infos)
.enumerate()
.map(|(i, ((txn, txn_output), info))| {
let version = start_version + i as u64;
let (write_set, events, _, _, _) = txn_output.unpack();
self.get_accumulator_root_hash(version)
.map(|h| (version, txn, info, events, h, write_set).into())
})
.map(
|(i, ((txn, txn_output), info))| -> Result<TransactionOnChainData> {
let version = start_version + i as u64;
let (write_set, mut events, _, _, _) = txn_output.unpack();
self.translate_v2_to_v1_events_for_version(version, &mut events)?;
let h = self.get_accumulator_root_hash(version)?;
Ok((version, txn, info, events, h, write_set).into())
},
)
.collect()
}

Expand Down Expand Up @@ -878,7 +881,11 @@ impl Context {
})?;
txns.into_inner()
.into_iter()
.map(|t| self.convert_into_transaction_on_chain_data(t))
.map(|t| -> Result<TransactionOnChainData> {
let mut txn = self.convert_into_transaction_on_chain_data(t)?;
self.translate_v2_to_v1_events_for_version(txn.version, &mut txn.events)?;
Ok(txn)
})
.collect::<Result<Vec<_>>>()
.context("Failed to parse account transactions")
.map_err(|err| E::internal_with_code(err, AptosErrorCode::InternalError, ledger_info))
Expand All @@ -889,10 +896,16 @@ impl Context {
hash: HashValue,
ledger_version: u64,
) -> Result<Option<TransactionOnChainData>> {
self.db
if let Some(t) = self
.db
.get_transaction_by_hash(hash, ledger_version, true)?
.map(|t| self.convert_into_transaction_on_chain_data(t))
.transpose()
{
let mut txn: TransactionOnChainData = self.convert_into_transaction_on_chain_data(t)?;
self.translate_v2_to_v1_events_for_version(txn.version, &mut txn.events)?;
Ok(Some(txn))
} else {
Ok(None)
}
}

pub async fn get_pending_transaction_by_hash(
Expand All @@ -915,11 +928,30 @@ impl Context {
version: u64,
ledger_version: u64,
) -> Result<TransactionOnChainData> {
self.convert_into_transaction_on_chain_data(self.db.get_transaction_by_version(
version,
ledger_version,
true,
)?)
let mut txn = self.convert_into_transaction_on_chain_data(
self.db
.get_transaction_by_version(version, ledger_version, true)?,
)?;
self.translate_v2_to_v1_events_for_version(version, &mut txn.events)?;
Ok(txn)
}

fn translate_v2_to_v1_events_for_version(
&self,
version: u64,
events: &mut [ContractEvent],
) -> Result<()> {
for (idx, event) in events.iter_mut().enumerate() {
let translated_event = self
.indexer_reader
.as_ref()
.ok_or(anyhow!("Internal indexer reader doesn't exist"))?
.get_translated_v1_event_by_version_and_index(version, idx as u64);
if let Ok(translated_event) = translated_event {
*event = ContractEvent::V1(translated_event);
}
}
Ok(())
}

pub fn get_accumulator_root_hash(&self, version: u64) -> Result<HashValue> {
Expand Down
4 changes: 2 additions & 2 deletions api/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn new_test_context_with_config(test_name: String, node_config: NodeConfig) -> T
fn new_test_context_with_db_sharding_and_internal_indexer(test_name: String) -> TestContext {
let mut node_config = NodeConfig::default();
node_config.storage.rocksdb_configs.enable_storage_sharding = true;
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 10);
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, true, 10);
super_new_test_context(test_name, node_config, false, None)
}

Expand All @@ -45,6 +45,6 @@ fn new_test_context_with_sharding_and_delayed_internal_indexer(
) -> TestContext {
let mut node_config = NodeConfig::default();
node_config.storage.rocksdb_configs.enable_storage_sharding = true;
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, 1);
node_config.indexer_db_config = InternalIndexerDBConfig::new(true, true, true, true, 1);
super_new_test_context(test_name, node_config, false, end_version)
}
8 changes: 8 additions & 0 deletions config/src/config/internal_indexer_db_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
pub struct InternalIndexerDBConfig {
pub enable_transaction: bool,
pub enable_event: bool,
pub enable_event_translation: bool,
pub enable_statekeys: bool,
pub batch_size: usize,
}
Expand All @@ -20,12 +21,14 @@ impl InternalIndexerDBConfig {
pub fn new(
enable_transaction: bool,
enable_event: bool,
enable_event_translation: bool,
enable_statekeys: bool,
batch_size: usize,
) -> Self {
Self {
enable_transaction,
enable_event,
enable_event_translation,
enable_statekeys,
batch_size,
}
Expand All @@ -39,6 +42,10 @@ impl InternalIndexerDBConfig {
self.enable_event
}

pub fn enable_event_translation(&self) -> bool {
self.enable_event_translation
}

pub fn enable_statekeys(&self) -> bool {
self.enable_statekeys
}
Expand All @@ -57,6 +64,7 @@ impl Default for InternalIndexerDBConfig {
Self {
enable_transaction: false,
enable_event: false,
enable_event_translation: false,
enable_statekeys: false,
batch_size: 10_000,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ impl InternalIndexerDBService {
.expect("Failed to open internal indexer db"),
);

let internal_indexer_db_config = InternalIndexerDBConfig::new(false, false, true, 10_000);
let internal_indexer_db_config =
InternalIndexerDBConfig::new(false, false, false, true, 10_000);
Some(InternalIndexerDB::new(arc_db, internal_indexer_db_config))
}

Expand Down
3 changes: 2 additions & 1 deletion storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use aptos_crypto::{
};
use aptos_db_indexer_schemas::schema::{
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
translated_v1_event::TranslatedV1EventSchema,
};
use aptos_schemadb::{iterator::SchemaIterator, schema::ValueCodec, ReadOptions, SchemaBatch, DB};
use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result};
use aptos_types::{
account_address::AccountAddress,
account_config::{new_block_event_key, NewBlockEvent},
contract_event::ContractEvent,
contract_event::{ContractEvent, ContractEventV1},
event::EventKey,
proof::position::Position,
transaction::Version,
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub(crate) struct StateStore {
buffered_state: Mutex<BufferedState>,
buffered_state_target_items: usize,
smt_ancestors: Mutex<SmtAncestors<StateValue>>,
internal_indexer_db: Option<InternalIndexerDB>,
pub internal_indexer_db: Option<InternalIndexerDB>,
}

impl Deref for StateStore {
Expand Down
120 changes: 116 additions & 4 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use aptos_db_indexer_schemas::{
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema,
transaction_by_account::TransactionByAccountSchema,
translated_v1_event::TranslatedV1EventSchema,
},
utils::{
error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter,
Expand All @@ -17,22 +18,29 @@ use aptos_db_indexer_schemas::{
};
use aptos_schemadb::{SchemaBatch, DB};
use aptos_storage_interface::{
db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result,
db_ensure as ensure, db_other_bail as bail, state_view::LatestDbStateCheckpointView,
AptosDbError, DbReader, Result,
};
use aptos_types::{
account_address::AccountAddress,
contract_event::{ContractEvent, EventWithVersion},
account_config::{CoinStoreResource, DepositEvent, DEPOSIT_EVENT_TYPE},
coin_deposit::{CoinDeposit, COIN_DEPOSIT_TYPE_STR},
contract_event::{ContractEvent, ContractEventV1, ContractEventV2, EventWithVersion},
event::EventKey,
indexer::indexer_db_reader::Order,
state_store::{
state_key::{prefix::StateKeyPrefix, StateKey},
state_value::StateValue,
TStateView,
},
transaction::{AccountTransactionsWithProof, Transaction, Version},
write_set::{TransactionWrite, WriteSet},
DummyCoinType,
};
use move_core_types::language_storage::StructTag;
use std::{
cmp::min,
str::FromStr,
sync::{
mpsc::{self, Receiver, Sender},
Arc,
Expand Down Expand Up @@ -118,6 +126,10 @@ impl InternalIndexerDB {
self.config.enable_event
}

pub fn event_translation_enabled(&self) -> bool {
self.config.enable_event_translation
}

pub fn transaction_enabled(&self) -> bool {
self.config.enable_transaction
}
Expand Down Expand Up @@ -186,6 +198,16 @@ impl InternalIndexerDB {
))
}

pub fn get_next_sequence_number(&self, event_key: &EventKey) -> Result<u64> {
let mut iter = self.db.iter::<EventByKeySchema>()?;
iter.seek_for_prev(&(*event_key, u64::max_value()))?;

Ok(iter.next().transpose()?.map_or(
0,
|((key, seq), _)| if &key == event_key { seq + 1 } else { 0 },
))
}

/// Given `event_key` and `start_seq_num`, returns events identified by transaction version and
/// index among all events emitted by the same transaction. Result won't contain records with a
/// transaction version > `ledger_version` and is in ascending order.
Expand Down Expand Up @@ -273,6 +295,16 @@ impl InternalIndexerDB {
.get::<InternalIndexerMetadataSchema>(key)?
.map(|v| v.expect_version()))
}

pub fn get_translated_v1_event_by_version_and_index(
&self,
version: Version,
index: u64,
) -> Result<ContractEventV1> {
self.db
.get::<TranslatedV1EventSchema>(&(version, index))?
.ok_or_else(|| AptosDbError::NotFound(format!("Event {} of Txn {}", index, version)))
}
}

pub struct DBIndexer {
Expand Down Expand Up @@ -394,6 +426,35 @@ impl DBIndexer {
)
.expect("Failed to put events by version to a batch");
}
if self.indexer_db.event_translation_enabled() {
if let ContractEvent::V2(v2) = event {
if let Some(translated_v1_event) = self
.translate_event_v2_to_v1(v2)
.expect("Failure in translating event")
{
let key = *translated_v1_event.key();
let sequence_number = translated_v1_event.sequence_number();
batch
.put::<EventByKeySchema>(
&(key, sequence_number),
&(version, idx as u64),
)
.expect("Failed to put events by key to a batch");
batch
.put::<EventByVersionSchema>(
&(key, version, sequence_number),
&(idx as u64),
)
.expect("Failed to put events by version to a batch");
batch
.put::<TranslatedV1EventSchema>(
&(version, idx as u64),
&translated_v1_event,
)
.expect("Failed to put translated v1 events to a batch");
}
}
}
});
}

Expand Down Expand Up @@ -441,6 +502,49 @@ impl DBIndexer {
Ok(version)
}

fn get_resource(
&self,
address: &AccountAddress,
struct_tag_str: &str,
) -> Result<Option<StateValue>> {
let state_view = self
.main_db_reader
.latest_state_checkpoint_view()
.expect("Failed to get state view");

let struct_tag = StructTag::from_str(struct_tag_str)?;
let state_key = StateKey::resource(address, &struct_tag)?;
let maybe_state_value = state_view.get_state_value(&state_key)?;
Ok(maybe_state_value)
}

fn translate_event_v2_to_v1(&self, v2: &ContractEventV2) -> Result<Option<ContractEventV1>> {
match v2.type_tag().to_canonical_string().as_str() {
COIN_DEPOSIT_TYPE_STR => {
let coin_deposit = CoinDeposit::try_from_bytes(v2.event_data())?;
let struct_tag_str = format!("0x1::coin::CoinStore<{}>", coin_deposit.coin_type());
// We can use `DummyCoinType` as it does not affect the correctness of deserialization.
let state_value = self
.get_resource(coin_deposit.account(), &struct_tag_str)?
.expect("Event handle resource not found");
let coin_store_resource: CoinStoreResource<DummyCoinType> =
bcs::from_bytes(state_value.bytes())?;

let key = *coin_store_resource.deposit_events().key();
let sequence_number = self.indexer_db.get_next_sequence_number(&key)?;

let deposit_event = DepositEvent::new(coin_deposit.amount());
Ok(Some(ContractEventV1::new(
key,
sequence_number,
DEPOSIT_EVENT_TYPE.clone(),
bcs::to_bytes(&deposit_event)?,
)))
},
_ => Ok(None),
}
}

pub fn get_account_transactions(
&self,
address: AccountAddress,
Expand Down Expand Up @@ -550,9 +654,16 @@ impl DBIndexer {
let mut events_with_version = event_indices
.into_iter()
.map(|(seq, ver, idx)| {
let event = self
let event = match self
.main_db_reader
.get_event_by_version_and_index(ver, idx)?;
.get_event_by_version_and_index(ver, idx)?
{
event @ ContractEvent::V1(_) => event,
ContractEvent::V2(_) => ContractEvent::V1(
self.indexer_db
.get_translated_v1_event_by_version_and_index(ver, idx)?,
),
};
let v0 = match &event {
ContractEvent::V1(event) => event,
ContractEvent::V2(_) => bail!("Unexpected module event"),
Expand All @@ -563,6 +674,7 @@ impl DBIndexer {
seq,
v0.sequence_number()
);

Ok(EventWithVersion::new(ver, event))
})
.collect::<Result<Vec<_>>>()?;
Expand Down
Loading

0 comments on commit 123fbfe

Please sign in to comment.