-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #45 from pshenmic/feat/index-blocks-sts
Persist blocks and state transitions in the database
- Loading branch information
Showing
11 changed files
with
323 additions
and
109 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,7 +89,7 @@ jobs: | |
docker pull ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer | ||
docker run -d --name postgres -e POSTGRES_PASSWORD=indexer -e POSTGRES_USER=indexer -e POSTGRES_DB=indexer -p 5432:5432 postgres | ||
sleep 10 | ||
docker run --rm -e DATABASE_URL=postgres://indexer:[email protected]:5432/indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer refinery migrate -e DATABASE_URL -p /app/migrations -t 1 | ||
docker run --rm --env-file api.env ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer refinery migrate -e DATABASE_URL -p /app/migrations -t 1 | ||
docker run -d -p 3005:3005 --restart always --env-file api.env --name platform-explorer-api ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:api | ||
sleep 3 | ||
docker run -d --name platform-explorer-indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer cargo run | ||
docker run -d --env-file api.env --name platform-explorer-indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer cargo run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
POSTGRES_HOST=127.0.0.1 | ||
POSTGRES_DB=indexer | ||
POSTGRES_USER=indexer | ||
POSTGRES_PASS=indexer |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,21 @@ | ||
CREATE TABLE data_contracts ( | ||
id SERIAL PRIMARY KEY, | ||
identifier varchar(255), | ||
CONSTRAINT production UNIQUE(identifier) | ||
CONSTRAINT identifier_unique UNIQUE(identifier) | ||
); | ||
|
||
CREATE TABLE blocks ( | ||
id SERIAL PRIMARY KEY, | ||
hash varchar(255) NOT NULL, | ||
block_height int NOT NULL, | ||
CONSTRAINT block_hash UNIQUE(hash) | ||
); | ||
|
||
CREATE TABLE state_transitions ( | ||
id SERIAL PRIMARY KEY, | ||
hash varchar(255) NOT NULL, | ||
data TEXT NOT NULL, | ||
type int NOT NULL, | ||
block_id int NOT NULL references blocks(id), | ||
CONSTRAINT state_transition_hash UNIQUE(hash) | ||
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,86 +1,79 @@ | ||
use std::any::Any; | ||
use std::cell::Cell; | ||
use std::sync::{Arc, Mutex}; | ||
use std::time::Duration; | ||
use dpp::state_transition::StateTransition; | ||
use crate::decoder::decoder::StateTransitionDecoder; | ||
use base64::{Engine as _, engine::{general_purpose}}; | ||
use crate::models::{PlatformExplorerSearchResponse, PlatformExplorerStatusResponse}; | ||
use futures::stream; | ||
use tokio::{task, time}; | ||
use tokio::time::{Instant, Interval}; | ||
use crate::models::{PlatformExplorerSearchResponse, PlatformExplorerStatusResponse, TDBlock, TDBlockHeader}; | ||
use crate::processor::psql::PSQLProcessor; | ||
use crate::processor::STProcessorLike; | ||
|
||
pub struct Indexer { | ||
decoder: StateTransitionDecoder, | ||
processor: PSQLProcessor, | ||
last_block_height: Cell<i32>, | ||
} | ||
|
||
/** | ||
Indexer is responsible for indexing platform chain data. | ||
Indexer is responsible for indexing platform chain data. | ||
It sync up with the network and sends incoming state transitions events to the lower level | ||
**/ | ||
impl Indexer { | ||
pub fn new() -> Indexer { | ||
let decoder = StateTransitionDecoder::new(); | ||
let processor = PSQLProcessor::new(); | ||
|
||
return Indexer { decoder, processor }; | ||
return Indexer { processor, last_block_height: Cell::new(1) }; | ||
} | ||
|
||
pub async fn start(&self) { | ||
self.init().await; | ||
println!("Indexer loop started"); | ||
|
||
println!("Indexer started"); | ||
let mut interval = time::interval(Duration::from_millis(3000)); | ||
|
||
loop { | ||
interval.tick().await; | ||
|
||
let current_block_height:i32 = self.last_block_height.get(); | ||
let last_block_height:i32 = self.fetch_last_block().await; | ||
|
||
let diff = last_block_height.clone() - current_block_height.clone(); | ||
|
||
if diff > 0 { | ||
for block_height in current_block_height..last_block_height+1 { | ||
self.index_block(block_height).await; | ||
} | ||
} | ||
} | ||
} | ||
|
||
async fn init(&self) { | ||
let resp = reqwest::get("https://platform-explorer-api.rd.dash.org/status") | ||
async fn index_block(&self, block_height: i32) { | ||
let url = format!("https://platform-explorer-api.rd.dash.org/search?query={}", &block_height); | ||
|
||
let resp = reqwest::get(url) | ||
.await | ||
.unwrap() | ||
.json::<PlatformExplorerStatusResponse>() | ||
.json::<PlatformExplorerSearchResponse>() | ||
.await | ||
.unwrap(); | ||
|
||
let txs = resp.block.block.data.txs; | ||
let hash = resp.block.block_id.hash; | ||
|
||
let blocks_count = resp.blocks_count.parse::<i32>().unwrap(); | ||
println!("Latest platform block: {}", &blocks_count); | ||
let block = TDBlock{txs: txs.clone(), header: TDBlockHeader{hash: hash.clone(), block_height: block_height.clone(), tx_count: txs.len() as i32 }}; | ||
|
||
for _i in 1..blocks_count { | ||
let url = format!("https://platform-explorer-api.rd.dash.org/search?query={}", _i); | ||
self.processor.handle_block(block).await; | ||
|
||
let resp = reqwest::get(url) | ||
.await | ||
.unwrap() | ||
.json::<PlatformExplorerSearchResponse>() | ||
.await | ||
.unwrap(); | ||
self.last_block_height.set(block_height); | ||
} | ||
|
||
let txs = resp.block.block.data.txs; | ||
let block_height = resp.block.block.header.height; | ||
async fn fetch_last_block(&self) -> i32 { | ||
let resp = reqwest::get("https://platform-explorer-api.rd.dash.org/status") | ||
.await | ||
.unwrap() | ||
.json::<PlatformExplorerStatusResponse>() | ||
.await | ||
.unwrap(); | ||
|
||
if txs.len() == usize::try_from(0).unwrap() { | ||
println!("No platform transactions at block height {}", &block_height); | ||
} | ||
let blocks_count = resp.blocks_count.parse::<i32>().unwrap(); | ||
|
||
for tx_base_64 in txs.iter() { | ||
let bytes = general_purpose::STANDARD.decode(tx_base_64).unwrap(); | ||
let st_result = self.decoder.decode(bytes).await; | ||
|
||
let st_type = match st_result { | ||
Ok(st) => match st { | ||
StateTransition::DataContractCreate(state_transition) => { | ||
self.processor.handle_data_contract_create(state_transition).await; | ||
"DataContractCreate" | ||
} | ||
StateTransition::DataContractUpdate(_) => "DataContractUpdate", | ||
StateTransition::DocumentsBatch(_) => "DocumentsBatch", | ||
StateTransition::IdentityCreate(_) => "IdentityCreate", | ||
StateTransition::IdentityTopUp(_) => "IdentityTopUp", | ||
StateTransition::IdentityCreditWithdrawal(_) => "DataContractCreate", | ||
StateTransition::IdentityUpdate(_) => "IdentityUpdate", | ||
StateTransition::IdentityCreditTransfer(_) => "IdentityCreditTransfer" | ||
} | ||
Err(e) => "UnknownStateTransition" | ||
}; | ||
println!("{} happened at block height {}", &st_type, &block_height); | ||
} | ||
} | ||
return blocks_count; | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,5 @@ | ||
pub mod psql; | ||
|
||
use async_trait::async_trait; | ||
|
||
use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition; | ||
use dpp::state_transition::data_contract_update_transition::DataContractUpdateTransition; | ||
use dpp::state_transition::StateTransition; | ||
use crate::processor::psql::PSQLProcessor; | ||
|
||
pub enum STProcessorType { | ||
PSQL, | ||
} | ||
|
||
pub trait STProcessorLike<T> { | ||
fn handle(state_transition: StateTransition) -> (); | ||
} | ||
|
||
#[async_trait] | ||
pub trait STProcessorHandlerSet { | ||
async fn handle_data_contract_create(&self, state_transition: DataContractCreateTransition) -> bool; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
use std::env; | ||
use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod, Runtime, tokio_postgres, Transaction}; | ||
use deadpool_postgres::tokio_postgres::{Error, IsolationLevel, NoTls, Row}; | ||
use dpp::platform_value::string_encoding::Encoding; | ||
use dpp::state_transition::data_contract_create_transition::accessors::DataContractCreateTransitionAccessorsV0; | ||
use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition; | ||
use dpp::state_transition::{StateTransition, StateTransitionType}; | ||
use crate::models::{TDBlock, TDBlockHeader}; | ||
use sha256::{digest, try_digest}; | ||
use base64::{Engine as _, engine::{general_purpose}}; | ||
|
||
pub struct PostgresDAO { | ||
connection_pool: Pool, | ||
} | ||
|
||
impl PostgresDAO { | ||
pub fn new() -> PostgresDAO { | ||
let mut cfg = Config::new(); | ||
|
||
let postgres_host = env::var("POSTGRES_HOST").expect("You've not set the POSTGRES_HOST"); | ||
let postgres_db = env::var("POSTGRES_DB").expect("You've not set the POSTGRES_DB"); | ||
let postgres_user = env::var("POSTGRES_USER").expect("You've not set the POSTGRES_USER"); | ||
let postgres_pass = env::var("POSTGRES_PASS").expect("You've not set the POSTGRES_HOST"); | ||
|
||
cfg.host = Some(postgres_host); | ||
cfg.dbname = Some(postgres_db); | ||
cfg.user = Some(postgres_user); | ||
cfg.password = Some(postgres_pass); | ||
cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast }); | ||
|
||
let connection_pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); | ||
|
||
return PostgresDAO { connection_pool }; | ||
} | ||
|
||
pub async fn create_state_transition(&self, block_id: i32, st_type: i32, bytes: Vec<u8>) { | ||
let data = general_purpose::STANDARD.encode(&bytes); | ||
let hash = digest(bytes.clone()).to_uppercase(); | ||
|
||
let query = "INSERT INTO state_transitions(hash, data, type, block_id) VALUES ($1, $2, $3, $4);"; | ||
|
||
let client = self.connection_pool.get().await.unwrap(); | ||
let stmt = client.prepare_cached(query).await.unwrap(); | ||
|
||
client.query(&stmt, &[&hash, &data, &st_type, &block_id]).await.unwrap(); | ||
} | ||
|
||
pub async fn create_data_contract(&self, state_transition: DataContractCreateTransition) { | ||
let id = state_transition.data_contract().id(); | ||
let id_str = id.to_string(Encoding::Base58); | ||
|
||
let query = "INSERT INTO data_contracts(identifier) VALUES ($1);"; | ||
|
||
let client = self.connection_pool.get().await.unwrap(); | ||
let stmt = client.prepare_cached(query).await.unwrap(); | ||
client.query(&stmt, &[&id_str]).await.unwrap(); | ||
} | ||
|
||
pub async fn get_latest_block(&self) -> i32 { | ||
return 0; | ||
} | ||
|
||
pub async fn get_block_header_by_height(&self, block_height: i32) -> Option<TDBlockHeader> { | ||
let client = self.connection_pool.get().await.unwrap(); | ||
|
||
let stmt = client.prepare_cached("SELECT hash,block_height FROM blocks where block_height = $1;").await.unwrap(); | ||
|
||
let rows: Vec<Row> = client.query(&stmt, &[&block_height]) | ||
.await.unwrap(); | ||
|
||
let blocks: Vec<TDBlockHeader> = rows | ||
.into_iter() | ||
.map(|row| { | ||
row.into() | ||
}).collect::<Vec<TDBlockHeader>>(); | ||
|
||
let block = blocks.first(); | ||
|
||
return block.cloned(); | ||
} | ||
|
||
pub async fn create_block(&self, block_header: TDBlockHeader) -> i32 { | ||
let client = self.connection_pool.get().await.unwrap(); | ||
|
||
let stmt = client.prepare_cached("INSERT INTO blocks(block_height, hash) VALUES ($1, $2) RETURNING id;").await.unwrap(); | ||
|
||
let rows = client.query(&stmt, &[&block_header.block_height, &block_header.hash]).await.unwrap(); | ||
|
||
let block_id: i32 = rows[0].get(0); | ||
|
||
return block_id; | ||
} | ||
} | ||
|
||
impl From<Row> for TDBlockHeader { | ||
fn from(row: Row) -> Self { | ||
// id,hash,block_height | ||
let hash: String = row.get(0); | ||
let block_height: i32 = row.get(1); | ||
|
||
return TDBlockHeader { hash, block_height, tx_count: 0 }; | ||
} | ||
} |
Oops, something went wrong.