From 91f13673bd460ce755d265d18e5d24c4e3c786d0 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Fri, 12 Jan 2024 10:39:19 +0100 Subject: [PATCH] Run a single blocking task per grpc call --- nucliadb_node/src/grpc/grpc_reader.rs | 122 +++++++++++++++++++------- nucliadb_node/src/grpc/grpc_writer.rs | 82 ++++++++++------- nucliadb_node/src/shards/metadata.rs | 4 +- 3 files changed, 142 insertions(+), 66 deletions(-) diff --git a/nucliadb_node/src/grpc/grpc_reader.rs b/nucliadb_node/src/grpc/grpc_reader.rs index a6a738cb11..0391542d3a 100644 --- a/nucliadb_node/src/grpc/grpc_reader.rs +++ b/nucliadb_node/src/grpc/grpc_reader.rs @@ -52,7 +52,7 @@ impl NodeReaderGRPCDriver { pub async fn initialize(&self) -> NodeResult<()> { if !self.settings.lazy_loading() { // If lazy loading is disabled, load - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); tokio::task::spawn_blocking(move || shards.load_all()).await?? } Ok(()) @@ -64,7 +64,7 @@ impl NodeReaderGRPCDriver { return Ok(shard); } let id_clone = id.clone(); - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) .await .map_err(|error| { @@ -81,6 +81,26 @@ impl NodeReaderGRPCDriver { } } +fn obtain_shard( + shards: Arc, + id: impl Into, +) -> Result, tonic::Status> { + let id = id.into(); + if let Some(shard) = shards.get(id.clone()) { + return Ok(shard); + } + let id_clone = id.clone(); + let shards = shards.clone(); + let shard = shards.load(id_clone).map_err(|error| { + if error.is::() { + tonic::Status::not_found(error.to_string()) + } else { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + } + })?; + Ok(shard) +} + pub struct GrpcStreaming(T); impl futures_core::Stream for GrpcStreaming { @@ -164,9 +184,12 @@ impl NodeReader for NodeReaderGRPCDriver { Some(ref shard_id) => shard_id.id.clone(), None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")), }; - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "get shard"); - let task = || run_with_telemetry(info, move || shard.get_info(&request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_info(&request)) + }; let shard_info = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -183,9 +206,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let search_request = request.into_inner(); let shard_id = search_request.shard.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "search"); - let task = || run_with_telemetry(info, move || shard.search(search_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.search(search_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -202,9 +228,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let suggest_request = request.into_inner(); let shard_id = suggest_request.shard.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "suggest"); - let task = || run_with_telemetry(info, move || shard.suggest(suggest_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.suggest(suggest_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -221,9 +250,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let vector_request = request.into_inner(); let shard_id = vector_request.id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "vector search"); - let task = || run_with_telemetry(info, move || shard.vector_search(vector_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.vector_search(vector_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -240,9 +272,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let relation_request = request.into_inner(); let shard_id = relation_request.shard_id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "relations search"); - let task = || run_with_telemetry(info, move || shard.relation_search(relation_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.relation_search(relation_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -259,9 +294,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let document_request = request.into_inner(); let shard_id = document_request.id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "document search"); - let task = || run_with_telemetry(info, move || shard.document_search(document_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.document_search(document_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -278,9 +316,12 @@ impl NodeReader for NodeReaderGRPCDriver { let span = Span::current(); let paragraph_request = request.into_inner(); let shard_id = paragraph_request.id.clone(); - let shard = self.obtain_shard(shard_id).await?; let info = info_span!(parent: &span, "paragraph search"); - let task = || run_with_telemetry(info, move || shard.paragraph_search(paragraph_request)); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.paragraph_search(paragraph_request)) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -296,9 +337,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "document ids"); - let task = || run_with_telemetry(info, move || shard.get_text_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_text_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -314,9 +358,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "paragraph ids"); - let task = || run_with_telemetry(info, move || shard.get_paragraphs_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_paragraphs_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -332,9 +379,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "vector ids"); - let task = || run_with_telemetry(info, move || shard.get_vectors_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_vectors_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -351,9 +401,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "relation ids"); - let task = || run_with_telemetry(info, move || shard.get_relations_keys()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_relations_keys()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -369,9 +422,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "relation edges"); - let task = || run_with_telemetry(info, move || shard.get_relations_edges()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_relations_edges()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -387,9 +443,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "relation types"); - let task = || run_with_telemetry(info, move || shard.get_relations_types()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_relations_types()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -405,9 +464,12 @@ impl NodeReader for NodeReaderGRPCDriver { ) -> Result, tonic::Status> { let span = Span::current(); let shard_id = request.into_inner().shard_id; - let shard = self.obtain_shard(shard_id.clone()).await?; let info = info_span!(parent: &span, "get shard files"); - let task = || run_with_telemetry(info, move || shard.get_shard_files()); + let shards = Arc::clone(&self.shards); + let task = move || { + let shard = obtain_shard(shards, shard_id)?; + run_with_telemetry(info, move || shard.get_shard_files()) + }; let response = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; diff --git a/nucliadb_node/src/grpc/grpc_writer.rs b/nucliadb_node/src/grpc/grpc_writer.rs index ad8c77d1ab..e428c133b6 100644 --- a/nucliadb_node/src/grpc/grpc_writer.rs +++ b/nucliadb_node/src/grpc/grpc_writer.rs @@ -67,7 +67,7 @@ impl NodeWriterGRPCDriver { pub async fn initialize(&self) -> NodeResult<()> { if !self.settings.lazy_loading() { // If lazy loading is disabled, load - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); tokio::task::spawn_blocking(move || shards.load_all()).await?? } Ok(()) @@ -80,28 +80,6 @@ impl NodeWriterGRPCDriver { } } - async fn obtain_shard(&self, id: impl Into) -> Result, tonic::Status> { - let id = id.into(); - if let Some(shard) = self.shards.get(id.clone()) { - return Ok(shard); - } - let id_clone = id.clone(); - let shards = self.shards.clone(); - let shard = tokio::task::spawn_blocking(move || shards.load(id_clone)) - .await - .map_err(|error| { - tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) - })?; - let shard = shard.map_err(|error| { - if error.is::() { - tonic::Status::not_found(error.to_string()) - } else { - tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) - } - })?; - Ok(shard) - } - #[tracing::instrument(skip_all)] fn emit_event(&self, event: NodeWriterEvent) { if let Some(sender) = &self.sender { @@ -110,6 +88,26 @@ impl NodeWriterGRPCDriver { } } +fn obtain_shard( + shards: Arc, + id: impl Into, +) -> Result, tonic::Status> { + let id = id.into(); + if let Some(shard) = shards.get(id.clone()) { + return Ok(shard); + } + let id_clone = id.clone(); + let shards = shards.clone(); + let shard = shards.load(id_clone).map_err(|error| { + if error.is::() { + tonic::Status::not_found(error.to_string()) + } else { + tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}")) + } + })?; + Ok(shard) +} + #[tonic::async_trait] impl NodeWriter for NodeWriterGRPCDriver { async fn new_shard( @@ -128,7 +126,7 @@ impl NodeWriter for NodeWriterGRPCDriver { Some(Channel::from(request.release_channel)), ); - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let new_shard = tokio::task::spawn_blocking(move || shards.create(metadata)) .await .map_err(|error| tonic::Status::internal(format!("Error creating shard: {error:?}")))?; @@ -153,7 +151,7 @@ impl NodeWriter for NodeWriterGRPCDriver { // Deletion does not require for the shard to be loaded. let shard_id = request.into_inner(); - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let shard_id_clone = shard_id.id.clone(); let deleted = tokio::task::spawn_blocking(move || shards.delete(shard_id_clone)) .await @@ -181,7 +179,7 @@ impl NodeWriter for NodeWriterGRPCDriver { let shard_id = request.into_inner().id; // No need to load shard to upgrade it - let shards = self.shards.clone(); + let shards = Arc::clone(&self.shards); let shard_id_clone = shard_id.clone(); let upgraded = tokio::task::spawn_blocking(move || shards.upgrade(shard_id_clone)) .await @@ -213,10 +211,12 @@ impl NodeWriter for NodeWriterGRPCDriver { let span = Span::current(); let resource = request.into_inner(); let shard_id = resource.shard_id.clone(); - let shard = self.obtain_shard(&shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "set resource"); let write_task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .set_resource(&resource) .and_then(|()| shard.get_opstatus()) @@ -253,10 +253,12 @@ impl NodeWriter for NodeWriterGRPCDriver { let span = Span::current(); let resource = request.into_inner(); let shard_id = resource.shard_id.clone(); - let shard = self.obtain_shard(&shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "remove resource"); let write_task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .remove_resource(&resource) .and_then(|()| shard.get_opstatus()) @@ -305,10 +307,12 @@ impl NodeWriter for NodeWriterGRPCDriver { None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")), }; - let shard = self.obtain_shard(shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "add vector set"); let task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .add_vectorset(&vectorset_id, request.similarity()) .and_then(|()| shard.get_opstatus()) @@ -337,10 +341,12 @@ impl NodeWriter for NodeWriterGRPCDriver { Some(ref shard_id) => &shard_id.id, None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")), }; - let shard = self.obtain_shard(shard_id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.clone(); let info = info_span!(parent: &span, "remove vector set"); let task = || { run_with_telemetry(info, move || { + let shard = obtain_shard(shards, shard_id_clone)?; shard .remove_vectorset(&request) .and_then(|()| shard.get_opstatus()) @@ -365,9 +371,13 @@ impl NodeWriter for NodeWriterGRPCDriver { ) -> Result, Status> { let span = Span::current(); let shard_id = request.into_inner(); - let shard = self.obtain_shard(shard_id.id.clone()).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.id.clone(); let info = info_span!(parent: &span, "list vector sets"); - let task = || run_with_telemetry(info, move || shard.list_vectorsets()); + let task = || { + let shard = obtain_shard(shards, shard_id_clone)?; + run_with_telemetry(info, move || shard.list_vectorsets()) + }; let status = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; @@ -401,10 +411,14 @@ impl NodeWriter for NodeWriterGRPCDriver { ) -> Result, Status> { send_analytics_event(AnalyticsEvent::GarbageCollect).await; let shard_id = request.into_inner(); - let shard = self.obtain_shard(&shard_id.id).await?; + let shards = Arc::clone(&self.shards); + let shard_id_clone = shard_id.id.clone(); let span = Span::current(); let info = info_span!(parent: &span, "list vector sets"); - let task = || run_with_telemetry(info, move || shard.gc()); + let task = || { + let shard = obtain_shard(shards, shard_id_clone)?; + run_with_telemetry(info, move || shard.gc()) + }; let result = tokio::task::spawn_blocking(task).await.map_err(|error| { tonic::Status::internal(format!("Blocking task panicked: {error:?}")) })?; diff --git a/nucliadb_node/src/shards/metadata.rs b/nucliadb_node/src/shards/metadata.rs index 9c6cf36e8f..be389b456a 100644 --- a/nucliadb_node/src/shards/metadata.rs +++ b/nucliadb_node/src/shards/metadata.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use std::fs::File; use std::io::{BufReader, BufWriter, Write}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use nucliadb_core::{node_error, protos, Channel, NodeResult}; @@ -132,7 +132,7 @@ impl ShardMetadata { generation_id: RwLock::new(None), } } - pub fn exists(shard_path: &PathBuf) -> bool { + pub fn exists(shard_path: &Path) -> bool { let metadata_path = shard_path.join(disk_structure::METADATA_FILE); metadata_path.exists() }