From 3300d319075da47d2437121980d5be80c927d6b7 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Thu, 11 Jan 2024 19:01:33 +0100 Subject: [PATCH 1/4] Remove async cache provider --- nucliadb_node/src/bin/writer.rs | 6 +- nucliadb_node/src/grpc/grpc_reader.rs | 22 +- nucliadb_node/src/grpc/grpc_writer.rs | 50 ++- nucliadb_node/src/replication/replicator.rs | 38 ++- nucliadb_node/src/replication/service.rs | 23 +- nucliadb_node/src/shards/metadata.rs | 6 +- nucliadb_node/src/shards/providers/mod.rs | 4 +- .../src/shards/providers/provider_traits.rs | 23 -- .../unbounded_cache/async_unbounded_reader.rs | 123 -------- .../unbounded_cache/async_unbounded_writer.rs | 296 ------------------ .../shards/providers/unbounded_cache/mod.rs | 4 - .../unbounded_cache/unbounded_reader.rs | 19 +- .../unbounded_cache/unbounded_writer.rs | 217 +++++++++---- nucliadb_node/src/shards/shard_writer.rs | 17 +- nucliadb_node/tests/common/node_services.rs | 14 +- nucliadb_node/tests/test_replication.rs | 9 +- 16 files changed, 295 insertions(+), 576 deletions(-) delete mode 100644 nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_reader.rs delete mode 100644 nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_writer.rs diff --git a/nucliadb_node/src/bin/writer.rs b/nucliadb_node/src/bin/writer.rs index d846eebb1e..a930c901bf 100644 --- a/nucliadb_node/src/bin/writer.rs +++ b/nucliadb_node/src/bin/writer.rs @@ -45,7 +45,7 @@ use tokio::signal::{ctrl_c, unix}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tonic::transport::Server; type GrpcServer = NodeWriterServer; -use nucliadb_node::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache; +use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardWriterCache; use tokio::sync::Notify; #[derive(Debug)] @@ -98,7 +98,7 @@ async fn main() -> NodeResult<()> { nucliadb_node::analytics::sync::start_analytics_loop(); let (shutdown_notifier, shutdown_notified) = get_shutdown_notifier(); - let shard_cache = Arc::new(AsyncUnboundedShardWriterCache::new(settings.clone())); + let shard_cache = Arc::new(UnboundedShardWriterCache::new(settings.clone())); let mut replication_task = None; if settings.node_role() == NodeRole::Secondary { @@ -165,7 +165,7 @@ async fn wait_for_sigkill(shutdown_notifier: Arc) -> NodeResult<()> { pub async fn start_grpc_service( settings: Settings, - shard_cache: Arc, + shard_cache: Arc, metadata_sender: UnboundedSender, node_id: String, shutdown_notifier: Arc, diff --git a/nucliadb_node/src/grpc/grpc_reader.rs b/nucliadb_node/src/grpc/grpc_reader.rs index ba69c25d0d..a6a738cb11 100644 --- a/nucliadb_node/src/grpc/grpc_reader.rs +++ b/nucliadb_node/src/grpc/grpc_reader.rs @@ -29,13 +29,13 @@ use Shard as ShardPB; use crate::settings::Settings; use crate::shards::errors::ShardNotFoundError; -use crate::shards::providers::unbounded_cache::AsyncUnboundedShardReaderCache; -use crate::shards::providers::AsyncShardReaderProvider; +use crate::shards::providers::unbounded_cache::UnboundedShardReaderCache; +use crate::shards::providers::ShardReaderProvider; use crate::shards::reader::{ShardFileChunkIterator, ShardReader}; use crate::telemetry::run_with_telemetry; pub struct NodeReaderGRPCDriver { - shards: AsyncUnboundedShardReaderCache, + shards: Arc, settings: Settings, } @@ -44,7 +44,7 @@ impl NodeReaderGRPCDriver { let cache_settings = settings.clone(); Self { settings, - shards: AsyncUnboundedShardReaderCache::new(cache_settings), + shards: Arc::new(UnboundedShardReaderCache::new(cache_settings)), } } @@ -52,17 +52,25 @@ impl NodeReaderGRPCDriver { pub async fn initialize(&self) -> NodeResult<()> { if !self.settings.lazy_loading() { // If lazy loading is disabled, load - self.shards.load_all().await? + let shards = self.shards.clone(); + tokio::task::spawn_blocking(move || shards.load_all()).await?? } Ok(()) } async fn obtain_shard(&self, id: impl Into) -> Result, tonic::Status> { let id = id.into(); - if let Some(shard) = self.shards.get(id.clone()).await { + if let Some(shard) = self.shards.get(id.clone()) { return Ok(shard); } - let shard = self.shards.load(id.clone()).await.map_err(|error| { + let id_clone = id.clone(); + let shards = self.shards.clone(); + let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) + .await + .map_err(|error| { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + })?; + let shard = shard.map_err(|error| { if error.is::() { tonic::Status::not_found(error.to_string()) } else { diff --git a/nucliadb_node/src/grpc/grpc_writer.rs b/nucliadb_node/src/grpc/grpc_writer.rs index b0b3ce6185..ad8c77d1ab 100644 --- a/nucliadb_node/src/grpc/grpc_writer.rs +++ b/nucliadb_node/src/grpc/grpc_writer.rs @@ -36,14 +36,14 @@ use crate::analytics::sync::send_analytics_event; use crate::settings::Settings; use crate::shards::errors::ShardNotFoundError; use crate::shards::metadata::ShardMetadata; -use crate::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache; -use crate::shards::providers::AsyncShardWriterProvider; +use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache; +use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::telemetry::run_with_telemetry; use crate::utils::list_shards; pub struct NodeWriterGRPCDriver { - shards: Arc, + shards: Arc, sender: Option>, settings: Settings, } @@ -55,7 +55,7 @@ pub enum NodeWriterEvent { } impl NodeWriterGRPCDriver { - pub fn new(settings: Settings, shard_cache: Arc) -> Self { + pub fn new(settings: Settings, shard_cache: Arc) -> Self { Self { settings, shards: shard_cache, @@ -67,7 +67,8 @@ impl NodeWriterGRPCDriver { pub async fn initialize(&self) -> NodeResult<()> { if !self.settings.lazy_loading() { // If lazy loading is disabled, load - self.shards.load_all().await? + let shards = self.shards.clone(); + tokio::task::spawn_blocking(move || shards.load_all()).await?? } Ok(()) } @@ -81,10 +82,17 @@ impl NodeWriterGRPCDriver { async fn obtain_shard(&self, id: impl Into) -> Result, tonic::Status> { let id = id.into(); - if let Some(shard) = self.shards.get(id.clone()).await { + if let Some(shard) = self.shards.get(id.clone()) { return Ok(shard); } - let shard = self.shards.load(id.clone()).await.map_err(|error| { + let id_clone = id.clone(); + let shards = self.shards.clone(); + let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) + .await + .map_err(|error| { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + })?; + let shard = shard.map_err(|error| { if error.is::() { tonic::Status::not_found(error.to_string()) } else { @@ -119,7 +127,12 @@ impl NodeWriter for NodeWriterGRPCDriver { request.similarity().into(), Some(Channel::from(request.release_channel)), ); - let new_shard = self.shards.create(metadata).await; + + let shards = self.shards.clone(); + let new_shard = tokio::task::spawn_blocking(move || shards.create(metadata)) + .await + .map_err(|error| tonic::Status::internal(format!("Error creating shard: {error:?}")))?; + match new_shard { Ok(new_shard) => { self.emit_event(NodeWriterEvent::ShardCreation); @@ -139,7 +152,15 @@ impl NodeWriter for NodeWriterGRPCDriver { send_analytics_event(AnalyticsEvent::Delete).await; // Deletion does not require for the shard to be loaded. let shard_id = request.into_inner(); - let deleted = self.shards.delete(shard_id.id.clone()).await; + + let shards = self.shards.clone(); + let shard_id_clone = shard_id.id.clone(); + let deleted = tokio::task::spawn_blocking(move || shards.delete(shard_id_clone)) + .await + .map_err(|error| { + tonic::Status::internal(format!("Error deleted shard {}: {error:?}", shard_id.id)) + })?; + match deleted { Ok(_) => { self.emit_event(NodeWriterEvent::ShardDeletion); @@ -158,8 +179,17 @@ impl NodeWriter for NodeWriterGRPCDriver { request: Request, ) -> Result, Status> { let shard_id = request.into_inner().id; + // No need to load shard to upgrade it - match self.shards.upgrade(shard_id).await { + let shards = self.shards.clone(); + let shard_id_clone = shard_id.clone(); + let upgraded = tokio::task::spawn_blocking(move || shards.upgrade(shard_id_clone)) + .await + .map_err(|error| { + tonic::Status::internal(format!("Error deleted shard {}: {error:?}", shard_id)) + })?; + + match upgraded { Ok(upgrade_details) => Ok(tonic::Response::new(upgrade_details)), Err(error) => Err(tonic::Status::internal(error.to_string())), } diff --git a/nucliadb_node/src/replication/replicator.rs b/nucliadb_node/src/replication/replicator.rs index 8e9cb54a1f..91ee43641d 100644 --- a/nucliadb_node/src/replication/replicator.rs +++ b/nucliadb_node/src/replication/replicator.rs @@ -35,8 +35,8 @@ use tonic::Request; use crate::replication::health::ReplicationHealthManager; use crate::settings::Settings; use crate::shards::metadata::ShardMetadata; -use crate::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache; -use crate::shards::providers::AsyncShardWriterProvider; +use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache; +use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::utils::{list_shards, set_primary_node_id}; @@ -201,7 +201,7 @@ impl ReplicateWorkerPool { pub async fn connect_to_primary_and_replicate( settings: Settings, - shard_cache: Arc, + shard_cache: Arc, secondary_id: String, shutdown_notified: Arc, ) -> NodeResult<()> { @@ -275,18 +275,23 @@ pub async fn connect_to_primary_and_replicate( let shard_id = shard_state.shard_id.clone(); let shard_lookup; if existing_shards.contains(&shard_id) { - shard_lookup = shard_cache.load(shard_id.clone()).await; + let id_clone = shard_id.clone(); + let shard_cache_clone = Arc::clone(&shard_cache); + shard_lookup = + tokio::task::spawn_blocking(move || shard_cache_clone.load(id_clone)).await?; } else { + let metadata = ShardMetadata::new( + shards_path.join(shard_id.clone()), + shard_state.shard_id.clone(), + Some(shard_state.kbid.clone()), + shard_state.similarity.clone().into(), + None, + ); + let shard_cache_clone = Arc::clone(&shard_cache); + warn!("Creating shard to replicate: {shard_id}"); - let shard_create = shard_cache - .create(ShardMetadata::new( - shards_path.join(shard_id.clone()), - shard_state.shard_id.clone(), - Some(shard_state.kbid.clone()), - shard_state.similarity.clone().into(), - None, - )) - .await; + let shard_create = + tokio::task::spawn_blocking(move || shard_cache_clone.create(metadata)).await?; if shard_create.is_err() { warn!("Failed to create shard: {:?}", shard_create); continue; @@ -326,7 +331,10 @@ pub async fn connect_to_primary_and_replicate( if !existing_shards.contains(&shard_id) { continue; } - let shard_lookup = shard_cache.delete(shard_id.clone()).await; + let id_clone = shard_id.clone(); + let shard_cache_clone = shard_cache.clone(); + let shard_lookup = + tokio::task::spawn_blocking(move || shard_cache_clone.delete(id_clone)).await?; if shard_lookup.is_err() { warn!("Failed to delete shard: {:?}", shard_lookup); continue; @@ -364,7 +372,7 @@ pub async fn connect_to_primary_and_replicate( pub async fn connect_to_primary_and_replicate_forever( settings: Settings, - shard_cache: Arc, + shard_cache: Arc, secondary_id: String, shutdown_notified: Arc, ) -> NodeResult<()> { diff --git a/nucliadb_node/src/replication/service.rs b/nucliadb_node/src/replication/service.rs index 37f71d32f6..3f2d4399bb 100644 --- a/nucliadb_node/src/replication/service.rs +++ b/nucliadb_node/src/replication/service.rs @@ -32,20 +32,20 @@ use tonic::Response; use crate::replication::NodeRole; use crate::settings::Settings; use crate::shards::metadata::Similarity; -use crate::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache; -use crate::shards::providers::AsyncShardWriterProvider; +use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache; +use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::utils::list_shards; pub struct ReplicationServiceGRPCDriver { settings: Settings, - shards: Arc, + shards: Arc, node_id: String, } impl ReplicationServiceGRPCDriver { pub fn new( settings: Settings, - shard_cache: Arc, + shard_cache: Arc, node_id: String, ) -> Self { Self { @@ -58,7 +58,8 @@ impl ReplicationServiceGRPCDriver { /// This function must be called before using this service pub async fn initialize(&self) -> NodeResult<()> { // should we do this? - self.shards.load_all().await?; + let shards = self.shards.clone(); + tokio::task::spawn_blocking(move || shards.load_all()).await??; Ok(()) } } @@ -256,11 +257,19 @@ impl replication::replication_service_server::ReplicationService for Replication Result, > = receiver.0.clone(); - let shard_lookup = self.shards.load(request.shard_id.clone()).await; + let id = request.shard_id; + let id_clone = id.clone(); + let shards = self.shards.clone(); + let shard_lookup = tokio::task::spawn_blocking(move || shards.load(id_clone)) + .await + .map_err(|error| { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + })?; + if let Err(error) = shard_lookup { return Err(tonic::Status::not_found(format!( "Shard {} not found, error: {}", - request.shard_id, error + id, error ))); } diff --git a/nucliadb_node/src/shards/metadata.rs b/nucliadb_node/src/shards/metadata.rs index cda6768c6a..9c6cf36e8f 100644 --- a/nucliadb_node/src/shards/metadata.rs +++ b/nucliadb_node/src/shards/metadata.rs @@ -132,7 +132,7 @@ impl ShardMetadata { generation_id: RwLock::new(None), } } - pub fn exists(shard_path: PathBuf) -> bool { + pub fn exists(shard_path: &PathBuf) -> bool { let metadata_path = shard_path.join(disk_structure::METADATA_FILE); metadata_path.exists() } @@ -237,7 +237,7 @@ impl ShardsMetadataManager { } } let shard_path = disk_structure::shard_path_by_id(&self.shards_path, &shard_id); - if !ShardMetadata::exists(shard_path.clone()) { + if !ShardMetadata::exists(&shard_path) { return None; } let sm = ShardMetadata::open(shard_path); @@ -275,7 +275,7 @@ mod test { #[test] fn open_empty() { let dir = TempDir::new().unwrap(); - assert!(!ShardMetadata::exists(dir.path().to_path_buf())); + assert!(!ShardMetadata::exists(&dir.path().to_path_buf())); let meta = ShardMetadata::open(dir.path().to_path_buf()); assert!(meta.is_err()); } diff --git a/nucliadb_node/src/shards/providers/mod.rs b/nucliadb_node/src/shards/providers/mod.rs index 6af1a91180..89d1d1d8aa 100644 --- a/nucliadb_node/src/shards/providers/mod.rs +++ b/nucliadb_node/src/shards/providers/mod.rs @@ -26,6 +26,4 @@ mod provider_traits; pub mod unbounded_cache; -pub use provider_traits::{ - AsyncShardReaderProvider, AsyncShardWriterProvider, ShardReaderProvider, ShardWriterProvider, -}; +pub use provider_traits::{ShardReaderProvider, ShardWriterProvider}; diff --git a/nucliadb_node/src/shards/providers/provider_traits.rs b/nucliadb_node/src/shards/providers/provider_traits.rs index d81627a692..ba7e080aa7 100644 --- a/nucliadb_node/src/shards/providers/provider_traits.rs +++ b/nucliadb_node/src/shards/providers/provider_traits.rs @@ -19,7 +19,6 @@ use std::sync::Arc; -use async_trait::async_trait; use nucliadb_core::protos::ShardCleaned; use nucliadb_core::NodeResult; @@ -35,14 +34,6 @@ pub trait ShardReaderProvider: Send + Sync { fn get(&self, id: ShardId) -> Option>; } -#[async_trait] -pub trait AsyncShardReaderProvider: Send + Sync { - async fn load(&self, id: ShardId) -> NodeResult>; - async fn load_all(&self) -> NodeResult<()>; - - async fn get(&self, id: ShardId) -> Option>; -} - pub trait ShardWriterProvider { fn load(&self, id: ShardId) -> NodeResult>; fn load_all(&self) -> NodeResult<()>; @@ -55,17 +46,3 @@ pub trait ShardWriterProvider { fn get_metadata(&self, id: ShardId) -> Option>; } - -#[async_trait] -pub trait AsyncShardWriterProvider { - async fn load(&self, id: ShardId) -> NodeResult>; - async fn load_all(&self) -> NodeResult<()>; - - async fn create(&self, metadata: ShardMetadata) -> NodeResult>; - async fn get(&self, id: ShardId) -> Option>; - async fn delete(&self, id: ShardId) -> NodeResult<()>; - - async fn upgrade(&self, id: ShardId) -> NodeResult; - - fn get_metadata(&self, id: ShardId) -> Option>; -} diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_reader.rs b/nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_reader.rs deleted file mode 100644 index 3608860cc0..0000000000 --- a/nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_reader.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright (C) 2021 Bosutech XXI S.L. -// -// nucliadb is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at info@nuclia.com. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::collections::HashMap; -use std::path::PathBuf; -use std::sync::Arc; - -use async_trait::async_trait; -use nucliadb_core::tracing::{debug, error}; -use nucliadb_core::{node_error, Context, NodeResult}; -use tokio::sync::RwLock; - -use crate::disk_structure; -use crate::settings::Settings; -use crate::shards::errors::ShardNotFoundError; -use crate::shards::metadata::ShardMetadata; -use crate::shards::providers::AsyncShardReaderProvider; -use crate::shards::reader::ShardReader; -use crate::shards::ShardId; - -#[derive(Default)] -pub struct AsyncUnboundedShardReaderCache { - cache: RwLock>>, - shards_path: PathBuf, -} - -impl AsyncUnboundedShardReaderCache { - pub fn new(settings: Settings) -> Self { - Self { - // NOTE: we use max shards per node as initial capacity to avoid - // hashmap resizing, as it would block the current thread while - // doing it. - // - // REVIEW: if resize don't take more than 10µs, it's acceptable - // (blocking in tokio means CPU bound during 10-100µs) - cache: RwLock::new(HashMap::with_capacity(settings.max_shards_per_node())), - shards_path: settings.shards_path(), - } - } -} - -#[async_trait] -impl AsyncShardReaderProvider for AsyncUnboundedShardReaderCache { - async fn load(&self, id: ShardId) -> NodeResult> { - let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); - let mut cache_writer = self.cache.write().await; - - if let Some(shard) = cache_writer.get(&id) { - debug!("Shard {shard_path:?} is already on memory"); - return Ok(Arc::clone(shard)); - } - - // Avoid blocking while interacting with the file system (reads and - // writes to disk) - let id_ = id.clone(); - let shard = tokio::task::spawn_blocking(move || { - if !ShardMetadata::exists(shard_path.clone()) { - return Err(node_error!(ShardNotFoundError( - "Shard {shard_path:?} is not on disk" - ))); - } - ShardReader::new(id.clone(), &shard_path).map_err(|error| { - node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}") - }) - }) - .await - .context("Blocking task panicked")??; - - let shard = Arc::new(shard); - cache_writer.insert(id_, Arc::clone(&shard)); - Ok(shard) - } - - async fn load_all(&self) -> NodeResult<()> { - let shards_path = self.shards_path.clone(); - let mut shards = tokio::task::spawn_blocking(move || -> NodeResult<_> { - let mut shards = HashMap::new(); - for entry in std::fs::read_dir(&shards_path)? { - let entry = entry?; - let file_name = entry.file_name().to_str().unwrap().to_string(); - let shard_path = entry.path(); - match ShardReader::new(file_name.clone(), &shard_path) { - Err(err) => error!("Loading shard {shard_path:?} from disk raised {err}"), - Ok(shard) => { - debug!("Shard loaded: {shard_path:?}"); - shards.insert(file_name, shard); - } - } - } - Ok(shards) - }) - .await - .context("Blocking task panicked")??; - - { - let mut cache = self.cache.write().await; - shards.drain().for_each(|(k, v)| { - cache.insert(k, Arc::new(v)); - }); - } - Ok(()) - } - - async fn get(&self, id: ShardId) -> Option> { - self.cache.read().await.get(&id).map(Arc::clone) - } -} diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_writer.rs b/nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_writer.rs deleted file mode 100644 index ff6eaf6997..0000000000 --- a/nucliadb_node/src/shards/providers/unbounded_cache/async_unbounded_writer.rs +++ /dev/null @@ -1,296 +0,0 @@ -// Copyright (C) 2021 Bosutech XXI S.L. -// -// nucliadb is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at info@nuclia.com. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; -use std::sync::Arc; - -use async_trait::async_trait; -use nucliadb_core::protos::ShardCleaned; -use nucliadb_core::tracing::{debug, error, info}; -use nucliadb_core::{node_error, Context, NodeResult}; -use tokio::sync::RwLock; - -use crate::disk_structure; -use crate::settings::Settings; -use crate::shards::errors::ShardNotFoundError; -use crate::shards::metadata::{ShardMetadata, ShardsMetadataManager}; -use crate::shards::providers::AsyncShardWriterProvider; -use crate::shards::writer::ShardWriter; -use crate::shards::ShardId; - -/// Each shard may be in one of this states -enum ShardCacheStatus { - /// Not in cache and not being deleted, therefore if found in on disk, loading it is safe. - NotInCache, - /// The shard is cached, but there is a task in the process of deleting it. - BeingDeleted, - /// The shard is not being deleted and is cached - InCache(Arc), -} - -/// This cache allows the user to block shards, ensuring that they will not be loaded from disk. -/// Being able to do so is crucial, otherwise the only source of truth will be disk and that would -/// not be thread-safe. -#[derive(Default)] -struct InnerCache { - blocked_shards: HashSet, - active_shards: HashMap>, -} - -impl InnerCache { - pub fn new() -> InnerCache { - Self::default() - } - pub fn get_shard(&self, id: &ShardId) -> ShardCacheStatus { - match self.active_shards.get(id).cloned() { - _ if self.blocked_shards.contains(id) => ShardCacheStatus::BeingDeleted, - Some(shard) => ShardCacheStatus::InCache(shard), - None => ShardCacheStatus::NotInCache, - } - } - pub fn set_being_deleted(&mut self, id: ShardId) { - self.blocked_shards.insert(id); - } - pub fn remove(&mut self, id: &ShardId) { - self.blocked_shards.remove(id); - self.active_shards.remove(id); - } - pub fn add_active_shard(&mut self, id: ShardId, shard: Arc) { - // It would be a dangerous bug to have a path - // in the system that leads to this assertion failing. - assert!(!self.blocked_shards.contains(&id)); - - self.active_shards.insert(id, shard); - } -} - -#[derive(Default)] -pub struct AsyncUnboundedShardWriterCache { - pub shards_path: PathBuf, - cache: RwLock, - metadata_manager: Arc, -} - -impl AsyncUnboundedShardWriterCache { - pub fn new(settings: Settings) -> Self { - Self { - // NOTE: as it's not probable all shards will be written, we don't - // assign any initial capacity to the HashMap under the consideration - // a resize blocking is not performance critical while writing. - cache: RwLock::new(InnerCache::default()), - shards_path: settings.shards_path(), - metadata_manager: Arc::new(ShardsMetadataManager::new(settings.shards_path())), - } - } -} - -#[async_trait] -impl AsyncShardWriterProvider for AsyncUnboundedShardWriterCache { - async fn create(&self, metadata: ShardMetadata) -> NodeResult> { - let shard_id = metadata.id(); - let metadata = Arc::new(metadata); - let shard_metadata = Arc::clone(&metadata); - let shard = tokio::task::spawn_blocking(move || ShardWriter::new(shard_metadata)) - .await - .context("Blocking task panicked")??; - let shard = Arc::new(shard); - let shard_cache_clone = Arc::clone(&shard); - self.metadata_manager.add_metadata(metadata); - - let mut cache_writer = self.cache.write().await; - cache_writer.add_active_shard(shard_id, shard_cache_clone); - Ok(shard) - } - - async fn load(&self, id: ShardId) -> NodeResult> { - let shard_key = id.clone(); - let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); - let mut cache_writer = self.cache.write().await; - match cache_writer.get_shard(&id) { - ShardCacheStatus::InCache(shard) => Ok(shard), - ShardCacheStatus::BeingDeleted => Err(node_error!(ShardNotFoundError( - "Shard {shard_path:?} is not on disk" - ))), - ShardCacheStatus::NotInCache => { - let metadata_manager = Arc::clone(&self.metadata_manager); - // Avoid blocking while interacting with the file system - let shard = tokio::task::spawn_blocking(move || { - if !ShardMetadata::exists(shard_path.clone()) { - return Err(node_error!(ShardNotFoundError( - "Shard {shard_path:?} is not on disk" - ))); - } - let metadata = metadata_manager - .get(id.clone()) - .expect("Shard metadata not found. This should not happen"); - ShardWriter::open(Arc::clone(&metadata)).map_err(|error| { - node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}") - }) - }) - .await - .context("Blocking task panicked")??; - - let shard = Arc::new(shard); - let cache_shard = Arc::clone(&shard); - cache_writer.add_active_shard(shard_key, cache_shard); - Ok(shard) - } - } - } - - async fn load_all(&self) -> NodeResult<()> { - let shards_path = self.shards_path.clone(); - let metadata_manager = Arc::clone(&self.metadata_manager); - let shards = tokio::task::spawn_blocking(move || -> NodeResult<_> { - let mut shards = InnerCache::new(); - for entry in std::fs::read_dir(&shards_path)? { - let entry = entry?; - let shard_id = entry.file_name().to_str().unwrap().to_string(); - let shard_path = entry.path(); - if !ShardMetadata::exists(shard_path.clone()) { - info!( - "Shard {shard_path:?} is not on disk", - shard_path = shard_path - ); - continue; - } - let metadata = metadata_manager - .get(shard_id.clone()) - .expect("Shard metadata not found. This should not happen"); - match ShardWriter::open(metadata) { - Err(err) => error!("Loading shard {shard_path:?} from disk raised {err}"), - Ok(shard) => { - debug!("Shard loaded: {shard_path:?}"); - shards.add_active_shard(shard_id, Arc::new(shard)); - } - } - } - Ok(shards) - }) - .await - .context("Blocking task panicked")??; - - *self.cache.write().await = shards; - Ok(()) - } - - async fn get(&self, id: ShardId) -> Option> { - let cache_reader = self.cache.read().await; - let ShardCacheStatus::InCache(shard) = cache_reader.get_shard(&id) else { - return None; - }; - - Some(shard) - } - - async fn delete(&self, id: ShardId) -> NodeResult<()> { - let mut cache_writer = self.cache.write().await; - // First the shard must be marked as being deleted, this way - // concurrent tasks can not make the mistake of trying to use it. - cache_writer.set_being_deleted(id.clone()); - - // Even though the shard was marked as deleted, if it was already in the - // active shards list there may be operations running on it. We must ensure - // that all of them have finished before proceeding. - if let Some(shard) = cache_writer.active_shards.get(&id).cloned() { - std::mem::drop(cache_writer); - let blocking_token = shard.block_shard().await; - // At this point we can ensure that no operations - // are being performed in this shard. Next operations - // will require using the cache, where the shard is marked - // as deleted. - std::mem::drop(blocking_token); - } else { - // Dropping the cache writer because is not needed while deleting the shard. - std::mem::drop(cache_writer); - } - - // No need to hold the lock while deletion happens. - // In case of error while deleting the function will return without removing - // The deletion flag, this is to avoid accesses to a partially deleted shard. - let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); - tokio::task::spawn_blocking(move || -> NodeResult<()> { - if shard_path.exists() { - debug!("Deleting shard {shard_path:?}"); - std::fs::remove_dir_all(shard_path)?; - } - Ok(()) - }) - .await - .context("Blocking task panicked")??; - - // If the shard was successfully deleted is safe to remove - // the entry from the cache. - self.cache.write().await.remove(&id); - - Ok(()) - } - - async fn upgrade(&self, id: ShardId) -> NodeResult { - let mut cache_writer = self.cache.write().await; - // First the shard must be marked as being deleted, this way - // concurrent tasks can not make the mistake of trying to use it. - cache_writer.set_being_deleted(id.clone()); - - // Even though the shard was marked as deleted, if it was already in the - // active shards list there may be operations running on it. We must ensure - // that all of them have finished before proceeding. - if let Some(shard) = cache_writer.active_shards.get(&id).cloned() { - std::mem::drop(cache_writer); - let blocking_token = shard.block_shard().await; - // At this point we can ensure that no operations - // are being performed in this shard. Next operations - // will require using the cache, where the shard is marked - // as deleted. - std::mem::drop(blocking_token); - } else { - // Dropping the cache writer because is not needed while deleting the shard. - std::mem::drop(cache_writer); - } - - let metadata = self.metadata_manager.get(id.clone()); - // If upgrading fails, the safe thing is to keep the being deleted flag - let (upgraded, details) = tokio::task::spawn_blocking(move || -> NodeResult<_> { - let upgraded = ShardWriter::clean_and_create(metadata.unwrap())?; - let details = ShardCleaned { - document_service: upgraded.document_version() as i32, - paragraph_service: upgraded.paragraph_version() as i32, - vector_service: upgraded.vector_version() as i32, - relation_service: upgraded.relation_version() as i32, - }; - Ok((upgraded, details)) - }) - .await - .context("Blocking task panicked")??; - - // The shard was upgraded, is safe to allow access again - let shard = Arc::new(upgraded); - let mut cache_writer = self.cache.write().await; - // Old shard is completely removed - cache_writer.remove(&id); - // The clean and upgraded version takes its place - cache_writer.add_active_shard(id, shard); - Ok(details) - } - - fn get_metadata(&self, id: ShardId) -> Option> { - self.metadata_manager.get(id) - } -} diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/mod.rs b/nucliadb_node/src/shards/providers/unbounded_cache/mod.rs index af8ed61c7d..95c5d118a3 100644 --- a/nucliadb_node/src/shards/providers/unbounded_cache/mod.rs +++ b/nucliadb_node/src/shards/providers/unbounded_cache/mod.rs @@ -26,12 +26,8 @@ //! For faster reads at cost of slower initialization and memory consumption, //! all shards can be loaded at initialization time. -mod async_unbounded_reader; -mod async_unbounded_writer; mod unbounded_reader; mod unbounded_writer; -pub use async_unbounded_reader::AsyncUnboundedShardReaderCache; -pub use async_unbounded_writer::AsyncUnboundedShardWriterCache; pub use unbounded_reader::UnboundedShardReaderCache; pub use unbounded_writer::UnboundedShardWriterCache; diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs index c07eebbce1..2e093f4351 100644 --- a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs +++ b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs @@ -26,6 +26,8 @@ use nucliadb_core::{node_error, NodeResult}; use crate::disk_structure; use crate::settings::Settings; +use crate::shards::errors::ShardNotFoundError; +use crate::shards::metadata::ShardMetadata; use crate::shards::providers::ShardReaderProvider; use crate::shards::reader::ShardReader; use crate::shards::ShardId; @@ -39,7 +41,9 @@ pub struct UnboundedShardReaderCache { impl UnboundedShardReaderCache { pub fn new(settings: Settings) -> Self { Self { - cache: RwLock::new(HashMap::new()), + // NOTE: we use max shards per node as initial capacity to avoid + // hashmap resizing, as it would hold the lock while doing it. + cache: RwLock::new(HashMap::with_capacity(settings.max_shards_per_node())), shards_path: settings.shards_path(), } } @@ -59,13 +63,15 @@ impl ShardReaderProvider for UnboundedShardReaderCache { let mut cache_writer = self.write(); if let Some(shard) = cache_writer.get(&id) { + debug!("Shard {shard_path:?} is already on memory"); return Ok(Arc::clone(shard)); } - if !shard_path.is_dir() { - return Err(node_error!("Shard {shard_path:?} is not on disk")); + if !ShardMetadata::exists(&shard_path) { + return Err(node_error!(ShardNotFoundError( + "Shard {shard_path:?} is not on disk" + ))); } - let shard = ShardReader::new(id.clone(), &shard_path).map_err(|error| { node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}") })?; @@ -79,19 +85,20 @@ impl ShardReaderProvider for UnboundedShardReaderCache { fn load_all(&self) -> NodeResult<()> { let mut cache = self.write(); let shards_path = self.shards_path.clone(); - debug!("Recovering shards from {shards_path:?}..."); + for entry in std::fs::read_dir(&shards_path)? { let entry = entry?; let file_name = entry.file_name().to_str().unwrap().to_string(); let shard_path = entry.path(); match ShardReader::new(file_name.clone(), &shard_path) { - Err(err) => error!("Loading {shard_path:?} raised {err}"), + Err(err) => error!("Loading shard {shard_path:?} from disk raised {err}"), Ok(shard) => { debug!("Shard loaded: {shard_path:?}"); cache.insert(file_name, Arc::new(shard)); } } } + Ok(()) } diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs index 976b868a07..8266f0f5ed 100644 --- a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs +++ b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs @@ -17,12 +17,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::path::PathBuf; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use nucliadb_core::protos::ShardCleaned; -use nucliadb_core::tracing::{debug, error, warn}; +use nucliadb_core::tracing::{debug, error, info}; use nucliadb_core::{node_error, NodeResult}; use crate::disk_structure; @@ -33,125 +33,228 @@ use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::shards::ShardId; +/// Each shard may be in one of this states +enum ShardCacheStatus { + /// Not in cache and not being deleted, therefore if found in on disk, loading it is safe. + NotInCache, + /// The shard is cached, but there is a task in the process of deleting it. + BeingDeleted, + /// The shard is not being deleted and is cached + InCache(Arc), +} + +/// This cache allows the user to block shards, ensuring that they will not be loaded from disk. +/// Being able to do so is crucial, otherwise the only source of truth will be disk and that would +/// not be thread-safe. +#[derive(Default)] +struct InnerCache { + blocked_shards: HashSet, + active_shards: HashMap>, +} + +impl InnerCache { + pub fn new() -> InnerCache { + Self::default() + } + pub fn get_shard(&self, id: &ShardId) -> ShardCacheStatus { + match self.active_shards.get(id).cloned() { + _ if self.blocked_shards.contains(id) => ShardCacheStatus::BeingDeleted, + Some(shard) => ShardCacheStatus::InCache(shard), + None => ShardCacheStatus::NotInCache, + } + } + pub fn set_being_deleted(&mut self, id: ShardId) { + self.blocked_shards.insert(id); + } + pub fn remove(&mut self, id: &ShardId) { + self.blocked_shards.remove(id); + self.active_shards.remove(id); + } + pub fn add_active_shard(&mut self, id: ShardId, shard: Arc) { + // It would be a dangerous bug to have a path + // in the system that leads to this assertion failing. + assert!(!self.blocked_shards.contains(&id)); + + self.active_shards.insert(id, shard); + } +} + #[derive(Default)] pub struct UnboundedShardWriterCache { - cache: RwLock>>, pub shards_path: PathBuf, - metadata_manager: ShardsMetadataManager, + cache: RwLock, + metadata_manager: Arc, } impl UnboundedShardWriterCache { pub fn new(settings: Settings) -> Self { Self { - cache: RwLock::new(HashMap::new()), + // NOTE: as it's not probable all shards will be written, we don't + // assign any initial capacity to the HashMap under the consideration + // a resize blocking is not performance critical while writing. + cache: RwLock::new(InnerCache::new()), shards_path: settings.shards_path(), - metadata_manager: ShardsMetadataManager::new(settings.shards_path()), + metadata_manager: Arc::new(ShardsMetadataManager::new(settings.shards_path())), } } - fn read(&self) -> RwLockReadGuard>> { + fn read(&self) -> RwLockReadGuard { self.cache.read().expect("Poisoned lock while reading") } - fn write(&self) -> RwLockWriteGuard>> { + fn write(&self) -> RwLockWriteGuard { self.cache.write().expect("Poisoned lock while reading") } } impl ShardWriterProvider for UnboundedShardWriterCache { fn create(&self, metadata: ShardMetadata) -> NodeResult> { - let metadata = Arc::new(metadata); let shard_id = metadata.id(); - self.metadata_manager.add_metadata(Arc::clone(&metadata)); - let new_shard = ShardWriter::new(metadata).map(Arc::new)?; - let returned_shard = Arc::clone(&new_shard); + let metadata = Arc::new(metadata); + let shard = Arc::new(ShardWriter::new(metadata.clone())?); + + let shard_cache_clone = Arc::clone(&shard); + self.metadata_manager.add_metadata(metadata); - self.write().insert(shard_id, new_shard); - Ok(returned_shard) + let mut cache_writer = self.write(); + cache_writer.add_active_shard(shard_id, shard_cache_clone); + Ok(shard) } fn load(&self, id: ShardId) -> NodeResult> { + let shard_key = id.clone(); let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); let mut cache_writer = self.write(); + match cache_writer.get_shard(&id) { + ShardCacheStatus::InCache(shard) => Ok(shard), + ShardCacheStatus::BeingDeleted => Err(node_error!(ShardNotFoundError( + "Shard {shard_path:?} is not on disk" + ))), + ShardCacheStatus::NotInCache => { + let metadata_manager = Arc::clone(&self.metadata_manager); - if let Some(shard) = cache_writer.get(&id) { - debug!("Shard {shard_path:?} is already on memory"); - return Ok(Arc::clone(shard)); - } + if !ShardMetadata::exists(&shard_path) { + return Err(node_error!(ShardNotFoundError( + "Shard {shard_path:?} is not on disk" + ))); + } + let metadata = metadata_manager + .get(id.clone()) + .expect("Shard metadata not found. This should not happen"); + let shard = ShardWriter::open(Arc::clone(&metadata)).map_err(|error| { + node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}") + })?; - // Avoid blocking while interacting with the file system - if !ShardMetadata::exists(shard_path.clone()) { - return Err(node_error!(ShardNotFoundError( - "Shard {shard_path:?} is not on disk" - ))); + let shard = Arc::new(shard); + let cache_shard = Arc::clone(&shard); + cache_writer.add_active_shard(shard_key, cache_shard); + Ok(shard) + } } - let sm = self - .metadata_manager - .get(id.clone()) - .expect("Shard metadata not found. This should not happen."); - let shard = ShardWriter::open(sm).map_err(|error| { - node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}") - })?; - - let shard = Arc::new(shard); - cache_writer.insert(id, Arc::clone(&shard)); - Ok(shard) } fn load_all(&self) -> NodeResult<()> { let mut cache = self.write(); - for entry in std::fs::read_dir(&self.shards_path)? { + let shards_path = self.shards_path.clone(); + let metadata_manager = Arc::clone(&self.metadata_manager); + + for entry in std::fs::read_dir(&shards_path)? { let entry = entry?; let shard_id = entry.file_name().to_str().unwrap().to_string(); let shard_path = entry.path(); - let metadata = self.metadata_manager.get(shard_id.clone()); - if metadata.is_none() { - warn!( + if !ShardMetadata::exists(&shard_path) { + info!( "Shard {shard_path:?} is not on disk", shard_path = shard_path ); continue; } - match ShardWriter::open(metadata.unwrap()) { + let metadata = metadata_manager + .get(shard_id.clone()) + .expect("Shard metadata not found. This should not happen"); + match ShardWriter::open(metadata) { Err(err) => error!("Loading shard {shard_path:?} from disk raised {err}"), Ok(shard) => { debug!("Shard loaded: {shard_path:?}"); - cache.insert(shard_id, Arc::new(shard)); + cache.add_active_shard(shard_id, Arc::new(shard)); } } } + Ok(()) } fn get(&self, id: ShardId) -> Option> { - self.read().get(&id).map(Arc::clone) + let cache_reader = self.read(); + match cache_reader.get_shard(&id) { + ShardCacheStatus::InCache(shard) => Some(shard), + _ => None, + } } fn delete(&self, id: ShardId) -> NodeResult<()> { - self.write().remove(&id); + let mut cache_writer = self.write(); + // First the shard must be marked as being deleted, this way + // concurrent tasks can not make the mistake of trying to use it. + cache_writer.set_being_deleted(id.clone()); + // Even though the shard was marked as deleted, if it was already in the + // active shards list there may be operations running on it. We must ensure + // that all of them have finished before proceeding. + if let Some(shard) = cache_writer.active_shards.get(&id).cloned() { + std::mem::drop(cache_writer); + let blocking_token = shard.block_shard(); + // At this point we can ensure that no operations + // are being performed in this shard. Next operations + // will require using the cache, where the shard is marked + // as deleted. + std::mem::drop(blocking_token); + } else { + // Dropping the cache writer because is not needed while deleting the shard. + std::mem::drop(cache_writer); + } + + // No need to hold the lock while deletion happens. + // In case of error while deleting the function will return without removing + // The deletion flag, this is to avoid accesses to a partially deleted shard. let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); if shard_path.exists() { debug!("Deleting shard {shard_path:?}"); std::fs::remove_dir_all(shard_path)?; } + + // If the shard was successfully deleted is safe to remove + // the entry from the cache. + self.write().remove(&id); + Ok(()) } fn upgrade(&self, id: ShardId) -> NodeResult { - self.write().remove(&id); + let mut cache_writer = self.write(); + // First the shard must be marked as being deleted, this way + // concurrent tasks can not make the mistake of trying to use it. + cache_writer.set_being_deleted(id.clone()); - let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); - let metadata = self.metadata_manager.get(id.clone()); - if metadata.is_none() { - warn!( - "Shard {shard_path:?} is not on disk", - shard_path = shard_path - ); - return Err(node_error!(ShardNotFoundError( - "Shard {shard_path:?} is not on disk" - ))); + // Even though the shard was marked as deleted, if it was already in the + // active shards list there may be operations running on it. We must ensure + // that all of them have finished before proceeding. + if let Some(shard) = cache_writer.active_shards.get(&id).cloned() { + std::mem::drop(cache_writer); + let blocking_token = shard.block_shard(); + // At this point we can ensure that no operations + // are being performed in this shard. Next operations + // will require using the cache, where the shard is marked + // as deleted. + std::mem::drop(blocking_token); + } else { + // Dropping the cache writer because is not needed while deleting the shard. + std::mem::drop(cache_writer); } + + let metadata = self.metadata_manager.get(id.clone()); + // If upgrading fails, the safe thing is to keep the being deleted flag + let upgraded = ShardWriter::clean_and_create(metadata.unwrap())?; let details = ShardCleaned { document_service: upgraded.document_version() as i32, @@ -160,7 +263,13 @@ impl ShardWriterProvider for UnboundedShardWriterCache { relation_service: upgraded.relation_version() as i32, }; - self.write().insert(id, Arc::new(upgraded)); + // The shard was upgraded, is safe to allow access again + let shard = Arc::new(upgraded); + let mut cache_writer = self.write(); + // Old shard is completely removed + cache_writer.remove(&id); + // The clean and upgraded version takes its place + cache_writer.add_active_shard(id, shard); Ok(details) } diff --git a/nucliadb_node/src/shards/shard_writer.rs b/nucliadb_node/src/shards/shard_writer.rs index 8e6e69b255..33d601a7a5 100644 --- a/nucliadb_node/src/shards/shard_writer.rs +++ b/nucliadb_node/src/shards/shard_writer.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::path::PathBuf; -use std::sync::Arc; +use std::sync::{Arc, Mutex, MutexGuard}; use nucliadb_core::prelude::*; use nucliadb_core::protos::shard_created::{ @@ -29,7 +29,6 @@ use nucliadb_core::tracing::{self, *}; use nucliadb_core::{thread, IndexFiles}; use nucliadb_procs::measure; use nucliadb_vectors::VectorErr; -use tokio::sync::{Mutex, MutexGuard}; use crate::disk_structure::*; use crate::shards::metadata::ShardMetadata; @@ -51,8 +50,8 @@ pub struct ShardWriter { paragraph_service_version: i32, vector_service_version: i32, relation_service_version: i32, - pub gc_lock: Mutex<()>, // lock to be able to do GC or not - write_lock: Mutex<()>, // be able to lock writes on the shard + pub gc_lock: tokio::sync::Mutex<()>, // lock to be able to do GC or not + write_lock: Mutex<()>, // be able to lock writes on the shard } impl ShardWriter { @@ -111,7 +110,7 @@ impl ShardWriter { paragraph_service_version: versions.version_paragraphs() as i32, vector_service_version: versions.version_vectors() as i32, relation_service_version: versions.version_relations() as i32, - gc_lock: Mutex::new(()), + gc_lock: tokio::sync::Mutex::new(()), write_lock: Mutex::new(()), }) } @@ -298,7 +297,7 @@ impl ShardWriter { let mut vector_result = Ok(()); let mut relation_result = Ok(()); - let _lock = self.write_lock.blocking_lock(); + let _lock = self.write_lock.lock().expect("Poisoned write lock"); thread::scope(|s| { s.spawn(|_| text_result = text_task()); s.spawn(|_| paragraph_result = paragraph_task()); @@ -358,7 +357,7 @@ impl ShardWriter { let mut vector_result = Ok(()); let mut relation_result = Ok(()); - let _lock = self.write_lock.blocking_lock(); + let _lock = self.write_lock.lock().expect("Poisoned write lock"); thread::scope(|s| { s.spawn(|_| text_result = text_task()); s.spawn(|_| paragraph_result = paragraph_task()); @@ -479,7 +478,7 @@ impl ShardWriter { } pub async fn block_shard(&self) -> BlockingToken { - let mutex_guard = self.write_lock.lock().await; + let mutex_guard = self.write_lock.lock().expect("Poisoned write lock"); BlockingToken(mutex_guard) } @@ -507,7 +506,7 @@ impl ShardWriter { ignored_segement_ids: &HashMap>, ) -> NodeResult> { let mut files = Vec::new(); - let _lock = self.write_lock.blocking_lock(); // need to make sure more writes don't happen while we are reading + let _lock = self.write_lock.lock().expect("Poisoned write lock"); // need to make sure more writes don't happen while we are reading files.push( paragraph_read(&self.paragraph_writer) diff --git a/nucliadb_node/tests/common/node_services.rs b/nucliadb_node/tests/common/node_services.rs index aa3cde6e8b..2f1f0d0306 100644 --- a/nucliadb_node/tests/common/node_services.rs +++ b/nucliadb_node/tests/common/node_services.rs @@ -33,7 +33,7 @@ use nucliadb_node::lifecycle; use nucliadb_node::replication::replicator::connect_to_primary_and_replicate; use nucliadb_node::replication::service::ReplicationServiceGRPCDriver; use nucliadb_node::settings::*; -use nucliadb_node::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache; +use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardWriterCache; use nucliadb_node::utils::read_or_create_host_key; use nucliadb_protos::replication; use tempfile::TempDir; @@ -95,10 +95,10 @@ pub struct NodeFixture { secondary_reader_addr: SocketAddr, reader_server_task: Option>, writer_server_task: Option>, - primary_shard_cache: Option>, + primary_shard_cache: Option>, secondary_reader_server_task: Option>, secondary_writer_server_task: Option>, - secondary_shard_cache: Option>, + secondary_shard_cache: Option>, tempdir: TempDir, secondary_tempdir: TempDir, shutdown_notifier: Arc, @@ -184,7 +184,7 @@ impl NodeFixture { let settings = self.settings.clone(); let cache_settings = self.settings.clone(); let addr = self.writer_addr; - let shards_cache = Arc::new(AsyncUnboundedShardWriterCache::new(cache_settings)); + let shards_cache = Arc::new(UnboundedShardWriterCache::new(cache_settings)); self.primary_shard_cache = Some(Arc::clone(&shards_cache)); let notifier = Arc::clone(&self.shutdown_notifier); self.writer_server_task = Some(tokio::spawn(async move { @@ -229,7 +229,7 @@ impl NodeFixture { let cache_settings = self.secondary_settings.clone(); let host_key_path = settings.host_key_path(); let node_id = read_or_create_host_key(host_key_path)?; - let shards_cache = Arc::new(AsyncUnboundedShardWriterCache::new(cache_settings)); + let shards_cache = Arc::new(UnboundedShardWriterCache::new(cache_settings)); self.secondary_shard_cache = Some(shards_cache.clone()); let notified = Arc::clone(&self.shutdown_notified); self.secondary_writer_server_task = Some(tokio::spawn(async move { @@ -332,14 +332,14 @@ impl NodeFixture { .clone() } - pub fn primary_shard_cache(&self) -> Arc { + pub fn primary_shard_cache(&self) -> Arc { self.primary_shard_cache .as_ref() .expect("Shard cache not initialized") .clone() } - pub fn secondary_shard_cache(&self) -> Arc { + pub fn secondary_shard_cache(&self) -> Arc { self.secondary_shard_cache .as_ref() .expect("Shard cache not initialized") diff --git a/nucliadb_node/tests/test_replication.rs b/nucliadb_node/tests/test_replication.rs index adad28a45c..73f9a4d8c4 100644 --- a/nucliadb_node/tests/test_replication.rs +++ b/nucliadb_node/tests/test_replication.rs @@ -28,7 +28,7 @@ use nucliadb_core::protos::{ ShardId, UserVector, UserVectors, VectorSetId, VectorSimilarity, }; use nucliadb_node::replication::health::ReplicationHealthManager; -use nucliadb_node::shards::providers::AsyncShardWriterProvider; +use nucliadb_node::shards::providers::ShardWriterProvider; use rstest::*; use tonic::Request; @@ -87,11 +87,8 @@ async fn test_search_replicated_data( assert_eq!(response.vector.unwrap().documents.len(), 1); // Validate generation id is the same - let primary_shard = fixture.primary_shard_cache().load(shard.id.clone()).await?; - let secondary_shard = fixture - .secondary_shard_cache() - .load(shard.id.clone()) - .await?; + let primary_shard = fixture.primary_shard_cache().load(shard.id.clone())?; + let secondary_shard = fixture.secondary_shard_cache().load(shard.id.clone())?; assert_eq!( primary_shard.metadata.get_generation_id(), From 37bba75e3d049deb6cc235b068954eacd2c3a804 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Fri, 12 Jan 2024 10:39:19 +0100 Subject: [PATCH 2/4] Run a single blocking task per grpc call --- nucliadb_node/src/grpc/grpc_reader.rs | 122 +++++++++++++++++++------- nucliadb_node/src/grpc/grpc_writer.rs | 82 ++++++++++------- nucliadb_node/src/shards/metadata.rs | 6 +- 3 files changed, 143 insertions(+), 67 deletions(-) diff --git a/nucliadb_node/src/grpc/grpc_reader.rs b/nucliadb_node/src/grpc/grpc_reader.rs index a6a738cb11..0391542d3a 100644 --- a/nucliadb_node/src/grpc/grpc_reader.rs +++ b/nucliadb_node/src/grpc/grpc_reader.rs @@ -52,7 +52,7 @@ impl NodeReaderGRPCDriver { pub async fn initialize(&self) -> NodeResult<()> { if !self.settings.lazy_loading() { // If lazy loading is disabled, load - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); tokio::task::spawn_blocking(move || shards.load_all()).await?? } Ok(()) @@ -64,7 +64,7 @@ impl NodeReaderGRPCDriver { return Ok(shard); } let id_clone = id.clone(); - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) .await .map_err(|error| { @@ -81,6 +81,26 @@ impl NodeReaderGRPCDriver { } } +fn obtain_shard( + shards: Arc, + id: impl Into, +) -> Result, tonic::Status> { + let id = id.into(); + if let Some(shard) = shards.get(id.clone()) { + return Ok(shard); + } + let id_clone = id.clone(); + let shards = shards.clone(); + let shard = shards.load(id_clone).map_err(|error| { + if error.is::() { + tonic::Status::not_found(error.to_string()) + } else { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + } + })?; + Ok(shard) +} + pub struct GrpcStreaming(T); impl futures_core::Stream for GrpcStreaming { @@ -164,9 +184,12 @@ impl NodeReader for NodeReaderGRPCDriver { Some(ref shard_id) => shard_id.id.clone(), None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")), }; - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "get shard"); - let task = || run_with_telemetry(info, move || shard.get_info(&request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_info(&request)) + }; let shard_info = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -183,9 +206,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let search_request = request.into_inner(); let shard_id = search_request.shard.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "search"); - let task = || run_with_telemetry(info, move || shard.search(search_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.search(search_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -202,9 +228,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let suggest_request = request.into_inner(); let shard_id = suggest_request.shard.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "suggest"); - let task = || run_with_telemetry(info, move || shard.suggest(suggest_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.suggest(suggest_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -221,9 +250,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let vector_request = request.into_inner(); let shard_id = vector_request.id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "vector search"); - let task = || run_with_telemetry(info, move || shard.vector_search(vector_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.vector_search(vector_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -240,9 +272,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let relation_request = request.into_inner(); let shard_id = relation_request.shard_id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "relations search"); - let task = || run_with_telemetry(info, move || shard.relation_search(relation_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.relation_search(relation_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -259,9 +294,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let document_request = request.into_inner(); let shard_id = document_request.id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "document search"); - let task = || run_with_telemetry(info, move || shard.document_search(document_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.document_search(document_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -278,9 +316,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let paragraph_request = request.into_inner(); let shard_id = paragraph_request.id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "paragraph search"); - let task = || run_with_telemetry(info, move || shard.paragraph_search(paragraph_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.paragraph_search(paragraph_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -296,9 +337,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "document ids"); - let task = || run_with_telemetry(info, move || shard.get_text_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_text_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -314,9 +358,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "paragraph ids"); - let task = || run_with_telemetry(info, move || shard.get_paragraphs_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_paragraphs_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -332,9 +379,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "vector ids"); - let task = || run_with_telemetry(info, move || shard.get_vectors_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_vectors_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -351,9 +401,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "relation ids"); - let task = || run_with_telemetry(info, move || shard.get_relations_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_relations_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -369,9 +422,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "relation edges"); - let task = || run_with_telemetry(info, move || shard.get_relations_edges()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_relations_edges()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -387,9 +443,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "relation types"); - let task = || run_with_telemetry(info, move || shard.get_relations_types()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_relations_types()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -405,9 +464,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().shard_id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "get shard files"); - let task = || run_with_telemetry(info, move || shard.get_shard_files()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_shard_files()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; diff --git a/nucliadb_node/src/grpc/grpc_writer.rs b/nucliadb_node/src/grpc/grpc_writer.rs index ad8c77d1ab..e428c133b6 100644 --- a/nucliadb_node/src/grpc/grpc_writer.rs +++ b/nucliadb_node/src/grpc/grpc_writer.rs @@ -67,7 +67,7 @@ impl NodeWriterGRPCDriver { pub async fn initialize(&self) -> NodeResult<()> { if !self.settings.lazy_loading() { // If lazy loading is disabled, load - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); tokio::task::spawn_blocking(move || shards.load_all()).await?? } Ok(()) @@ -80,28 +80,6 @@ impl NodeWriterGRPCDriver { } } - async fn obtain_shard(&self, id: impl Into) -> Result, tonic::Status> { - let id = id.into(); - if let Some(shard) = self.shards.get(id.clone()) { - return Ok(shard); - } - let id_clone = id.clone(); - let shards = self.shards.clone(); - let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) - .await - .map_err(|error| { - tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) - })?; - let shard = shard.map_err(|error| { - if error.is::() { - tonic::Status::not_found(error.to_string()) - } else { - tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) - } - })?; - Ok(shard) - } - #[tracing::instrument(skip_all)] fn emit_event(&self, event: NodeWriterEvent) { if let Some(sender) = &self.sender { @@ -110,6 +88,26 @@ impl NodeWriterGRPCDriver { } } +fn obtain_shard( + shards: Arc, + id: impl Into, +) -> Result, tonic::Status> { + let id = id.into(); + if let Some(shard) = shards.get(id.clone()) { + return Ok(shard); + } + let id_clone = id.clone(); + let shards = shards.clone(); + let shard = shards.load(id_clone).map_err(|error| { + if error.is::() { + tonic::Status::not_found(error.to_string()) + } else { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + } + })?; + Ok(shard) +} + #[tonic::async_trait] impl NodeWriter for NodeWriterGRPCDriver { async fn new_shard( @@ -128,7 +126,7 @@ impl NodeWriter for NodeWriterGRPCDriver { Some(Channel::from(request.release_channel)), ); - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let new_shard = tokio::task::spawn_blocking(move || shards.create(metadata)) .await .map_err(|error| tonic::Status::internal(format!("Error creating shard: {error:?}")))?; @@ -153,7 +151,7 @@ impl NodeWriter for NodeWriterGRPCDriver { // Deletion does not require for the shard to be loaded. let shard_id = request.into_inner(); - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let shard_id_clone = shard_id.id.clone(); let deleted = tokio::task::spawn_blocking(move || shards.delete(shard_id_clone)) .await @@ -181,7 +179,7 @@ impl NodeWriter for NodeWriterGRPCDriver { let shard_id = request.into_inner().id; // No need to load shard to upgrade it - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let shard_id_clone = shard_id.clone(); let upgraded = tokio::task::spawn_blocking(move || shards.upgrade(shard_id_clone)) .await @@ -213,10 +211,12 @@ impl NodeWriter for NodeWriterGRPCDriver { let span = Span::current(); let resource = request.into_inner(); let shard_id = resource.shard_id.clone(); - let shard = self.obtain_shard(&shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "set resource"); let write_task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .set_resource(&resource) .and_then(|()| shard.get_opstatus()) @@ -253,10 +253,12 @@ impl NodeWriter for NodeWriterGRPCDriver { let span = Span::current(); let resource = request.into_inner(); let shard_id = resource.shard_id.clone(); - let shard = self.obtain_shard(&shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "remove resource"); let write_task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .remove_resource(&resource) .and_then(|()| shard.get_opstatus()) @@ -305,10 +307,12 @@ impl NodeWriter for NodeWriterGRPCDriver { None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")), }; - let shard = self.obtain_shard(shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "add vector set"); let task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .add_vectorset(&vectorset_id, request.similarity()) .and_then(|()| shard.get_opstatus()) @@ -337,10 +341,12 @@ impl NodeWriter for NodeWriterGRPCDriver { Some(ref shard_id) => &shard_id.id, None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")), }; - let shard = self.obtain_shard(shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "remove vector set"); let task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .remove_vectorset(&request) .and_then(|()| shard.get_opstatus()) @@ -365,9 +371,13 @@ impl NodeWriter for NodeWriterGRPCDriver { ) -> Result, Status> { let span = Span::current(); let shard_id = request.into_inner(); - let shard = self.obtain_shard(shard_id.id.clone()).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.id.clone(); let info = info_span!(parent: &span, "list vector sets"); - let task = || run_with_telemetry(info, move || shard.list_vectorsets()); + let task = || { + let shard = obtain_shard(shards, shard_id_clone)?; + run_with_telemetry(info, move || shard.list_vectorsets()) + }; let status = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -401,10 +411,14 @@ impl NodeWriter for NodeWriterGRPCDriver { ) -> Result, Status> { send_analytics_event(AnalyticsEvent::GarbageCollect).await; let shard_id = request.into_inner(); - let shard = self.obtain_shard(&shard_id.id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.id.clone(); let span = Span::current(); let info = info_span!(parent: &span, "list vector sets"); - let task = || run_with_telemetry(info, move || shard.gc()); + let task = || { + let shard = obtain_shard(shards, shard_id_clone)?; + run_with_telemetry(info, move || shard.gc()) + }; let result = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; diff --git a/nucliadb_node/src/shards/metadata.rs b/nucliadb_node/src/shards/metadata.rs index 9c6cf36e8f..9586f6c73e 100644 --- a/nucliadb_node/src/shards/metadata.rs +++ b/nucliadb_node/src/shards/metadata.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use std::fs::File; use std::io::{BufReader, BufWriter, Write}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use nucliadb_core::{node_error, protos, Channel, NodeResult}; @@ -132,7 +132,7 @@ impl ShardMetadata { generation_id: RwLock::new(None), } } - pub fn exists(shard_path: &PathBuf) -> bool { + pub fn exists(shard_path: &Path) -> bool { let metadata_path = shard_path.join(disk_structure::METADATA_FILE); metadata_path.exists() } @@ -275,7 +275,7 @@ mod test { #[test] fn open_empty() { let dir = TempDir::new().unwrap(); - assert!(!ShardMetadata::exists(&dir.path().to_path_buf())); + assert!(!ShardMetadata::exists(dir.path())); let meta = ShardMetadata::open(dir.path().to_path_buf()); assert!(meta.is_err()); } From 027de1868d19585e5b5bbcc338db5aa30214cd37 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Fri, 12 Jan 2024 12:50:38 +0100 Subject: [PATCH 3/4] Remove shard cache traits --- nucliadb_node/src/grpc/grpc_reader.rs | 17 ++----- nucliadb_node/src/grpc/grpc_writer.rs | 1 - nucliadb_node/src/replication/replicator.rs | 5 +- nucliadb_node/src/replication/service.rs | 1 - nucliadb_node/src/shards/providers/mod.rs | 3 -- .../src/shards/providers/provider_traits.rs | 48 ------------------- .../unbounded_cache/unbounded_reader.rs | 13 ++--- .../unbounded_cache/unbounded_writer.rs | 17 +++---- nucliadb_node/tests/test_replication.rs | 1 - nucliadb_node_binding/src/reader.rs | 1 - nucliadb_node_binding/src/writer.rs | 1 - 11 files changed, 18 insertions(+), 90 deletions(-) delete mode 100644 nucliadb_node/src/shards/providers/provider_traits.rs diff --git a/nucliadb_node/src/grpc/grpc_reader.rs b/nucliadb_node/src/grpc/grpc_reader.rs index 0391542d3a..15062410ae 100644 --- a/nucliadb_node/src/grpc/grpc_reader.rs +++ b/nucliadb_node/src/grpc/grpc_reader.rs @@ -30,7 +30,6 @@ use Shard as ShardPB; use crate::settings::Settings; use crate::shards::errors::ShardNotFoundError; use crate::shards::providers::unbounded_cache::UnboundedShardReaderCache; -use crate::shards::providers::ShardReaderProvider; use crate::shards::reader::{ShardFileChunkIterator, ShardReader}; use crate::telemetry::run_with_telemetry; @@ -60,24 +59,14 @@ impl NodeReaderGRPCDriver { async fn obtain_shard(&self, id: impl Into) -> Result, tonic::Status> { let id = id.into(); - if let Some(shard) = self.shards.get(id.clone()) { - return Ok(shard); - } let id_clone = id.clone(); let shards = Arc::clone(&self.shards); - let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) + + tokio::task::spawn_blocking(move || obtain_shard(shards, id_clone)) .await .map_err(|error| { tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) - })?; - let shard = shard.map_err(|error| { - if error.is::() { - tonic::Status::not_found(error.to_string()) - } else { - tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) - } - })?; - Ok(shard) + })? } } diff --git a/nucliadb_node/src/grpc/grpc_writer.rs b/nucliadb_node/src/grpc/grpc_writer.rs index e428c133b6..a0798dc012 100644 --- a/nucliadb_node/src/grpc/grpc_writer.rs +++ b/nucliadb_node/src/grpc/grpc_writer.rs @@ -37,7 +37,6 @@ use crate::settings::Settings; use crate::shards::errors::ShardNotFoundError; use crate::shards::metadata::ShardMetadata; use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache; -use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::telemetry::run_with_telemetry; use crate::utils::list_shards; diff --git a/nucliadb_node/src/replication/replicator.rs b/nucliadb_node/src/replication/replicator.rs index 91ee43641d..f59f09a889 100644 --- a/nucliadb_node/src/replication/replicator.rs +++ b/nucliadb_node/src/replication/replicator.rs @@ -36,7 +36,6 @@ use crate::replication::health::ReplicationHealthManager; use crate::settings::Settings; use crate::shards::metadata::ShardMetadata; use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache; -use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::utils::{list_shards, set_primary_node_id}; @@ -176,7 +175,9 @@ impl ReplicateWorkerPool { } pub async fn add(&mut self, worker: F) -> NodeResult<()> - where F: Future> + Send + 'static { + where + F: Future> + Send + 'static, + { let work_lock = Arc::clone(&self.work_lock); let permit = work_lock.acquire_owned().await.unwrap(); diff --git a/nucliadb_node/src/replication/service.rs b/nucliadb_node/src/replication/service.rs index 3f2d4399bb..ac20fb89e6 100644 --- a/nucliadb_node/src/replication/service.rs +++ b/nucliadb_node/src/replication/service.rs @@ -33,7 +33,6 @@ use crate::replication::NodeRole; use crate::settings::Settings; use crate::shards::metadata::Similarity; use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache; -use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::utils::list_shards; pub struct ReplicationServiceGRPCDriver { diff --git a/nucliadb_node/src/shards/providers/mod.rs b/nucliadb_node/src/shards/providers/mod.rs index 89d1d1d8aa..370d4d8774 100644 --- a/nucliadb_node/src/shards/providers/mod.rs +++ b/nucliadb_node/src/shards/providers/mod.rs @@ -23,7 +23,4 @@ //! Providers, for example, can use cache strategies to provide faster readers //! and writers to the users. -mod provider_traits; pub mod unbounded_cache; - -pub use provider_traits::{ShardReaderProvider, ShardWriterProvider}; diff --git a/nucliadb_node/src/shards/providers/provider_traits.rs b/nucliadb_node/src/shards/providers/provider_traits.rs deleted file mode 100644 index ba7e080aa7..0000000000 --- a/nucliadb_node/src/shards/providers/provider_traits.rs +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (C) 2021 Bosutech XXI S.L. -// -// nucliadb is offered under the AGPL v3.0 and as commercial software. -// For commercial licensing, contact us at info@nuclia.com. -// -// AGPL: -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::sync::Arc; - -use nucliadb_core::protos::ShardCleaned; -use nucliadb_core::NodeResult; - -use crate::shards::metadata::ShardMetadata; -use crate::shards::reader::ShardReader; -use crate::shards::writer::ShardWriter; -use crate::shards::ShardId; - -pub trait ShardReaderProvider: Send + Sync { - fn load(&self, id: ShardId) -> NodeResult>; - fn load_all(&self) -> NodeResult<()>; - - fn get(&self, id: ShardId) -> Option>; -} - -pub trait ShardWriterProvider { - fn load(&self, id: ShardId) -> NodeResult>; - fn load_all(&self) -> NodeResult<()>; - - fn create(&self, metadata: ShardMetadata) -> NodeResult>; - fn get(&self, id: ShardId) -> Option>; - fn delete(&self, id: ShardId) -> NodeResult<()>; - - fn upgrade(&self, id: ShardId) -> NodeResult; - - fn get_metadata(&self, id: ShardId) -> Option>; -} diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs index 2e093f4351..6602281b53 100644 --- a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs +++ b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_reader.rs @@ -28,7 +28,6 @@ use crate::disk_structure; use crate::settings::Settings; use crate::shards::errors::ShardNotFoundError; use crate::shards::metadata::ShardMetadata; -use crate::shards::providers::ShardReaderProvider; use crate::shards::reader::ShardReader; use crate::shards::ShardId; @@ -48,17 +47,15 @@ impl UnboundedShardReaderCache { } } - fn read(&self) -> RwLockReadGuard>> { + pub fn read(&self) -> RwLockReadGuard>> { self.cache.read().expect("Poisoned lock while reading") } - fn write(&self) -> RwLockWriteGuard>> { + pub fn write(&self) -> RwLockWriteGuard>> { self.cache.write().expect("Poisoned lock while reading") } -} -impl ShardReaderProvider for UnboundedShardReaderCache { - fn load(&self, id: ShardId) -> NodeResult> { + pub fn load(&self, id: ShardId) -> NodeResult> { let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); let mut cache_writer = self.write(); @@ -82,7 +79,7 @@ impl ShardReaderProvider for UnboundedShardReaderCache { Ok(shard) } - fn load_all(&self) -> NodeResult<()> { + pub fn load_all(&self) -> NodeResult<()> { let mut cache = self.write(); let shards_path = self.shards_path.clone(); @@ -102,7 +99,7 @@ impl ShardReaderProvider for UnboundedShardReaderCache { Ok(()) } - fn get(&self, id: ShardId) -> Option> { + pub fn get(&self, id: ShardId) -> Option> { self.read().get(&id).map(Arc::clone) } } diff --git a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs index 8266f0f5ed..f4d3725451 100644 --- a/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs +++ b/nucliadb_node/src/shards/providers/unbounded_cache/unbounded_writer.rs @@ -29,7 +29,6 @@ use crate::disk_structure; use crate::settings::Settings; use crate::shards::errors::ShardNotFoundError; use crate::shards::metadata::{ShardMetadata, ShardsMetadataManager}; -use crate::shards::providers::ShardWriterProvider; use crate::shards::writer::ShardWriter; use crate::shards::ShardId; @@ -105,10 +104,8 @@ impl UnboundedShardWriterCache { fn write(&self) -> RwLockWriteGuard { self.cache.write().expect("Poisoned lock while reading") } -} -impl ShardWriterProvider for UnboundedShardWriterCache { - fn create(&self, metadata: ShardMetadata) -> NodeResult> { + pub fn create(&self, metadata: ShardMetadata) -> NodeResult> { let shard_id = metadata.id(); let metadata = Arc::new(metadata); let shard = Arc::new(ShardWriter::new(metadata.clone())?); @@ -121,7 +118,7 @@ impl ShardWriterProvider for UnboundedShardWriterCache { Ok(shard) } - fn load(&self, id: ShardId) -> NodeResult> { + pub fn load(&self, id: ShardId) -> NodeResult> { let shard_key = id.clone(); let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id); let mut cache_writer = self.write(); @@ -153,7 +150,7 @@ impl ShardWriterProvider for UnboundedShardWriterCache { } } - fn load_all(&self) -> NodeResult<()> { + pub fn load_all(&self) -> NodeResult<()> { let mut cache = self.write(); let shards_path = self.shards_path.clone(); let metadata_manager = Arc::clone(&self.metadata_manager); @@ -184,7 +181,7 @@ impl ShardWriterProvider for UnboundedShardWriterCache { Ok(()) } - fn get(&self, id: ShardId) -> Option> { + pub fn get(&self, id: ShardId) -> Option> { let cache_reader = self.read(); match cache_reader.get_shard(&id) { ShardCacheStatus::InCache(shard) => Some(shard), @@ -192,7 +189,7 @@ impl ShardWriterProvider for UnboundedShardWriterCache { } } - fn delete(&self, id: ShardId) -> NodeResult<()> { + pub fn delete(&self, id: ShardId) -> NodeResult<()> { let mut cache_writer = self.write(); // First the shard must be marked as being deleted, this way // concurrent tasks can not make the mistake of trying to use it. @@ -230,7 +227,7 @@ impl ShardWriterProvider for UnboundedShardWriterCache { Ok(()) } - fn upgrade(&self, id: ShardId) -> NodeResult { + pub fn upgrade(&self, id: ShardId) -> NodeResult { let mut cache_writer = self.write(); // First the shard must be marked as being deleted, this way // concurrent tasks can not make the mistake of trying to use it. @@ -273,7 +270,7 @@ impl ShardWriterProvider for UnboundedShardWriterCache { Ok(details) } - fn get_metadata(&self, id: ShardId) -> Option> { + pub fn get_metadata(&self, id: ShardId) -> Option> { self.metadata_manager.get(id) } } diff --git a/nucliadb_node/tests/test_replication.rs b/nucliadb_node/tests/test_replication.rs index 73f9a4d8c4..8476590181 100644 --- a/nucliadb_node/tests/test_replication.rs +++ b/nucliadb_node/tests/test_replication.rs @@ -28,7 +28,6 @@ use nucliadb_core::protos::{ ShardId, UserVector, UserVectors, VectorSetId, VectorSimilarity, }; use nucliadb_node::replication::health::ReplicationHealthManager; -use nucliadb_node::shards::providers::ShardWriterProvider; use rstest::*; use tonic::Request; diff --git a/nucliadb_node_binding/src/reader.rs b/nucliadb_node_binding/src/reader.rs index 55d847d356..71a36ba573 100644 --- a/nucliadb_node_binding/src/reader.rs +++ b/nucliadb_node_binding/src/reader.rs @@ -28,7 +28,6 @@ use nucliadb_node::settings::providers::env::EnvSettingsProvider; use nucliadb_node::settings::providers::SettingsProvider; use nucliadb_node::settings::Settings; use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardReaderCache; -use nucliadb_node::shards::providers::ShardReaderProvider; use nucliadb_node::shards::reader::ShardReader; use prost::Message; use pyo3::exceptions::{PyStopIteration, PyValueError}; diff --git a/nucliadb_node_binding/src/writer.rs b/nucliadb_node_binding/src/writer.rs index b06b45accd..d5ab5b6ef6 100644 --- a/nucliadb_node_binding/src/writer.rs +++ b/nucliadb_node_binding/src/writer.rs @@ -32,7 +32,6 @@ use nucliadb_node::settings::providers::SettingsProvider; use nucliadb_node::settings::Settings; use nucliadb_node::shards::metadata::ShardMetadata; use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardWriterCache; -use nucliadb_node::shards::providers::ShardWriterProvider; use nucliadb_node::shards::writer::ShardWriter; use prost::Message; use pyo3::exceptions::PyValueError; From b0cc3299c8a9d845faeb45bad4aa1bb31684f24e Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Fri, 12 Jan 2024 13:12:25 +0100 Subject: [PATCH 4/4] Format --- nucliadb_node/src/replication/replicator.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nucliadb_node/src/replication/replicator.rs b/nucliadb_node/src/replication/replicator.rs index f59f09a889..d6af5e7748 100644 --- a/nucliadb_node/src/replication/replicator.rs +++ b/nucliadb_node/src/replication/replicator.rs @@ -175,9 +175,7 @@ impl ReplicateWorkerPool { } pub async fn add(&mut self, worker: F) -> NodeResult<()> - where - F: Future> + Send + 'static, - { + where F: Future> + Send + 'static { let work_lock = Arc::clone(&self.work_lock); let permit = work_lock.acquire_owned().await.unwrap();