diff --git a/CLI.md b/CLI.md index 5f562828d3a..8f31e8b5745 100644 --- a/CLI.md +++ b/CLI.md @@ -114,6 +114,9 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp Default value: `10` * `--wasm-runtime ` — The WebAssembly runtime to use +* `--max-loaded-chains ` — The maximal number of chains loaded in memory at a given time + + Default value: `40` * `--max-concurrent-queries ` — The maximal number of simultaneous queries to the database * `--max-stream-queries ` — The maximal number of simultaneous stream queries to the database diff --git a/linera-client/src/client_context.rs b/linera-client/src/client_context.rs index 67ee3729c09..f5e3e62ad3c 100644 --- a/linera-client/src/client_context.rs +++ b/linera-client/src/client_context.rs @@ -1,6 +1,8 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +#[cfg(with_testing)] +use std::num::NonZeroUsize; use std::{ collections::{BTreeMap, HashSet}, sync::Arc, @@ -172,6 +174,7 @@ where options.long_lived_services, chain_ids, name, + options.max_loaded_chains, ); ClientContext { @@ -207,7 +210,16 @@ where 1 => format!("Client node for {:.8}", chain_ids[0]), n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1), }; - let client = Client::new(node_provider, storage, 10, delivery, false, chain_ids, name); + let client = Client::new( + node_provider, + storage, + 10, + delivery, + false, + chain_ids, + name, + NonZeroUsize::new(20).expect("Chain worker limit should not be zero"), + ); ClientContext { client: Arc::new(client), diff --git a/linera-client/src/client_options.rs b/linera-client/src/client_options.rs index 26ac6b427b9..814f7a69d84 100644 --- a/linera-client/src/client_options.rs +++ b/linera-client/src/client_options.rs @@ -1,7 +1,13 @@ // Copyright (c) Zefchain Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashSet, env, fmt, iter, num::NonZeroU16, path::PathBuf, time::Duration}; +use std::{ + collections::HashSet, + env, fmt, iter, + num::{NonZeroU16, NonZeroUsize}, + path::PathBuf, + time::Duration, +}; use chrono::{DateTime, Utc}; use linera_base::{ @@ -96,6 +102,10 @@ pub struct ClientOptions { #[arg(long)] pub wasm_runtime: Option, + /// The maximal number of chains loaded in memory at a given time. + #[arg(long, default_value = "40")] + pub max_loaded_chains: NonZeroUsize, + /// The maximal number of simultaneous queries to the database #[arg(long)] pub max_concurrent_queries: Option, diff --git a/linera-client/src/unit_tests/chain_listener.rs b/linera-client/src/unit_tests/chain_listener.rs index 36a79b19f06..31972d33b5f 100644 --- a/linera-client/src/unit_tests/chain_listener.rs +++ b/linera-client/src/unit_tests/chain_listener.rs @@ -3,7 +3,7 @@ #![allow(clippy::large_futures)] -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc}; use async_trait::async_trait; use futures::{lock::Mutex, FutureExt as _}; @@ -134,6 +134,7 @@ async fn test_chain_listener() -> anyhow::Result<()> { false, [chain_id0], format!("Client node for {:.8}", chain_id0), + NonZeroUsize::new(20).expect("Chain worker LRU cache size must be non-zero"), )), }; let key_pair = KeyPair::generate_from(&mut rng); diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index e7f14255f5e..0943d4390cf 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -153,11 +153,6 @@ mod metrics { }); } -/// The number of chain workers that can be in memory at the same time. More workers improve -/// perfomance whenever the client interacts with multiple chains at the same time, but also -/// increases memory usage. -const CHAIN_WORKER_LIMIT: usize = 20; - /// A builder that creates [`ChainClient`]s which share the cache and notifiers. pub struct Client where @@ -184,10 +179,13 @@ where storage: Storage, /// Chain state for the managed chains. chains: DashMap, + /// The maximum active chain workers. + max_loaded_chains: NonZeroUsize, } impl Client { /// Creates a new `Client` with a new cache and notifiers. + #[allow(clippy::too_many_arguments)] #[instrument(level = "trace", skip_all)] pub fn new( validator_node_provider: P, @@ -197,13 +195,14 @@ impl Client { long_lived_services: bool, tracked_chains: impl IntoIterator, name: impl Into, + max_loaded_chains: NonZeroUsize, ) -> Self { let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect())); let state = WorkerState::new_for_client( name.into(), storage.clone(), tracked_chains.clone(), - NonZeroUsize::new(CHAIN_WORKER_LIMIT).expect("Chain worker limit should not be zero"), + max_loaded_chains, ) .with_long_lived_services(long_lived_services) .with_allow_inactive_chains(true) @@ -220,6 +219,7 @@ impl Client { tracked_chains, notifier: Arc::new(ChannelNotifier::default()), storage, + max_loaded_chains, } } @@ -1098,7 +1098,8 @@ where let local_node = self.client.local_node.clone(); let nodes = self.make_nodes(committee)?; let n_validators = nodes.len(); - let chain_worker_count = std::cmp::max(1, CHAIN_WORKER_LIMIT / n_validators); + let chain_worker_count = + std::cmp::max(1, self.client.max_loaded_chains.get() / n_validators); communicate_with_quorum( &nodes, committee, @@ -1135,7 +1136,8 @@ where let local_node = self.client.local_node.clone(); let nodes = self.make_nodes(committee)?; let n_validators = nodes.len(); - let chain_worker_count = std::cmp::max(1, CHAIN_WORKER_LIMIT / n_validators); + let chain_worker_count = + std::cmp::max(1, self.client.max_loaded_chains.get() / n_validators); let ((votes_hash, votes_round), votes) = communicate_with_quorum( &nodes, committee, @@ -1428,7 +1430,7 @@ where // We would like to use all chain workers, but we need to keep some of them free, because // handling the certificates can trigger messages to other chains, and putting these in // the inbox requires the recipient chain's worker, too. - let chain_worker_limit = (CHAIN_WORKER_LIMIT / 2).max(1); + let chain_worker_limit = (self.client.max_loaded_chains.get() / 2).max(1); // Process the certificates sorted by chain and in ascending order of block height. let stream = stream::iter(certificates.into_values().map(|certificates| { @@ -1492,7 +1494,8 @@ where let client = self.clone(); // Proceed to downloading received certificates. Split the available chain workers so that // the tasks don't use more than the limit in total. - let chain_worker_limit = (CHAIN_WORKER_LIMIT / local_committee.validators().len()).max(1); + let chain_worker_limit = + (self.client.max_loaded_chains.get() / local_committee.validators().len()).max(1); let result = communicate_with_quorum( &nodes, &local_committee, @@ -3295,7 +3298,7 @@ where .synchronize_received_certificates_from_validator( chain_id, &remote_node, - CHAIN_WORKER_LIMIT, + self.client.max_loaded_chains.into(), ) .await?; // Process received certificates. If the client state has changed during the diff --git a/linera-core/src/unit_tests/test_utils.rs b/linera-core/src/unit_tests/test_utils.rs index 22d8c722cc9..4e9c1c8c350 100644 --- a/linera-core/src/unit_tests/test_utils.rs +++ b/linera-core/src/unit_tests/test_utils.rs @@ -836,6 +836,7 @@ where false, [chain_id], format!("Client node for {:.8}", chain_id), + NonZeroUsize::new(20).expect("Chain worker limit should not be zero"), )); Ok(builder.create_chain_client( chain_id, diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 0d592ec2ec3..4acbbc0d69b 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -6,7 +6,8 @@ #![deny(clippy::large_futures)] use std::{ - borrow::Cow, collections::HashMap, env, path::PathBuf, process, sync::Arc, time::Instant, + borrow::Cow, collections::HashMap, env, num::NonZeroUsize, path::PathBuf, process, sync::Arc, + time::Instant, }; use anyhow::{anyhow, bail, ensure, Context}; @@ -1156,6 +1157,7 @@ impl Job { false, vec![message_id.chain_id, chain_id], "Temporary client for fetching the parent chain", + NonZeroUsize::new(20).expect("Chain worker limit should not be zero"), ); // Take the latest committee we know of. diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index 0217e774eb9..48be7017b0c 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -45,6 +45,7 @@ struct ServerContext { notification_config: NotificationConfig, shard: Option, grace_period: Duration, + max_loaded_chains: NonZeroUsize, } impl ServerContext { @@ -63,7 +64,7 @@ impl ServerContext { format!("Shard {} @ {}:{}", shard_id, local_ip_addr, shard.port), Some(self.server_config.key.copy()), storage, - NonZeroUsize::new(400).expect("Chain worker limit should not be zero"), + self.max_loaded_chains, ) .with_allow_inactive_chains(false) .with_allow_messages_from_deprecated_epochs(false) @@ -343,6 +344,10 @@ enum ServerCommand { #[arg(long)] wasm_runtime: Option, + /// The maximal number of chains loaded in memory at a given time. + #[arg(long, default_value = "400")] + max_loaded_chains: NonZeroUsize, + /// The maximal number of simultaneous queries to the database #[arg(long)] max_concurrent_queries: Option, @@ -492,6 +497,7 @@ async fn run(options: ServerOptions) { shard, grace_period, wasm_runtime, + max_loaded_chains, max_concurrent_queries, max_stream_queries, cache_size, @@ -516,6 +522,7 @@ async fn run(options: ServerOptions) { notification_config, shard, grace_period, + max_loaded_chains, }; let wasm_runtime = wasm_runtime.with_wasm_default(); let common_config = CommonStoreConfig {