Skip to content

Commit

Permalink
Review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
timorleph committed Feb 5, 2024
1 parent 7c1c298 commit 05e8835
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

141 changes: 71 additions & 70 deletions consensus/src/extension/election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ fn common_vote(relative_round: Round) -> bool {
relative_round % 2 == 1
}

enum VoteError<H: Hasher> {
EliminateCandidate,
enum CandidateOutcome<H: Hasher> {
Eliminate,
ElectionDone(H::Hash),
}

Expand All @@ -32,26 +32,27 @@ struct CandidateElection<H: Hasher> {
impl<H: Hasher> CandidateElection<H> {
/// Creates an election for the given candidate.
/// The candidate will eventually either get elected or eliminated.
pub fn new(candidate: &ExtenderUnit<H>) -> Self {
/// Might immediately return an outcome.
pub fn for_candidate(
candidate: &ExtenderUnit<H>,
units: &Units<H>,
) -> Result<Self, CandidateOutcome<H>> {
CandidateElection {
round: candidate.round,
candidate_creator: candidate.creator,
candidate_hash: candidate.hash,
votes: HashMap::new(),
}
.compute_votes(units)
}

fn parent_votes(
&mut self,
parents: &NodeMap<H::Hash>,
units: &Units<H>,
) -> Result<(NodeCount, NodeCount), VoteError<H>> {
) -> Result<(NodeCount, NodeCount), CandidateOutcome<H>> {
let (mut votes_for, mut votes_against) = (NodeCount(0), NodeCount(0));
for parent in parents.values() {
// Since this code is being called round-by-round we should only ever hit the cache here.
// In particular this should always return `Ok`, but this way the code is fine even if
// someone were to call it in a different order.
match self.vote(*parent, units)? {
match self.votes.get(parent).expect("units are added in order") {
true => votes_for += NodeCount(1),
false => votes_against += NodeCount(1),
}
Expand All @@ -63,12 +64,11 @@ impl<H: Hasher> CandidateElection<H> {
&mut self,
parents: &NodeMap<H::Hash>,
relative_round: Round,
units: &Units<H>,
) -> Result<bool, VoteError<H>> {
use VoteError::*;
) -> Result<bool, CandidateOutcome<H>> {
use CandidateOutcome::*;
let threshold = (parents.size() * 2) / 3 + NodeCount(1);
// Gather parents' votes.
let (votes_for, votes_against) = self.parent_votes(parents, units)?;
let (votes_for, votes_against) = self.parent_votes(parents)?;
assert!(votes_for + votes_against >= threshold);
let common_vote = common_vote(relative_round);
// If the round is sufficiently high we are done voting for the candidate if
Expand All @@ -77,7 +77,7 @@ impl<H: Hasher> CandidateElection<H> {
// the default vote is for the candidate and the parents' votes are for over the threshold,
true if votes_for >= threshold => return Err(ElectionDone(self.candidate_hash)),
// or the default vote is against the candidate and the parents' votes are against over the threshold.
false if votes_against >= threshold => return Err(EliminateCandidate),
false if votes_against >= threshold => return Err(Eliminate),
_ => (),
// Note that this means the earliest we can have a head elected is round 4.
}
Expand All @@ -91,51 +91,41 @@ impl<H: Hasher> CandidateElection<H> {
})
}

fn compute_vote(
&mut self,
voter: &ExtenderUnit<H>,
units: &Units<H>,
) -> Result<bool, VoteError<H>> {
// This should never happen, but this preserves correctness even if someone calls this for too old units.
fn vote(&mut self, voter: &ExtenderUnit<H>) -> Result<(), CandidateOutcome<H>> {
// If the vote is already computed we are done.
if self.votes.get(&voter.hash).is_some() {
return Ok(());
}
// Votes for old units are never used, so we just return.
if voter.round <= self.round {
return Ok(false);
return Ok(());
}
let relative_round = voter.round - self.round;
// Direct descendands vote for, all other units of that round against.
if relative_round == 1 {
return Ok(voter.parents.get(self.candidate_creator) == Some(&self.candidate_hash));
}
// Otherwise we compute the vote based on the parents' votes.
self.vote_from_parents(&voter.parents, relative_round, units)
}

fn vote(&mut self, voter: H::Hash, units: &Units<H>) -> Result<bool, VoteError<H>> {
// Check the vote cache.
if let Some(vote) = self.votes.get(&voter) {
return Ok(*vote);
}
let voter = units
.get(&voter)
.expect("only requesting votes from units we have");
let result = self.compute_vote(voter, units)?;
self.votes.insert(voter.hash, result);
Ok(result)
}

/// Add a single voter and copute their vote. This might end up electing or eliminating the candidate.
pub fn add_voter(mut self, voter: H::Hash, units: &Units<H>) -> Result<Self, VoteError<H>> {
self.vote(voter, units).map(|_| self)
let vote = match relative_round {
0 => unreachable!("just checked that voter and election rounds are not equal"),
// Direct descendands vote for, all other units of that round against.
1 => voter.parents.get(self.candidate_creator) == Some(&self.candidate_hash),
// Otherwise we compute the vote based on the parents' votes.
_ => self.vote_from_parents(&voter.parents, relative_round)?,
};
self.votes.insert(voter.hash, vote);
Ok(())
}

/// Compute all the votes for units we have at the moment. This might end up electing or eliminating the candidate.
pub fn compute_votes(mut self, units: &Units<H>) -> Result<Self, VoteError<H>> {
fn compute_votes(mut self, units: &Units<H>) -> Result<Self, CandidateOutcome<H>> {
for round in self.round + 1..=units.highest_round() {
for voter in units.in_round(round).expect("units come in order") {
self.vote(voter, units)?;
for voter in units.in_round(round).expect("units are added in order") {
self.vote(voter)?;
}
}
Ok(self)
}

/// Add a single voter and compute their vote. This might end up electing or eliminating the candidate.
/// Might panic if called for a unit before its parents.
pub fn add_voter(mut self, voter: &ExtenderUnit<H>) -> Result<Self, CandidateOutcome<H>> {
self.vote(voter).map(|()| self)
}
}

/// Election for a single round.
Expand All @@ -156,59 +146,65 @@ pub enum ElectionResult<H: Hasher> {
impl<H: Hasher> RoundElection<H> {
/// Create a new round election. It might immediately be decided, so this might return an election result rather than a pending election.
/// Returns an error when it's too early to finalize the candidate list, i.e. we are not at least 3 rounds ahead of the election round.
///
/// Note: it is crucial that units are added to `Units` only when all their parents are there, otherwise this might panic.
pub fn for_round(round: Round, units: &Units<H>) -> Result<ElectionResult<H>, ()> {
// If we don't yet have a unit of round + 3 we might not know about the winning candidate, so we cannot start the election.
if units.highest_round() < round + 3 {
return Err(());
}
// We might be missing units from this round, but they are guaranteed to get consistently eliminated.
let mut candidates = units
// We might be missing units from this round, but any unit that is not an ancestor of an arbitrary unit from round + 3
// will always eventually be eliminated in the voting, so we can freely skip it.
let mut candidates: Vec<_> = units
.in_round(round)
.expect("units come in order, so we definitely have units from this round");
.expect("units come in order, so we definitely have units from this round")
.iter()
.map(|candidate| candidate.hash)
.collect();
candidates.sort();
// We will be `pop`ing the candidates from the back.
candidates.reverse();
let candidate = units
.get(&candidates.pop().expect("there is a candidate"))
.expect("we have all the units we work with");
let voting = CandidateElection::new(candidate);
let election = RoundElection { candidates, voting };
// We might already have enough units to pick a head.
Ok(election.compute_votes(units))
Ok(Self::handle_candidate_election_result(
CandidateElection::for_candidate(candidate, units),
candidates,
units,
))
}

fn handle_candidate_election_result(
result: Result<CandidateElection<H>, VoteError<H>>,
result: Result<CandidateElection<H>, CandidateOutcome<H>>,
mut candidates: Vec<H::Hash>,
units: &Units<H>,
) -> ElectionResult<H> {
use CandidateOutcome::*;
use ElectionResult::*;
use VoteError::*;
match result {
// Wait for more voters.
Ok(voting) => Pending(RoundElection { candidates, voting }),
// Pick the next candidate and keep trying.
Err(EliminateCandidate) => {
Err(Eliminate) => {
let candidate = units
.get(&candidates.pop().expect("there is a candidate"))
.expect("we have all the units we work with");
let voting = CandidateElection::new(candidate);
RoundElection { candidates, voting }.compute_votes(units)
Self::handle_candidate_election_result(
CandidateElection::for_candidate(candidate, units),
candidates,
units,
)
}
// Yay, we picked a head.
Err(ElectionDone(head)) => Elected(head),
}
}

fn compute_votes(self, units: &Units<H>) -> ElectionResult<H> {
let RoundElection { candidates, voting } = self;
Self::handle_candidate_election_result(voting.compute_votes(units), candidates, units)
}

/// Add a single voter to the election.
pub fn add_voter(self, voter: H::Hash, units: &Units<H>) -> ElectionResult<H> {
/// Might panic if not all parents were added previously.
pub fn add_voter(self, voter: &ExtenderUnit<H>, units: &Units<H>) -> ElectionResult<H> {
let RoundElection { candidates, voting } = self;
Self::handle_candidate_election_result(voting.add_voter(voter, units), candidates, units)
Self::handle_candidate_election_result(voting.add_voter(voter), candidates, units)
}
}

Expand Down Expand Up @@ -261,7 +257,7 @@ mod test {
};
let last_voter = construct_unit_all_parents(NodeIndex(0), 4, n_members);
units.add_unit(last_voter.clone());
match election.add_voter(last_voter.hash, &units) {
match election.add_voter(&last_voter, &units) {
Pending(_) => panic!("failed to elect obvious head"),
Elected(head) => {
assert_eq!(units.get(&head).expect("we have the head").round, 0);
Expand Down Expand Up @@ -298,7 +294,12 @@ mod test {
for creator in n_members.into_iterator() {
units.add_unit(construct_unit_all_parents(creator, 0, n_members));
}
let mut candidate_hashes = units.in_round(0).expect("just added these");
let mut candidate_hashes: Vec<_> = units
.in_round(0)
.expect("just added these")
.iter()
.map(|candidate| candidate.hash)
.collect();
candidate_hashes.sort();
let skipped_parent = units
.get(&candidate_hashes[0])
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/extension/extender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ impl<H: Hasher> Extender<H> {

/// Add a unit to the extender. Might return several batches of ordered units as a result.
pub fn add_unit(&mut self, u: ExtenderUnit<H>) -> Vec<Vec<H::Hash>> {
let unit = u.hash;
let hash = u.hash;
self.units.add_unit(u);
let unit = self.units.get(&hash).expect("just added");
let mut result = Vec::new();
// If we have an ongoing election try to finish it.
if let Some(election) = self.election.take() {
Expand Down
22 changes: 13 additions & 9 deletions consensus/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,24 @@ impl<H: Hasher> Service<H> {
let mut round = 0;
loop {
futures::select! {
v = self.electors.next() => {
if let Some(u) = v {
v = self.electors.next() => match v {
Some(u) => {
debug!(target: LOG_TARGET, "{:?} New unit in Extender round {:?} creator {:?} hash {:?}.", self.node_id, u.round, u.creator, u.hash);
for batch in self.extender.add_unit(u) {
let head = *batch.last().expect("all batches are nonempty");
if self.finalizer_tx.unbounded_send(batch).is_err() {
warn!(target: LOG_TARGET, "{:?} Channel for batches should be open", self.node_id);
exiting = true;
}
debug!(target: LOG_TARGET, "{:?} Finalized round {:?} with head {:?}.", self.node_id, round, head);
round += 1;
let head = *batch.last().expect("all batches are nonempty");
if self.finalizer_tx.unbounded_send(batch).is_err() {
warn!(target: LOG_TARGET, "{:?} Channel for batches should be open", self.node_id);
exiting = true;
}
debug!(target: LOG_TARGET, "{:?} Finalized round {:?} with head {:?}.", self.node_id, round, head);
round += 1;
}
},
None => {
warn!(target: LOG_TARGET, "{:?} Units for extender unexpectedly ended.", self.node_id);
exiting = true;
}
},
_ = terminator.get_exit().fuse() => {
debug!(target: LOG_TARGET, "{:?} received exit signal.", self.node_id);
exiting = true;
Expand Down
41 changes: 38 additions & 3 deletions consensus/src/extension/units.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ impl<H: Hasher> Units<H> {
}

/// Get the list of unit hashes from the given round.
pub fn in_round(&self, round: Round) -> Option<Vec<H::Hash>> {
self.by_round.get(&round).cloned()
/// Panics if called for a round greater or equal to the round
/// of the highest head of a removed batch.
pub fn in_round(&self, round: Round) -> Option<Vec<&ExtenderUnit<H>>> {
self.by_round.get(&round).map(|hashes| {
hashes
.iter()
.map(|hash| self.units.get(hash).expect("we have all the units"))
.collect()
})
}

/// The highest round among all added units, or 0 if there are none.
Expand Down Expand Up @@ -91,7 +98,7 @@ mod test {
let unit = construct_unit_all_parents(NodeIndex(0), 0, n_members);
units.add_unit(unit.clone());
assert_eq!(units.highest_round(), 0);
assert_eq!(units.in_round(0), Some(vec![unit.hash]));
assert_eq!(units.in_round(0), Some(vec![&unit]));
assert_eq!(units.get(&unit.hash), Some(&unit));
}

Expand All @@ -117,4 +124,32 @@ mod test {
assert_eq!(batch.pop(), Some(head));
}
}

#[test]
fn batch_order_constant_with_different_insertion_order() {
let mut units = Units::new();
let mut units_but_backwards = Units::new();
let n_members = NodeCount(4);
let max_round = 43;
let mut heads = Vec::new();
for round in 0..=max_round {
let mut round_units = Vec::new();
for creator in n_members.into_iterator() {
let unit = construct_unit_all_parents(creator, round, n_members);
if round as usize % n_members.0 == creator.0 {
heads.push(unit.hash)
}
round_units.push(unit.clone());
units.add_unit(unit);
}
for unit in round_units.into_iter().rev() {
units_but_backwards.add_unit(unit);
}
}
for head in heads {
let batch1 = units.remove_batch(head);
let batch2 = units_but_backwards.remove_batch(head);
assert_eq!(batch1, batch2);
}
}
}
2 changes: 1 addition & 1 deletion docs/src/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To explain the inner workings of AlephBFT it is instructive to follow the path o
5. At a suitable moment the units from the store are further moved to a component called `Terminal` -- implemented in `src/terminal.rs`.
6. Roughly speaking, terminal is expected to "unpack" the unit, so that their parents become explicit (instead of being control hashes only).
7. Each unit whose parents are successfully decoded, is added to the "Dag". Each unit in the Dag is legit + has all its parents in the Dag.
8. Dag units are passed to a component called the `Extender` -- see the files in `src/extender/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md).
8. Dag units are passed to a component called the `Extender` -- see the files in `src/extension/`. The role of the extender is to efficiently run the `OrderData` algorithm, described in the [section on AlephBFT](how_alephbft_does_it.md).
9. Once a unit's data is placed in one of batches by the `Extender` then its path is over and can be safely discarded.

### 5.1 Creator
Expand Down

0 comments on commit 05e8835

Please sign in to comment.