Skip to content

Commit

Permalink
Event V2 Translation
Browse files Browse the repository at this point in the history
  • Loading branch information
junkil-park committed Sep 18, 2024
1 parent 09ce976 commit 80c5021
Show file tree
Hide file tree
Showing 12 changed files with 309 additions and 6 deletions.
11 changes: 10 additions & 1 deletion storage/aptosdb/src/db/include/aptosdb_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,16 @@ impl AptosDB {
let mut events_with_version = event_indices
.into_iter()
.map(|(seq, ver, idx)| {
let event = self.event_store.get_event_by_version_and_index(ver, idx)?;
let event = match self.event_store.get_event_by_version_and_index(ver, idx)? {
event @ ContractEvent::V1(_) => event,
ContractEvent::V2(_) => ContractEvent::V1(
self.state_store
.internal_indexer_db
.as_ref()
.expect("Indexer not enabled")
.get_translated_v1_event_by_version_and_index(ver, idx)?,
),
};
let v0 = match &event {
ContractEvent::V1(event) => event,
ContractEvent::V2(_) => bail!("Unexpected module event"),
Expand Down
3 changes: 2 additions & 1 deletion storage/aptosdb/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use aptos_crypto::{
};
use aptos_db_indexer_schemas::schema::{
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
translated_v1_event::TranslatedV1EventSchema,
};
use aptos_schemadb::{iterator::SchemaIterator, schema::ValueCodec, ReadOptions, SchemaBatch, DB};
use aptos_storage_interface::{db_ensure as ensure, db_other_bail, AptosDbError, Result};
use aptos_types::{
account_address::AccountAddress,
account_config::{new_block_event_key, NewBlockEvent},
contract_event::ContractEvent,
contract_event::{ContractEvent, ContractEventV1},
event::EventKey,
proof::position::Position,
transaction::Version,
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub(crate) struct StateStore {
buffered_state: Mutex<BufferedState>,
buffered_state_target_items: usize,
smt_ancestors: Mutex<SmtAncestors<StateValue>>,
internal_indexer_db: Option<InternalIndexerDB>,
pub internal_indexer_db: Option<InternalIndexerDB>,
}

impl Deref for StateStore {
Expand Down
102 changes: 100 additions & 2 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use aptos_db_indexer_schemas::{
event_by_key::EventByKeySchema, event_by_version::EventByVersionSchema,
indexer_metadata::InternalIndexerMetadataSchema, state_keys::StateKeysSchema,
transaction_by_account::TransactionByAccountSchema,
translated_v1_event::TranslatedV1EventSchema,
},
utils::{
error_if_too_many_requested, get_first_seq_num_and_limit, AccountTransactionVersionIter,
Expand All @@ -17,22 +18,29 @@ use aptos_db_indexer_schemas::{
};
use aptos_schemadb::{SchemaBatch, DB};
use aptos_storage_interface::{
db_ensure as ensure, db_other_bail as bail, AptosDbError, DbReader, Result,
db_ensure as ensure, db_other_bail as bail, state_view::LatestDbStateCheckpointView,
AptosDbError, DbReader, Result,
};
use aptos_types::{
account_address::AccountAddress,
contract_event::{ContractEvent, EventWithVersion},
account_config::{CoinStoreResource, DepositEvent, DEPOSIT_EVENT_TYPE},
coin_deposit::{CoinDeposit, COIN_DEPOSIT_TYPE_STR},
contract_event::{ContractEvent, ContractEventV1, ContractEventV2, EventWithVersion},
event::EventKey,
indexer::indexer_db_reader::Order,
state_store::{
state_key::{prefix::StateKeyPrefix, StateKey},
state_value::StateValue,
TStateView,
},
transaction::{AccountTransactionsWithProof, Transaction, Version},
write_set::{TransactionWrite, WriteSet},
DummyCoinType,
};
use move_core_types::language_storage::StructTag;
use std::{
cmp::min,
str::FromStr,
sync::{
mpsc::{self, Receiver, Sender},
Arc,
Expand Down Expand Up @@ -186,6 +194,16 @@ impl InternalIndexerDB {
))
}

pub fn get_next_sequence_number(&self, event_key: &EventKey) -> Result<u64> {
let mut iter = self.db.iter::<EventByKeySchema>()?;
iter.seek_for_prev(&(*event_key, u64::max_value()))?;

Ok(iter.next().transpose()?.map_or(
0,
|((key, seq), _)| if &key == event_key { seq + 1 } else { 0 },
))
}

/// Given `event_key` and `start_seq_num`, returns events identified by transaction version and
/// index among all events emitted by the same transaction. Result won't contain records with a
/// transaction version > `ledger_version` and is in ascending order.
Expand Down Expand Up @@ -273,6 +291,16 @@ impl InternalIndexerDB {
.get::<InternalIndexerMetadataSchema>(key)?
.map(|v| v.expect_version()))
}

pub fn get_translated_v1_event_by_version_and_index(
&self,
version: Version,
index: u64,
) -> Result<ContractEventV1> {
self.db
.get::<TranslatedV1EventSchema>(&(version, index))?
.ok_or_else(|| AptosDbError::NotFound(format!("Event {} of Txn {}", index, version)))
}
}

pub struct DBIndexer {
Expand Down Expand Up @@ -392,6 +420,33 @@ impl DBIndexer {
)
.expect("Failed to put events by version to a batch");
}
if let ContractEvent::V2(v2) = event {
if let Some(translated_v1_event) = self
.translate_event_v2_to_v1(v2)
.expect("Failure in translating event")
{
let key = *translated_v1_event.key();
let sequence_number = translated_v1_event.sequence_number();
batch
.put::<EventByKeySchema>(
&(key, sequence_number),
&(version, idx as u64),
)
.expect("Failed to put events by key to a batch");
batch
.put::<EventByVersionSchema>(
&(key, version, sequence_number),
&(idx as u64),
)
.expect("Failed to put events by version to a batch");
batch
.put::<TranslatedV1EventSchema>(
&(version, idx as u64),
&translated_v1_event,
)
.expect("Failed to put translated v1 events to a batch");
}
}
});
}

Expand Down Expand Up @@ -436,6 +491,49 @@ impl DBIndexer {
Ok(version)
}

fn get_resource(
&self,
address: &AccountAddress,
struct_tag_str: &str,
) -> Result<Option<StateValue>> {
let state_view = self
.main_db_reader
.latest_state_checkpoint_view()
.expect("Failed to get state view");

let struct_tag = StructTag::from_str(struct_tag_str)?;
let state_key = StateKey::resource(address, &struct_tag)?;
let maybe_state_value = state_view.get_state_value(&state_key)?;
Ok(maybe_state_value)
}

fn translate_event_v2_to_v1(&self, v2: &ContractEventV2) -> Result<Option<ContractEventV1>> {
match v2.type_tag().to_canonical_string().as_str() {
COIN_DEPOSIT_TYPE_STR => {
let coin_deposit = CoinDeposit::try_from_bytes(v2.event_data())?;
let struct_tag_str = format!("0x1::coin::CoinStore<{}>", coin_deposit.coin_type());
// We can use `DummyCoinType` as it does not affect the correctness of deserialization.
let state_value = self
.get_resource(&AccountAddress::ONE, &struct_tag_str)?
.expect("Event handle resource not found");
let coin_store_resource: CoinStoreResource<DummyCoinType> =
bcs::from_bytes(state_value.bytes())?;

let key = *coin_store_resource.deposit_events().key();
let sequence_number = self.indexer_db.get_next_sequence_number(&key)?;

let deposit_event = DepositEvent::new(coin_deposit.amount());
Ok(Some(ContractEventV1::new(
key,
sequence_number,
DEPOSIT_EVENT_TYPE.clone(),
bcs::to_bytes(&deposit_event)?,
)))
},
_ => Ok(None),
}
}

pub fn get_account_transactions(
&self,
address: AccountAddress,
Expand Down
14 changes: 14 additions & 0 deletions storage/indexer_schemas/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub mod indexer_metadata;
pub mod state_keys;
pub mod table_info;
pub mod transaction_by_account;
pub mod translated_v1_event;

use anyhow::ensure;
use aptos_schemadb::ColumnFamilyName;

pub const DEFAULT_COLUMN_FAMILY_NAME: ColumnFamilyName = "default";
Expand All @@ -22,6 +25,7 @@ pub const EVENT_BY_KEY_CF_NAME: ColumnFamilyName = "event_by_key";
pub const EVENT_BY_VERSION_CF_NAME: ColumnFamilyName = "event_by_version";
pub const TRANSACTION_BY_ACCOUNT_CF_NAME: ColumnFamilyName = "transaction_by_account";
pub const STATE_KEYS_CF_NAME: ColumnFamilyName = "state_keys";
pub const TRANSLATED_V1_EVENT_CF_NAME: ColumnFamilyName = "translated_v1_event";

pub fn column_families() -> Vec<ColumnFamilyName> {
vec![
Expand All @@ -41,3 +45,13 @@ pub fn internal_indexer_column_families() -> Vec<ColumnFamilyName> {
STATE_KEYS_CF_NAME,
]
}

fn ensure_slice_len_eq(data: &[u8], len: usize) -> anyhow::Result<()> {
ensure!(
data.len() == len,
"Unexpected data len {}, expected {}.",
data.len(),
len,
);
Ok(())
}
66 changes: 66 additions & 0 deletions storage/indexer_schemas/src/schema/translated_v1_event/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright © Aptos Foundation
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

//! This module defines physical storage schema for the contract events.
//!
//! A translated v1 event is keyed by the version of the transaction it belongs to and the index of
//! the original v2 event among all events yielded by the same transaction.
//! ```text
//! |<-------key----->|<---value--->|
//! | version | index | event bytes |
//! ```

use crate::schema::{ensure_slice_len_eq, TRANSLATED_V1_EVENT_CF_NAME};
use anyhow::Result;
use aptos_schemadb::{
define_pub_schema,
schema::{KeyCodec, ValueCodec},
};
use aptos_types::{contract_event::ContractEventV1, transaction::Version};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use std::mem::size_of;

define_pub_schema!(
TranslatedV1EventSchema,
Key,
ContractEventV1,
TRANSLATED_V1_EVENT_CF_NAME
);

type Index = u64;
type Key = (Version, Index);

impl KeyCodec<TranslatedV1EventSchema> for Key {
fn encode_key(&self) -> Result<Vec<u8>> {
let (version, index) = *self;

let mut encoded_key = Vec::with_capacity(size_of::<Version>() + size_of::<Index>());
encoded_key.write_u64::<BigEndian>(version)?;
encoded_key.write_u64::<BigEndian>(index)?;
Ok(encoded_key)
}

fn decode_key(data: &[u8]) -> Result<Self> {
ensure_slice_len_eq(data, size_of::<Self>())?;

let version_size = size_of::<Version>();

let version = (&data[..version_size]).read_u64::<BigEndian>()?;
let index = (&data[version_size..]).read_u64::<BigEndian>()?;
Ok((version, index))
}
}

impl ValueCodec<TranslatedV1EventSchema> for ContractEventV1 {
fn encode_value(&self) -> Result<Vec<u8>> {
bcs::to_bytes(self).map_err(Into::into)
}

fn decode_value(data: &[u8]) -> Result<Self> {
bcs::from_bytes(data).map_err(Into::into)
}
}

#[cfg(test)]
mod test;
20 changes: 20 additions & 0 deletions storage/indexer_schemas/src/schema/translated_v1_event/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright © Aptos Foundation
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use super::*;
use aptos_schemadb::{schema::fuzzing::assert_encode_decode, test_no_panic_decoding};
use proptest::prelude::*;

proptest! {
#[test]
fn test_encode_decode(
version in any::<Version>(),
index in any::<u64>(),
event in any::<ContractEventV1>(),
) {
assert_encode_decode::<TranslatedV1EventSchema>(&(version, index), &event);
}
}

test_no_panic_decoding!(TranslatedV1EventSchema);
21 changes: 20 additions & 1 deletion types/src/account_config/events/deposit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use move_core_types::{ident_str, identifier::IdentStr, move_resource::MoveStructType};
use move_core_types::{
ident_str,
identifier::IdentStr,
language_storage::{StructTag, TypeTag, CORE_CODE_ADDRESS},
move_resource::MoveStructType,
};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};

/// Struct that represents a DepositPaymentEvent.
Expand All @@ -12,6 +18,10 @@ pub struct DepositEvent {
}

impl DepositEvent {
pub fn new(amount: u64) -> Self {
Self { amount }
}

pub fn try_from_bytes(bytes: &[u8]) -> Result<Self> {
bcs::from_bytes(bytes).map_err(Into::into)
}
Expand All @@ -26,3 +36,12 @@ impl MoveStructType for DepositEvent {
const MODULE_NAME: &'static IdentStr = ident_str!("coin");
const STRUCT_NAME: &'static IdentStr = ident_str!("DepositEvent");
}

pub static DEPOSIT_EVENT_TYPE: Lazy<TypeTag> = Lazy::new(|| {
TypeTag::Struct(Box::new(StructTag {
address: CORE_CODE_ADDRESS,
module: ident_str!("coin").to_owned(),
name: ident_str!("DepositEvent").to_owned(),
type_args: vec![],
}))
});
Loading

0 comments on commit 80c5021

Please sign in to comment.