From d7c986ca250f619e0e29ee312b64215eedb567f5 Mon Sep 17 00:00:00 2001 From: mwtian <81660174+mwtian@users.noreply.github.com> Date: Mon, 26 Aug 2024 11:50:03 -0700 Subject: [PATCH] [Narwhal] remove NarwhalManager, LazyNarwhalClient, TransactionValidator and a few other integrations (#19089) ## Description Remove a few integration points between Narwhal and Sui. Remove some usages of Narwhal types in Sui. ## Test plan CI --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 9 - consensus/core/src/block.rs | 29 +- consensus/core/src/block_verifier.rs | 20 +- consensus/core/src/commit.rs | 6 +- consensus/core/src/core.rs | 7 +- consensus/core/src/lib.rs | 13 +- consensus/core/src/transaction.rs | 46 +--- crates/sui-benchmark/Cargo.toml | 4 - crates/sui-core/Cargo.toml | 5 - .../authority/authority_per_epoch_store.rs | 45 +++- crates/sui-core/src/authority_server.rs | 9 +- crates/sui-core/src/consensus_adapter.rs | 48 +--- crates/sui-core/src/consensus_handler.rs | 111 +++----- crates/sui-core/src/consensus_manager/mod.rs | 133 +--------- .../consensus_manager/mysticeti_manager.rs | 4 +- .../src/consensus_manager/narwhal_manager.rs | 234 ---------------- .../consensus_types/consensus_output_api.rs | 2 +- crates/sui-core/src/consensus_types/mod.rs | 1 - crates/sui-core/src/consensus_validator.rs | 70 +---- crates/sui-core/src/mysticeti_adapter.rs | 1 + crates/sui-core/src/transaction_manager.rs | 2 +- .../src/unit_tests/mysticeti_manager_tests.rs | 38 ++- .../src/unit_tests/narwhal_manager_tests.rs | 249 ------------------ crates/sui-node/Cargo.toml | 1 - crates/sui-protocol-config/src/lib.rs | 12 + crates/sui-tool/Cargo.toml | 2 - crates/sui-tool/README.md | 1 - crates/sui-tool/src/db_tool/mod.rs | 16 +- crates/sui-tool/src/lib.rs | 47 ---- 29 files changed, 199 insertions(+), 966 deletions(-) delete mode 100644 crates/sui-core/src/consensus_manager/narwhal_manager.rs delete mode 100644 crates/sui-core/src/unit_tests/narwhal_manager_tests.rs diff --git a/Cargo.lock b/Cargo.lock index e3d83e379f77a..3aef21700580c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12800,7 +12800,6 @@ dependencies = [ "itertools 0.10.5", "move-core-types", "mysten-metrics", - "narwhal-node", "prometheus", "rand 0.8.5", "regex", @@ -13057,12 +13056,7 @@ dependencies = [ "mysten-network", "narwhal-config", "narwhal-crypto", - "narwhal-executor", - "narwhal-network", - "narwhal-node", - "narwhal-test-utils", "narwhal-types", - "narwhal-worker", "nonempty", "num-bigint 0.4.4", "num_cpus", @@ -14083,7 +14077,6 @@ dependencies = [ "mysten-network", "mysten-service", "narwhal-network", - "narwhal-worker", "parking_lot 0.12.1", "prometheus", "reqwest 0.12.5", @@ -14901,8 +14894,6 @@ dependencies = [ "indicatif", "itertools 0.10.5", "move-core-types", - "narwhal-storage", - "narwhal-types", "num_cpus", "object_store", "prometheus", diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index 5c1a12d2ee44a..de5c779125423 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -443,8 +443,8 @@ impl VerifiedBlock { } } - #[cfg(test)] - pub(crate) fn new_for_test(block: Block) -> Self { + /// This method is public for testing in other crates. + pub fn new_for_test(block: Block) -> Self { // Use empty signature in test. let signed_block = SignedBlock { inner: block, @@ -462,7 +462,7 @@ impl VerifiedBlock { } /// Returns reference to the block. - pub(crate) fn reference(&self) -> BlockRef { + pub fn reference(&self) -> BlockRef { BlockRef { round: self.round(), author: self.author(), @@ -544,15 +544,14 @@ pub(crate) fn genesis_blocks(context: Arc) -> Vec { } /// Creates fake blocks for testing. -#[cfg(test)] +/// This struct is public for testing in other crates. #[derive(Clone)] -pub(crate) struct TestBlock { +pub struct TestBlock { block: BlockV1, } -#[cfg(test)] impl TestBlock { - pub(crate) fn new(round: Round, author: u32) -> Self { + pub fn new(round: Round, author: u32) -> Self { Self { block: BlockV1 { round, @@ -562,42 +561,42 @@ impl TestBlock { } } - pub(crate) fn set_epoch(mut self, epoch: Epoch) -> Self { + pub fn set_epoch(mut self, epoch: Epoch) -> Self { self.block.epoch = epoch; self } - pub(crate) fn set_round(mut self, round: Round) -> Self { + pub fn set_round(mut self, round: Round) -> Self { self.block.round = round; self } - pub(crate) fn set_author(mut self, author: AuthorityIndex) -> Self { + pub fn set_author(mut self, author: AuthorityIndex) -> Self { self.block.author = author; self } - pub(crate) fn set_timestamp_ms(mut self, timestamp_ms: BlockTimestampMs) -> Self { + pub fn set_timestamp_ms(mut self, timestamp_ms: BlockTimestampMs) -> Self { self.block.timestamp_ms = timestamp_ms; self } - pub(crate) fn set_ancestors(mut self, ancestors: Vec) -> Self { + pub fn set_ancestors(mut self, ancestors: Vec) -> Self { self.block.ancestors = ancestors; self } - pub(crate) fn set_transactions(mut self, transactions: Vec) -> Self { + pub fn set_transactions(mut self, transactions: Vec) -> Self { self.block.transactions = transactions; self } - pub(crate) fn set_commit_votes(mut self, commit_votes: Vec) -> Self { + pub fn set_commit_votes(mut self, commit_votes: Vec) -> Self { self.block.commit_votes = commit_votes; self } - pub(crate) fn build(self) -> Block { + pub fn build(self) -> Block { Block::V1(self.block) } } diff --git a/consensus/core/src/block_verifier.rs b/consensus/core/src/block_verifier.rs index 2e876e9130f0b..0c0387d536112 100644 --- a/consensus/core/src/block_verifier.rs +++ b/consensus/core/src/block_verifier.rs @@ -145,9 +145,7 @@ impl BlockVerifier for SignedBlockVerifier { let batch: Vec<_> = block.transactions().iter().map(|t| t.data()).collect(); let max_transaction_size_limit = - self.context - .protocol_config - .consensus_max_transaction_size_bytes() as usize; + self.context.protocol_config.max_transaction_size_bytes() as usize; for t in &batch { if t.len() > max_transaction_size_limit && max_transaction_size_limit > 0 { return Err(ConsensusError::TransactionTooLarge { @@ -166,10 +164,10 @@ impl BlockVerifier for SignedBlockVerifier { }); } - let total_transactions_size_limit = - self.context - .protocol_config - .consensus_max_transactions_in_block_bytes() as usize; + let total_transactions_size_limit = self + .context + .protocol_config + .max_transactions_in_block_bytes() as usize; if batch.iter().map(|t| t.len()).sum::() > total_transactions_size_limit && total_transactions_size_limit > 0 { @@ -180,7 +178,7 @@ impl BlockVerifier for SignedBlockVerifier { } self.transaction_verifier - .verify_batch(&self.context.protocol_config, &batch) + .verify_batch(&batch) .map_err(|e| ConsensusError::InvalidTransaction(format!("{e:?}"))) } @@ -238,11 +236,7 @@ mod test { impl TransactionVerifier for TxnSizeVerifier { // Fails verification if any transaction is < 4 bytes. - fn verify_batch( - &self, - _protocol_config: &sui_protocol_config::ProtocolConfig, - transactions: &[&[u8]], - ) -> Result<(), ValidationError> { + fn verify_batch(&self, transactions: &[&[u8]]) -> Result<(), ValidationError> { for txn in transactions { if txn.len() < 4 { return Err(ValidationError::InvalidTransaction(format!( diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index f50d5ba469b89..406b482338f91 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -261,7 +261,7 @@ pub struct CommitRef { } impl CommitRef { - pub(crate) fn new(index: CommitIndex, digest: CommitDigest) -> Self { + pub fn new(index: CommitIndex, digest: CommitDigest) -> Self { Self { index, digest } } } @@ -305,8 +305,8 @@ pub struct CommittedSubDag { } impl CommittedSubDag { - /// Create new (empty) sub-dag. - pub(crate) fn new( + /// Creates a new committed sub dag. + pub fn new( leader: BlockRef, blocks: Vec, timestamp_ms: BlockTimestampMs, diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index f9d5f2041b0aa..e8d5fa9276ad9 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -1239,12 +1239,7 @@ mod test { let transaction: String = bcs::from_bytes(transaction.data()).unwrap(); assert_eq!(format!("Transaction {i}"), transaction); } - assert!( - total - <= context - .protocol_config - .consensus_max_transactions_in_block_bytes() - ); + assert!(total <= context.protocol_config.max_transactions_in_block_bytes()); // genesis blocks should be referenced let all_genesis = genesis_blocks(context); diff --git a/consensus/core/src/lib.rs b/consensus/core/src/lib.rs index 729fe63c18153..4c3cc17ca965c 100644 --- a/consensus/core/src/lib.rs +++ b/consensus/core/src/lib.rs @@ -9,8 +9,10 @@ mod block_manager; mod block_verifier; mod broadcaster; mod commit; +mod commit_consumer; mod commit_observer; mod commit_syncer; +mod commit_vote_monitor; mod context; mod core; mod core_thread; @@ -31,8 +33,9 @@ mod threshold_clock; mod transaction; mod universal_committer; -mod commit_consumer; -mod commit_vote_monitor; +#[cfg(test)] +#[path = "tests/randomized_tests.rs"] +mod randomized_tests; #[cfg(test)] mod test_dag; #[cfg(test)] @@ -40,12 +43,12 @@ mod test_dag_builder; #[cfg(test)] mod test_dag_parser; +/// Exported consensus API. pub use authority_node::ConsensusAuthority; pub use block::{BlockAPI, Round}; pub use commit::{CommitDigest, CommitIndex, CommitRef, CommittedSubDag}; pub use commit_consumer::{CommitConsumer, CommitConsumerMonitor}; pub use transaction::{ClientError, TransactionClient, TransactionVerifier, ValidationError}; -#[cfg(test)] -#[path = "tests/randomized_tests.rs"] -mod randomized_tests; +/// Exported API for testing. +pub use block::{TestBlock, Transaction, VerifiedBlock}; diff --git a/consensus/core/src/transaction.rs b/consensus/core/src/transaction.rs index 590e922e5eca0..f0977a5453788 100644 --- a/consensus/core/src/transaction.rs +++ b/consensus/core/src/transaction.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use mysten_metrics::monitored_mpsc::{channel, Receiver, Sender}; -use sui_protocol_config::ProtocolConfig; use tap::tap::TapFallible; use thiserror::Error; use tokio::sync::oneshot; @@ -46,7 +45,7 @@ impl TransactionConsumer { tx_receiver, max_consumed_bytes_per_request: context .protocol_config - .consensus_max_transactions_in_block_bytes(), + .max_transactions_in_block_bytes(), max_consumed_transactions_per_request: context .protocol_config .max_num_transactions_in_block(), @@ -158,9 +157,7 @@ impl TransactionClient { ( Self { sender, - max_transaction_size: context - .protocol_config - .consensus_max_transaction_size_bytes(), + max_transaction_size: context.protocol_config.max_transaction_size_bytes(), }, receiver, ) @@ -215,11 +212,7 @@ impl TransactionClient { /// before acceptance of the block. pub trait TransactionVerifier: Send + Sync + 'static { /// Determines if this batch can be voted on - fn verify_batch( - &self, - protocol_config: &ProtocolConfig, - batch: &[&[u8]], - ) -> Result<(), ValidationError>; + fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>; } #[derive(Debug, Error)] @@ -233,11 +226,7 @@ pub enum ValidationError { pub(crate) struct NoopTransactionVerifier; impl TransactionVerifier for NoopTransactionVerifier { - fn verify_batch( - &self, - _protocol_config: &ProtocolConfig, - _batch: &[&[u8]], - ) -> Result<(), ValidationError> { + fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> { Ok(()) } } @@ -338,14 +327,9 @@ mod tests { // ensure their total size is less than `max_bytes_to_fetch` let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum(); assert!( - total_size - <= context - .protocol_config - .consensus_max_transactions_in_block_bytes(), + total_size <= context.protocol_config.max_transactions_in_block_bytes(), "Should have fetched transactions up to {}", - context - .protocol_config - .consensus_max_transactions_in_block_bytes() + context.protocol_config.max_transactions_in_block_bytes() ); all_transactions.extend(transactions); @@ -356,14 +340,9 @@ mod tests { // ensure their total size is less than `max_bytes_to_fetch` let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum(); assert!( - total_size - <= context - .protocol_config - .consensus_max_transactions_in_block_bytes(), + total_size <= context.protocol_config.max_transactions_in_block_bytes(), "Should have fetched transactions up to {}", - context - .protocol_config - .consensus_max_transactions_in_block_bytes() + context.protocol_config.max_transactions_in_block_bytes() ); all_transactions.extend(transactions); @@ -435,14 +414,9 @@ mod tests { let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum(); assert!( - total_size - <= context - .protocol_config - .consensus_max_transactions_in_block_bytes(), + total_size <= context.protocol_config.max_transactions_in_block_bytes(), "Should have fetched transactions up to {}", - context - .protocol_config - .consensus_max_transactions_in_block_bytes() + context.protocol_config.max_transactions_in_block_bytes() ); all_transactions.extend(transactions); diff --git a/crates/sui-benchmark/Cargo.toml b/crates/sui-benchmark/Cargo.toml index daad4d2158c51..defbfb9d9bdfe 100644 --- a/crates/sui-benchmark/Cargo.toml +++ b/crates/sui-benchmark/Cargo.toml @@ -48,7 +48,6 @@ fastcrypto-zkp.workspace = true move-core-types.workspace = true mysten-metrics.workspace = true -narwhal-node.workspace = true test-cluster.workspace = true sysinfo.workspace = true @@ -58,6 +57,3 @@ sui-framework-snapshot.workspace = true sui-macros.workspace = true sui-simulator.workspace = true typed-store.workspace = true - -[features] -benchmark = ["narwhal-node/benchmark"] diff --git a/crates/sui-core/Cargo.toml b/crates/sui-core/Cargo.toml index 2c596b502a54b..0574e9f570216 100644 --- a/crates/sui-core/Cargo.toml +++ b/crates/sui-core/Cargo.toml @@ -73,12 +73,7 @@ typed-store.workspace = true mysten-metrics.workspace = true narwhal-config.workspace = true narwhal-crypto.workspace = true -narwhal-executor.workspace = true -narwhal-network.workspace = true -narwhal-node.workspace = true -narwhal-test-utils.workspace = true narwhal-types.workspace = true -narwhal-worker.workspace = true shared-crypto.workspace = true sui-archival.workspace = true sui-config.workspace = true diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 9e922112eff57..5980cf4d2702d 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -11,7 +11,6 @@ use fastcrypto_zkp::bn254::zk_login_api::ZkLoginEnv; use futures::future::{join_all, select, Either}; use futures::FutureExt; use itertools::{izip, Itertools}; -use narwhal_executor::ExecutionIndices; use parking_lot::RwLock; use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard}; use serde::{Deserialize, Serialize}; @@ -19,7 +18,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::future::Future; use std::path::{Path, PathBuf}; use std::sync::Arc; -use sui_config::node::{ConsensusProtocol, ExpensiveSafetyCheckConfig}; +use sui_config::node::ExpensiveSafetyCheckConfig; use sui_macros::fail_point_arg; use sui_types::accumulator::Accumulator; use sui_types::authenticator_state::{get_authenticator_state, ActiveJwk}; @@ -66,7 +65,6 @@ use crate::consensus_handler::{ ConsensusCommitInfo, SequencedConsensusTransaction, SequencedConsensusTransactionKey, SequencedConsensusTransactionKind, VerifiedSequencedConsensusTransaction, }; -use crate::consensus_manager::ConsensusManager; use crate::epoch::epoch_metrics::EpochMetrics; use crate::epoch::randomness::{ DkgStatus, RandomnessManager, RandomnessReporter, VersionedProcessedMessage, @@ -234,6 +232,37 @@ impl ConsensusStatsAPI for ConsensusStatsV1 { } } +#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq, Copy)] +pub struct ExecutionIndices { + /// The round number of the last committed leader. + pub last_committed_round: u64, + /// The index of the last sub-DAG that was executed (either fully or partially). + pub sub_dag_index: u64, + /// The index of the last transaction was executed (used for crash-recovery). + pub transaction_index: u64, +} + +impl Ord for ExecutionIndices { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + ( + self.last_committed_round, + self.sub_dag_index, + self.transaction_index, + ) + .cmp(&( + other.last_committed_round, + other.sub_dag_index, + other.transaction_index, + )) + } +} + +impl PartialOrd for ExecutionIndices { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + #[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)] pub struct ExecutionIndicesWithStats { pub index: ExecutionIndices, @@ -1732,14 +1761,8 @@ impl AuthorityPerEpochStore { } fn get_max_accumulated_txn_cost_per_object_in_commit(&self) -> Option { - match ConsensusManager::get_consensus_protocol_in_epoch(self) { - ConsensusProtocol::Narwhal => self - .protocol_config() - .max_accumulated_txn_cost_per_object_in_narwhal_commit_as_option(), - ConsensusProtocol::Mysticeti => self - .protocol_config() - .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option(), - } + self.protocol_config() + .max_accumulated_txn_cost_per_object_in_mysticeti_commit_as_option() } fn should_defer( diff --git a/crates/sui-core/src/authority_server.rs b/crates/sui-core/src/authority_server.rs index cb819f18a1a16..f2414fd0726ae 100644 --- a/crates/sui-core/src/authority_server.rs +++ b/crates/sui-core/src/authority_server.rs @@ -6,7 +6,6 @@ use anyhow::Result; use async_trait::async_trait; use mysten_metrics::histogram::Histogram as MystenHistogram; use mysten_metrics::spawn_monitored_task; -use narwhal_worker::LazyNarwhalClient; use prometheus::{ register_int_counter_vec_with_registry, register_int_counter_with_registry, IntCounter, IntCounterVec, Registry, @@ -46,7 +45,10 @@ use tokio::task::JoinHandle; use tonic::metadata::{Ascii, MetadataValue}; use tracing::{error, error_span, info, Instrument}; -use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; +use crate::{ + authority::authority_per_epoch_store::AuthorityPerEpochStore, + mysticeti_adapter::LazyMysticetiClient, +}; use crate::{ authority::AuthorityState, consensus_adapter::{ConsensusAdapter, ConsensusAdapterMetrics}, @@ -119,9 +121,8 @@ impl AuthorityServer { } pub fn new_for_test(state: Arc) -> Self { - let consensus_address = new_local_tcp_address_for_testing(); let consensus_adapter = Arc::new(ConsensusAdapter::new( - Arc::new(LazyNarwhalClient::new(consensus_address)), + Arc::new(LazyMysticetiClient::new()), state.name, Arc::new(ConnectionMonitorStatusForTests {}), 100_000, diff --git a/crates/sui-core/src/consensus_adapter.rs b/crates/sui-core/src/consensus_adapter.rs index f67388061656c..b165188bca6b5 100644 --- a/crates/sui-core/src/consensus_adapter.rs +++ b/crates/sui-core/src/consensus_adapter.rs @@ -10,7 +10,6 @@ use futures::pin_mut; use futures::FutureExt; use itertools::Itertools; use narwhal_types::{TransactionProto, TransactionsClient}; -use narwhal_worker::LazyNarwhalClient; use parking_lot::RwLockReadGuard; use prometheus::Histogram; use prometheus::HistogramVec; @@ -218,41 +217,6 @@ impl SubmitToConsensus for TransactionsClient, - ) -> SuiResult { - let transactions = transactions - .iter() - .map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail")) - .collect::>(); - // The retrieved LocalNarwhalClient can be from the past epoch. Submit would fail after - // Narwhal shuts down, so there should be no correctness issue. - let client = { - let c = self.client.load(); - if c.is_some() { - c - } else { - self.client.store(Some(self.get().await)); - self.client.load() - } - }; - let client = client.as_ref().unwrap().load(); - client - .submit_transactions(transactions) - .await - .map_err(|e| SuiError::FailedToSubmitToConsensus(format!("{:?}", e))) - .tap_err(|r| { - // Will be logged by caller as well. - warn!("Submit transaction failed with: {:?}", r); - })?; - Ok(()) - } -} - /// Submit Sui certificates to the consensus. pub struct ConsensusAdapter { /// The network client connecting to the consensus node of this authority. @@ -372,7 +336,7 @@ impl ConsensusAdapter { } } debug!( - "Submitting {:?} recovered pending consensus transactions to Narwhal", + "Submitting {:?} recovered pending consensus transactions to consensus", recovered.len() ); for transaction in recovered { @@ -1133,8 +1097,8 @@ mod adapter_tests { use super::position_submit_certificate; use crate::consensus_adapter::{ ConnectionMonitorStatusForTests, ConsensusAdapter, ConsensusAdapterMetrics, - LazyNarwhalClient, }; + use crate::mysticeti_adapter::LazyMysticetiClient; use fastcrypto::traits::KeyPair; use rand::Rng; use rand::{rngs::StdRng, SeedableRng}; @@ -1171,9 +1135,7 @@ mod adapter_tests { // When we define max submit position and delay step let consensus_adapter = ConsensusAdapter::new( - Arc::new(LazyNarwhalClient::new( - "/ip4/127.0.0.1/tcp/0/http".parse().unwrap(), - )), + Arc::new(LazyMysticetiClient::new()), *committee.authority_by_index(0).unwrap(), Arc::new(ConnectionMonitorStatusForTests {}), 100_000, @@ -1203,9 +1165,7 @@ mod adapter_tests { // Without submit position and delay step let consensus_adapter = ConsensusAdapter::new( - Arc::new(LazyNarwhalClient::new( - "/ip4/127.0.0.1/tcp/0/http".parse().unwrap(), - )), + Arc::new(LazyMysticetiClient::new()), *committee.authority_by_index(0).unwrap(), Arc::new(ConnectionMonitorStatusForTests {}), 100_000, diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 0897ed724e03f..9fd2d951e09ff 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -9,13 +9,10 @@ use std::{ }; use arc_swap::ArcSwap; -use async_trait::async_trait; use consensus_core::CommitConsumerMonitor; use lru::LruCache; use mysten_metrics::{monitored_mpsc::UnboundedReceiver, monitored_scope, spawn_monitored_task}; use narwhal_config::Committee; -use narwhal_executor::{ExecutionIndices, ExecutionState}; -use narwhal_types::ConsensusOutput; use serde::{Deserialize, Serialize}; use sui_macros::{fail_point_async, fail_point_if}; use sui_protocol_config::ProtocolConfig; @@ -33,7 +30,8 @@ use tracing::{debug, error, info, instrument, trace_span, warn}; use crate::{ authority::{ authority_per_epoch_store::{ - AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndicesWithStats, + AuthorityPerEpochStore, ConsensusStats, ConsensusStatsAPI, ExecutionIndices, + ExecutionIndicesWithStats, }, epoch_start_configuration::EpochStartConfigTrait, AuthorityMetrics, AuthorityState, @@ -167,6 +165,11 @@ impl ConsensusHandler { } } + /// Returns the last subdag index processed by the handler. + pub fn last_processed_subdag_index(&self) -> u64 { + self.last_consensus_stats.index.sub_dag_index + } + /// Updates the execution indexes based on the provided input. fn update_index_and_hash(&mut self, index: ExecutionIndices, v: &[u8]) { update_index_and_hash(&mut self.last_consensus_stats, index, v) @@ -199,31 +202,9 @@ fn update_index_and_hash( last_consensus_stats.hash = hash; } -#[async_trait] -impl ExecutionState for ConsensusHandler { - /// This function gets called by the consensus for each consensus commit. - #[instrument(level = "debug", skip_all)] - async fn handle_consensus_output(&mut self, consensus_output: ConsensusOutput) { - let _scope = monitored_scope("HandleConsensusOutput"); - self.handle_consensus_output_internal(consensus_output) - .await; - } - - fn last_executed_sub_dag_round(&self) -> u64 { - self.last_consensus_stats.index.last_committed_round - } - - fn last_executed_sub_dag_index(&self) -> u64 { - self.last_consensus_stats.index.sub_dag_index - } -} - impl ConsensusHandler { #[instrument(level = "debug", skip_all)] - async fn handle_consensus_output_internal( - &mut self, - consensus_output: impl ConsensusOutputAPI, - ) { + async fn handle_consensus_output(&mut self, consensus_output: impl ConsensusOutputAPI) { // This code no longer supports old protocol versions. assert!(self .epoch_store @@ -242,7 +223,7 @@ impl ConsensusHandler { // It is critical that the writes done by this function are atomic - otherwise we can // lose the later parts of a commit if we restart midway through processing it. warn!( - "Ignoring consensus output for round {} as it is already committed. NOTE: This is only expected if Narwhal is running.", + "Ignoring consensus output for round {} as it is already committed. NOTE: This is only expected if consensus is running.", round ); return; @@ -512,7 +493,7 @@ impl MysticetiConsensusHandler { while let Some(consensus_output) = receiver.recv().await { let commit_index = consensus_output.commit_ref.index; consensus_handler - .handle_consensus_output_internal(consensus_output) + .handle_consensus_output(consensus_output) .await; commit_consumer_monitor.set_highest_handled_commit(commit_index); } @@ -872,11 +853,9 @@ impl ConsensusCommitInfo { #[cfg(test)] mod tests { - use std::collections::BTreeSet; - - use narwhal_config::AuthorityIdentifier; - use narwhal_test_utils::latest_protocol_version; - use narwhal_types::{Batch, Certificate, CommittedSubDag, HeaderV1Builder, ReputationScores}; + use consensus_core::{ + BlockAPI, CommitDigest, CommitRef, CommittedSubDag, TestBlock, Transaction, VerifiedBlock, + }; use prometheus::Registry; use sui_protocol_config::ConsensusTransactionOrdering; use sui_types::{ @@ -911,8 +890,6 @@ mod tests { let shared_object = Object::shared_for_testing(); objects.push(shared_object.clone()); - let latest_protocol_config = &latest_protocol_version(); - let network_config = sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir() .with_objects(objects.clone()) @@ -945,59 +922,41 @@ mod tests { // AND // Create test transactions let transactions = test_certificates(&state, shared_object).await; - let mut certificates = Vec::new(); - let mut batches = Vec::new(); + let mut blocks = Vec::new(); - for transaction in transactions.iter() { + for (i, transaction) in transactions.iter().enumerate() { let transaction_bytes: Vec = bcs::to_bytes( &ConsensusTransaction::new_certificate_message(&state.name, transaction.clone()), ) .unwrap(); - let batch = Batch::new(vec![transaction_bytes], latest_protocol_config); - - batches.push(vec![batch.clone()]); - - // AND make batch as part of a commit - let header = HeaderV1Builder::default() - .author(AuthorityIdentifier(0)) - .round(5) - .epoch(0) - .parents(BTreeSet::new()) - .with_payload_batch(batch.clone(), 0, 0) - .build() - .unwrap(); - - let certificate = Certificate::new_unsigned( - latest_protocol_config, - &committee, - header.into(), - vec![], - ) - .unwrap(); + // AND create block for each transaction + let block = VerifiedBlock::new_for_test( + TestBlock::new(100 + i as u32, (i % committee.size()) as u32) + .set_transactions(vec![Transaction::new(transaction_bytes)]) + .build(), + ); - certificates.push(certificate); + blocks.push(block); } // AND create the consensus output - let consensus_output = ConsensusOutput { - sub_dag: Arc::new(CommittedSubDag::new( - certificates.clone(), - certificates[0].clone(), - 10, - ReputationScores::default(), - None, - )), - batches, - }; + let leader_block = blocks[0].clone(); + let committed_sub_dag = CommittedSubDag::new( + leader_block.reference(), + blocks.clone(), + leader_block.timestamp_ms(), + CommitRef::new(10, CommitDigest::MIN), + vec![], + ); // AND processing the consensus output once consensus_handler - .handle_consensus_output(consensus_output.clone()) + .handle_consensus_output(committed_sub_dag.clone()) .await; // AND capturing the consensus stats - let num_certificates = certificates.len(); + let num_blocks = blocks.len(); let num_transactions = transactions.len(); let last_consensus_stats_1 = consensus_handler.last_consensus_stats.clone(); assert_eq!( @@ -1005,11 +964,11 @@ mod tests { num_transactions as u64 ); assert_eq!(last_consensus_stats_1.index.sub_dag_index, 10_u64); - assert_eq!(last_consensus_stats_1.index.last_committed_round, 5_u64); + assert_eq!(last_consensus_stats_1.index.last_committed_round, 100_u64); assert_ne!(last_consensus_stats_1.hash, 0); assert_eq!( last_consensus_stats_1.stats.get_num_messages(0), - num_certificates as u64 + num_blocks as u64 ); assert_eq!( last_consensus_stats_1.stats.get_num_user_transactions(0), @@ -1020,7 +979,7 @@ mod tests { // THEN the consensus stats do not update for _ in 0..2 { consensus_handler - .handle_consensus_output(consensus_output.clone()) + .handle_consensus_output(committed_sub_dag.clone()) .await; let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone(); assert_eq!(last_consensus_stats_1, last_consensus_stats_2); diff --git a/crates/sui-core/src/consensus_manager/mod.rs b/crates/sui-core/src/consensus_manager/mod.rs index dd49d2ac768d3..5e3feba4936d8 100644 --- a/crates/sui-core/src/consensus_manager/mod.rs +++ b/crates/sui-core/src/consensus_manager/mod.rs @@ -4,7 +4,6 @@ use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; use crate::consensus_adapter::SubmitToConsensus; use crate::consensus_handler::ConsensusHandlerInitializer; use crate::consensus_manager::mysticeti_manager::MysticetiManager; -use crate::consensus_manager::narwhal_manager::{NarwhalConfiguration, NarwhalManager}; use crate::consensus_validator::SuiTxValidator; use crate::mysticeti_adapter::LazyMysticetiClient; use arc_swap::ArcSwapOption; @@ -12,23 +11,20 @@ use async_trait::async_trait; use enum_dispatch::enum_dispatch; use fastcrypto::traits::KeyPair as _; use mysten_metrics::RegistryService; -use narwhal_worker::LazyNarwhalClient; use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; use std::path::PathBuf; use std::sync::Arc; use std::time::{Duration, Instant}; -use sui_config::node::ConsensusProtocol; use sui_config::{ConsensusConfig, NodeConfig}; -use sui_protocol_config::{ConsensusChoice, ProtocolVersion}; +use sui_protocol_config::ProtocolVersion; use sui_types::committee::EpochId; use sui_types::error::SuiResult; use sui_types::messages_consensus::ConsensusTransaction; use tokio::sync::{Mutex, MutexGuard}; use tokio::time::{sleep, timeout}; -use tracing::{debug, info}; +use tracing::info; pub mod mysticeti_manager; -pub mod narwhal_manager; #[derive(PartialEq)] pub(crate) enum Running { @@ -56,29 +52,10 @@ pub trait ConsensusManagerTrait { // the ConsensusManagerTrait easier. #[enum_dispatch] enum ProtocolManager { - Narwhal(NarwhalManager), Mysticeti(MysticetiManager), } impl ProtocolManager { - /// Creates a new narwhal manager. - pub fn new_narwhal( - config: &NodeConfig, - consensus_config: &ConsensusConfig, - registry_service: &RegistryService, - metrics: Arc, - ) -> Self { - let narwhal_config = NarwhalConfiguration { - primary_keypair: config.protocol_key_pair().copy(), - network_keypair: config.network_key_pair().copy(), - worker_ids_and_keypairs: vec![(0, config.worker_key_pair().copy())], - storage_base_path: consensus_config.db_path().to_path_buf(), - parameters: consensus_config.narwhal_config().to_owned(), - registry_service: registry_service.clone(), - }; - Self::Narwhal(NarwhalManager::new(narwhal_config, metrics)) - } - /// Creates a new mysticeti manager. pub fn new_mysticeti( config: &NodeConfig, @@ -101,11 +78,9 @@ impl ProtocolManager { /// Used by Sui validator to start consensus protocol for each epoch. pub struct ConsensusManager { consensus_config: ConsensusConfig, - narwhal_manager: ProtocolManager, mysticeti_manager: ProtocolManager, - narwhal_client: Arc, mysticeti_client: Arc, - active: parking_lot::Mutex>, + active: parking_lot::Mutex, consensus_client: Arc, } @@ -119,15 +94,6 @@ impl ConsensusManager { let metrics = Arc::new(ConsensusManagerMetrics::new( ®istry_service.default_registry(), )); - let narwhal_client = Arc::new(LazyNarwhalClient::new( - consensus_config.address().to_owned(), - )); - let narwhal_manager = ProtocolManager::new_narwhal( - node_config, - consensus_config, - registry_service, - metrics.clone(), - ); let mysticeti_client = Arc::new(LazyMysticetiClient::new()); let mysticeti_manager = ProtocolManager::new_mysticeti( node_config, @@ -138,11 +104,9 @@ impl ConsensusManager { ); Self { consensus_config: consensus_config.clone(), - narwhal_manager, mysticeti_manager, - narwhal_client, mysticeti_client, - active: parking_lot::Mutex::new(vec![false; 2]), + active: parking_lot::Mutex::new(false), consensus_client, } } @@ -150,44 +114,6 @@ impl ConsensusManager { pub fn get_storage_base_path(&self) -> PathBuf { self.consensus_config.db_path().to_path_buf() } - - // Picks the consensus protocol based on the protocol config and the epoch. - pub fn get_consensus_protocol_in_epoch( - epoch_store: &AuthorityPerEpochStore, - ) -> ConsensusProtocol { - let protocol_config = epoch_store.protocol_config(); - if protocol_config.version >= ProtocolVersion::new(36) { - if let Ok(consensus_choice) = std::env::var("CONSENSUS") { - match consensus_choice.to_lowercase().as_str() { - "narwhal" => return ConsensusProtocol::Narwhal, - "mysticeti" => return ConsensusProtocol::Mysticeti, - "swap_each_epoch" => { - let protocol = if epoch_store.epoch() % 2 == 0 { - ConsensusProtocol::Narwhal - } else { - ConsensusProtocol::Mysticeti - }; - return protocol; - } - _ => { - debug!("Invalid consensus choice {} in env var. Continue to pick consensus with protocol config", consensus_choice); - } - }; - } - } - - match protocol_config.consensus_choice() { - ConsensusChoice::Narwhal => ConsensusProtocol::Narwhal, - ConsensusChoice::Mysticeti => ConsensusProtocol::Mysticeti, - ConsensusChoice::SwapEachEpoch => { - if epoch_store.epoch() % 2 == 0 { - ConsensusProtocol::Narwhal - } else { - ConsensusProtocol::Mysticeti - } - } - } - } } #[async_trait] @@ -201,26 +127,11 @@ impl ConsensusManagerTrait for ConsensusManager { ) { let protocol_manager = { let mut active = self.active.lock(); - active.iter().enumerate().for_each(|(index, active)| { - assert!( - !*active, - "Cannot start consensus. ConsensusManager protocol {index} is already running" - ); - }); - let protocol = Self::get_consensus_protocol_in_epoch(&epoch_store); - info!("Starting consensus protocol {protocol:?} ..."); - match protocol { - ConsensusProtocol::Narwhal => { - active[0] = true; - self.consensus_client.set(self.narwhal_client.clone()); - &self.narwhal_manager - } - ConsensusProtocol::Mysticeti => { - active[1] = true; - self.consensus_client.set(self.mysticeti_client.clone()); - &self.mysticeti_manager - } - } + assert!(!*active, "Cannot start consensus. It is already running!"); + info!("Starting consensus ..."); + *active = true; + self.consensus_client.set(self.mysticeti_client.clone()); + &self.mysticeti_manager }; protocol_manager @@ -234,14 +145,12 @@ impl ConsensusManagerTrait for ConsensusManager { } async fn shutdown(&self) { + info!("Shutting down consensus ..."); let prev_active = { let mut active = self.active.lock(); - std::mem::replace(&mut *active, vec![false; 2]) + std::mem::replace(&mut *active, false) }; - if prev_active[0] { - self.narwhal_manager.shutdown().await; - } - if prev_active[1] { + if prev_active { self.mysticeti_manager.shutdown().await; } self.consensus_client.clear(); @@ -249,7 +158,7 @@ impl ConsensusManagerTrait for ConsensusManager { async fn is_running(&self) -> bool { let active = self.active.lock(); - active.iter().any(|i| *i) + *active } } @@ -313,8 +222,6 @@ impl SubmitToConsensus for ConsensusClient { pub struct ConsensusManagerMetrics { start_latency: IntGauge, shutdown_latency: IntGauge, - start_primary_retries: IntGauge, - start_worker_retries: IntGauge, } impl ConsensusManagerMetrics { @@ -332,18 +239,6 @@ impl ConsensusManagerMetrics { registry, ) .unwrap(), - start_primary_retries: register_int_gauge_with_registry!( - "narwhal_manager_start_primary_retries", - "The number of retries took to start narwhal primary node", - registry - ) - .unwrap(), - start_worker_retries: register_int_gauge_with_registry!( - "narwhal_manager_start_worker_retries", - "The number of retries took to start narwhal worker node", - registry - ) - .unwrap(), } } } @@ -392,7 +287,7 @@ impl<'a> RunningLockGuard<'a> { "Shutting down consensus for epoch {epoch:?} & protocol version {version:?}" ); } else { - tracing::warn!("Consensus shutdown was called but Narwhal node is not running"); + tracing::warn!("Consensus shutdown was called but consensus is not running"); return None; } diff --git a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs index 6a0304f06b2d0..9694a53881285 100644 --- a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs @@ -8,7 +8,6 @@ use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority}; use fastcrypto::ed25519; use mysten_metrics::{monitored_mpsc::unbounded_channel, RegistryID, RegistryService}; -use narwhal_executor::ExecutionState; use prometheus::Registry; use sui_config::NodeConfig; use sui_protocol_config::ConsensusNetwork; @@ -150,8 +149,7 @@ impl ConsensusManagerTrait for MysticetiManager { let consensus_handler = consensus_handler_initializer.new_consensus_handler(); let consumer = CommitConsumer::new( commit_sender, - // TODO(mysticeti): remove dependency on narwhal executor - consensus_handler.last_executed_sub_dag_index() as CommitIndex, + consensus_handler.last_processed_subdag_index() as CommitIndex, ); let monitor = consumer.monitor(); diff --git a/crates/sui-core/src/consensus_manager/narwhal_manager.rs b/crates/sui-core/src/consensus_manager/narwhal_manager.rs deleted file mode 100644 index 252d52e7b64a7..0000000000000 --- a/crates/sui-core/src/consensus_manager/narwhal_manager.rs +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 -use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore; -use crate::consensus_handler::ConsensusHandlerInitializer; -use crate::consensus_manager::{ - ConsensusManagerMetrics, ConsensusManagerTrait, Running, RunningLockGuard, -}; -use crate::consensus_validator::SuiTxValidator; -use async_trait::async_trait; -use fastcrypto::traits::KeyPair; -use mysten_metrics::RegistryService; -use narwhal_config::{Parameters, WorkerId}; -use narwhal_network::client::NetworkClient; -use narwhal_node::primary_node::PrimaryNode; -use narwhal_node::worker_node::WorkerNodes; -use narwhal_node::{CertificateStoreCacheMetrics, NodeStorage}; -use std::path::PathBuf; -use std::sync::Arc; -use sui_config::NodeConfig; -use sui_types::committee::EpochId; -use sui_types::crypto::{AuthorityKeyPair, NetworkKeyPair}; -use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait; -use tokio::sync::Mutex; - -#[cfg(test)] -#[path = "../unit_tests/narwhal_manager_tests.rs"] -pub mod narwhal_manager_tests; - -pub struct NarwhalConfiguration { - pub primary_keypair: AuthorityKeyPair, - pub network_keypair: NetworkKeyPair, - pub worker_ids_and_keypairs: Vec<(WorkerId, NetworkKeyPair)>, - - pub storage_base_path: PathBuf, - pub parameters: Parameters, - pub registry_service: RegistryService, -} - -pub struct NarwhalManager { - primary_keypair: AuthorityKeyPair, - network_keypair: NetworkKeyPair, - worker_ids_and_keypairs: Vec<(WorkerId, NetworkKeyPair)>, - primary_node: PrimaryNode, - worker_nodes: WorkerNodes, - storage_base_path: PathBuf, - running: Mutex, - metrics: Arc, - registry_service: RegistryService, - // NarwhalManager creates and unregisters the store cache metrics. - store_cache_metrics: parking_lot::Mutex>>, -} - -impl NarwhalManager { - pub fn new(config: NarwhalConfiguration, metrics: Arc) -> Self { - // Create the Narwhal Primary with configuration - let primary_node = - PrimaryNode::new(config.parameters.clone(), config.registry_service.clone()); - - // Create Narwhal Workers with configuration - let worker_nodes = - WorkerNodes::new(config.registry_service.clone(), config.parameters.clone()); - - Self { - primary_node, - worker_nodes, - primary_keypair: config.primary_keypair, - network_keypair: config.network_keypair, - worker_ids_and_keypairs: config.worker_ids_and_keypairs, - storage_base_path: config.storage_base_path, - running: Mutex::new(Running::False), - metrics, - registry_service: config.registry_service.clone(), - store_cache_metrics: parking_lot::Mutex::new(None), - } - } - - fn get_store_path(&self, epoch: EpochId) -> PathBuf { - let mut store_path = self.storage_base_path.clone(); - store_path.push(format!("{}", epoch)); - store_path - } -} - -#[async_trait] -impl ConsensusManagerTrait for NarwhalManager { - // Starts the Narwhal (primary & worker(s)) - if not already running. - // Note: After a binary is updated with the new protocol version and the node - // is restarted, the protocol config does not take effect until we have a quorum - // of validators have updated the binary. Because of this the protocol upgrade - // will happen in the following epoch after quorum is reached. In this case NarwhalManager - // is not recreated which is why we pass protocol config in at start and not at creation. - // To ensure correct behavior an updated protocol config must be passed in at the - // start of EACH epoch. - async fn start( - &self, - config: &NodeConfig, - epoch_store: Arc, - consensus_handler_initializer: ConsensusHandlerInitializer, - tx_validator: SuiTxValidator, - ) { - let system_state = epoch_store.epoch_start_state(); - let epoch = epoch_store.epoch(); - let committee = system_state.get_narwhal_committee(); - let protocol_config = epoch_store.protocol_config(); - - let Some(_guard) = RunningLockGuard::acquire_start( - &self.metrics, - &self.running, - epoch, - protocol_config.version, - ) - .await - else { - return; - }; - - let transactions_addr = &config - .consensus_config - .as_ref() - .expect("Validator is missing consensus config") - .address; - let worker_cache = system_state.get_narwhal_worker_cache(transactions_addr); - - // Register store cache metrics. - let store_cache_metrics = Arc::new(CertificateStoreCacheMetrics::new( - self.registry_service.clone(), - )); - *self.store_cache_metrics.lock() = Some(store_cache_metrics.clone()); - - // Create a new store - let store_path = self.get_store_path(epoch); - let store = NodeStorage::reopen(store_path, Some(store_cache_metrics)); - - // Create a new client. - let network_client = NetworkClient::new_from_keypair(&self.network_keypair); - - let name = self.primary_keypair.public().clone(); - - // start primary - const MAX_PRIMARY_RETRIES: u32 = 2; - let mut primary_retries = 0; - loop { - match self - .primary_node - .start( - self.primary_keypair.copy(), - self.network_keypair.copy(), - committee.clone(), - protocol_config.clone(), - worker_cache.clone(), - network_client.clone(), - &store, - consensus_handler_initializer.new_consensus_handler(), - ) - .await - { - Ok(_) => { - break; - } - Err(e) => { - primary_retries += 1; - if primary_retries >= MAX_PRIMARY_RETRIES { - panic!("Unable to start Narwhal Primary: {:?}", e); - } - tracing::error!("Unable to start Narwhal Primary: {:?}, retrying", e); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - } - } - - // Start Narwhal Workers with configuration - const MAX_WORKER_RETRIES: u32 = 2; - let mut worker_retries = 0; - loop { - // Copy the config for this iteration of the loop - let id_keypair_copy = self - .worker_ids_and_keypairs - .iter() - .map(|(id, keypair)| (*id, keypair.copy())) - .collect(); - - match self - .worker_nodes - .start( - name.clone(), - id_keypair_copy, - committee.clone(), - protocol_config.clone(), - worker_cache.clone(), - network_client.clone(), - &store, - tx_validator.clone(), - ) - .await - { - Ok(_) => { - break; - } - Err(e) => { - worker_retries += 1; - if worker_retries >= MAX_WORKER_RETRIES { - panic!("Unable to start Narwhal Worker: {:?}", e); - } - tracing::error!("Unable to start Narwhal Worker: {:?}, retrying", e); - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - } - } - - self.metrics - .start_primary_retries - .set(primary_retries as i64); - self.metrics.start_worker_retries.set(worker_retries as i64); - } - - // Shuts down whole Narwhal (primary & worker(s)) and waits until nodes have shutdown. - async fn shutdown(&self) { - let Some(_guard) = RunningLockGuard::acquire_shutdown(&self.metrics, &self.running).await - else { - return; - }; - - self.primary_node.shutdown().await; - self.worker_nodes.shutdown().await; - if let Some(store_cache_metrics) = self.store_cache_metrics.lock().take() { - store_cache_metrics.unregister(); - } - } - - async fn is_running(&self) -> bool { - let running = self.running.lock().await; - Running::False != *running - } -} diff --git a/crates/sui-core/src/consensus_types/consensus_output_api.rs b/crates/sui-core/src/consensus_types/consensus_output_api.rs index c04d73fa1af72..d75655ed754ac 100644 --- a/crates/sui-core/src/consensus_types/consensus_output_api.rs +++ b/crates/sui-core/src/consensus_types/consensus_output_api.rs @@ -82,7 +82,7 @@ impl ConsensusOutputAPI for narwhal_types::ConsensusOutput { ) { Ok(transaction) => transaction, Err(err) => { - // This should have been prevented by Narwhal batch verification. + // This should have been prevented by transaction verifications in consensus. panic!( "Unexpected malformed transaction (failed to deserialize): {}\nCertificate={:?} BatchDigest={:?} Transaction={:?}", err, cert, digest, serialized_transaction diff --git a/crates/sui-core/src/consensus_types/mod.rs b/crates/sui-core/src/consensus_types/mod.rs index 1bb7add7abcdf..65870c6dd7064 100644 --- a/crates/sui-core/src/consensus_types/mod.rs +++ b/crates/sui-core/src/consensus_types/mod.rs @@ -5,6 +5,5 @@ pub(crate) mod committee_api; pub(crate) mod consensus_output_api; /// An unique integer ID for a validator used by consensus. -/// In Narwhal, this is the inner value of the `AuthorityIdentifier` type. /// In Mysticeti, this is used the same way as the AuthorityIndex type there. pub type AuthorityIndex = u32; diff --git a/crates/sui-core/src/consensus_validator.rs b/crates/sui-core/src/consensus_validator.rs index b56236ed7e3f9..85d423b25be84 100644 --- a/crates/sui-core/src/consensus_validator.rs +++ b/crates/sui-core/src/consensus_validator.rs @@ -7,10 +7,7 @@ use consensus_core::{TransactionVerifier, ValidationError}; use eyre::WrapErr; use fastcrypto_tbls::dkg; use mysten_metrics::monitored_scope; -use narwhal_types::{validate_batch_version, BatchAPI}; -use narwhal_worker::TransactionValidator; use prometheus::{register_int_counter_with_registry, IntCounter, Registry}; -use sui_protocol_config::ProtocolConfig; use sui_types::{ error::SuiError, messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}, @@ -135,41 +132,8 @@ fn tx_from_bytes(tx: &[u8]) -> Result { .wrap_err("Malformed transaction (failed to deserialize)") } -impl TransactionValidator for SuiTxValidator { - type Error = eyre::Report; - - fn validate(&self, _tx: &[u8]) -> Result<(), Self::Error> { - // We only accept transactions from local sui instance so no need to re-verify it - Ok(()) - } - - fn validate_batch( - &self, - b: &narwhal_types::Batch, - protocol_config: &ProtocolConfig, - ) -> Result<(), Self::Error> { - let _scope = monitored_scope("ValidateBatch"); - - // TODO: Remove once we have removed BatchV1 from the codebase. - validate_batch_version(b, protocol_config) - .map_err(|err| eyre::eyre!(format!("Invalid Batch: {err}")))?; - - let txs = b - .transactions() - .iter() - .map(|tx| tx_from_bytes(tx).map(|tx| tx.kind)) - .collect::, _>>()?; - - self.validate_transactions(txs) - } -} - impl TransactionVerifier for SuiTxValidator { - fn verify_batch( - &self, - _protocol_config: &ProtocolConfig, - batch: &[&[u8]], - ) -> Result<(), ValidationError> { + fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError> { let _scope = monitored_scope("ValidateBatch"); let txs = batch @@ -214,9 +178,7 @@ impl SuiTxValidatorMetrics { mod tests { use std::sync::Arc; - use narwhal_test_utils::latest_protocol_version; - use narwhal_types::{Batch, BatchV1}; - use narwhal_worker::TransactionValidator; + use consensus_core::TransactionVerifier as _; use sui_macros::sim_test; use sui_types::{ crypto::Ed25519SuiSignature, messages_consensus::ConsensusTransaction, object::Object, @@ -238,8 +200,6 @@ mod tests { let shared_object = Object::shared_for_testing(); objects.push(shared_object.clone()); - let latest_protocol_config = &latest_protocol_version(); - let network_config = sui_swarm_config::network_config_builder::ConfigBuilder::new_with_temp_dir() .with_objects(objects.clone()) @@ -265,7 +225,7 @@ mod tests { state.transaction_manager().clone(), metrics, ); - let res = validator.validate(&first_transaction_bytes); + let res = validator.verify_batch(&[&first_transaction_bytes]); assert!(res.is_ok(), "{res:?}"); let transaction_bytes: Vec<_> = certificates @@ -276,8 +236,8 @@ mod tests { }) .collect(); - let batch = Batch::new(transaction_bytes, latest_protocol_config); - let res_batch = validator.validate_batch(&batch, latest_protocol_config); + let batch: Vec<_> = transaction_bytes.iter().map(|t| t.as_slice()).collect(); + let res_batch = validator.verify_batch(&batch); assert!(res_batch.is_ok(), "{res_batch:?}"); let bogus_transaction_bytes: Vec<_> = certificates @@ -292,21 +252,11 @@ mod tests { }) .collect(); - let batch = Batch::new(bogus_transaction_bytes, latest_protocol_config); - let res_batch = validator.validate_batch(&batch, latest_protocol_config); - assert!(res_batch.is_err()); - - // TODO: Remove once we have removed BatchV1 from the codebase. - let batch_v1 = Batch::V1(BatchV1::new(vec![])); - - // Case #1: Receive BatchV1 but network has upgraded past v11 so we fail because we expect BatchV2 - let res_batch = validator.validate_batch(&batch_v1, latest_protocol_config); + let batch: Vec<_> = bogus_transaction_bytes + .iter() + .map(|t| t.as_slice()) + .collect(); + let res_batch = validator.verify_batch(&batch); assert!(res_batch.is_err()); - - let batch_v2 = Batch::new(vec![], latest_protocol_config); - - // Case #2: Receive BatchV2 and network is upgraded past v11 so we are okay - let res_batch = validator.validate_batch(&batch_v2, latest_protocol_config); - assert!(res_batch.is_ok()); } } diff --git a/crates/sui-core/src/mysticeti_adapter.rs b/crates/sui-core/src/mysticeti_adapter.rs index 48bbb39b77a71..d5947f039fb8a 100644 --- a/crates/sui-core/src/mysticeti_adapter.rs +++ b/crates/sui-core/src/mysticeti_adapter.rs @@ -21,6 +21,7 @@ use crate::{ /// Gets a client to submit transactions to Mysticeti, or waits for one to be available. /// This hides the complexities of async consensus initialization and submitting to different /// instances of consensus across epochs. +// TODO: rename to LazyConsensusClient? #[derive(Default, Clone)] pub struct LazyMysticetiClient { client: Arc>, diff --git a/crates/sui-core/src/transaction_manager.rs b/crates/sui-core/src/transaction_manager.rs index d7796b69edbc2..368e259c2dbcd 100644 --- a/crates/sui-core/src/transaction_manager.rs +++ b/crates/sui-core/src/transaction_manager.rs @@ -43,7 +43,7 @@ const MIN_HASHMAP_CAPACITY: usize = 1000; /// TransactionManager is responsible for managing object dependencies of pending transactions, /// and publishing a stream of certified transactions (certificates) ready to execute. -/// It receives certificates from Narwhal, validator RPC handlers, and checkpoint executor. +/// It receives certificates from consensus, validator RPC handlers, and checkpoint executor. /// Execution driver subscribes to the stream of ready certificates from TransactionManager, and /// executes them in parallel. /// The actual execution logic is inside AuthorityState. After a transaction commits and updates diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index f56e373ec8c54..00e8a77436624 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -7,21 +7,47 @@ use fastcrypto::traits::KeyPair; use mysten_metrics::RegistryService; use prometheus::Registry; use sui_swarm_config::network_config_builder::ConfigBuilder; -use tokio::time::sleep; +use sui_types::messages_checkpoint::{ + CertifiedCheckpointSummary, CheckpointContents, CheckpointSummary, +}; +use tokio::{sync::mpsc, time::sleep}; use crate::{ - authority::test_authority_builder::TestAuthorityBuilder, - checkpoints::CheckpointServiceNoop, + authority::{test_authority_builder::TestAuthorityBuilder, AuthorityState}, + checkpoints::{CheckpointMetrics, CheckpointService, CheckpointServiceNoop}, consensus_handler::ConsensusHandlerInitializer, consensus_manager::{ - mysticeti_manager::MysticetiManager, - narwhal_manager::narwhal_manager_tests::checkpoint_service_for_testing, - ConsensusManagerMetrics, ConsensusManagerTrait, + mysticeti_manager::MysticetiManager, ConsensusManagerMetrics, ConsensusManagerTrait, }, consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics}, mysticeti_adapter::LazyMysticetiClient, + state_accumulator::StateAccumulator, }; +pub fn checkpoint_service_for_testing(state: Arc) -> Arc { + let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10); + let epoch_store = state.epoch_store_for_testing(); + let accumulator = Arc::new(StateAccumulator::new_for_tests( + state.get_accumulator_store().clone(), + &epoch_store, + )); + let (certified_output, _certified_result) = mpsc::channel::(10); + + let (checkpoint_service, _) = CheckpointService::spawn( + state.clone(), + state.get_checkpoint_store().clone(), + epoch_store.clone(), + state.get_transaction_cache_reader().clone(), + Arc::downgrade(&accumulator), + Box::new(output), + Box::new(certified_output), + CheckpointMetrics::new_for_tests(), + 3, + 100_000, + ); + checkpoint_service +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn test_mysticeti_manager() { // GIVEN diff --git a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs deleted file mode 100644 index 5f6e52ebfd15e..0000000000000 --- a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs +++ /dev/null @@ -1,249 +0,0 @@ -// Copyright (c) Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -use crate::authority::test_authority_builder::TestAuthorityBuilder; -use crate::authority::AuthorityState; -use crate::checkpoints::{CheckpointMetrics, CheckpointService, CheckpointServiceNoop}; -use crate::consensus_handler::ConsensusHandlerInitializer; -use crate::consensus_manager::narwhal_manager::{NarwhalConfiguration, NarwhalManager}; -use crate::consensus_manager::{ConsensusManagerMetrics, ConsensusManagerTrait}; -use crate::consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics}; -use crate::state_accumulator::StateAccumulator; -use bytes::Bytes; -use fastcrypto::bls12381; -use fastcrypto::traits::KeyPair; -use mysten_metrics::RegistryService; -use narwhal_config::{Epoch, WorkerCache}; -use narwhal_types::{TransactionProto, TransactionsClient}; -use prometheus::Registry; -use std::sync::Arc; -use std::time::Duration; -use sui_swarm_config::network_config_builder::ConfigBuilder; -use sui_types::messages_checkpoint::{ - CertifiedCheckpointSummary, CheckpointContents, CheckpointSummary, -}; -use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait; -use sui_types::sui_system_state::SuiSystemStateTrait; -use tokio::sync::{broadcast, mpsc}; -use tokio::time::{interval, sleep}; - -async fn send_transactions( - name: &bls12381::min_sig::BLS12381PublicKey, - worker_cache: WorkerCache, - epoch: Epoch, - mut rx_shutdown: broadcast::Receiver<()>, -) { - let target = worker_cache - .worker(name, /* id */ &0) - .expect("Our key or worker id is not in the worker cache") - .transactions; - let config = mysten_network::config::Config::new(); - let channel = config.connect_lazy(&target).unwrap(); - let mut client = TransactionsClient::new(channel); - // Make a transaction to submit forever. - let tx = TransactionProto { - transactions: vec![Bytes::from(epoch.to_be_bytes().to_vec())], - }; - // Repeatedly send transactions. - let interval = interval(Duration::from_millis(1)); - - tokio::pin!(interval); - let mut succeeded_once = false; - loop { - tokio::select! { - _ = interval.tick() => { - // Send a transactions. - let result = client.submit_transaction(tx.clone()).await; - if result.is_ok() { - succeeded_once = true; - } - - }, - _ = rx_shutdown.recv() => { - break - } - } - } - assert!(succeeded_once); -} - -pub fn checkpoint_service_for_testing(state: Arc) -> Arc { - let (output, _result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10); - let epoch_store = state.epoch_store_for_testing(); - let accumulator = Arc::new(StateAccumulator::new_for_tests( - state.get_accumulator_store().clone(), - &epoch_store, - )); - let (certified_output, _certified_result) = mpsc::channel::(10); - - let (checkpoint_service, _) = CheckpointService::spawn( - state.clone(), - state.get_checkpoint_store().clone(), - epoch_store.clone(), - state.get_transaction_cache_reader().clone(), - Arc::downgrade(&accumulator), - Box::new(output), - Box::new(certified_output), - CheckpointMetrics::new_for_tests(), - 3, - 100_000, - ); - checkpoint_service -} - -#[tokio::test(flavor = "current_thread", start_paused = true)] -async fn test_narwhal_manager() { - let configs = ConfigBuilder::new_with_temp_dir() - .committee_size(1.try_into().unwrap()) - .build(); - let mut narwhal_managers = Vec::new(); - let mut shutdown_senders = Vec::new(); - - for config in configs.validator_configs() { - let consensus_config = config.consensus_config().unwrap(); - let registry_service = RegistryService::new(Registry::new()); - let secret = Arc::pin(config.protocol_key_pair().copy()); - let genesis = config.genesis().unwrap(); - - let state = TestAuthorityBuilder::new() - .with_genesis_and_keypair(genesis, &secret) - .build() - .await; - - let system_state = state - .get_sui_system_state_object_for_testing() - .expect("Reading Sui system state object cannot fail") - .into_epoch_start_state(); - - let transactions_addr = &config.consensus_config.as_ref().unwrap().address; - let narwhal_committee = system_state.get_narwhal_committee(); - let worker_cache = system_state.get_narwhal_worker_cache(transactions_addr); - - let narwhal_config = NarwhalConfiguration { - primary_keypair: config.protocol_key_pair().copy(), - network_keypair: config.network_key_pair().copy(), - worker_ids_and_keypairs: vec![(0, config.worker_key_pair().copy())], - storage_base_path: consensus_config.db_path().to_path_buf(), - parameters: consensus_config.narwhal_config().to_owned(), - registry_service, - }; - - let metrics = Arc::new(ConsensusManagerMetrics::new(&Registry::new())); - let epoch_store = state.epoch_store_for_testing(); - - let narwhal_manager = NarwhalManager::new(narwhal_config, metrics); - - let consensus_handler_initializer = ConsensusHandlerInitializer::new_for_testing( - state.clone(), - checkpoint_service_for_testing(state.clone()), - ); - - // start narwhal - narwhal_manager - .start( - config, - epoch_store.clone(), - consensus_handler_initializer, - SuiTxValidator::new( - epoch_store.clone(), - Arc::new(CheckpointServiceNoop {}), - state.transaction_manager().clone(), - SuiTxValidatorMetrics::new(&Registry::new()), - ), - ) - .await; - - assert!(narwhal_manager.is_running().await); - - let name = config.protocol_key_pair().public().clone(); - narwhal_managers.push(( - narwhal_manager, - state, - transactions_addr.clone(), - name.clone(), - )); - - // Send some transactions - let (tx_shutdown, rx_shutdown) = broadcast::channel(1); - tokio::spawn(async move { - send_transactions( - &name, - worker_cache.clone(), - narwhal_committee.epoch(), - rx_shutdown, - ) - .await - }); - shutdown_senders.push(tx_shutdown); - } - - sleep(Duration::from_secs(1)).await; - for tr_shutdown in shutdown_senders { - _ = tr_shutdown.send(()); - } - let mut shutdown_senders = Vec::new(); - - for ((narwhal_manager, state, transactions_addr, name), config) in narwhal_managers - .into_iter() - .zip(configs.validator_configs()) - { - // stop narwhal instance - narwhal_manager.shutdown().await; - - // ensure that no primary or worker node is running - assert!(!narwhal_manager.is_running().await); - assert!(!narwhal_manager.primary_node.is_running().await); - assert!(narwhal_manager - .worker_nodes - .workers_running() - .await - .is_empty()); - - let system_state = state - .get_sui_system_state_object_for_testing() - .expect("Reading Sui system state object cannot fail") - .into_epoch_start_state(); - let narwhal_committee = system_state.get_narwhal_committee(); - let worker_cache = system_state.get_narwhal_worker_cache(&transactions_addr); - - let epoch_store = state.epoch_store_for_testing(); - - let consensus_handler_initializer = ConsensusHandlerInitializer::new_for_testing( - state.clone(), - checkpoint_service_for_testing(state.clone()), - ); - - // start narwhal with advanced epoch - narwhal_manager - .start( - config, - epoch_store.clone(), - consensus_handler_initializer, - SuiTxValidator::new( - epoch_store.clone(), - Arc::new(CheckpointServiceNoop {}), - state.transaction_manager().clone(), - SuiTxValidatorMetrics::new(&Registry::new()), - ), - ) - .await; - - // Send some transactions - let (tr_shutdown, rx_shutdown) = broadcast::channel(1); - tokio::spawn(async move { - send_transactions( - &name, - worker_cache.clone(), - narwhal_committee.epoch(), - rx_shutdown, - ) - .await - }); - - shutdown_senders.push(tr_shutdown); - } - sleep(Duration::from_secs(5)).await; - for tr_shutdown in shutdown_senders { - _ = tr_shutdown.send(()); - } -} diff --git a/crates/sui-node/Cargo.toml b/crates/sui-node/Cargo.toml index 1e3449067aaf3..4d46c42852e5a 100644 --- a/crates/sui-node/Cargo.toml +++ b/crates/sui-node/Cargo.toml @@ -50,7 +50,6 @@ mysten-metrics.workspace = true mysten-service.workspace = true mysten-common.workspace = true narwhal-network.workspace = true -narwhal-worker.workspace = true typed-store.workspace = true mysten-network.workspace = true telemetry-subscribers.workspace = true diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index 35ec99261a5e4..0f6832b9ea481 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -1537,6 +1537,18 @@ impl ProtocolConfig { self.feature_flags.authority_capabilities_v2 } + pub fn max_transaction_size_bytes(&self) -> u64 { + // Provide a default value if protocol config version is too low. + self.consensus_max_transaction_size_bytes + .unwrap_or(256 * 1024) + } + + pub fn max_transactions_in_block_bytes(&self) -> u64 { + // Provide a default value if protocol config version is too low. + self.consensus_max_transactions_in_block_bytes + .unwrap_or(512 * 1024) + } + pub fn max_num_transactions_in_block(&self) -> u64 { // 500 is the value used before this field is introduced. self.consensus_max_num_transactions_in_block.unwrap_or(500) diff --git a/crates/sui-tool/Cargo.toml b/crates/sui-tool/Cargo.toml index a19bfaa194b94..cb9bff954df3e 100644 --- a/crates/sui-tool/Cargo.toml +++ b/crates/sui-tool/Cargo.toml @@ -36,8 +36,6 @@ tokio = { workspace = true, features = ["full"] } typed-store.workspace = true fastcrypto.workspace = true -narwhal-storage.workspace = true -narwhal-types.workspace = true sui-config.workspace = true sui-core.workspace = true sui-network.workspace = true diff --git a/crates/sui-tool/README.md b/crates/sui-tool/README.md index d0d63b382a788..7041d44c36999 100644 --- a/crates/sui-tool/README.md +++ b/crates/sui-tool/README.md @@ -12,7 +12,6 @@ cargo run --bin sui-tool -- You can use the anemo CLI tools to ping or call an RPC on an Anemo server. Note that (for now) this uses randomly generated keys, so a server or method that restricts access to allowlisted peers will reject connections from this tool. Anemo networks are identified by a "server name" that the client must match. Server names you may want to use: -- Narwhal primary and worker: `narwhal` - Sui discovery and state sync: `sui` ### ping diff --git a/crates/sui-tool/src/db_tool/mod.rs b/crates/sui-tool/src/db_tool/mod.rs index a05a75a58c915..705eb7feb8df6 100644 --- a/crates/sui-tool/src/db_tool/mod.rs +++ b/crates/sui-tool/src/db_tool/mod.rs @@ -6,7 +6,6 @@ use self::index_search::{search_index, SearchRange}; use crate::db_tool::db_dump::{compact, print_table_metadata, prune_checkpoints, prune_objects}; use anyhow::{anyhow, bail}; use clap::Parser; -use narwhal_storage::NodeStorage; use std::path::{Path, PathBuf}; use sui_core::authority::authority_per_epoch_store::AuthorityEpochTables; use sui_core::authority::authority_store_tables::AuthorityPerpetualTables; @@ -264,15 +263,12 @@ pub fn print_last_consensus_index(path: &Path) -> anyhow::Result<()> { Ok(()) } -pub fn print_consensus_commit(path: &Path, opt: PrintConsensusCommitOptions) -> anyhow::Result<()> { - let consensus_db = NodeStorage::reopen(path, None); - let consensus_commit = consensus_db - .consensus_store - .read_consensus_commit(&opt.seqnum)?; - match consensus_commit { - Some(commit) => println!("Consensus commit at {} is {:?}", opt.seqnum, commit), - None => println!("Consensus commit at {} is not found!", opt.seqnum), - } +// TODO: implement for consensus. +pub fn print_consensus_commit( + _path: &Path, + _opt: PrintConsensusCommitOptions, +) -> anyhow::Result<()> { + println!("Printing consensus commit is unimplemented"); Ok(()) } diff --git a/crates/sui-tool/src/lib.rs b/crates/sui-tool/src/lib.rs index a49ac1b5e9498..c748e6c76f49a 100644 --- a/crates/sui-tool/src/lib.rs +++ b/crates/sui-tool/src/lib.rs @@ -489,58 +489,11 @@ async fn get_object_impl( } pub(crate) fn make_anemo_config() -> anemo_cli::Config { - use narwhal_types::*; use sui_network::discovery::*; use sui_network::state_sync::*; // TODO: implement `ServiceInfo` generation in anemo-build and use here. anemo_cli::Config::new() - // Narwhal primary-to-primary - .add_service( - "PrimaryToPrimary", - anemo_cli::ServiceInfo::new() - .add_method( - "SendCertificate", - anemo_cli::ron_method!( - PrimaryToPrimaryClient, - send_certificate, - SendCertificateRequest - ), - ) - .add_method( - "RequestVote", - anemo_cli::ron_method!( - PrimaryToPrimaryClient, - request_vote, - RequestVoteRequest - ), - ) - .add_method( - "FetchCertificates", - anemo_cli::ron_method!( - PrimaryToPrimaryClient, - fetch_certificates, - FetchCertificatesRequest - ), - ), - ) - // Narwhal worker-to-worker - .add_service( - "WorkerToWorker", - anemo_cli::ServiceInfo::new() - .add_method( - "ReportBatch", - anemo_cli::ron_method!(WorkerToWorkerClient, report_batch, WorkerBatchMessage), - ) - .add_method( - "RequestBatches", - anemo_cli::ron_method!( - WorkerToWorkerClient, - request_batches, - RequestBatchesRequest - ), - ), - ) // Sui discovery .add_service( "Discovery",