Skip to content

Commit

Permalink
feat: run rewards service every epoch (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
mateuszjasiuk authored Sep 11, 2024
1 parent 58da92f commit 9f85ffb
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 73 deletions.
1 change: 0 additions & 1 deletion chain/src/repository/pos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub fn insert_bonds(
.into_iter()
.map(|bond| {
let validator: ValidatorDb = validators::table
// Epoch for validators is problematic?
.filter(
validators::namada_address
.eq(&bond.target.to_string()),
Expand Down
86 changes: 40 additions & 46 deletions rewards/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use shared::crawler;
use shared::crawler_state::{CrawlerName, IntervalCrawlerState};
use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError};
use tendermint_rpc::HttpClient;
use tokio::sync::{Mutex, MutexGuard};
use tokio::time::{sleep, Instant};
use tokio::time::sleep;
use tracing::Level;
use tracing_subscriber::FmtSubscriber;

Expand Down Expand Up @@ -49,61 +48,49 @@ async fn main() -> Result<(), MainError> {

let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?);

// Initially set the instant to the current time minus the sleep_for
// so we can start processing right away
let instant = Arc::new(Mutex::new(
Instant::now()
.checked_sub(Duration::from_secs(config.sleep_for))
.unwrap(),
));

// Run migrations
run_migrations(&conn)
.await
.context_db_interact_error()
.into_db_error()?;

tracing::info!("Query epoch...");
let epoch = namada_service::get_current_epoch(&client)
.await
.into_rpc_error()?;

if epoch < 2 {
tracing::info!("Waiting for first epoch to happen...");
sleep(Duration::from_secs(config.sleep_for)).await;
return Ok(());
let mut epoch;
loop {
epoch = namada_service::get_current_epoch(&client)
.await
.into_rpc_error()?;

if epoch < 2 {
tracing::info!("Waiting for first epoch to happen...");
sleep(Duration::from_secs(config.sleep_for)).await;
} else {
break;
}
}

crawler::crawl(
move |_| {
crawling_fn(
conn.clone(),
client.clone(),
instant.clone(),
config.sleep_for,
)
},
0,
move |epoch| crawling_fn(conn.clone(), client.clone(), epoch),
epoch,
)
.await
}

async fn crawling_fn(
conn: Arc<Object>,
client: Arc<HttpClient>,
instant: Arc<Mutex<Instant>>,
sleep_for: u64,
epoch_to_process: u32,
) -> Result<(), MainError> {
let mut instant = instant.lock().await;

let should_process = can_process(&instant, sleep_for);
let should_process = can_process(epoch_to_process, client.clone()).await?;

if !should_process {
let timestamp = Utc::now().naive_utc();
update_crawler_timestamp(&conn, timestamp).await?;

tracing::warn!(
"Not enough time has passed since last crawl, skipping..."
"Epoch {} was not processed, retry...",
epoch_to_process
);

return Err(MainError::NoAction);
Expand All @@ -118,21 +105,20 @@ async fn crawling_fn(
let rewards = namada_service::query_rewards(&client, delegations_pairs)
.await
.into_rpc_error()?;
let non_zero_rewards = rewards
.into_iter()
.filter(|reward| !reward.amount.is_zero())
.collect();

let timestamp = DateTimeUtc::now().0.timestamp();
let crawler_state = IntervalCrawlerState { timestamp };

conn.interact(move |conn| {
conn.build_transaction().read_write().run(
|transaction_conn: &mut diesel::prelude::PgConnection| {
let rewards_db = repository::pos_rewards::query_rewards(
transaction_conn,
rewards,
);

repository::pos_rewards::upsert_rewards(
transaction_conn,
rewards_db,
non_zero_rewards,
)?;

repository::crawler_state::upsert_crawler_state(
Expand All @@ -149,17 +135,25 @@ async fn crawling_fn(
.and_then(identity)
.into_db_error()?;

// Once we are done processing, we reset the instant
*instant = Instant::now();

Ok(())
}

fn can_process(instant: &MutexGuard<Instant>, sleep_for: u64) -> bool {
tracing::info!("Attempting to process rewards data");

let time_elapsed = instant.elapsed().as_secs();
time_elapsed >= sleep_for
async fn can_process(
epoch: u32,
client: Arc<HttpClient>,
) -> Result<bool, MainError> {
tracing::info!("Attempting to process epoch: {}...", epoch);
let current_epoch = namada_service::get_current_epoch(&client.clone())
.await
.map_err(|e| {
tracing::error!(
"Failed to query Namada's last committed block: {}",
e
);
MainError::RpcError
})?;

Ok(current_epoch >= epoch)
}

async fn update_crawler_timestamp(
Expand Down
47 changes: 21 additions & 26 deletions rewards/src/repository/pos_rewards.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,33 @@
use diesel::upsert::excluded;
use diesel::{
ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper,
};
use diesel::{ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl};
use orm::pos_rewards::PosRewardInsertDb;
use orm::schema::{pos_rewards, validators};
use orm::validators::ValidatorDb;
use shared::rewards::Reward;

pub fn query_rewards(
transaction_conn: &mut PgConnection,
rewards: Vec<Reward>,
) -> Vec<PosRewardInsertDb> {
rewards
.into_iter()
.map(|reward| {
let validator: ValidatorDb =
validators::table
.filter(validators::dsl::namada_address.eq(
&reward.delegation_pair.validator_address.to_string(),
))
.select(ValidatorDb::as_select())
.first(transaction_conn)
.expect("Failed to get validator");
PosRewardInsertDb::from_reward(reward, validator.id)
})
.collect::<Vec<_>>()
}

pub fn upsert_rewards(
transaction_conn: &mut PgConnection,
rewards: Vec<PosRewardInsertDb>,
rewards: Vec<Reward>,
) -> anyhow::Result<()> {
diesel::insert_into(pos_rewards::table)
.values::<&Vec<PosRewardInsertDb>>(&rewards)
.values::<Vec<PosRewardInsertDb>>(
rewards
.into_iter()
.map(|reward| {
let validator_id: i32 = validators::table
.filter(
validators::namada_address.eq(&reward
.delegation_pair
.validator_address
.to_string()),
)
.select(validators::id)
.first(transaction_conn)
.expect("Failed to get validator");

PosRewardInsertDb::from_reward(reward, validator_id)
})
.collect::<Vec<_>>(),
)
.on_conflict((
pos_rewards::columns::owner,
pos_rewards::columns::validator_id,
Expand Down
4 changes: 4 additions & 0 deletions shared/src/balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ impl Amount {
self.0.checked_add(other.0).map(Self)
}

pub fn is_zero(&self) -> bool {
self.0.is_zero()
}

pub fn fake() -> Self {
Self(NamadaAmount::from_u64((0..10000000).fake::<u64>()))
}
Expand Down

0 comments on commit 9f85ffb

Please sign in to comment.