Skip to content

Commit

Permalink
New LRU cache implementation (#1738)
Browse files Browse the repository at this point in the history
New LRU cache implementation
  • Loading branch information
javitonino authored Jan 25, 2024
1 parent 417c9b4 commit 4ac0037
Show file tree
Hide file tree
Showing 25 changed files with 974 additions and 537 deletions.
58 changes: 55 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions charts/nucliadb_node/templates/node.cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ data:
SIDECAR_LISTEN_ADDRESS: 0.0.0.0:{{ .Values.serving.grpc_sidecar }}
METRICS_PORT: {{ .Values.serving.metricsPort | quote }}
DATA_PATH: "{{.Values.config.data_path}}"
{{- if .Values.config.lazyloading }}
LAZY_LOADING: "true"
{{- end }}
{{- with .Values.indexing.index_jetstream_auth }}
INDEX_JETSTREAM_AUTH: {{ . }}
{{- end }}
Expand Down
1 change: 0 additions & 1 deletion nucliadb/nucliadb/search/tests/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"DATA_PATH": "/data",
"READER_LISTEN_ADDRESS": "0.0.0.0:4445",
"NUCLIADB_DISABLE_ANALYTICS": "True",
"LAZY_LOADING": "true",
"RUST_BACKTRACE": "full",
"DEBUG": "1",
"RUST_LOG": "nucliadb_*=DEBUG",
Expand Down
1 change: 1 addition & 0 deletions nucliadb_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ hostname = "0.3"
username = "0.2"
md5 = "0.7"
lazy_static = "1.4.0"
lru = "0.12.1"


[build-dependencies]
Expand Down
1 change: 0 additions & 1 deletion nucliadb_node/nucliadb_node/tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
"NUCLIADB_DISABLE_ANALYTICS": "True",
"DATA_PATH": "/data",
"READER_LISTEN_ADDRESS": "0.0.0.0:4445",
"LAZY_LOADING": "true",
"RUST_BACKTRACE": "full",
"RUST_LOG": "nucliadb_*=DEBUG", # noqa
"DEBUG": "1",
Expand Down
1 change: 0 additions & 1 deletion nucliadb_node/src/bin/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ pub async fn start_grpc_service(
tokio::spawn(health_checker(health_reporter, settings.clone()));

let grpc_driver = NodeReaderGRPCDriver::new(settings.clone());
grpc_driver.initialize().await?;

Server::builder()
.layer(tracing_middleware)
Expand Down
7 changes: 3 additions & 4 deletions nucliadb_node/src/bin/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::signal::{ctrl_c, unix};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tonic::transport::Server;
type GrpcServer = NodeWriterServer<NodeWriterGRPCDriver>;
use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use nucliadb_node::shards::providers::shard_cache::ShardWriterCache;
use tokio::sync::Notify;

#[derive(Debug)]
Expand Down Expand Up @@ -98,7 +98,7 @@ async fn main() -> NodeResult<()> {
nucliadb_node::analytics::sync::start_analytics_loop();

let (shutdown_notifier, shutdown_notified) = get_shutdown_notifier();
let shard_cache = Arc::new(UnboundedShardWriterCache::new(settings.clone()));
let shard_cache = Arc::new(ShardWriterCache::new(settings.clone()));

let mut replication_task = None;
if settings.node_role() == NodeRole::Secondary {
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn wait_for_sigkill(shutdown_notifier: Arc<Notify>) -> NodeResult<()> {

pub async fn start_grpc_service(
settings: Settings,
shard_cache: Arc<UnboundedShardWriterCache>,
shard_cache: Arc<ShardWriterCache>,
metadata_sender: UnboundedSender<NodeWriterEvent>,
node_id: String,
shutdown_notifier: Arc<Notify>,
Expand All @@ -188,7 +188,6 @@ pub async fn start_grpc_service(
if settings.node_role() == NodeRole::Primary {
let grpc_driver = NodeWriterGRPCDriver::new(settings.clone(), shard_cache.clone())
.with_sender(metadata_sender);
grpc_driver.initialize().await?;
server_builder = server_builder.add_service(GrpcServer::new(grpc_driver));
}
let replication_server = replication::replication_service_server::ReplicationServiceServer::new(
Expand Down
31 changes: 7 additions & 24 deletions nucliadb_node/src/grpc/grpc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,22 @@ use Shard as ShardPB;

use crate::settings::Settings;
use crate::shards::errors::ShardNotFoundError;
use crate::shards::providers::unbounded_cache::UnboundedShardReaderCache;
use crate::shards::providers::shard_cache::ShardReaderCache;
use crate::shards::reader::{ShardFileChunkIterator, ShardReader};
use crate::telemetry::run_with_telemetry;

pub struct NodeReaderGRPCDriver {
shards: Arc<UnboundedShardReaderCache>,
settings: Settings,
shards: Arc<ShardReaderCache>,
}

impl NodeReaderGRPCDriver {
pub fn new(settings: Settings) -> Self {
let cache_settings = settings.clone();
Self {
settings,
shards: Arc::new(UnboundedShardReaderCache::new(cache_settings)),
shards: Arc::new(ShardReaderCache::new(cache_settings)),
}
}

/// This function must be called before using this service
pub async fn initialize(&self) -> NodeResult<()> {
if !self.settings.lazy_loading() {
// If lazy loading is disabled, load
let shards = Arc::clone(&self.shards);
tokio::task::spawn_blocking(move || shards.load_all()).await??
}
Ok(())
}

async fn obtain_shard(&self, id: impl Into<String>) -> Result<Arc<ShardReader>, tonic::Status> {
let id = id.into();
let id_clone = id.clone();
Expand All @@ -65,26 +53,21 @@ impl NodeReaderGRPCDriver {
tokio::task::spawn_blocking(move || obtain_shard(shards, id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
tonic::Status::internal(format!("Error loading shard {id}: {error:?}"))
})?
}
}

fn obtain_shard(
shards: Arc<UnboundedShardReaderCache>,
shards: Arc<ShardReaderCache>,
id: impl Into<String>,
) -> Result<Arc<ShardReader>, tonic::Status> {
let id = id.into();
if let Some(shard) = shards.get(id.clone()) {
return Ok(shard);
}
let id_clone = id.clone();
let shards = shards.clone();
let shard = shards.load(id_clone).map_err(|error| {
let shard = shards.get(&id).map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
tonic::Status::internal(format!("Error loading shard {id}: {error:?}"))
}
})?;
Ok(shard)
Expand Down
30 changes: 8 additions & 22 deletions nucliadb_node/src/grpc/grpc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ use crate::analytics::sync::send_analytics_event;
use crate::settings::Settings;
use crate::shards::errors::ShardNotFoundError;
use crate::shards::metadata::ShardMetadata;
use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use crate::shards::providers::shard_cache::ShardWriterCache;
use crate::shards::writer::ShardWriter;
use crate::telemetry::run_with_telemetry;
use crate::utils::list_shards;

pub struct NodeWriterGRPCDriver {
shards: Arc<UnboundedShardWriterCache>,
shards: Arc<ShardWriterCache>,
sender: Option<UnboundedSender<NodeWriterEvent>>,
settings: Settings,
}
Expand All @@ -54,24 +54,14 @@ pub enum NodeWriterEvent {
}

impl NodeWriterGRPCDriver {
pub fn new(settings: Settings, shard_cache: Arc<UnboundedShardWriterCache>) -> Self {
pub fn new(settings: Settings, shard_cache: Arc<ShardWriterCache>) -> Self {
Self {
settings,
shards: shard_cache,
sender: None,
}
}

/// This function must be called before using this service
pub async fn initialize(&self) -> NodeResult<()> {
if !self.settings.lazy_loading() {
// If lazy loading is disabled, load
let shards = Arc::clone(&self.shards);
tokio::task::spawn_blocking(move || shards.load_all()).await??
}
Ok(())
}

pub fn with_sender(self, sender: UnboundedSender<NodeWriterEvent>) -> Self {
Self {
sender: Some(sender),
Expand All @@ -88,16 +78,12 @@ impl NodeWriterGRPCDriver {
}

fn obtain_shard(
shards: Arc<UnboundedShardWriterCache>,
shards: Arc<ShardWriterCache>,
id: impl Into<String>,
) -> Result<Arc<ShardWriter>, tonic::Status> {
let id = id.into();
if let Some(shard) = shards.get(id.clone()) {
return Ok(shard);
}
let id_clone = id.clone();
let shards = shards.clone();
let shard = shards.load(id_clone).map_err(|error| {

let shard = shards.get(&id).map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
Expand Down Expand Up @@ -152,7 +138,7 @@ impl NodeWriter for NodeWriterGRPCDriver {

let shards = Arc::clone(&self.shards);
let shard_id_clone = shard_id.id.clone();
let deleted = tokio::task::spawn_blocking(move || shards.delete(shard_id_clone))
let deleted = tokio::task::spawn_blocking(move || shards.delete(&shard_id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error deleted shard {}: {error:?}", shard_id.id))
Expand Down Expand Up @@ -180,7 +166,7 @@ impl NodeWriter for NodeWriterGRPCDriver {
// No need to load shard to upgrade it
let shards = Arc::clone(&self.shards);
let shard_id_clone = shard_id.clone();
let upgraded = tokio::task::spawn_blocking(move || shards.upgrade(shard_id_clone))
let upgraded = tokio::task::spawn_blocking(move || shards.upgrade(&shard_id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error deleted shard {}: {error:?}", shard_id))
Expand Down
16 changes: 8 additions & 8 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tonic::Request;
use crate::replication::health::ReplicationHealthManager;
use crate::settings::Settings;
use crate::shards::metadata::ShardMetadata;
use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use crate::shards::providers::shard_cache::ShardWriterCache;
use crate::shards::writer::ShardWriter;
use crate::utils::{list_shards, set_primary_node_id};

Expand Down Expand Up @@ -200,7 +200,7 @@ impl ReplicateWorkerPool {

pub async fn connect_to_primary_and_replicate(
settings: Settings,
shard_cache: Arc<UnboundedShardWriterCache>,
shard_cache: Arc<ShardWriterCache>,
secondary_id: String,
shutdown_notified: Arc<AtomicBool>,
) -> NodeResult<()> {
Expand Down Expand Up @@ -274,10 +274,11 @@ pub async fn connect_to_primary_and_replicate(
let shard_id = shard_state.shard_id.clone();
let shard_lookup;
if existing_shards.contains(&shard_id) {
let id_clone = shard_id.clone();
let shard_cache_clone = Arc::clone(&shard_cache);
let shard_cache_clone = shard_cache.clone();
let shard_id_clone = shard_id.clone();
shard_lookup =
tokio::task::spawn_blocking(move || shard_cache_clone.load(id_clone)).await?;
tokio::task::spawn_blocking(move || shard_cache_clone.get(&shard_id_clone))
.await?;
} else {
let metadata = ShardMetadata::new(
shards_path.join(shard_id.clone()),
Expand Down Expand Up @@ -330,10 +331,9 @@ pub async fn connect_to_primary_and_replicate(
if !existing_shards.contains(&shard_id) {
continue;
}
let id_clone = shard_id.clone();
let shard_cache_clone = shard_cache.clone();
let shard_lookup =
tokio::task::spawn_blocking(move || shard_cache_clone.delete(id_clone)).await?;
tokio::task::spawn_blocking(move || shard_cache_clone.delete(&shard_id)).await?;
if shard_lookup.is_err() {
warn!("Failed to delete shard: {:?}", shard_lookup);
continue;
Expand Down Expand Up @@ -365,7 +365,7 @@ pub async fn connect_to_primary_and_replicate(

pub async fn connect_to_primary_and_replicate_forever(
settings: Settings,
shard_cache: Arc<UnboundedShardWriterCache>,
shard_cache: Arc<ShardWriterCache>,
secondary_id: String,
shutdown_notified: Arc<AtomicBool>,
) -> NodeResult<()> {
Expand Down
Loading

1 comment on commit 4ac0037

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 4ac0037 Previous: c67870a Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13307.601378764546 iter/sec (stddev: 9.744836110424792e-7) 12887.24555746259 iter/sec (stddev: 2.385970996903907e-7) 0.97

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.