Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Nov 5, 2024
1 parent 96a00d4 commit dfa9202
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 41 deletions.
13 changes: 8 additions & 5 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::consensus_adapter::ConsensusOverloadChecker;
use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::jsonrpc_index::CoinIndexKey2;
Expand Down Expand Up @@ -142,7 +143,6 @@ use crate::authority::authority_store_pruner::{
use crate::authority::epoch_start_configuration::EpochStartConfigTrait;
use crate::authority::epoch_start_configuration::EpochStartConfiguration;
use crate::checkpoints::CheckpointStore;
use crate::consensus_adapter::ConsensusAdapter;
use crate::epoch::committee_store::CommitteeStore;
use crate::execution_cache::{
CheckpointCache, ExecutionCacheCommit, ExecutionCacheReconfigAPI, ExecutionCacheWrite,
Expand Down Expand Up @@ -1027,6 +1027,7 @@ impl AuthorityState {
debug!("handle_transaction_v2");

// Ensure an idempotent answer.
// TODO(fastpath): refactor this to work with Mysticeti fastpath where signatures do not exist.
let tx_status = self.get_transaction_status(&tx_digest, epoch_store)?;
if tx_status.is_some() {
return Ok(tx_status);
Expand Down Expand Up @@ -1080,7 +1081,7 @@ impl AuthorityState {

pub(crate) fn check_system_overload(
&self,
consensus_adapter: &Arc<ConsensusAdapter>,
consensus_overload_checker: &(impl ConsensusOverloadChecker + ?Sized),
tx_data: &SenderSignedData,
do_authority_overload_check: bool,
) -> SuiResult {
Expand All @@ -1094,9 +1095,11 @@ impl AuthorityState {
.tap_err(|_| {
self.update_overload_metrics("execution_pending");
})?;
consensus_adapter.check_consensus_overload().tap_err(|_| {
self.update_overload_metrics("consensus");
})?;
consensus_overload_checker
.check_consensus_overload()
.tap_err(|_| {
self.update_overload_metrics("consensus");
})?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl ValidatorService {
// higher chance to succeed.
let mut validator_pushback_error = None;
let overload_check_res = state.check_system_overload(
&consensus_adapter,
&*consensus_adapter,
transaction.data(),
state.check_system_overload_at_signing(),
);
Expand Down Expand Up @@ -511,7 +511,7 @@ impl ValidatorService {

// Check system overload
let overload_check_res = self.state.check_system_overload(
&consensus_adapter,
&*consensus_adapter,
transaction.data(),
state.check_system_overload_at_signing(),
);
Expand Down Expand Up @@ -683,7 +683,7 @@ impl ValidatorService {
// Check system overload
for certificate in &certificates {
let overload_check_res = self.state.check_system_overload(
&self.consensus_adapter,
&*self.consensus_adapter,
certificate.data(),
self.state.check_system_overload_at_execution(),
);
Expand Down
34 changes: 25 additions & 9 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ impl ConsensusAdapterMetrics {
}
}

/// An object that can be used to check if the consensus is overloaded.
pub trait ConsensusOverloadChecker: Sync + Send + 'static {
fn check_consensus_overload(&self) -> SuiResult;
}

#[mockall::automock]
#[async_trait::async_trait]
pub trait SubmitToConsensus: Sync + Send + 'static {
Expand All @@ -193,6 +198,7 @@ pub trait SubmitToConsensus: Sync + Send + 'static {
epoch_store: &Arc<AuthorityPerEpochStore>,
) -> SuiResult;
}

/// Submit Sui certificates to the consensus.
pub struct ConsensusAdapter {
/// The network client connecting to the consensus node of this authority.
Expand Down Expand Up @@ -576,7 +582,7 @@ impl ConsensusAdapter {

/// Performs weakly consistent checks on internal buffers to quickly
/// discard transactions if we are overloaded
pub fn check_limits(&self) -> bool {
fn check_limits(&self) -> bool {
// First check total transactions (waiting and in submission)
if self.num_inflight_transactions.load(Ordering::Relaxed) as usize
> self.max_pending_transactions
Expand All @@ -587,14 +593,6 @@ impl ConsensusAdapter {
self.submit_semaphore.available_permits() > 0
}

pub(crate) fn check_consensus_overload(&self) -> SuiResult {
fp_ensure!(
self.check_limits(),
SuiError::TooManyTransactionsPendingConsensus
);
Ok(())
}

fn submit_unchecked(
self: &Arc<Self>,
transactions: &[ConsensusTransaction],
Expand Down Expand Up @@ -978,6 +976,24 @@ pub fn get_position_in_list(
.0
}

impl ConsensusOverloadChecker for ConsensusAdapter {
fn check_consensus_overload(&self) -> SuiResult {
fp_ensure!(
self.check_limits(),
SuiError::TooManyTransactionsPendingConsensus
);
Ok(())
}
}

pub struct NoopConsensusOverloadChecker {}

impl ConsensusOverloadChecker for NoopConsensusOverloadChecker {
fn check_consensus_overload(&self) -> SuiResult {
Ok(())
}
}

impl ReconfigurationInitiator for Arc<ConsensusAdapter> {
/// This method is called externally to begin reconfiguration
/// It transition reconfig state to reject new certificates from user
Expand Down
64 changes: 40 additions & 24 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@ use fastcrypto_tbls::dkg;
use mysten_metrics::monitored_scope;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use sui_types::{
error::SuiError,
error::{SuiError, SuiResult},
messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
transaction::Transaction,
};
use tap::TapFallible;
use tracing::{info, warn};
use tracing::{debug, info, warn};

use crate::{
authority::AuthorityState, checkpoints::CheckpointServiceNotify,
authority::{authority_per_epoch_store::AuthorityPerEpochStore, AuthorityState},
checkpoints::CheckpointServiceNotify,
consensus_adapter::ConsensusOverloadChecker,
transaction_manager::TransactionManager,
};

/// Allows verifying the validity of transactions
#[derive(Clone)]
pub struct SuiTxValidator {
authority_state: Arc<AuthorityState>,
consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
_transaction_manager: Arc<TransactionManager>,
metrics: Arc<SuiTxValidatorMetrics>,
Expand All @@ -31,6 +35,7 @@ pub struct SuiTxValidator {
impl SuiTxValidator {
pub fn new(
authority_state: Arc<AuthorityState>,
consensus_overload_checker: Arc<dyn ConsensusOverloadChecker>,
checkpoint_service: Arc<dyn CheckpointServiceNotify + Send + Sync>,
transaction_manager: Arc<TransactionManager>,
metrics: Arc<SuiTxValidatorMetrics>,
Expand All @@ -42,6 +47,7 @@ impl SuiTxValidator {
);
Self {
authority_state,
consensus_overload_checker,
checkpoint_service,
_transaction_manager: transaction_manager,
metrics,
Expand Down Expand Up @@ -131,32 +137,38 @@ impl SuiTxValidator {
continue;
};

// Currently validity_check() and verify_transaction() are not required to be consistent across validators,
// so they do not run in validate_transactions(). They can run there once we confirm it is safe.
if tx
.validity_check(epoch_store.protocol_config(), epoch_store.epoch())
.is_err()
{
result.push(i as TransactionIndex);
continue;
}
let Ok(tx) = epoch_store.verify_transaction(*tx.clone()) else {
result.push(i as TransactionIndex);
continue;
};

if self
.authority_state
.handle_transaction_v2(&epoch_store, tx)
.await
.is_err()
{
if let Err(e) = self.vote_transaction(&epoch_store, tx).await {
debug!("Failed to vote transaction: {:?}", e);
result.push(i as TransactionIndex);
}
}

result
}

async fn vote_transaction(
&self,
epoch_store: &Arc<AuthorityPerEpochStore>,
tx: Box<Transaction>,
) -> SuiResult<()> {
// Currently validity_check() and verify_transaction() are not required to be consistent across validators,
// so they do not run in validate_transactions(). They can run there once we confirm it is safe.
tx.validity_check(epoch_store.protocol_config(), epoch_store.epoch())?;

self.authority_state.check_system_overload(
&*self.consensus_overload_checker,
tx.data(),
self.authority_state.check_system_overload_at_signing(),
)?;

let tx = epoch_store.verify_transaction(*tx)?;

self.authority_state
.handle_transaction_v2(epoch_store, tx)
.await?;

Ok(())
}
}

fn tx_kind_from_bytes(tx: &[u8]) -> Result<ConsensusTransactionKind, ValidationError> {
Expand Down Expand Up @@ -240,7 +252,10 @@ mod tests {
use crate::{
authority::test_authority_builder::TestAuthorityBuilder,
checkpoints::CheckpointServiceNoop,
consensus_adapter::consensus_tests::{test_certificates, test_gas_objects},
consensus_adapter::{
consensus_tests::{test_certificates, test_gas_objects},
NoopConsensusOverloadChecker,
},
consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
};

Expand Down Expand Up @@ -273,6 +288,7 @@ mod tests {
let metrics = SuiTxValidatorMetrics::new(&Default::default());
let validator = SuiTxValidator::new(
state.clone(),
Arc::new(NoopConsensusOverloadChecker {}),
Arc::new(CheckpointServiceNoop {}),
state.transaction_manager().clone(),
metrics,
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tokio::{sync::mpsc, time::sleep};
use crate::{
authority::{test_authority_builder::TestAuthorityBuilder, AuthorityState},
checkpoints::{CheckpointMetrics, CheckpointService, CheckpointServiceNoop},
consensus_adapter::NoopConsensusOverloadChecker,
consensus_handler::ConsensusHandlerInitializer,
consensus_manager::{
mysticeti_manager::MysticetiManager, ConsensusManagerMetrics, ConsensusManagerTrait,
Expand Down Expand Up @@ -97,6 +98,7 @@ async fn test_mysticeti_manager() {
consensus_handler_initializer,
SuiTxValidator::new(
state.clone(),
Arc::new(NoopConsensusOverloadChecker {}),
Arc::new(CheckpointServiceNoop {}),
state.transaction_manager().clone(),
SuiTxValidatorMetrics::new(&Registry::new()),
Expand Down
1 change: 1 addition & 0 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,7 @@ impl SuiNode {
consensus_handler_initializer,
SuiTxValidator::new(
state.clone(),
consensus_adapter.clone(),
checkpoint_service.clone(),
state.transaction_manager().clone(),
sui_tx_validator_metrics.clone(),
Expand Down

0 comments on commit dfa9202

Please sign in to comment.