Skip to content

Commit

Permalink
safe delete in node (#1668)
Browse files Browse the repository at this point in the history
* mark as deleted functionality added

* add additional safety

* ifxes

* ..
  • Loading branch information
hermeGarcia authored Dec 18, 2023
1 parent 55b59ae commit 050596e
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;

Expand All @@ -35,21 +35,66 @@ use crate::shards::providers::AsyncShardWriterProvider;
use crate::shards::writer::ShardWriter;
use crate::shards::ShardId;

/// Each shard may be in one of this states
enum ShardCacheStatus {
/// Not in cache and not being deleted, therefore if found in on disk, loading it is safe.
NotInCache,
/// The shard is cached, but there is a task in the process of deleting it.
BeingDeleted,
/// The shard is not being deleted and is cached
InCache(Arc<ShardWriter>),
}

/// This cache allows the user to block shards, ensuring that they will not be loaded from disk.
/// Being able to do so is crucial, otherwise the only source of truth will be disk and that would
/// not be thread-safe.
#[derive(Default)]
struct InnerCache {
blocked_shards: HashSet<String>,
active_shards: HashMap<ShardId, Arc<ShardWriter>>,
}

impl InnerCache {
pub fn new() -> InnerCache {
Self::default()
}
pub fn get_shard(&self, id: &ShardId) -> ShardCacheStatus {
match self.active_shards.get(id).cloned() {
_ if self.blocked_shards.contains(id) => ShardCacheStatus::BeingDeleted,
Some(shard) => ShardCacheStatus::InCache(shard),
None => ShardCacheStatus::NotInCache,
}
}
pub fn set_being_deleted(&mut self, id: ShardId) {
self.blocked_shards.insert(id);
}
pub fn remove(&mut self, id: &ShardId) {
self.blocked_shards.remove(id);
self.active_shards.remove(id);
}
pub fn add_active_shard(&mut self, id: ShardId, shard: Arc<ShardWriter>) {
// It would be a dangerous bug to have a path
// in the system that leads to this assertion failing.
assert!(!self.blocked_shards.contains(&id));

self.active_shards.insert(id, shard);
}
}

#[derive(Default)]
pub struct AsyncUnboundedShardWriterCache {
cache: RwLock<HashMap<ShardId, Arc<ShardWriter>>>,
pub shards_path: PathBuf,
cache: RwLock<InnerCache>,
metadata_manager: Arc<ShardsMetadataManager>,
}

impl AsyncUnboundedShardWriterCache {
pub fn new(settings: Settings) -> Self {
Self {
// NOTE: as it's not probable all shards will be written, we don't
// assign any initial capacity to the HashMap under the
// consideration a resize blocking is not performance critical while
// writting.
cache: RwLock::new(HashMap::new()),
// assign any initial capacity to the HashMap under the consideration
// a resize blocking is not performance critical while writing.
cache: RwLock::new(InnerCache::default()),
shards_path: settings.shards_path(),
metadata_manager: Arc::new(ShardsMetadataManager::new(settings.shards_path())),
}
Expand All @@ -61,56 +106,60 @@ impl AsyncShardWriterProvider for AsyncUnboundedShardWriterCache {
async fn create(&self, metadata: ShardMetadata) -> NodeResult<Arc<ShardWriter>> {
let shard_id = metadata.id();
let metadata = Arc::new(metadata);
let mmetadata = Arc::clone(&metadata);
let new_shard = Arc::new(
tokio::task::spawn_blocking(move || ShardWriter::new(mmetadata))
.await
.context("Blocking task panicked")??,
);
let shard_metadata = Arc::clone(&metadata);
let shard = tokio::task::spawn_blocking(move || ShardWriter::new(shard_metadata))
.await
.context("Blocking task panicked")??;
let shard = Arc::new(shard);
let shard_cache_clone = Arc::clone(&shard);
self.metadata_manager.add_metadata(metadata);
self.cache.write().await.insert(shard_id, new_shard.clone());
Ok(new_shard)

let mut cache_writer = self.cache.write().await;
cache_writer.add_active_shard(shard_id, shard_cache_clone);
Ok(shard)
}

async fn load(&self, id: ShardId) -> NodeResult<Arc<ShardWriter>> {
let shard_key = id.clone();
let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id);
let mut cache = self.cache.write().await;

if let Some(shard) = cache.get(&id) {
debug!("Shard {shard_path:?} is already on memory");
return Ok(Arc::clone(shard));
}
let mut cache_writer = self.cache.write().await;
match cache_writer.get_shard(&id) {
ShardCacheStatus::InCache(shard) => Ok(shard),
ShardCacheStatus::BeingDeleted => Err(node_error!(ShardNotFoundError(
"Shard {shard_path:?} is not on disk"
))),
ShardCacheStatus::NotInCache => {
let metadata_manager = Arc::clone(&self.metadata_manager);
// Avoid blocking while interacting with the file system
let shard = tokio::task::spawn_blocking(move || {
if !ShardMetadata::exists(shard_path.clone()) {
return Err(node_error!(ShardNotFoundError(
"Shard {shard_path:?} is not on disk"
)));
}
let metadata = metadata_manager
.get(id.clone())
.expect("Shard metadata not found. This should not happen");
ShardWriter::open(Arc::clone(&metadata)).map_err(|error| {
node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}")
})
})
.await
.context("Blocking task panicked")??;

let metadata_manager = Arc::clone(&self.metadata_manager);
// Avoid blocking while interacting with the file system
let shard = tokio::task::spawn_blocking(move || {
if !ShardMetadata::exists(shard_path.clone()) {
return Err(node_error!(ShardNotFoundError(
"Shard {shard_path:?} is not on disk"
)));
let shard = Arc::new(shard);
let cache_shard = Arc::clone(&shard);
cache_writer.add_active_shard(shard_key, cache_shard);
Ok(shard)
}
let metadata = metadata_manager
.get(id.clone())
.expect("Shard metadata not found. This should not happen");
ShardWriter::open(Arc::clone(&metadata)).map_err(|error| {
node_error!("Shard {shard_path:?} could not be loaded from disk: {error:?}")
})
})
.await
.context("Blocking task panicked")??;

let shard = Arc::new(shard);
let cache_shard = Arc::clone(&shard);
cache.insert(shard_key, cache_shard);
Ok(shard)
}
}

async fn load_all(&self) -> NodeResult<()> {
let shards_path = self.shards_path.clone();
let metadata_manager = Arc::clone(&self.metadata_manager);
let shards = tokio::task::spawn_blocking(move || -> NodeResult<_> {
let mut shards = HashMap::new();
let mut shards = InnerCache::new();
for entry in std::fs::read_dir(&shards_path)? {
let entry = entry?;
let shard_id = entry.file_name().to_str().unwrap().to_string();
Expand All @@ -129,7 +178,7 @@ impl AsyncShardWriterProvider for AsyncUnboundedShardWriterCache {
Err(err) => error!("Loading shard {shard_path:?} from disk raised {err}"),
Ok(shard) => {
debug!("Shard loaded: {shard_path:?}");
shards.insert(shard_id, Arc::new(shard));
shards.add_active_shard(shard_id, Arc::new(shard));
}
}
}
Expand All @@ -143,28 +192,81 @@ impl AsyncShardWriterProvider for AsyncUnboundedShardWriterCache {
}

async fn get(&self, id: ShardId) -> Option<Arc<ShardWriter>> {
self.cache.read().await.get(&id).map(Arc::clone)
let cache_reader = self.cache.read().await;
let ShardCacheStatus::InCache(shard) = cache_reader.get_shard(&id) else {
return None;
};

Some(shard)
}

async fn delete(&self, id: ShardId) -> NodeResult<()> {
self.cache.write().await.remove(&id);
let mut cache_writer = self.cache.write().await;
// First the shard must be marked as being deleted, this way
// concurrent tasks can not make the mistake of trying to use it.
cache_writer.set_being_deleted(id.clone());

// Even though the shard was marked as deleted, if it was already in the
// active shards list there may be operations running on it. We must ensure
// that all of them have finished before proceeding.
if let Some(shard) = cache_writer.active_shards.get(&id).cloned() {
std::mem::drop(cache_writer);
let blocking_token = shard.block_shard().await;
// At this point we can ensure that no operations
// are being performed in this shard. Next operations
// will require using the cache, where the shard is marked
// as deleted.
std::mem::drop(blocking_token);
} else {
// Dropping the cache writer because is not needed while deleting the shard.
std::mem::drop(cache_writer);
}

// No need to hold the lock while deletion happens.
// In case of error while deleting the function will return without removing
// The deletion flag, this is to avoid accesses to a partially deleted shard.
let shard_path = disk_structure::shard_path_by_id(&self.shards_path.clone(), &id);
tokio::task::spawn_blocking(move || {
tokio::task::spawn_blocking(move || -> NodeResult<()> {
if shard_path.exists() {
debug!("Deleting shard {shard_path:?}");
std::fs::remove_dir_all(shard_path)?;
}
Ok(())
})
.await
.context("Blocking task panicked")?
.context("Blocking task panicked")??;

// If the shard was successfully deleted is safe to remove
// the entry from the cache.
self.cache.write().await.remove(&id);

Ok(())
}

async fn upgrade(&self, id: ShardId) -> NodeResult<ShardCleaned> {
self.cache.write().await.remove(&id);
let mut cache_writer = self.cache.write().await;
// First the shard must be marked as being deleted, this way
// concurrent tasks can not make the mistake of trying to use it.
cache_writer.set_being_deleted(id.clone());

// Even though the shard was marked as deleted, if it was already in the
// active shards list there may be operations running on it. We must ensure
// that all of them have finished before proceeding.
if let Some(shard) = cache_writer.active_shards.get(&id).cloned() {
std::mem::drop(cache_writer);
let blocking_token = shard.block_shard().await;
// At this point we can ensure that no operations
// are being performed in this shard. Next operations
// will require using the cache, where the shard is marked
// as deleted.
std::mem::drop(blocking_token);
} else {
// Dropping the cache writer because is not needed while deleting the shard.
std::mem::drop(cache_writer);
}

let id_ = id.clone();
let metadata = self.metadata_manager.get(id.clone());
// If upgrading fails, the safe thing is to keep the being deleted flag
let (upgraded, details) = tokio::task::spawn_blocking(move || -> NodeResult<_> {
let upgraded = ShardWriter::clean_and_create(metadata.unwrap())?;
let details = ShardCleaned {
Expand All @@ -178,7 +280,13 @@ impl AsyncShardWriterProvider for AsyncUnboundedShardWriterCache {
.await
.context("Blocking task panicked")??;

self.cache.write().await.insert(id_, Arc::new(upgraded));
// The shard was upgraded, is safe to allow access again
let shard = Arc::new(upgraded);
let mut cache_writer = self.cache.write().await;
// Old shard is completely removed
cache_writer.remove(&id);
// The clean and upgraded version takes its place
cache_writer.add_active_shard(id, shard);
Ok(details)
}

Expand Down
12 changes: 10 additions & 2 deletions nucliadb_node/src/shards/shard_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ use nucliadb_core::tracing::{self, *};
use nucliadb_core::{thread, IndexFiles};
use nucliadb_procs::measure;
use nucliadb_vectors::VectorErr;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, MutexGuard};

use crate::disk_structure::*;
use crate::shards::metadata::ShardMetadata;
use crate::shards::versions::Versions;
use crate::telemetry::run_with_telemetry;

pub struct BlockingToken<'a>(MutexGuard<'a, ()>);

#[derive(Debug)]
pub struct ShardWriter {
pub metadata: Arc<ShardMetadata>,
Expand Down Expand Up @@ -171,7 +173,8 @@ impl ShardWriter {
path: path.clone().join(RELATIONS_DIR),
channel,
};
let sw = ShardWriter::initialize(Arc::clone(&metadata), tsc, psc, vsc, rsc)?;
let shard_metadata = Arc::clone(&metadata);
let sw = ShardWriter::initialize(shard_metadata, tsc, psc, vsc, rsc)?;
metadata.new_generation_id();

Ok(sw)
Expand Down Expand Up @@ -474,6 +477,11 @@ impl ShardWriter {
}
}

pub async fn block_shard(&self) -> BlockingToken {
let mutex_guard = self.write_lock.lock().await;
BlockingToken(mutex_guard)
}

pub fn get_shard_segments(&self) -> NodeResult<HashMap<String, Vec<String>>> {
let mut segments = HashMap::new();

Expand Down

1 comment on commit 050596e

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 050596e Previous: 5a633b0 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 12827.01645027661 iter/sec (stddev: 0.000002265121685922756) 12745.686329086004 iter/sec (stddev: 1.7317806991721728e-7) 0.99

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.