Skip to content

Commit

Permalink
Cleanly handle shard deletion in replication (#2499)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Sep 26, 2024
1 parent 73e484c commit 570fc48
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
2 changes: 1 addition & 1 deletion nucliadb_node/src/cache/reader_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ShardReaderCache {
let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), id);

if !ShardMetadata::exists(&shard_path) {
return Err(node_error!(ShardNotFoundError("Shard {shard_path:?} is not on disk")));
return Err(node_error!(ShardNotFoundError(id.clone())));
}
let shard = ShardReader::new(id.clone(), &shard_path)
.map_err(|error| node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}"))?;
Expand Down
6 changes: 3 additions & 3 deletions nucliadb_node/src/cache/writer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl InnerCache {

pub fn get(&mut self, id: &ShardId) -> NodeResult<CacheResult<ShardId, ShardWriter>> {
if self.blocked_shards.contains(id) {
return Err(node_error!(ShardNotFoundError("Shard {shard_path:?} is not on disk")));
return Err(node_error!(ShardNotFoundError(id.clone())));
}

Ok(self.active_shards.get(id))
Expand All @@ -77,7 +77,7 @@ impl InnerCache {
shard: Arc<ShardWriter>,
) -> NodeResult<Arc<ShardWriter>> {
if self.blocked_shards.contains(&shard.id) {
return Err(node_error!(ShardNotFoundError("Shard {shard_path:?} is not on disk")));
return Err(node_error!(ShardNotFoundError(shard.id.clone())));
}

self.active_shards.loaded(guard, &shard);
Expand Down Expand Up @@ -157,7 +157,7 @@ impl ShardWriterCache {
let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), id);

if !ShardMetadata::exists(&shard_path) {
return Err(node_error!(ShardNotFoundError("Shard {shard_path:?} is not on disk")));
return Err(node_error!(ShardNotFoundError(id.clone())));
}
let metadata = metadata_manager.get(id.clone()).expect("Shard metadata not found. This should not happen");
let shard = ShardWriter::open(metadata)
Expand Down
2 changes: 1 addition & 1 deletion nucliadb_node/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::fmt::Display;

#[derive(Debug)]
pub struct ShardNotFoundError(pub &'static str);
pub struct ShardNotFoundError(pub String);

impl Display for ShardNotFoundError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down
28 changes: 23 additions & 5 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,15 @@ impl ReplicateWorkerPool {
tokio::spawn(async move {
let result = worker.await;
drop(permit);
if result.is_err() {
error!("Error replicating shard: {:?}", result);
if let Err(e) = result {
if let Some(status) = e.downcast_ref::<tonic::Status>() {
if status.code() == tonic::Code::NotFound {
warn!("Error replicating shard, not found in primary: {status:?}");
return;
}
}

error!("Error replicating shard: {e:?}");
}
});
Ok(())
Expand Down Expand Up @@ -303,13 +310,24 @@ pub async fn connect_to_primary_and_replicate(
}

let shard_id = shard_state.shard_id.clone();
let shard_lookup;
let shard_path = disk_structure::shard_path_by_id(&settings.shards_path(), &shard_id);
let shard = if existing_shards.contains(&shard_id) {
let shard_cache_clone = shard_cache.clone();
let shard_id_clone = shard_id.clone();
shard_lookup = tokio::task::spawn_blocking(move || shard_cache_clone.get(&shard_id_clone)).await?;
ShardStub::Shard(shard_lookup?)
let shard_lookup = tokio::task::spawn_blocking(move || shard_cache_clone.get(&shard_id_clone)).await?;
if let Ok(shard) = shard_lookup {
ShardStub::Shard(shard)
} else {
// Shard exists but could not open, delete and recreate
warn!("Could not open replica shard {shard_id}, starting from scratch...");
std::fs::remove_dir_all(&shard_path)?;
std::fs::create_dir(&shard_path)?;
ShardStub::New {
id: shard_id.clone(),
path: shard_path.clone(),
cache: shard_cache.clone(),
}
}
} else {
std::fs::create_dir(&shard_path)?;
ShardStub::New {
Expand Down

0 comments on commit 570fc48

Please sign in to comment.