Skip to content

Commit

Permalink
Add CLI option for chain worker LRU cache limit (#2874)
Browse files Browse the repository at this point in the history
* Add server `--max-loaded-chains` CLI option

Allow validators to tweak the maximum of active chains stored in the
worker's chain worker LRU cache.

* Add `max_loaded_chains` field to `Client`

Prepare to make the chain worker LRU cache size configurable.

* Replace `CHAIN_WORKER_LIMIT` with parameter

Make the value configurable in preparation to have the value user
configurable by a command line option.

* Add client `--max-loaded-chains` CLI option

Allow users to tweak the maximum of active chains stored in the node's
chain worker LRU cache.
  • Loading branch information
jvff authored Nov 13, 2024
1 parent 8baa3d2 commit f325fe1
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 16 deletions.
3 changes: 3 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ A Byzantine-fault tolerant sidechain with low-latency finality and high throughp

Default value: `10`
* `--wasm-runtime <WASM_RUNTIME>` — The WebAssembly runtime to use
* `--max-loaded-chains <MAX_LOADED_CHAINS>` — The maximal number of chains loaded in memory at a given time

Default value: `40`
* `--max-concurrent-queries <MAX_CONCURRENT_QUERIES>` — The maximal number of simultaneous queries to the database
* `--max-stream-queries <MAX_STREAM_QUERIES>` — The maximal number of simultaneous stream queries to the database

Expand Down
14 changes: 13 additions & 1 deletion linera-client/src/client_context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#[cfg(with_testing)]
use std::num::NonZeroUsize;
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
Expand Down Expand Up @@ -172,6 +174,7 @@ where
options.long_lived_services,
chain_ids,
name,
options.max_loaded_chains,
);

ClientContext {
Expand Down Expand Up @@ -207,7 +210,16 @@ where
1 => format!("Client node for {:.8}", chain_ids[0]),
n => format!("Client node for {:.8} and {} others", chain_ids[0], n - 1),
};
let client = Client::new(node_provider, storage, 10, delivery, false, chain_ids, name);
let client = Client::new(
node_provider,
storage,
10,
delivery,
false,
chain_ids,
name,
NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
);

ClientContext {
client: Arc::new(client),
Expand Down
12 changes: 11 additions & 1 deletion linera-client/src/client_options.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
// Copyright (c) Zefchain Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::{collections::HashSet, env, fmt, iter, num::NonZeroU16, path::PathBuf, time::Duration};
use std::{
collections::HashSet,
env, fmt, iter,
num::{NonZeroU16, NonZeroUsize},
path::PathBuf,
time::Duration,
};

use chrono::{DateTime, Utc};
use linera_base::{
Expand Down Expand Up @@ -96,6 +102,10 @@ pub struct ClientOptions {
#[arg(long)]
pub wasm_runtime: Option<WasmRuntime>,

/// The maximal number of chains loaded in memory at a given time.
#[arg(long, default_value = "40")]
pub max_loaded_chains: NonZeroUsize,

/// The maximal number of simultaneous queries to the database
#[arg(long)]
pub max_concurrent_queries: Option<usize>,
Expand Down
3 changes: 2 additions & 1 deletion linera-client/src/unit_tests/chain_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

#![allow(clippy::large_futures)]

use std::{collections::BTreeMap, sync::Arc};
use std::{collections::BTreeMap, num::NonZeroUsize, sync::Arc};

use async_trait::async_trait;
use futures::{lock::Mutex, FutureExt as _};
Expand Down Expand Up @@ -134,6 +134,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
false,
[chain_id0],
format!("Client node for {:.8}", chain_id0),
NonZeroUsize::new(20).expect("Chain worker LRU cache size must be non-zero"),
)),
};
let key_pair = KeyPair::generate_from(&mut rng);
Expand Down
25 changes: 14 additions & 11 deletions linera-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ mod metrics {
});
}

/// The number of chain workers that can be in memory at the same time. More workers improve
/// perfomance whenever the client interacts with multiple chains at the same time, but also
/// increases memory usage.
const CHAIN_WORKER_LIMIT: usize = 20;

/// A builder that creates [`ChainClient`]s which share the cache and notifiers.
pub struct Client<ValidatorNodeProvider, Storage>
where
Expand All @@ -184,10 +179,13 @@ where
storage: Storage,
/// Chain state for the managed chains.
chains: DashMap<ChainId, ChainClientState>,
/// The maximum active chain workers.
max_loaded_chains: NonZeroUsize,
}

impl<P, S: Storage + Clone> Client<P, S> {
/// Creates a new `Client` with a new cache and notifiers.
#[allow(clippy::too_many_arguments)]
#[instrument(level = "trace", skip_all)]
pub fn new(
validator_node_provider: P,
Expand All @@ -197,13 +195,14 @@ impl<P, S: Storage + Clone> Client<P, S> {
long_lived_services: bool,
tracked_chains: impl IntoIterator<Item = ChainId>,
name: impl Into<String>,
max_loaded_chains: NonZeroUsize,
) -> Self {
let tracked_chains = Arc::new(RwLock::new(tracked_chains.into_iter().collect()));
let state = WorkerState::new_for_client(
name.into(),
storage.clone(),
tracked_chains.clone(),
NonZeroUsize::new(CHAIN_WORKER_LIMIT).expect("Chain worker limit should not be zero"),
max_loaded_chains,
)
.with_long_lived_services(long_lived_services)
.with_allow_inactive_chains(true)
Expand All @@ -220,6 +219,7 @@ impl<P, S: Storage + Clone> Client<P, S> {
tracked_chains,
notifier: Arc::new(ChannelNotifier::default()),
storage,
max_loaded_chains,
}
}

Expand Down Expand Up @@ -1098,7 +1098,8 @@ where
let local_node = self.client.local_node.clone();
let nodes = self.make_nodes(committee)?;
let n_validators = nodes.len();
let chain_worker_count = std::cmp::max(1, CHAIN_WORKER_LIMIT / n_validators);
let chain_worker_count =
std::cmp::max(1, self.client.max_loaded_chains.get() / n_validators);
communicate_with_quorum(
&nodes,
committee,
Expand Down Expand Up @@ -1135,7 +1136,8 @@ where
let local_node = self.client.local_node.clone();
let nodes = self.make_nodes(committee)?;
let n_validators = nodes.len();
let chain_worker_count = std::cmp::max(1, CHAIN_WORKER_LIMIT / n_validators);
let chain_worker_count =
std::cmp::max(1, self.client.max_loaded_chains.get() / n_validators);
let ((votes_hash, votes_round), votes) = communicate_with_quorum(
&nodes,
committee,
Expand Down Expand Up @@ -1428,7 +1430,7 @@ where
// We would like to use all chain workers, but we need to keep some of them free, because
// handling the certificates can trigger messages to other chains, and putting these in
// the inbox requires the recipient chain's worker, too.
let chain_worker_limit = (CHAIN_WORKER_LIMIT / 2).max(1);
let chain_worker_limit = (self.client.max_loaded_chains.get() / 2).max(1);

// Process the certificates sorted by chain and in ascending order of block height.
let stream = stream::iter(certificates.into_values().map(|certificates| {
Expand Down Expand Up @@ -1492,7 +1494,8 @@ where
let client = self.clone();
// Proceed to downloading received certificates. Split the available chain workers so that
// the tasks don't use more than the limit in total.
let chain_worker_limit = (CHAIN_WORKER_LIMIT / local_committee.validators().len()).max(1);
let chain_worker_limit =
(self.client.max_loaded_chains.get() / local_committee.validators().len()).max(1);
let result = communicate_with_quorum(
&nodes,
&local_committee,
Expand Down Expand Up @@ -3295,7 +3298,7 @@ where
.synchronize_received_certificates_from_validator(
chain_id,
&remote_node,
CHAIN_WORKER_LIMIT,
self.client.max_loaded_chains.into(),
)
.await?;
// Process received certificates. If the client state has changed during the
Expand Down
1 change: 1 addition & 0 deletions linera-core/src/unit_tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ where
false,
[chain_id],
format!("Client node for {:.8}", chain_id),
NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
));
Ok(builder.create_chain_client(
chain_id,
Expand Down
4 changes: 3 additions & 1 deletion linera-service/src/linera/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
#![deny(clippy::large_futures)]

use std::{
borrow::Cow, collections::HashMap, env, path::PathBuf, process, sync::Arc, time::Instant,
borrow::Cow, collections::HashMap, env, num::NonZeroUsize, path::PathBuf, process, sync::Arc,
time::Instant,
};

use anyhow::{anyhow, bail, ensure, Context};
Expand Down Expand Up @@ -1156,6 +1157,7 @@ impl Job {
false,
vec![message_id.chain_id, chain_id],
"Temporary client for fetching the parent chain",
NonZeroUsize::new(20).expect("Chain worker limit should not be zero"),
);

// Take the latest committee we know of.
Expand Down
9 changes: 8 additions & 1 deletion linera-service/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ struct ServerContext {
notification_config: NotificationConfig,
shard: Option<usize>,
grace_period: Duration,
max_loaded_chains: NonZeroUsize,
}

impl ServerContext {
Expand All @@ -63,7 +64,7 @@ impl ServerContext {
format!("Shard {} @ {}:{}", shard_id, local_ip_addr, shard.port),
Some(self.server_config.key.copy()),
storage,
NonZeroUsize::new(400).expect("Chain worker limit should not be zero"),
self.max_loaded_chains,
)
.with_allow_inactive_chains(false)
.with_allow_messages_from_deprecated_epochs(false)
Expand Down Expand Up @@ -343,6 +344,10 @@ enum ServerCommand {
#[arg(long)]
wasm_runtime: Option<WasmRuntime>,

/// The maximal number of chains loaded in memory at a given time.
#[arg(long, default_value = "400")]
max_loaded_chains: NonZeroUsize,

/// The maximal number of simultaneous queries to the database
#[arg(long)]
max_concurrent_queries: Option<usize>,
Expand Down Expand Up @@ -492,6 +497,7 @@ async fn run(options: ServerOptions) {
shard,
grace_period,
wasm_runtime,
max_loaded_chains,
max_concurrent_queries,
max_stream_queries,
cache_size,
Expand All @@ -516,6 +522,7 @@ async fn run(options: ServerOptions) {
notification_config,
shard,
grace_period,
max_loaded_chains,
};
let wasm_runtime = wasm_runtime.with_wasm_default();
let common_config = CommonStoreConfig {
Expand Down

0 comments on commit f325fe1

Please sign in to comment.