Skip to content

Commit

Permalink
Compute which sender chains' message deliveries to wait for. (#2693)
Browse files Browse the repository at this point in the history
* Break a few lines with > 100 characters.

* Add ReceivedCertificatesFromValidator type.

* Compute which sender chains' message deliveries to wait for.

* Address review comments.
  • Loading branch information
afck authored Oct 23, 2024
1 parent c44bb60 commit 9e03718
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 23 deletions.
90 changes: 69 additions & 21 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,8 @@ where
self.client
.download_certificates(&nodes, block.chain_id, block.height)
.await?;
// Process the received operations. Download required hashed certificate values if necessary.
// Process the received operations. Download required hashed certificate values if
// necessary.
if let Err(err) = self.process_certificate(certificate.clone(), vec![]).await {
match &err {
LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) => {
Expand All @@ -1187,7 +1188,7 @@ where
&self,
chain_id: ChainId,
remote_node: &RemoteNode<P::Node>,
) -> Result<(ValidatorName, u64, Vec<Certificate>), NodeError> {
) -> Result<ReceivedCertificatesFromValidator, NodeError> {
let tracker = self
.state()
.received_certificate_trackers()
Expand All @@ -1202,6 +1203,7 @@ where
let query = ChainInfoQuery::new(chain_id).with_received_log_excluding_first_n(tracker);
let info = remote_node.handle_chain_info_query(query).await?;
let mut tracker = MultichainTracker::new(tracker);
let mut other_sender_chains = Vec::new();
let remote_log = info.requested_received_log.clone();
let remote_node_chains_view = info.requested_received_log.into_iter().fold(
BTreeMap::<ChainId, Vec<BlockHeight>>::new(),
Expand All @@ -1220,16 +1222,22 @@ where
for (chain_id, mut block_batch) in remote_node_chains_view {
block_batch.sort();

let Some(last_height) = block_batch.last().copied() else {
continue;
};

self.advance_with_local(chain_id, &mut block_batch, &mut tracker)
.await?;

let Some(first_block) = block_batch.first() else {
let Some(first_height) = block_batch.first().copied() else {
// `advance_with_local` might have drained the whole `block_batch`.
// In that case, move to the next chain batch.
// In that case, move to the next chain batch, but remember to wait for
// the messages to be delivered to the inboxes.
other_sender_chains.push((chain_id, last_height));
continue;
};
let batch_size = block_batch.last().unwrap().saturating_sub(*first_block).0 + 1; // safe to unwrap because we checked that the vec is not empty.
let block_batch_range = BlockHeightRange::multi(*first_block, batch_size);
let batch_size = last_height.saturating_sub(first_height).0 + 1;
let block_batch_range = BlockHeightRange::multi(first_height, batch_size);
let query = ChainInfoQuery::new(chain_id)
.with_sent_certificate_hashes_in_range(block_batch_range.clone());

Expand Down Expand Up @@ -1259,8 +1267,13 @@ where
.await?
{
HandleCertificateResult::FutureEpoch => {
warn!("Postponing received certificate from {:.8} at height {} from future epoch {}",
chain_id, certificate.value().height(), certificate.value().epoch());
warn!(
"Postponing received certificate from {:.8} at height {} \
from future epoch {}",
chain_id,
certificate.value().height(),
certificate.value().epoch()
);
// Stop the synchronization here. Do not increment the tracker further so
// that this certificate can still be downloaded later, once our committee
// is updated.
Expand All @@ -1283,11 +1296,16 @@ where
}
}
}
let new_tracker = tracker.finalize(&remote_log);
Ok((remote_node.name, new_tracker, certificates))
Ok(ReceivedCertificatesFromValidator {
name: remote_node.name,
tracker: tracker.finalize(&remote_log),
certificates,
other_sender_chains,
})
}

/// Uses local information (about already-processed blocks) to advance the `block_batch` to a place where only new blocks are left.
/// Uses local information (about already-processed blocks) to advance the `block_batch` to a
/// place where only new blocks are left.
async fn advance_with_local(
&self,
chain_id: ChainId,
Expand All @@ -1305,8 +1323,8 @@ where
// Find the first block in the batch that is higher than the highest block known locally.
match block_batch.iter().position(|b| b >= &local_next) {
None => {
// Our highest, locally-known block is higher than any block height from the current batch.
// Move to the next chain batch.
// Our highest, locally-known block is higher than any block height from the
// current batch. Move to the next chain batch.
block_batch.clear();
}
Some(index) => {
Expand Down Expand Up @@ -1356,12 +1374,15 @@ where

/// Processes the result of [`synchronize_received_certificates_from_validator`] and updates
/// the tracker for this validator.
#[tracing::instrument(level = "trace", skip(tracker, certificates))]
#[tracing::instrument(level = "trace", skip(tracker, certificates, other_sender_chains))]
async fn receive_certificates_from_validator(
&self,
name: ValidatorName,
tracker: u64,
certificates: Vec<Certificate>,
ReceivedCertificatesFromValidator {
name,
tracker,
certificates,
other_sender_chains,
}: ReceivedCertificatesFromValidator,
) {
for certificate in certificates {
let hash = certificate.hash();
Expand All @@ -1375,6 +1396,19 @@ where
return;
}
}
for (chain_id, height) in other_sender_chains {
if let Err(error) = self
.client
.local_node
.wait_for_outgoing_messages(chain_id, height)
.await
{
error!(
"Failed trying to wait for outgoing messages from {chain_id} \
up to {height}: {error}"
);
}
}
// Update tracker.
self.state_mut()
.update_received_certificate_tracker(name, tracker);
Expand Down Expand Up @@ -1426,9 +1460,9 @@ where
return Err(error.into());
}
};
for (name, tracker, certificates) in responses {
for received_certificates in responses {
// Process received certificates.
self.receive_certificates_from_validator(name, tracker, certificates)
self.receive_certificates_from_validator(received_certificates)
.await;
}
Ok(())
Expand Down Expand Up @@ -3207,14 +3241,14 @@ where

let chain_id = self.chain_id;
// Proceed to downloading received certificates.
let (name, tracker, certificates) = self
let received_certificates = self
.synchronize_received_certificates_from_validator(chain_id, &remote_node)
.await?;

drop(_guard);
// Process received certificates. If the client state has changed during the
// network calls, we should still be fine.
self.receive_certificates_from_validator(name, tracker, certificates)
self.receive_certificates_from_validator(received_certificates)
.await;
Ok(())
}
Expand Down Expand Up @@ -3243,3 +3277,17 @@ impl Drop for AbortOnDrop {
self.0.abort();
}
}

/// The result of `synchronize_received_certificates_from_validator`.
struct ReceivedCertificatesFromValidator {
/// The name of the validator we downloaded from.
name: ValidatorName,
/// The new tracker value for that validator.
tracker: u64,
/// The downloaded certificates. The signatures were already checked and they are ready
/// to be processed.
certificates: Vec<Certificate>,
/// Sender chains that were already up to date locally. We need to wait for their messages
/// to be delivered, at least up to the given block height.
other_sender_chains: Vec<(ChainId, BlockHeight)>,
}
20 changes: 18 additions & 2 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ where
let hash = certificate.hash();
if !certificate.value().is_confirmed() || certificate.value().chain_id() != chain_id {
// The certificate is not as expected. Give up.
tracing::warn!("Failed to process network certificate {}", hash);
warn!("Failed to process network certificate {}", hash);
return info;
}
let mut result = self
Expand All @@ -324,7 +324,7 @@ where
Ok(response) => info = Some(response.info),
Err(error) => {
// The certificate is not as expected. Give up.
tracing::warn!("Failed to process network certificate {}: {}", hash, error);
warn!("Failed to process network certificate {}: {}", hash, error);
return info;
}
};
Expand All @@ -333,6 +333,22 @@ where
info
}

/// Returns only after the outbox of the given chain does not contain any entries up to
/// `height` anymore.
#[tracing::instrument(level = "trace", skip_all)]
pub async fn wait_for_outgoing_messages(
&self,
chain_id: ChainId,
height: BlockHeight,
) -> Result<(), LocalNodeError> {
// TODO(#2692): Implement this, once #2689 is merged.
warn!(
"Not waiting for outgoing messages from {chain_id:.8} up to height {height}: \
not implemented yet."
);
Ok(())
}

/// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
/// [`ChainId`].
///
Expand Down

0 comments on commit 9e03718

Please sign in to comment.