Skip to content

Commit

Permalink
Remove async cache provider
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino committed Jan 12, 2024
1 parent 4589798 commit 3300d31
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 576 deletions.
6 changes: 3 additions & 3 deletions nucliadb_node/src/bin/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::signal::{ctrl_c, unix};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tonic::transport::Server;
type GrpcServer = NodeWriterServer<NodeWriterGRPCDriver>;
use nucliadb_node::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache;
use nucliadb_node::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use tokio::sync::Notify;

#[derive(Debug)]
Expand Down Expand Up @@ -98,7 +98,7 @@ async fn main() -> NodeResult<()> {
nucliadb_node::analytics::sync::start_analytics_loop();

let (shutdown_notifier, shutdown_notified) = get_shutdown_notifier();
let shard_cache = Arc::new(AsyncUnboundedShardWriterCache::new(settings.clone()));
let shard_cache = Arc::new(UnboundedShardWriterCache::new(settings.clone()));

let mut replication_task = None;
if settings.node_role() == NodeRole::Secondary {
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn wait_for_sigkill(shutdown_notifier: Arc<Notify>) -> NodeResult<()> {

pub async fn start_grpc_service(
settings: Settings,
shard_cache: Arc<AsyncUnboundedShardWriterCache>,
shard_cache: Arc<UnboundedShardWriterCache>,
metadata_sender: UnboundedSender<NodeWriterEvent>,
node_id: String,
shutdown_notifier: Arc<Notify>,
Expand Down
22 changes: 15 additions & 7 deletions nucliadb_node/src/grpc/grpc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ use Shard as ShardPB;

use crate::settings::Settings;
use crate::shards::errors::ShardNotFoundError;
use crate::shards::providers::unbounded_cache::AsyncUnboundedShardReaderCache;
use crate::shards::providers::AsyncShardReaderProvider;
use crate::shards::providers::unbounded_cache::UnboundedShardReaderCache;
use crate::shards::providers::ShardReaderProvider;
use crate::shards::reader::{ShardFileChunkIterator, ShardReader};
use crate::telemetry::run_with_telemetry;

pub struct NodeReaderGRPCDriver {
shards: AsyncUnboundedShardReaderCache,
shards: Arc<UnboundedShardReaderCache>,
settings: Settings,
}

Expand All @@ -44,25 +44,33 @@ impl NodeReaderGRPCDriver {
let cache_settings = settings.clone();
Self {
settings,
shards: AsyncUnboundedShardReaderCache::new(cache_settings),
shards: Arc::new(UnboundedShardReaderCache::new(cache_settings)),
}
}

/// This function must be called before using this service
pub async fn initialize(&self) -> NodeResult<()> {
if !self.settings.lazy_loading() {
// If lazy loading is disabled, load
self.shards.load_all().await?
let shards = self.shards.clone();
tokio::task::spawn_blocking(move || shards.load_all()).await??
}
Ok(())
}

async fn obtain_shard(&self, id: impl Into<String>) -> Result<Arc<ShardReader>, tonic::Status> {
let id = id.into();
if let Some(shard) = self.shards.get(id.clone()).await {
if let Some(shard) = self.shards.get(id.clone()) {
return Ok(shard);
}
let shard = self.shards.load(id.clone()).await.map_err(|error| {
let id_clone = id.clone();
let shards = self.shards.clone();
let shard = tokio::task::spawn_blocking(move || shards.load(id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
})?;
let shard = shard.map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
Expand Down
50 changes: 40 additions & 10 deletions nucliadb_node/src/grpc/grpc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ use crate::analytics::sync::send_analytics_event;
use crate::settings::Settings;
use crate::shards::errors::ShardNotFoundError;
use crate::shards::metadata::ShardMetadata;
use crate::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache;
use crate::shards::providers::AsyncShardWriterProvider;
use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use crate::shards::providers::ShardWriterProvider;
use crate::shards::writer::ShardWriter;
use crate::telemetry::run_with_telemetry;
use crate::utils::list_shards;

pub struct NodeWriterGRPCDriver {
shards: Arc<AsyncUnboundedShardWriterCache>,
shards: Arc<UnboundedShardWriterCache>,
sender: Option<UnboundedSender<NodeWriterEvent>>,
settings: Settings,
}
Expand All @@ -55,7 +55,7 @@ pub enum NodeWriterEvent {
}

impl NodeWriterGRPCDriver {
pub fn new(settings: Settings, shard_cache: Arc<AsyncUnboundedShardWriterCache>) -> Self {
pub fn new(settings: Settings, shard_cache: Arc<UnboundedShardWriterCache>) -> Self {
Self {
settings,
shards: shard_cache,
Expand All @@ -67,7 +67,8 @@ impl NodeWriterGRPCDriver {
pub async fn initialize(&self) -> NodeResult<()> {
if !self.settings.lazy_loading() {
// If lazy loading is disabled, load
self.shards.load_all().await?
let shards = self.shards.clone();
tokio::task::spawn_blocking(move || shards.load_all()).await??
}
Ok(())
}
Expand All @@ -81,10 +82,17 @@ impl NodeWriterGRPCDriver {

async fn obtain_shard(&self, id: impl Into<String>) -> Result<Arc<ShardWriter>, tonic::Status> {
let id = id.into();
if let Some(shard) = self.shards.get(id.clone()).await {
if let Some(shard) = self.shards.get(id.clone()) {
return Ok(shard);
}
let shard = self.shards.load(id.clone()).await.map_err(|error| {
let id_clone = id.clone();
let shards = self.shards.clone();
let shard = tokio::task::spawn_blocking(move || shards.load(id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
})?;
let shard = shard.map_err(|error| {
if error.is::<ShardNotFoundError>() {
tonic::Status::not_found(error.to_string())
} else {
Expand Down Expand Up @@ -119,7 +127,12 @@ impl NodeWriter for NodeWriterGRPCDriver {
request.similarity().into(),
Some(Channel::from(request.release_channel)),
);
let new_shard = self.shards.create(metadata).await;

let shards = self.shards.clone();
let new_shard = tokio::task::spawn_blocking(move || shards.create(metadata))
.await
.map_err(|error| tonic::Status::internal(format!("Error creating shard: {error:?}")))?;

match new_shard {
Ok(new_shard) => {
self.emit_event(NodeWriterEvent::ShardCreation);
Expand All @@ -139,7 +152,15 @@ impl NodeWriter for NodeWriterGRPCDriver {
send_analytics_event(AnalyticsEvent::Delete).await;
// Deletion does not require for the shard to be loaded.
let shard_id = request.into_inner();
let deleted = self.shards.delete(shard_id.id.clone()).await;

let shards = self.shards.clone();
let shard_id_clone = shard_id.id.clone();
let deleted = tokio::task::spawn_blocking(move || shards.delete(shard_id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error deleted shard {}: {error:?}", shard_id.id))
})?;

match deleted {
Ok(_) => {
self.emit_event(NodeWriterEvent::ShardDeletion);
Expand All @@ -158,8 +179,17 @@ impl NodeWriter for NodeWriterGRPCDriver {
request: Request<ShardId>,
) -> Result<Response<ShardCleaned>, Status> {
let shard_id = request.into_inner().id;

// No need to load shard to upgrade it
match self.shards.upgrade(shard_id).await {
let shards = self.shards.clone();
let shard_id_clone = shard_id.clone();
let upgraded = tokio::task::spawn_blocking(move || shards.upgrade(shard_id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error deleted shard {}: {error:?}", shard_id))
})?;

match upgraded {
Ok(upgrade_details) => Ok(tonic::Response::new(upgrade_details)),
Err(error) => Err(tonic::Status::internal(error.to_string())),
}
Expand Down
38 changes: 23 additions & 15 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use tonic::Request;
use crate::replication::health::ReplicationHealthManager;
use crate::settings::Settings;
use crate::shards::metadata::ShardMetadata;
use crate::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache;
use crate::shards::providers::AsyncShardWriterProvider;
use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use crate::shards::providers::ShardWriterProvider;
use crate::shards::writer::ShardWriter;
use crate::utils::{list_shards, set_primary_node_id};

Expand Down Expand Up @@ -201,7 +201,7 @@ impl ReplicateWorkerPool {

pub async fn connect_to_primary_and_replicate(
settings: Settings,
shard_cache: Arc<AsyncUnboundedShardWriterCache>,
shard_cache: Arc<UnboundedShardWriterCache>,
secondary_id: String,
shutdown_notified: Arc<AtomicBool>,
) -> NodeResult<()> {
Expand Down Expand Up @@ -275,18 +275,23 @@ pub async fn connect_to_primary_and_replicate(
let shard_id = shard_state.shard_id.clone();
let shard_lookup;
if existing_shards.contains(&shard_id) {
shard_lookup = shard_cache.load(shard_id.clone()).await;
let id_clone = shard_id.clone();
let shard_cache_clone = Arc::clone(&shard_cache);
shard_lookup =
tokio::task::spawn_blocking(move || shard_cache_clone.load(id_clone)).await?;
} else {
let metadata = ShardMetadata::new(
shards_path.join(shard_id.clone()),
shard_state.shard_id.clone(),
Some(shard_state.kbid.clone()),
shard_state.similarity.clone().into(),
None,
);
let shard_cache_clone = Arc::clone(&shard_cache);

warn!("Creating shard to replicate: {shard_id}");
let shard_create = shard_cache
.create(ShardMetadata::new(
shards_path.join(shard_id.clone()),
shard_state.shard_id.clone(),
Some(shard_state.kbid.clone()),
shard_state.similarity.clone().into(),
None,
))
.await;
let shard_create =
tokio::task::spawn_blocking(move || shard_cache_clone.create(metadata)).await?;
if shard_create.is_err() {
warn!("Failed to create shard: {:?}", shard_create);
continue;
Expand Down Expand Up @@ -326,7 +331,10 @@ pub async fn connect_to_primary_and_replicate(
if !existing_shards.contains(&shard_id) {
continue;
}
let shard_lookup = shard_cache.delete(shard_id.clone()).await;
let id_clone = shard_id.clone();
let shard_cache_clone = shard_cache.clone();
let shard_lookup =
tokio::task::spawn_blocking(move || shard_cache_clone.delete(id_clone)).await?;
if shard_lookup.is_err() {
warn!("Failed to delete shard: {:?}", shard_lookup);
continue;
Expand Down Expand Up @@ -364,7 +372,7 @@ pub async fn connect_to_primary_and_replicate(

pub async fn connect_to_primary_and_replicate_forever(
settings: Settings,
shard_cache: Arc<AsyncUnboundedShardWriterCache>,
shard_cache: Arc<UnboundedShardWriterCache>,
secondary_id: String,
shutdown_notified: Arc<AtomicBool>,
) -> NodeResult<()> {
Expand Down
23 changes: 16 additions & 7 deletions nucliadb_node/src/replication/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ use tonic::Response;
use crate::replication::NodeRole;
use crate::settings::Settings;
use crate::shards::metadata::Similarity;
use crate::shards::providers::unbounded_cache::AsyncUnboundedShardWriterCache;
use crate::shards::providers::AsyncShardWriterProvider;
use crate::shards::providers::unbounded_cache::UnboundedShardWriterCache;
use crate::shards::providers::ShardWriterProvider;
use crate::shards::writer::ShardWriter;
use crate::utils::list_shards;
pub struct ReplicationServiceGRPCDriver {
settings: Settings,
shards: Arc<AsyncUnboundedShardWriterCache>,
shards: Arc<UnboundedShardWriterCache>,
node_id: String,
}

impl ReplicationServiceGRPCDriver {
pub fn new(
settings: Settings,
shard_cache: Arc<AsyncUnboundedShardWriterCache>,
shard_cache: Arc<UnboundedShardWriterCache>,
node_id: String,
) -> Self {
Self {
Expand All @@ -58,7 +58,8 @@ impl ReplicationServiceGRPCDriver {
/// This function must be called before using this service
pub async fn initialize(&self) -> NodeResult<()> {
// should we do this?
self.shards.load_all().await?;
let shards = self.shards.clone();
tokio::task::spawn_blocking(move || shards.load_all()).await??;
Ok(())
}
}
Expand Down Expand Up @@ -256,11 +257,19 @@ impl replication::replication_service_server::ReplicationService for Replication
Result<replication::ReplicateShardResponse, tonic::Status>,
> = receiver.0.clone();

let shard_lookup = self.shards.load(request.shard_id.clone()).await;
let id = request.shard_id;
let id_clone = id.clone();
let shards = self.shards.clone();
let shard_lookup = tokio::task::spawn_blocking(move || shards.load(id_clone))
.await
.map_err(|error| {
tonic::Status::internal(format!("Error lazy loading shard {id}: {error:?}"))
})?;

if let Err(error) = shard_lookup {
return Err(tonic::Status::not_found(format!(
"Shard {} not found, error: {}",
request.shard_id, error
id, error
)));
}

Expand Down
6 changes: 3 additions & 3 deletions nucliadb_node/src/shards/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ShardMetadata {
generation_id: RwLock::new(None),
}
}
pub fn exists(shard_path: PathBuf) -> bool {
pub fn exists(shard_path: &PathBuf) -> bool {
let metadata_path = shard_path.join(disk_structure::METADATA_FILE);
metadata_path.exists()
}
Expand Down Expand Up @@ -237,7 +237,7 @@ impl ShardsMetadataManager {
}
}
let shard_path = disk_structure::shard_path_by_id(&self.shards_path, &shard_id);
if !ShardMetadata::exists(shard_path.clone()) {
if !ShardMetadata::exists(&shard_path) {
return None;
}
let sm = ShardMetadata::open(shard_path);
Expand Down Expand Up @@ -275,7 +275,7 @@ mod test {
#[test]
fn open_empty() {
let dir = TempDir::new().unwrap();
assert!(!ShardMetadata::exists(dir.path().to_path_buf()));
assert!(!ShardMetadata::exists(&dir.path().to_path_buf()));
let meta = ShardMetadata::open(dir.path().to_path_buf());
assert!(meta.is_err());
}
Expand Down
4 changes: 1 addition & 3 deletions nucliadb_node/src/shards/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@
mod provider_traits;
pub mod unbounded_cache;

pub use provider_traits::{
AsyncShardReaderProvider, AsyncShardWriterProvider, ShardReaderProvider, ShardWriterProvider,
};
pub use provider_traits::{ShardReaderProvider, ShardWriterProvider};
23 changes: 0 additions & 23 deletions nucliadb_node/src/shards/providers/provider_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::sync::Arc;

use async_trait::async_trait;
use nucliadb_core::protos::ShardCleaned;
use nucliadb_core::NodeResult;

Expand All @@ -35,14 +34,6 @@ pub trait ShardReaderProvider: Send + Sync {
fn get(&self, id: ShardId) -> Option<Arc<ShardReader>>;
}

#[async_trait]
pub trait AsyncShardReaderProvider: Send + Sync {
async fn load(&self, id: ShardId) -> NodeResult<Arc<ShardReader>>;
async fn load_all(&self) -> NodeResult<()>;

async fn get(&self, id: ShardId) -> Option<Arc<ShardReader>>;
}

pub trait ShardWriterProvider {
fn load(&self, id: ShardId) -> NodeResult<Arc<ShardWriter>>;
fn load_all(&self) -> NodeResult<()>;
Expand All @@ -55,17 +46,3 @@ pub trait ShardWriterProvider {

fn get_metadata(&self, id: ShardId) -> Option<Arc<ShardMetadata>>;
}

#[async_trait]
pub trait AsyncShardWriterProvider {
async fn load(&self, id: ShardId) -> NodeResult<Arc<ShardWriter>>;
async fn load_all(&self) -> NodeResult<()>;

async fn create(&self, metadata: ShardMetadata) -> NodeResult<Arc<ShardWriter>>;
async fn get(&self, id: ShardId) -> Option<Arc<ShardWriter>>;
async fn delete(&self, id: ShardId) -> NodeResult<()>;

async fn upgrade(&self, id: ShardId) -> NodeResult<ShardCleaned>;

fn get_metadata(&self, id: ShardId) -> Option<Arc<ShardMetadata>>;
}
Loading

0 comments on commit 3300d31

Please sign in to comment.