diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 64cbfb1bc..6218db42d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -89,7 +89,7 @@ jobs: docker pull ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer docker run -d --name postgres -e POSTGRES_PASSWORD=indexer -e POSTGRES_USER=indexer -e POSTGRES_DB=indexer -p 5432:5432 postgres sleep 10 - docker run --rm -e DATABASE_URL=postgres://indexer:indexer@172.17.0.1:5432/indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer refinery migrate -e DATABASE_URL -p /app/migrations -t 1 + docker run --rm --env-file api.env ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer refinery migrate -e DATABASE_URL -p /app/migrations -t 1 docker run -d -p 3005:3005 --restart always --env-file api.env --name platform-explorer-api ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:api sleep 3 - docker run -d --name platform-explorer-indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer cargo run + docker run -d --env-file api.env --name platform-explorer-indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer cargo run diff --git a/packages/indexer/.env b/packages/indexer/.env new file mode 100644 index 000000000..cb2f1bb0c --- /dev/null +++ b/packages/indexer/.env @@ -0,0 +1,4 @@ +POSTGRES_HOST=127.0.0.1 +POSTGRES_DB=indexer +POSTGRES_USER=indexer +POSTGRES_PASS=indexer diff --git a/packages/indexer/Cargo.lock b/packages/indexer/Cargo.lock index 39d0134b6..736f994ee 100644 --- a/packages/indexer/Cargo.lock +++ b/packages/indexer/Cargo.lock @@ -662,6 +662,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "downcast" version = "0.11.0" @@ -1264,6 +1270,7 @@ dependencies = [ "base64 0.21.4", "daemonize", "deadpool-postgres", + "dotenv", "dpp", "futures", "mockall", @@ -1271,6 +1278,7 @@ dependencies = [ "reqwest", "serde", "serde_json", + "sha256", "tokio", ] @@ -2495,6 +2503,19 @@ dependencies = [ "digest", ] +[[package]] +name = "sha256" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7895c8ae88588ccead14ff438b939b0c569cd619116f14b4d13fdff7b8333386" +dependencies = [ + "async-trait", + "bytes", + "hex", + "sha2", + "tokio", +] + [[package]] name = "shlex" version = "1.2.0" diff --git a/packages/indexer/Cargo.toml b/packages/indexer/Cargo.toml index 094d31dd6..b4d478408 100644 --- a/packages/indexer/Cargo.toml +++ b/packages/indexer/Cargo.toml @@ -19,6 +19,8 @@ mockall = { version="0.11.3", optional=true} base64 = "0.21.2" deadpool-postgres = "0.10.5" refinery = { version = "0.8.10", features = ["postgres"] } +sha256 = "1.4.0" +dotenv = "0.15.0" [features] default = ["fixtures-and-mocks"] diff --git a/packages/indexer/migrations/V1__initial.sql b/packages/indexer/migrations/V1__initial.sql index b9b3c8c25..0dae84e9b 100644 --- a/packages/indexer/migrations/V1__initial.sql +++ b/packages/indexer/migrations/V1__initial.sql @@ -1,5 +1,21 @@ CREATE TABLE data_contracts ( id SERIAL PRIMARY KEY, identifier varchar(255), - CONSTRAINT production UNIQUE(identifier) + CONSTRAINT identifier_unique UNIQUE(identifier) +); + +CREATE TABLE blocks ( + id SERIAL PRIMARY KEY, + hash varchar(255) NOT NULL, + block_height int NOT NULL, + CONSTRAINT block_hash UNIQUE(hash) +); + +CREATE TABLE state_transitions ( + id SERIAL PRIMARY KEY, + hash varchar(255) NOT NULL, + data TEXT NOT NULL, + type int NOT NULL, + block_id int NOT NULL references blocks(id), + CONSTRAINT state_transition_hash UNIQUE(hash) ); diff --git a/packages/indexer/src/indexer/mod.rs b/packages/indexer/src/indexer/mod.rs index 293453c39..7cbda7d05 100644 --- a/packages/indexer/src/indexer/mod.rs +++ b/packages/indexer/src/indexer/mod.rs @@ -1,86 +1,79 @@ +use std::any::Any; +use std::cell::Cell; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use dpp::state_transition::StateTransition; -use crate::decoder::decoder::StateTransitionDecoder; use base64::{Engine as _, engine::{general_purpose}}; -use crate::models::{PlatformExplorerSearchResponse, PlatformExplorerStatusResponse}; +use futures::stream; +use tokio::{task, time}; +use tokio::time::{Instant, Interval}; +use crate::models::{PlatformExplorerSearchResponse, PlatformExplorerStatusResponse, TDBlock, TDBlockHeader}; use crate::processor::psql::PSQLProcessor; -use crate::processor::STProcessorLike; pub struct Indexer { - decoder: StateTransitionDecoder, processor: PSQLProcessor, + last_block_height: Cell, } -/** -Indexer is responsible for indexing platform chain data. -Indexer is responsible for indexing platform chain data. -It sync up with the network and sends incoming state transitions events to the lower level - **/ impl Indexer { pub fn new() -> Indexer { - let decoder = StateTransitionDecoder::new(); let processor = PSQLProcessor::new(); - return Indexer { decoder, processor }; + return Indexer { processor, last_block_height: Cell::new(1) }; } pub async fn start(&self) { - self.init().await; + println!("Indexer loop started"); - println!("Indexer started"); + let mut interval = time::interval(Duration::from_millis(3000)); + + loop { + interval.tick().await; + + let current_block_height:i32 = self.last_block_height.get(); + let last_block_height:i32 = self.fetch_last_block().await; + + let diff = last_block_height.clone() - current_block_height.clone(); + + if diff > 0 { + for block_height in current_block_height..last_block_height+1 { + self.index_block(block_height).await; + } + } + } } - async fn init(&self) { - let resp = reqwest::get("https://platform-explorer-api.rd.dash.org/status") + async fn index_block(&self, block_height: i32) { + let url = format!("https://platform-explorer-api.rd.dash.org/search?query={}", &block_height); + + let resp = reqwest::get(url) .await .unwrap() - .json::() + .json::() .await .unwrap(); + let txs = resp.block.block.data.txs; + let hash = resp.block.block_id.hash; - let blocks_count = resp.blocks_count.parse::().unwrap(); - println!("Latest platform block: {}", &blocks_count); + let block = TDBlock{txs: txs.clone(), header: TDBlockHeader{hash: hash.clone(), block_height: block_height.clone(), tx_count: txs.len() as i32 }}; - for _i in 1..blocks_count { - let url = format!("https://platform-explorer-api.rd.dash.org/search?query={}", _i); + self.processor.handle_block(block).await; - let resp = reqwest::get(url) - .await - .unwrap() - .json::() - .await - .unwrap(); + self.last_block_height.set(block_height); + } - let txs = resp.block.block.data.txs; - let block_height = resp.block.block.header.height; + async fn fetch_last_block(&self) -> i32 { + let resp = reqwest::get("https://platform-explorer-api.rd.dash.org/status") + .await + .unwrap() + .json::() + .await + .unwrap(); - if txs.len() == usize::try_from(0).unwrap() { - println!("No platform transactions at block height {}", &block_height); - } + let blocks_count = resp.blocks_count.parse::().unwrap(); - for tx_base_64 in txs.iter() { - let bytes = general_purpose::STANDARD.decode(tx_base_64).unwrap(); - let st_result = self.decoder.decode(bytes).await; - - let st_type = match st_result { - Ok(st) => match st { - StateTransition::DataContractCreate(state_transition) => { - self.processor.handle_data_contract_create(state_transition).await; - "DataContractCreate" - } - StateTransition::DataContractUpdate(_) => "DataContractUpdate", - StateTransition::DocumentsBatch(_) => "DocumentsBatch", - StateTransition::IdentityCreate(_) => "IdentityCreate", - StateTransition::IdentityTopUp(_) => "IdentityTopUp", - StateTransition::IdentityCreditWithdrawal(_) => "DataContractCreate", - StateTransition::IdentityUpdate(_) => "IdentityUpdate", - StateTransition::IdentityCreditTransfer(_) => "IdentityCreditTransfer" - } - Err(e) => "UnknownStateTransition" - }; - println!("{} happened at block height {}", &st_type, &block_height); - } - } + return blocks_count; } } diff --git a/packages/indexer/src/main.rs b/packages/indexer/src/main.rs index b387ce40b..da471d484 100644 --- a/packages/indexer/src/main.rs +++ b/packages/indexer/src/main.rs @@ -1,4 +1,5 @@ use std::io; +use dotenv::dotenv; mod indexer; mod decoder; @@ -7,10 +8,9 @@ mod processor; #[tokio::main] async fn main() { + dotenv().ok(); // This line loads the environment variables from the ".env" file + let indexer = indexer::Indexer::new(); indexer.start().await; - - println!("Press enter to stop daemon"); - io::stdin().read_line(&mut String::new()).unwrap(); } diff --git a/packages/indexer/src/models/mod.rs b/packages/indexer/src/models/mod.rs index 4a20831ba..21d5ae3e4 100644 --- a/packages/indexer/src/models/mod.rs +++ b/packages/indexer/src/models/mod.rs @@ -1,3 +1,4 @@ +use deadpool_postgres::tokio_postgres::Row; use serde::{Deserialize}; #[derive(Debug, Deserialize)] @@ -42,3 +43,19 @@ pub struct BlockWrapper { pub struct PlatformExplorerSearchResponse { pub block: BlockWrapper, } + +#[derive(Clone)] +pub struct TDBlockHeader { + pub hash: String, + pub block_height: i32, + pub tx_count: i32, +} + +pub struct TDBlock { + pub header: TDBlockHeader, + pub txs: Vec, +} + +pub struct PlatformStateTransition { + +} diff --git a/packages/indexer/src/processor/mod.rs b/packages/indexer/src/processor/mod.rs index 29626c79f..053004418 100644 --- a/packages/indexer/src/processor/mod.rs +++ b/packages/indexer/src/processor/mod.rs @@ -1,21 +1,5 @@ pub mod psql; -use async_trait::async_trait; - -use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition; -use dpp::state_transition::data_contract_update_transition::DataContractUpdateTransition; -use dpp::state_transition::StateTransition; -use crate::processor::psql::PSQLProcessor; - pub enum STProcessorType { PSQL, } - -pub trait STProcessorLike { - fn handle(state_transition: StateTransition) -> (); -} - -#[async_trait] -pub trait STProcessorHandlerSet { - async fn handle_data_contract_create(&self, state_transition: DataContractCreateTransition) -> bool; -} diff --git a/packages/indexer/src/processor/psql/dao/mod.rs b/packages/indexer/src/processor/psql/dao/mod.rs new file mode 100644 index 000000000..118aca891 --- /dev/null +++ b/packages/indexer/src/processor/psql/dao/mod.rs @@ -0,0 +1,103 @@ +use std::env; +use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod, Runtime, tokio_postgres, Transaction}; +use deadpool_postgres::tokio_postgres::{Error, IsolationLevel, NoTls, Row}; +use dpp::platform_value::string_encoding::Encoding; +use dpp::state_transition::data_contract_create_transition::accessors::DataContractCreateTransitionAccessorsV0; +use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition; +use dpp::state_transition::{StateTransition, StateTransitionType}; +use crate::models::{TDBlock, TDBlockHeader}; +use sha256::{digest, try_digest}; +use base64::{Engine as _, engine::{general_purpose}}; + +pub struct PostgresDAO { + connection_pool: Pool, +} + +impl PostgresDAO { + pub fn new() -> PostgresDAO { + let mut cfg = Config::new(); + + let postgres_host = env::var("POSTGRES_HOST").expect("You've not set the POSTGRES_HOST"); + let postgres_db = env::var("POSTGRES_DB").expect("You've not set the POSTGRES_DB"); + let postgres_user = env::var("POSTGRES_USER").expect("You've not set the POSTGRES_USER"); + let postgres_pass = env::var("POSTGRES_PASS").expect("You've not set the POSTGRES_HOST"); + + cfg.host = Some(postgres_host); + cfg.dbname = Some(postgres_db); + cfg.user = Some(postgres_user); + cfg.password = Some(postgres_pass); + cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast }); + + let connection_pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); + + return PostgresDAO { connection_pool }; + } + + pub async fn create_state_transition(&self, block_id: i32, st_type: i32, bytes: Vec) { + let data = general_purpose::STANDARD.encode(&bytes); + let hash = digest(bytes.clone()).to_uppercase(); + + let query = "INSERT INTO state_transitions(hash, data, type, block_id) VALUES ($1, $2, $3, $4);"; + + let client = self.connection_pool.get().await.unwrap(); + let stmt = client.prepare_cached(query).await.unwrap(); + + client.query(&stmt, &[&hash, &data, &st_type, &block_id]).await.unwrap(); + } + + pub async fn create_data_contract(&self, state_transition: DataContractCreateTransition) { + let id = state_transition.data_contract().id(); + let id_str = id.to_string(Encoding::Base58); + + let query = "INSERT INTO data_contracts(identifier) VALUES ($1);"; + + let client = self.connection_pool.get().await.unwrap(); + let stmt = client.prepare_cached(query).await.unwrap(); + client.query(&stmt, &[&id_str]).await.unwrap(); + } + + pub async fn get_latest_block(&self) -> i32 { + return 0; + } + + pub async fn get_block_header_by_height(&self, block_height: i32) -> Option { + let client = self.connection_pool.get().await.unwrap(); + + let stmt = client.prepare_cached("SELECT hash,block_height FROM blocks where block_height = $1;").await.unwrap(); + + let rows: Vec = client.query(&stmt, &[&block_height]) + .await.unwrap(); + + let blocks: Vec = rows + .into_iter() + .map(|row| { + row.into() + }).collect::>(); + + let block = blocks.first(); + + return block.cloned(); + } + + pub async fn create_block(&self, block_header: TDBlockHeader) -> i32 { + let client = self.connection_pool.get().await.unwrap(); + + let stmt = client.prepare_cached("INSERT INTO blocks(block_height, hash) VALUES ($1, $2) RETURNING id;").await.unwrap(); + + let rows = client.query(&stmt, &[&block_header.block_height, &block_header.hash]).await.unwrap(); + + let block_id: i32 = rows[0].get(0); + + return block_id; + } +} + +impl From for TDBlockHeader { + fn from(row: Row) -> Self { + // id,hash,block_height + let hash: String = row.get(0); + let block_height: i32 = row.get(1); + + return TDBlockHeader { hash, block_height, tx_count: 0 }; + } +} diff --git a/packages/indexer/src/processor/psql/mod.rs b/packages/indexer/src/processor/psql/mod.rs index 17549af8b..9adf86238 100644 --- a/packages/indexer/src/processor/psql/mod.rs +++ b/packages/indexer/src/processor/psql/mod.rs @@ -1,57 +1,131 @@ +mod dao; + use std::ops::DerefMut; -use dpp::state_transition::StateTransition; -use crate::processor::{STProcessorHandlerSet, STProcessorLike, STProcessorType}; -use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod, Runtime, tokio_postgres}; -use deadpool_postgres::tokio_postgres::{Error, NoTls, Row}; +use dpp::state_transition::{StateTransition, StateTransitionLike, StateTransitionType}; +use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod, Runtime, tokio_postgres, Transaction}; use dpp::dashcore::bech32::ToBase32; use dpp::platform_value::string_encoding::Encoding; use dpp::state_transition::data_contract_create_transition::accessors::DataContractCreateTransitionAccessorsV0; use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition; +use crate::models::{TDBlock, TDBlockHeader}; +use crate::processor::psql::dao::PostgresDAO; +use base64::{Engine as _, engine::{general_purpose}}; +use dpp::serialization::PlatformSerializable; +use dpp::state_transition::StateTransition::DataContractCreate; +use crate::decoder::decoder::StateTransitionDecoder; pub struct PSQLProcessor { - connection_pool: Pool, + decoder: StateTransitionDecoder, + dao: PostgresDAO, } - impl PSQLProcessor { pub fn new() -> PSQLProcessor { - let mut cfg = Config::new(); + let dao = PostgresDAO::new(); + let decoder = StateTransitionDecoder::new(); - cfg.host = Some("172.17.0.1".to_string()); - cfg.dbname = Some("indexer".to_string()); - cfg.user = Some("indexer".to_string()); - cfg.password = Some("indexer".to_string()); - cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast }); + return PSQLProcessor { decoder, dao }; + } + + pub async fn handle_data_contract_create(&self, state_transition: DataContractCreateTransition) -> () { + self.dao.create_data_contract(state_transition).await; + } - let connection_pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); + pub async fn get_latest_block(&self, state_transition: DataContractCreateTransition) -> i32 { + let block = self.dao.get_latest_block().await; - return PSQLProcessor { connection_pool }; + return block; } - pub async fn handle_data_contract_create(&self, state_transition: DataContractCreateTransition) -> () { - let client = self.connection_pool.get().await.unwrap(); + pub async fn handle_st(&self, block_id: i32, state_transition: StateTransition) -> () { + let mut st_type: i32 = 999; + let mut bytes: Vec = Vec::new(); - let id = state_transition.data_contract().id(); - let id_str = id.to_string(Encoding::Base58); + match state_transition { + StateTransition::DataContractCreate(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::DataContractCreate( + st.clone() + )).unwrap(); - let stmt = client.prepare_cached("INSERT INTO data_contracts(identifier) VALUES ($1);").await.unwrap(); + self.handle_data_contract_create(st).await + } + StateTransition::DataContractUpdate(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::DataContractUpdate( + st.clone() + )).unwrap(); + } + StateTransition::DocumentsBatch(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::DocumentsBatch( + st.clone() + )).unwrap(); + } + StateTransition::IdentityCreate(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::IdentityCreate( + st.clone() + )).unwrap(); + } + StateTransition::IdentityTopUp(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::IdentityTopUp( + st.clone() + )).unwrap(); + } + StateTransition::IdentityCreditWithdrawal(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::IdentityCreditWithdrawal( + st.clone() + )).unwrap(); + } + StateTransition::IdentityUpdate(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::IdentityUpdate( + st.clone() + )).unwrap(); + } + StateTransition::IdentityCreditTransfer(st) => { + st_type = st.state_transition_type() as i32; + bytes = PlatformSerializable::serialize_to_bytes(&StateTransition::IdentityCreditTransfer( + st.clone() + )).unwrap(); + } + } - client.query(&stmt, &[&id_str]).await.unwrap(); + self.dao.create_state_transition(block_id, st_type, bytes).await; } -} -impl STProcessorLike for PSQLProcessor { - fn handle(state_transition: StateTransition) -> () { - match state_transition { - StateTransition::DataContractCreate(_) => {}, - StateTransition::DataContractUpdate(_) => {} - StateTransition::DocumentsBatch(_) => {} - StateTransition::IdentityCreate(_) => {} - StateTransition::IdentityTopUp(_) => {} - StateTransition::IdentityCreditWithdrawal(_) => {} - StateTransition::IdentityUpdate(_) => {} - StateTransition::IdentityCreditTransfer(_) => {} + pub async fn handle_block(&self, block: TDBlock) -> () { + let processed = self.dao.get_block_header_by_height(block.header.block_height.clone()).await; + + match processed { + None => { + // TODO IMPLEMENT PSQL TRANSACTION + + let block_height = block.header.block_height.clone(); + + let block_id = self.dao.create_block(block.header).await; + + if block.txs.len() as i32 == 0 { + println!("No platform transactions at block height {}", block_height.clone()); + } + + for tx_base_64 in block.txs.iter() { + let bytes = general_purpose::STANDARD.decode(tx_base_64).unwrap(); + let st_result = self.decoder.decode(bytes).await; + + let state_transition = st_result.unwrap(); + + self.handle_st(block_id, state_transition).await; + + println!("Processed DataContractCreate at height {}", block_height) + } + } + Some(st) => { + println!("Block at the height {} has been already processed", &block.header.block_height); + } } } } -