Skip to content

Commit

Permalink
Fix catching up (#344)
Browse files Browse the repository at this point in the history
  • Loading branch information
kostekIV authored and timorl committed Mar 18, 2022
1 parent deeda8d commit cc582b1
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 32 deletions.
122 changes: 90 additions & 32 deletions finality-aleph/src/session_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use tokio::sync::{
RwLock,
};

const PRUNING_THRESHOLD: u32 = 10;
type SessionMap = HashMap<SessionId, Vec<AuthorityId>>;
type SessionSubscribers = HashMap<SessionId, Vec<OneShotSender<Vec<AuthorityId>>>>;

Expand Down Expand Up @@ -221,18 +222,6 @@ where
}
}

/// Returns None if the num is not a first block of some session otherwise returns Some(id) where
/// id is id of a session that block starts.
fn is_first_block<B: Block>(num: NumberFor<B>, period: SessionPeriod) -> Option<SessionId> {
let session = session_id_from_block_num::<B>(num, period);

if first_block_of_session::<B>(session, period) == num {
Some(session)
} else {
None
}
}

/// Struct responsible for updating session map
pub struct SessionMapUpdater<AP, FN, B>
where
Expand Down Expand Up @@ -289,39 +278,56 @@ where
.await;
}

if session_id.0 >= 10 && session_id.0 % 10 == 0 {
debug!(target: "aleph-session-updater", "Pruning session map below session #{:?}", session_id.0 - 10);
if session_id.0 >= PRUNING_THRESHOLD && session_id.0 % PRUNING_THRESHOLD == 0 {
debug!(target: "aleph-session-updater", "Pruning session map below session #{:?}", session_id.0 - PRUNING_THRESHOLD);
self.session_map
.prune_below(SessionId(session_id.0 - 10))
.prune_below(SessionId(session_id.0 - PRUNING_THRESHOLD))
.await;
}
}

async fn update_session(&mut self, session_id: SessionId, period: SessionPeriod) {
let first_block = first_block_of_session::<B>(session_id, period);
self.handle_first_block_of_session(first_block, session_id)
.await;
}

fn catch_up_boundaries(&self, period: SessionPeriod) -> (SessionId, SessionId) {
let last_finalized = self.finality_notificator.last_finalized();

let current_session = session_id_from_block_num::<B>(last_finalized, period);
let starting_session = SessionId(current_session.0.saturating_sub(PRUNING_THRESHOLD));

(starting_session, current_session)
}

pub async fn run(mut self, period: SessionPeriod) {
let mut notifications = self.finality_notificator.notification_stream();

let (starting_session, current_session) = self.catch_up_boundaries(period);

// lets catch up
for block_num in 0..=self
.finality_notificator
.last_finalized()
.saturated_into::<u32>()
{
let block_num = block_num.saturated_into();
if let Some(session_id) = is_first_block::<B>(block_num, period) {
self.handle_first_block_of_session(block_num, session_id)
.await;
}
for session in starting_session.0..=current_session.0 {
self.update_session(SessionId(session), period).await;
}

let mut last_updated = current_session;

while let Some(FinalityNotification { header, .. }) = notifications.next().await {
let last_finalized = header.number();
trace!(target: "aleph-session-updater", "got FinalityNotification about #{:?}", last_finalized);

if let Some(session_id) = is_first_block::<B>(*last_finalized, period) {
// we have finalized first block of some session, now we can put the next known authorities into session map
self.handle_first_block_of_session(*last_finalized, session_id)
.await;
let session_id = session_id_from_block_num::<B>(*last_finalized, period);

if last_updated >= session_id {
continue;
}

for session in (last_updated.0 + 1)..=session_id.0 {
self.update_session(SessionId(session), period).await;
}

last_updated = session_id;
}
}
}
Expand All @@ -335,7 +341,7 @@ mod tests {
use sc_utils::mpsc::tracing_unbounded;
use sp_consensus::BlockOrigin;
use sp_runtime::testing::UintAuthorityId;
use std::time::Duration;
use std::{sync::Mutex, time::Duration};
use substrate_test_runtime_client::{
ClientBlockImportExt, DefaultTestClientBuilderExt, TestClient, TestClientBuilder,
TestClientBuilderExt,
Expand All @@ -345,19 +351,20 @@ mod tests {
struct MockProvider {
pub session_map: HashMap<NumberFor<TBlock>, Vec<AuthorityId>>,
pub next_session_map: HashMap<NumberFor<TBlock>, Vec<AuthorityId>>,
pub asked_for: Arc<Mutex<Vec<NumberFor<TBlock>>>>,
}

struct MockNotificator {
pub last_finalized: NumberFor<TBlock>,
pub receiver:
std::sync::Mutex<Option<TracingUnboundedReceiver<FinalityNotification<TBlock>>>>,
pub receiver: Mutex<Option<TracingUnboundedReceiver<FinalityNotification<TBlock>>>>,
}

impl MockProvider {
fn new() -> Self {
Self {
session_map: HashMap::new(),
next_session_map: HashMap::new(),
asked_for: Arc::new(Mutex::new(Vec::new())),
}
}
}
Expand All @@ -373,10 +380,14 @@ mod tests {

impl AuthorityProvider<NumberFor<TBlock>> for MockProvider {
fn authorities(&self, b: NumberFor<TBlock>) -> Option<Vec<AuthorityId>> {
let mut asked = self.asked_for.lock().unwrap();
asked.push(b);
self.session_map.get(&b).cloned()
}

fn next_authorities(&self, b: NumberFor<TBlock>) -> Option<Vec<AuthorityId>> {
let mut asked = self.asked_for.lock().unwrap();
asked.push(b);
self.next_session_map.get(&b).cloned()
}
}
Expand Down Expand Up @@ -501,6 +512,53 @@ mod tests {
);
}

#[tokio::test(flavor = "multi_thread")]
async fn prunes_old_sessions() {
let (_sender, receiver) = tracing_unbounded("test");
let mut mock_provider = MockProvider::new();
let mut mock_notificator = MockNotificator::new(receiver);

mock_provider.session_map.insert(0, authorities(0, 4));
for i in 0..=2 * PRUNING_THRESHOLD {
mock_provider.next_session_map.insert(
i as u64,
authorities(4 * (i + 1) as u64, 4 * (i + 2) as u64),
);
}

mock_notificator.last_finalized = 20;

let asked = mock_provider.asked_for.clone();
let updater = SessionMapUpdater::new(mock_provider, mock_notificator);
let session_map = updater.readonly_session_map();

let _handle = tokio::spawn(updater.run(SessionPeriod(1)));

// wait a bit
Delay::new(Duration::from_millis(50)).await;

{
let asked = asked.lock().unwrap();
assert_eq!((10..=20).into_iter().collect::<Vec<_>>(), *asked);
}
for i in 0..=20 - PRUNING_THRESHOLD {
assert_eq!(
session_map.get(SessionId(i)).await,
None,
"Session {:?} should be pruned",
i
);
}
for i in 21 - PRUNING_THRESHOLD..=20 {
assert_eq!(
session_map.get(SessionId(i)).await,
Some(authorities(4 * i as u64, 4 * (i + 1) as u64)),
"Session {:?} should not be pruned",
i
);
}
}

#[tokio::test(flavor = "multi_thread")]
async fn subscription_with_already_defined_session_works() {
let mut shared = SharedSessionMap::new();
Expand Down
1 change: 1 addition & 0 deletions scripts/run_nodes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ run_node() {
-laleph-finality=debug \
-laleph-justification=debug \
-laleph-data-store=debug \
-laleph-updater=debug \
-laleph-metrics=debug \
2> $auth.log > /dev/null & \
}
Expand Down

0 comments on commit cc582b1

Please sign in to comment.