Skip to content

Commit

Permalink
Run a single blocking task per grpc call
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino committed Jan 12, 2024
1 parent 3300d31 commit 91f1367
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 66 deletions.
122 changes: 92 additions & 30 deletions nucliadb_node/src/grpc/grpc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl NodeReaderGRPCDriver {
pub async fn initialize(&self) -> NodeResult<()> {
if !self.settings.lazy_loading() {
// If lazy loading is disabled, load
let shards = self.shards.clone();
let shards = Arc::clone(&self.shards);
tokio::task::spawn_blocking(move || shards.load_all()).await??
}
Ok(())
Expand All @@ -64,7 +64,7 @@ impl NodeReaderGRPCDriver {
return Ok(shard);
}
let id_clone = id.clone();
let shards = self.shards.clone();
let shards = Arc::clone(&self.shards);
let shard = tokio::task::spawn_blocking(move || shards.load(id_clone))
.await
.map_err(|error| {
Expand All @@ -81,6 +81,26 @@ impl NodeReaderGRPCDriver {
}
}

fn obtain_shard(
shards: Arc<UnboundedShardReaderCache>,
id: impl Into<String>,
) -> Result<Arc<ShardReader>, tonic::Status> {
let id = id.into();
if let Some(shard) = shards.get(id.clone()) {
return Ok(shard);
}
let id_clone = id.clone();
let shards = shards.clone();
let shard = shards.load(id_clone).map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
}
})?;
Ok(shard)
}

pub struct GrpcStreaming<T>(T);

impl futures_core::Stream for GrpcStreaming<ParagraphIterator> {
Expand Down Expand Up @@ -164,9 +184,12 @@ impl NodeReader for NodeReaderGRPCDriver {
Some(ref shard_id) => shard_id.id.clone(),
None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")),
};
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "get shard");
let task = || run_with_telemetry(info, move || shard.get_info(&request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_info(&request))
};
let shard_info = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -183,9 +206,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let search_request = request.into_inner();
let shard_id = search_request.shard.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "search");
let task = || run_with_telemetry(info, move || shard.search(search_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.search(search_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -202,9 +228,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let suggest_request = request.into_inner();
let shard_id = suggest_request.shard.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "suggest");
let task = || run_with_telemetry(info, move || shard.suggest(suggest_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.suggest(suggest_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -221,9 +250,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let vector_request = request.into_inner();
let shard_id = vector_request.id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "vector search");
let task = || run_with_telemetry(info, move || shard.vector_search(vector_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.vector_search(vector_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -240,9 +272,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let relation_request = request.into_inner();
let shard_id = relation_request.shard_id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "relations search");
let task = || run_with_telemetry(info, move || shard.relation_search(relation_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.relation_search(relation_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -259,9 +294,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let document_request = request.into_inner();
let shard_id = document_request.id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "document search");
let task = || run_with_telemetry(info, move || shard.document_search(document_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.document_search(document_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -278,9 +316,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let paragraph_request = request.into_inner();
let shard_id = paragraph_request.id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "paragraph search");
let task = || run_with_telemetry(info, move || shard.paragraph_search(paragraph_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.paragraph_search(paragraph_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -296,9 +337,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "document ids");
let task = || run_with_telemetry(info, move || shard.get_text_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_text_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -314,9 +358,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "paragraph ids");
let task = || run_with_telemetry(info, move || shard.get_paragraphs_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_paragraphs_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -332,9 +379,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "vector ids");
let task = || run_with_telemetry(info, move || shard.get_vectors_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_vectors_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -351,9 +401,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "relation ids");
let task = || run_with_telemetry(info, move || shard.get_relations_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_relations_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -369,9 +422,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<EdgeList>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "relation edges");
let task = || run_with_telemetry(info, move || shard.get_relations_edges());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_relations_edges())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -387,9 +443,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<TypeList>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "relation types");
let task = || run_with_telemetry(info, move || shard.get_relations_types());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_relations_types())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -405,9 +464,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<ShardFileList>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().shard_id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "get shard files");
let task = || run_with_telemetry(info, move || shard.get_shard_files());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_shard_files())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand Down
Loading

0 comments on commit 91f1367

Please sign in to comment.