Skip to content

Commit

Permalink
can work with both keystore or provider as signer
Browse files Browse the repository at this point in the history
  • Loading branch information
Ho committed Jan 11, 2022
1 parent e5e3e3b commit 4962291
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 41 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ serde_json = "1.0.64"
sqlx = { version = "0.5.1", features = [ "runtime-tokio-rustls", "postgres", "chrono", "decimal", "json", "migrate" ] }
tokio = { version = "1.0", features = [ "full" ] }
tonic = "0.5.2"
async-trait = "0.1.52"

[build-dependencies]
prost = "0.7.0"
Expand Down
11 changes: 6 additions & 5 deletions src/bin/block_submitter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::Parser;
use fluidex_common::non_blocking_tracing;
use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt};
use regnbue_bridge::block_submitter::{storage, types, EthSender, Settings, TaskFetcher};
use regnbue_bridge::block_submitter::{storage, types, EthSenderConfigure, Settings, TaskFetcher};
use std::cell::RefCell;

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -44,23 +44,24 @@ async fn main() -> anyhow::Result<()> {

// TODO: maybe separate and have: 1. consumer 2. producer 3. sender
let dbpool = storage::from_config(&settings).await?;
let eth_sender = EthSender::from_config_with_pool(&settings, dbpool.clone()).await?;
//let eth_sender = eth
let client = EthSenderConfigure::from_config(&settings).await?.build(dbpool.clone());

// one-block mode
if let Some(sub_cmd) = opts.command {
match sub_cmd {
SubCommand::Verify(opts) => {
let block_id = opts.block_id;
let block = types::SubmitBlockArgs::fetch_by_blockid(block_id, &dbpool).await?;
let ret = eth_sender
let ret = client
.verify_block(block.ok_or_else(|| anyhow::anyhow!("block {} not existed", block_id))?)
.await?;
println!("verify block {} result: {}", block_id, ret);
}
SubCommand::Manual(_) => {
let block = types::SubmitBlockArgs::fetch_latest(None, &dbpool).await?;
let block = block.ok_or_else(|| anyhow::anyhow!("no pending block for submitting"))?;
eth_sender.submit_block(block).await?;
client.submit_block(block).await?;
}
};

Expand All @@ -83,7 +84,7 @@ async fn main() -> anyhow::Result<()> {
let (tx, rx) = crossbeam_channel::unbounded();
let mut fetcher = TaskFetcher::from_config_with_pool(&settings, dbpool.clone());
let fetcher_task_handle = tokio::spawn(async move { fetcher.run(tx).await });
let eth_sender_task_handle = tokio::spawn(async move { eth_sender.run(rx).await });
let eth_sender_task_handle = tokio::spawn(async move { client.run(rx).await });

tokio::select! {
_ = async { fetcher_task_handle.await } => {
Expand Down
6 changes: 3 additions & 3 deletions src/block_submitter/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct Settings {
pub contract_abi_file_path: String,
pub confirmations: usize, // TODO: default
pub web3_url: String,
pub keystore: String,
pub password: String,
pub chain_id: u64,
pub keystore: Option<String>,
pub password: Option<String>,
pub chain_id: Option<u64>,
}
125 changes: 96 additions & 29 deletions src/block_submitter/eth_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,83 +2,150 @@ use super::types::{ContractCall, SubmitBlockArgs};
use crate::block_submitter::Settings;
use crate::contracts;
use crate::storage::PoolType;
use async_trait::async_trait;
use crossbeam_channel::Receiver;
use ethers::abi::Abi;
use ethers::prelude::*;
use ethers::types::H256;
use fluidex_common::db::models;
use std::convert::TryFrom;
use std::{boxed, pin};

type SignedClient = SignerMiddleware<Provider<Http>, LocalWallet>;

#[async_trait]
pub trait EthSend: Sync + Send {
async fn verify_block(&self, args: SubmitBlockArgs) -> Result<u64, anyhow::Error>;
async fn submit_block(&self, args: SubmitBlockArgs) -> Result<(), anyhow::Error>;
async fn run(&self, rx: Receiver<ContractCall>);
}

#[derive(Debug)]
pub struct EthSender {
connpool: PoolType,
client: SignedClient,
account: Address,
contract: Contract<SignedClient>,
pub struct EthSenderConfigure {
abi: Abi,
base_cli: Provider<Http>,
address: Address,
confirmations: usize,
account: Address,
wallet: Option<LocalWallet>,
}

impl EthSender {
pub async fn from_config_with_pool(config: &Settings, connpool: PoolType) -> Result<Self, anyhow::Error> {
impl EthSenderConfigure {
pub async fn from_config(config: &Settings) -> Result<Self, anyhow::Error> {
let address = config.contract_address.parse::<Address>()?;
let abi: Abi = contracts::get_abi(&config.contract_abi_file_path)?;
let base_cli = Provider::<Http>::try_from(config.web3_url.as_str())?;

let client = Provider::<Http>::try_from(config.web3_url.as_str())?;
let wallet = LocalWallet::decrypt_keystore(config.keystore.as_str(), config.password.as_str())?.with_chain_id(config.chain_id);
let account = wallet.address();
let client = SignerMiddleware::new(client, wallet);
let wallet = if let Some(keystore) = &config.keystore {
let psw = config.password.as_ref().map_or_else(Default::default, Clone::clone);
let wallet = LocalWallet::decrypt_keystore(keystore.as_str(), &psw)?.with_chain_id(config.chain_id.unwrap_or(5u64)); //use goerli's chain as default
Some(wallet)
} else {
None
};

let contract = Contract::new(address, abi, client.clone());
let account = if let Some(wallet) = &wallet {
wallet.address()
} else {
base_cli.get_accounts().await?[0]
};

Ok(Self {
connpool,
client,
abi,
address,
base_cli,
wallet,
account,
contract,
confirmations: config.confirmations,
})
}

pub async fn run(&self, rx: Receiver<ContractCall>) {
fn build_with_keystore(self, connpool: PoolType) -> EthSender<SignedClient> {
let wallet = self.wallet.unwrap();
let client = SignerMiddleware::new(self.base_cli, wallet);

EthSender {
connpool,
account: self.account,
contract: Contract::new(self.address, self.abi, client),
confirmations: self.confirmations,
}
}

fn build_with_account(self, connpool: PoolType) -> EthSender<Provider<Http>> {
EthSender {
connpool,
account: self.account,
contract: Contract::new(self.address, self.abi, self.base_cli),
confirmations: self.confirmations,
}
}

pub fn build(self, connpool: PoolType) -> pin::Pin<boxed::Box<dyn EthSend>> {
if self.wallet.is_some() {
boxed::Box::pin(self.build_with_keystore(connpool))
} else {
boxed::Box::pin(self.build_with_account(connpool))
}
}
}

#[derive(Debug)]
pub struct EthSender<M> {
connpool: PoolType,
//client: SignedClient,
contract: Contract<M>,
account: Address,
confirmations: usize,
}

#[async_trait]
impl<M: Middleware> EthSend for EthSender<M> {
async fn run(&self, rx: Receiver<ContractCall>) {
for call in rx.iter() {
log::debug!("{:?}", call);
if let Err(e) = self.run_inner(call).await {
let ret = match call {
ContractCall::SubmitBlock(args) => self.submit_block(args).await,
};
if let Err(e) = ret {
log::error!("{:?}", e);
};
}
}

async fn run_inner(&self, call: ContractCall) -> Result<(), anyhow::Error> {
match call {
ContractCall::SubmitBlock(args) => self.submit_block(args).await,
}
}

pub async fn verify_block(&self, args: SubmitBlockArgs) -> Result<bool, anyhow::Error> {
async fn verify_block(&self, args: SubmitBlockArgs) -> Result<u64, anyhow::Error> {
// println!("block aux {:02x?}", args.deposit_aux);
let ret = self
.contract
.method::<_, bool>("verifyBlock", (args.public_inputs, args.serialized_proof, args.public_data))?
.method::<_, u64>(
"verifyBlock",
(args.public_inputs, args.serialized_proof, args.public_data, args.deposit_aux),
)?
.call()
.await?;
.await
.map_err(|e| anyhow::anyhow!("verify fail: {}", e))?;

Ok(ret)
}

pub async fn submit_block(&self, args: SubmitBlockArgs) -> Result<(), anyhow::Error> {
async fn submit_block(&self, args: SubmitBlockArgs) -> Result<(), anyhow::Error> {
let call = self
.contract
.method::<_, H256>(
"submitBlock",
(args.block_id, args.public_inputs, args.serialized_proof, args.public_data),
(
args.block_id,
args.public_inputs,
args.serialized_proof,
args.public_data,
args.deposit_aux,
),
)?
.from(self.account);
// ganache does not support EIP-1559
#[cfg(feature = "ganache")]
let call = call.legacy();
let pending_tx = call.send().await?;
let pending_tx = call.send().await.map_err(|e| anyhow::anyhow!("submit fail: {}", e))?;
let receipt = pending_tx.confirmations(self.confirmations).await?;
log::info!("block {:?} confirmed. receipt: {:?}.", args.block_id, receipt);

Expand Down
2 changes: 1 addition & 1 deletion src/block_submitter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ pub mod task_fetcher;
pub mod types;

pub use config::Settings;
pub use eth_sender::EthSender;
pub use eth_sender::{EthSender, EthSenderConfigure};
pub use task_fetcher::TaskFetcher;

0 comments on commit 4962291

Please sign in to comment.