Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove async cache provider #1720

Merged
merged 5 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions nucliadb_node/src/bin/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::signal::{ctrl_c, unix};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tonic::transport::Server;
type GrpcServer = NodeWriterServer<NodeWriterGRPCDriver>;
use nucliadb_node::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache;
use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use tokio::sync::Notify;

#[derive(Debug)]
Expand Down Expand Up @@ -98,7 +98,7 @@ async fn main() -> NodeResult<()> {
nucliadb_node::analytics::sync::start_analytics_loop();

let (shutdown_notifier, shutdown_notified) = get_shutdown_notifier();
let shard_cache = Arc::new(AsyncUnboundedShardWriterCache::new(settings.clone()));
let shard_cache = Arc::new(UnboundedShardWriterCache::new(settings.clone()));

let mut replication_task = None;
if settings.node_role() == NodeRole::Secondary {
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn wait_for_sigkill(shutdown_notifier: Arc<Notify>) -> NodeResult<()> {

pub async fn start_grpc_service(
settings: Settings,
shard_cache: Arc<AsyncUnboundedShardWriterCache>,
shard_cache: Arc<UnboundedShardWriterCache>,
metadata_sender: UnboundedSender<NodeWriterEvent>,
node_id: String,
shutdown_notifier: Arc<Notify>,
Expand Down
145 changes: 102 additions & 43 deletions nucliadb_node/src/grpc/grpc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ use Shard as ShardPB;

use crate::settings::Settings;
use crate::shards::errors::ShardNotFoundError;
use crate::shards::providers::unbounded_cache::AsyncUnboundedShardReaderCache;
use crate::shards::providers::AsyncShardReaderProvider;
use crate::shards::providers::unbounded_cache::UnboundedShardReaderCache;
use crate::shards::reader::{ShardFileChunkIterator, ShardReader};
use crate::telemetry::run_with_telemetry;

pub struct NodeReaderGRPCDriver {
shards: AsyncUnboundedShardReaderCache,
shards: Arc<UnboundedShardReaderCache>,
settings: Settings,
}

Expand All @@ -44,35 +43,53 @@ impl NodeReaderGRPCDriver {
let cache_settings = settings.clone();
Self {
settings,
shards: AsyncUnboundedShardReaderCache::new(cache_settings),
shards: Arc::new(UnboundedShardReaderCache::new(cache_settings)),
}
}

/// This function must be called before using this service
pub async fn initialize(&self) -> NodeResult<()> {
if !self.settings.lazy_loading() {
// If lazy loading is disabled, load
self.shards.load_all().await?
let shards = Arc::clone(&self.shards);
tokio::task::spawn_blocking(move || shards.load_all()).await??
}
Ok(())
}

async fn obtain_shard(&self, id: impl Into<String>) -> Result<Arc<ShardReader>, tonic::Status> {
let id = id.into();
if let Some(shard) = self.shards.get(id.clone()).await {
return Ok(shard);
}
let shard = self.shards.load(id.clone()).await.map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
let id_clone = id.clone();
let shards = Arc::clone(&self.shards);

tokio::task::spawn_blocking(move || obtain_shard(shards, id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
}
})?;
Ok(shard)
})?
}
}

fn obtain_shard(
shards: Arc<UnboundedShardReaderCache>,
id: impl Into<String>,
) -> Result<Arc<ShardReader>, tonic::Status> {
let id = id.into();
if let Some(shard) = shards.get(id.clone()) {
return Ok(shard);
}
let id_clone = id.clone();
let shards = shards.clone();
let shard = shards.load(id_clone).map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
}
})?;
Ok(shard)
}

pub struct GrpcStreaming<T>(T);

impl futures_core::Stream for GrpcStreaming<ParagraphIterator> {
Expand Down Expand Up @@ -156,9 +173,12 @@ impl NodeReader for NodeReaderGRPCDriver {
Some(ref shard_id) => shard_id.id.clone(),
None => return Err(tonic::Status::invalid_argument("Shard ID must be provided")),
};
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "get shard");
let task = || run_with_telemetry(info, move || shard.get_info(&request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_info(&request))
};
let shard_info = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -175,9 +195,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let search_request = request.into_inner();
let shard_id = search_request.shard.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "search");
let task = || run_with_telemetry(info, move || shard.search(search_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.search(search_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -194,9 +217,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let suggest_request = request.into_inner();
let shard_id = suggest_request.shard.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "suggest");
let task = || run_with_telemetry(info, move || shard.suggest(suggest_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.suggest(suggest_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -213,9 +239,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let vector_request = request.into_inner();
let shard_id = vector_request.id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "vector search");
let task = || run_with_telemetry(info, move || shard.vector_search(vector_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.vector_search(vector_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -232,9 +261,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let relation_request = request.into_inner();
let shard_id = relation_request.shard_id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "relations search");
let task = || run_with_telemetry(info, move || shard.relation_search(relation_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.relation_search(relation_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -251,9 +283,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let document_request = request.into_inner();
let shard_id = document_request.id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "document search");
let task = || run_with_telemetry(info, move || shard.document_search(document_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.document_search(document_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -270,9 +305,12 @@ impl NodeReader for NodeReaderGRPCDriver {
let span = Span::current();
let paragraph_request = request.into_inner();
let shard_id = paragraph_request.id.clone();
let shard = self.obtain_shard(shard_id).await?;
let info = info_span!(parent: &span, "paragraph search");
let task = || run_with_telemetry(info, move || shard.paragraph_search(paragraph_request));
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.paragraph_search(paragraph_request))
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -288,9 +326,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "document ids");
let task = || run_with_telemetry(info, move || shard.get_text_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_text_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -306,9 +347,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "paragraph ids");
let task = || run_with_telemetry(info, move || shard.get_paragraphs_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_paragraphs_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -324,9 +368,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "vector ids");
let task = || run_with_telemetry(info, move || shard.get_vectors_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_vectors_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -343,9 +390,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<IdCollection>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "relation ids");
let task = || run_with_telemetry(info, move || shard.get_relations_keys());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_relations_keys())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -361,9 +411,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<EdgeList>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "relation edges");
let task = || run_with_telemetry(info, move || shard.get_relations_edges());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_relations_edges())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -379,9 +432,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<TypeList>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "relation types");
let task = || run_with_telemetry(info, move || shard.get_relations_types());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_relations_types())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand All @@ -397,9 +453,12 @@ impl NodeReader for NodeReaderGRPCDriver {
) -> Result<tonic::Response<ShardFileList>, tonic::Status> {
let span = Span::current();
let shard_id = request.into_inner().shard_id;
let shard = self.obtain_shard(shard_id.clone()).await?;
let info = info_span!(parent: &span, "get shard files");
let task = || run_with_telemetry(info, move || shard.get_shard_files());
let shards = Arc::clone(&self.shards);
let task = move || {
let shard = obtain_shard(shards, shard_id)?;
run_with_telemetry(info, move || shard.get_shard_files())
};
let response = tokio::task::spawn_blocking(task).await.map_err(|error| {
tonic::Status::internal(format!("Blocking task panicked: {error:?}"))
})?;
Expand Down
Loading
Loading