diff --git a/nucliadb_core/src/paragraphs.rs b/nucliadb_core/src/paragraphs.rs index 0ef8f7b7f9..4f689e3b6e 100644 --- a/nucliadb_core/src/paragraphs.rs +++ b/nucliadb_core/src/paragraphs.rs @@ -18,14 +18,13 @@ // along with this program. If not, see . // use std::path::PathBuf; -use std::sync::{Arc, RwLock}; use crate::prelude::*; use crate::protos::*; use crate::query_language::BooleanExpression; use crate::IndexFiles; -pub type ParagraphsReaderPointer = Arc>; +pub type ParagraphsReaderPointer = Box; pub type ParagraphsWriterPointer = Box; pub type ProtosRequest = ParagraphSearchRequest; pub type ProtosResponse = ParagraphSearchResponse; diff --git a/nucliadb_core/src/relations.rs b/nucliadb_core/src/relations.rs index 1e9030221a..ee1f94932c 100644 --- a/nucliadb_core/src/relations.rs +++ b/nucliadb_core/src/relations.rs @@ -18,14 +18,13 @@ // along with this program. If not, see . // use std::path::PathBuf; -use std::sync::{Arc, RwLock}; use crate::prelude::*; use crate::protos::*; use crate::Channel; use crate::IndexFiles; -pub type RelationsReaderPointer = Arc>; +pub type RelationsReaderPointer = Box; pub type RelationsWriterPointer = Box; pub type ProtosRequest = RelationSearchRequest; pub type ProtosResponse = RelationSearchResponse; diff --git a/nucliadb_core/src/texts.rs b/nucliadb_core/src/texts.rs index 64b9348d7a..a2b189d38b 100644 --- a/nucliadb_core/src/texts.rs +++ b/nucliadb_core/src/texts.rs @@ -18,14 +18,13 @@ // along with this program. If not, see . // use std::path::PathBuf; -use std::sync::{Arc, RwLock}; use crate::prelude::*; use crate::protos::*; use crate::query_planner::{PreFilterRequest, PreFilterResponse}; use crate::IndexFiles; -pub type TextsReaderPointer = Arc>; +pub type TextsReaderPointer = Box; pub type TextsWriterPointer = Box; pub type ProtosRequest = DocumentSearchRequest; pub type ProtosResponse = DocumentSearchResponse; diff --git a/nucliadb_core/src/vectors.rs b/nucliadb_core/src/vectors.rs index f68854678e..62df9db121 100644 --- a/nucliadb_core/src/vectors.rs +++ b/nucliadb_core/src/vectors.rs @@ -20,7 +20,6 @@ use std::collections::HashSet; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; use uuid::Uuid; @@ -31,7 +30,7 @@ use crate::query_language::BooleanExpression; use crate::Channel; use crate::IndexFiles; -pub type VectorsReaderPointer = Arc>; +pub type VectorsReaderPointer = Box; pub type VectorsWriterPointer = Box; pub type ProtosRequest = VectorSearchRequest; pub type ProtosResponse = VectorSearchResponse; @@ -84,8 +83,6 @@ pub trait VectorReader: std::fmt::Debug + Send + Sync { fn search(&self, request: &ProtosRequest, context: &VectorsContext) -> NodeResult; fn stored_ids(&self) -> NodeResult>; fn count(&self) -> NodeResult; - - fn update(&mut self) -> NodeResult<()>; } pub trait VectorWriter: std::fmt::Debug + Send + Sync { diff --git a/nucliadb_node/src/shards/shard_reader.rs b/nucliadb_node/src/shards/shard_reader.rs index cbf78897e1..ecb9f8f273 100644 --- a/nucliadb_node/src/shards/shard_reader.rs +++ b/nucliadb_node/src/shards/shard_reader.rs @@ -50,7 +50,7 @@ use std::collections::HashSet; use std::fs::{self, File}; use std::io::{BufReader, Read}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, RwLock}; +use std::sync::RwLock; const MAX_SUGGEST_COMPOUND_WORDS: usize = 3; const MIN_VIABLE_PREFIX_SUGGEST: usize = 1; @@ -58,27 +58,24 @@ const CHUNK_SIZE: usize = 65535; fn open_vectors_reader(version: u32, path: &Path) -> NodeResult { match version { - 1 => nucliadb_vectors::service::VectorReaderService::open(path) - .map(|i| Arc::new(RwLock::new(i)) as VectorsReaderPointer), - 2 => nucliadb_vectors::service::VectorReaderService::open(path) - .map(|i| Arc::new(RwLock::new(i)) as VectorsReaderPointer), + 1 => nucliadb_vectors::service::VectorReaderService::open(path).map(|i| Box::new(i) as VectorsReaderPointer), + 2 => nucliadb_vectors::service::VectorReaderService::open(path).map(|i| Box::new(i) as VectorsReaderPointer), v => Err(node_error!("Invalid vectors version {v}")), } } fn open_paragraphs_reader(version: u32, path: &Path) -> NodeResult { match version { 2 => nucliadb_paragraphs2::reader::ParagraphReaderService::open(path) - .map(|i| Arc::new(RwLock::new(i)) as ParagraphsReaderPointer), + .map(|i| Box::new(i) as ParagraphsReaderPointer), 3 => nucliadb_paragraphs3::reader::ParagraphReaderService::open(path) - .map(|i| Arc::new(RwLock::new(i)) as ParagraphsReaderPointer), + .map(|i| Box::new(i) as ParagraphsReaderPointer), v => Err(node_error!("Invalid paragraphs version {v}")), } } fn open_texts_reader(version: u32, path: &Path) -> NodeResult { match version { - 2 => nucliadb_texts2::reader::TextReaderService::open(path) - .map(|i| Arc::new(RwLock::new(i)) as TextsReaderPointer), + 2 => nucliadb_texts2::reader::TextReaderService::open(path).map(|i| Box::new(i) as TextsReaderPointer), v => Err(node_error!("Invalid text reader version {v}")), } } @@ -86,7 +83,7 @@ fn open_texts_reader(version: u32, path: &Path) -> NodeResult NodeResult { match version { 2 => nucliadb_relations2::reader::RelationsReaderService::open(path) - .map(|i| Arc::new(RwLock::new(i)) as RelationsReaderPointer), + .map(|i| Box::new(i) as RelationsReaderPointer), v => Err(node_error!("Invalid relations version {v}")), } } @@ -140,12 +137,13 @@ impl Iterator for ShardFileChunkIterator { pub struct ShardReader { pub id: String, pub metadata: ShardMetadata, + indexes: ShardIndexes, root_path: PathBuf, suffixed_root_path: String, - text_reader: TextsReaderPointer, - paragraph_reader: ParagraphsReaderPointer, - vector_reader: VectorsReaderPointer, - relation_reader: RelationsReaderPointer, + text_reader: RwLock, + paragraph_reader: RwLock, + vector_reader: RwLock, + relation_reader: RwLock, versions: Versions, } @@ -195,16 +193,12 @@ impl ShardReader { pub fn get_info(&self) -> NodeResult { let span = tracing::Span::current(); - let paragraphs = self.paragraph_reader.clone(); - let vectors = self.vector_reader.clone(); - let texts = self.text_reader.clone(); - let info = info_span!(parent: &span, "text count"); - let text_task = || run_with_telemetry(info, move || read_rw_lock(&texts).count()); + let text_task = || run_with_telemetry(info, || read_rw_lock(&self.text_reader).count()); let info = info_span!(parent: &span, "paragraph count"); - let paragraph_task = || run_with_telemetry(info, move || read_rw_lock(¶graphs).count()); + let paragraph_task = || run_with_telemetry(info, || read_rw_lock(&self.paragraph_reader).count()); let info = info_span!(parent: &span, "vector count"); - let vector_task = || run_with_telemetry(info, move || read_rw_lock(&vectors).count()); + let vector_task = || run_with_telemetry(info, || read_rw_lock(&self.vector_reader).count()); let mut text_result = Ok(0); let mut paragraph_result = Ok(0); @@ -298,12 +292,13 @@ impl ShardReader { Ok(ShardReader { id, metadata, - root_path: shard_path.to_path_buf(), suffixed_root_path, - text_reader: fields.unwrap(), - paragraph_reader: paragraphs.unwrap(), - vector_reader: vectors.unwrap(), - relation_reader: relations.unwrap(), + indexes, + root_path: shard_path.to_path_buf(), + text_reader: RwLock::new(fields.unwrap()), + paragraph_reader: RwLock::new(paragraphs.unwrap()), + vector_reader: RwLock::new(vectors.unwrap()), + relation_reader: RwLock::new(relations.unwrap()), versions, }) } @@ -336,9 +331,6 @@ impl ShardReader { let mut suggest_paragraphs = request.features.contains(&(SuggestFeatures::Paragraphs as i32)); let suggest_entities = request.features.contains(&(SuggestFeatures::Entities as i32)); - - let paragraphs_reader_service = self.paragraph_reader.clone(); - let relations_reader_service = self.relation_reader.clone(); let prefixes = Self::split_suggest_query(request.body.clone(), MAX_SUGGEST_COMPOUND_WORDS); // Prefilter to apply field label filters @@ -371,7 +363,7 @@ impl ShardReader { } let suggest_paragraphs_task = suggest_paragraphs.then(|| { - let paragraph_task = move || read_rw_lock(¶graphs_reader_service).suggest(&request); + let paragraph_task = move || read_rw_lock(&self.paragraph_reader).suggest(&request); let info = info_span!(parent: &span, "paragraph suggest"); || run_with_telemetry(info, paragraph_task) }); @@ -392,9 +384,8 @@ impl ShardReader { }, ); - let responses = requests - .map(|request| read_rw_lock(&relations_reader_service).search(&request)) - .collect::>(); + let responses: Vec<_> = + requests.map(|request| read_rw_lock(&self.relation_reader).search(&request)).collect(); let entities = responses .into_iter() @@ -466,34 +457,30 @@ impl ShardReader { // Run the rest of the plan let text_task = index_queries.texts_request.map(|mut request| { request.id = search_id.clone(); - let text_reader_service = self.text_reader.clone(); let info = info_span!(parent: &span, "text search"); - let task = move || read_rw_lock(&text_reader_service).search(&request); + let task = move || read_rw_lock(&self.text_reader).search(&request); || run_with_telemetry(info, task) }); let paragraph_task = index_queries.paragraphs_request.map(|mut request| { request.id = search_id.clone(); let paragraphs_context = &index_queries.paragraphs_context; - let paragraph_reader_service = self.paragraph_reader.clone(); let info = info_span!(parent: &span, "paragraph search"); - let task = move || read_rw_lock(¶graph_reader_service).search(&request, paragraphs_context); + let task = move || read_rw_lock(&self.paragraph_reader).search(&request, paragraphs_context); || run_with_telemetry(info, task) }); let vector_task = index_queries.vectors_request.map(|mut request| { request.id = search_id.clone(); let vectors_context = &index_queries.vectors_context; - let vector_reader_service = self.vector_reader.clone(); let info = info_span!(parent: &span, "vector search"); - let task = move || read_rw_lock(&vector_reader_service).search(&request, vectors_context); + let task = move || read_rw_lock(&self.vector_reader).search(&request, vectors_context); || run_with_telemetry(info, task) }); let relation_task = index_queries.relations_request.map(|request| { - let relation_reader_service = self.relation_reader.clone(); let info = info_span!(parent: &span, "relations search"); - let task = move || read_rw_lock(&relation_reader_service).search(&request); + let task = move || read_rw_lock(&self.relation_reader).search(&request); || run_with_telemetry(info, task) }); @@ -529,18 +516,16 @@ impl ShardReader { #[tracing::instrument(skip_all)] pub fn paragraph_iterator(&self, request: StreamRequest) -> NodeResult { let span = tracing::Span::current(); - let paragraph_task_copy = Arc::clone(&self.paragraph_reader); run_with_telemetry(info_span!(parent: &span, "paragraph iteration"), || { - read_rw_lock(¶graph_task_copy).iterator(&request) + read_rw_lock(&self.paragraph_reader).iterator(&request) }) } #[tracing::instrument(skip_all)] pub fn document_iterator(&self, request: StreamRequest) -> NodeResult { let span = tracing::Span::current(); - let text_task_copy = Arc::clone(&self.text_reader); run_with_telemetry(info_span!(parent: &span, "field iteration"), || { - read_rw_lock(&text_task_copy).iterator(&request) + read_rw_lock(&self.text_reader).iterator(&request) }) } @@ -592,39 +577,35 @@ impl ShardReader { #[tracing::instrument(skip_all)] pub fn paragraph_search(&self, search_request: ParagraphSearchRequest) -> NodeResult { let span = tracing::Span::current(); - let paragraph_task_copy = Arc::clone(&self.paragraph_reader); run_with_telemetry(info_span!(parent: &span, "paragraph reader search"), || { - read_rw_lock(¶graph_task_copy).search(&search_request, &ParagraphsContext::default()) + read_rw_lock(&self.paragraph_reader).search(&search_request, &ParagraphsContext::default()) }) } #[tracing::instrument(skip_all)] pub fn document_search(&self, search_request: DocumentSearchRequest) -> NodeResult { let span = tracing::Span::current(); - let text_task_copy = Arc::clone(&self.text_reader); run_with_telemetry(info_span!(parent: &span, "field reader search"), || { - read_rw_lock(&text_task_copy).search(&search_request) + read_rw_lock(&self.text_reader).search(&search_request) }) } #[tracing::instrument(skip_all)] pub fn vector_search(&self, search_request: VectorSearchRequest) -> NodeResult { let span = tracing::Span::current(); - let vector_task_copy = Arc::clone(&self.vector_reader); run_with_telemetry(info_span!(parent: &span, "vector reader search"), || { - read_rw_lock(&vector_task_copy).search(&search_request, &VectorsContext::default()) + read_rw_lock(&self.vector_reader).search(&search_request, &VectorsContext::default()) }) } #[tracing::instrument(skip_all)] pub fn relation_search(&self, search_request: RelationSearchRequest) -> NodeResult { let span = tracing::Span::current(); - let relation_task_copy = Arc::clone(&self.relation_reader); run_with_telemetry(info_span!(parent: &span, "relation reader search"), || { - read_rw_lock(&relation_task_copy).search(&search_request) + read_rw_lock(&self.relation_reader).search(&search_request) }) } @@ -644,7 +625,12 @@ impl ShardReader { } pub fn update(&self) -> NodeResult<()> { - write_rw_lock(&self.vector_reader).update() + let version = self.versions.vectors; + let path = self.indexes.vectors_path(); + let new_reader = open_vectors_reader(version, &path)?; + let mut writer = write_rw_lock(&self.vector_reader); + *writer = new_reader; + Ok(()) } } diff --git a/nucliadb_vectors/src/data_point_provider/reader.rs b/nucliadb_vectors/src/data_point_provider/reader.rs index 428f21573a..e99f2dffc4 100644 --- a/nucliadb_vectors/src/data_point_provider/reader.rs +++ b/nucliadb_vectors/src/data_point_provider/reader.rs @@ -27,11 +27,10 @@ use crate::data_types::DeleteLog; use crate::utils; use crate::{VectorErr, VectorR}; use fs2::FileExt; -use fxhash::{FxHashMap, FxHashSet}; +use fxhash::FxHashMap; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::fs::File; -use std::io; use std::path::{Path, PathBuf}; use std::time::SystemTime; @@ -99,19 +98,12 @@ impl Fssc { } } -fn last_modified(path: &Path) -> io::Result { - let meta = std::fs::metadata(path)?; - meta.modified() -} - pub struct Reader { metadata: IndexMetadata, path: PathBuf, open_data_points: FxHashMap, - data_point_pins: Vec, delete_log: DTrie, number_of_embeddings: usize, - version: SystemTime, dimension: Option, } @@ -129,8 +121,7 @@ impl Reader { })?; let state_path = path.join(STATE); - let state_file = File::open(&state_path)?; - let version = last_modified(&state_path)?; + let state_file = File::open(state_path)?; let state = read_state(&state_file)?; let data_point_list = state.data_point_list; let delete_log = state.delete_log; @@ -156,8 +147,6 @@ impl Reader { Ok(Reader { metadata, - version, - data_point_pins, open_data_points, delete_log, number_of_embeddings, @@ -166,66 +155,6 @@ impl Reader { }) } - pub fn update(&mut self) -> VectorR<()> { - let state_path = self.path.join(STATE); - let disk_version = last_modified(&state_path)?; - - if disk_version == self.version { - return Ok(()); - } - - let state_file = File::open(state_path)?; - let state = read_state(&state_file)?; - let data_point_list = state.data_point_list; - let new_delete_log = state.delete_log; - let mut new_dimension = self.dimension; - let mut new_number_of_embeddings = 0; - let mut new_data_point_pins = Vec::new(); - let mut new_open_data_points = Vec::new(); - let mut data_points_to_eject: FxHashSet<_> = self.open_data_points.keys().copied().collect(); - - for data_point_id in data_point_list { - let data_point_pin = DataPointPin::open_pin(&self.path, data_point_id)?; - - if let Some(open_data_point) = self.open_data_points.get(&data_point_id) { - let data_point_journal = open_data_point.journal(); - new_number_of_embeddings += data_point_journal.no_nodes(); - data_points_to_eject.remove(&data_point_id); - } else { - let open_data_point = data_point::open(&data_point_pin)?; - let data_point_journal = open_data_point.journal(); - new_number_of_embeddings += data_point_journal.no_nodes(); - new_open_data_points.push(open_data_point); - } - - new_data_point_pins.push(data_point_pin); - } - - for open_data_point in new_open_data_points { - let data_point_id = open_data_point.get_id(); - self.open_data_points.insert(data_point_id, open_data_point); - } - - for data_point_id in data_points_to_eject { - self.open_data_points.remove(&data_point_id); - } - - if new_dimension.is_none() { - if let Some(data_point_pin) = new_data_point_pins.first() { - let open_data_point = &self.open_data_points[&data_point_pin.id()]; - new_dimension = open_data_point.stored_len(); - } - } - - self.version = disk_version; - self.delete_log = new_delete_log; - self.data_point_pins = new_data_point_pins; - self.dimension = new_dimension; - self.number_of_embeddings = new_number_of_embeddings; - - Ok(()) - } - pub fn search(&self, request: &dyn SearchRequest) -> VectorR> { let Some(dimension) = self.dimension else { return Ok(Vec::with_capacity(0)); diff --git a/nucliadb_vectors/src/service/reader.rs b/nucliadb_vectors/src/service/reader.rs index 5b410f909b..02a3f0d53b 100644 --- a/nucliadb_vectors/src/service/reader.rs +++ b/nucliadb_vectors/src/service/reader.rs @@ -61,11 +61,6 @@ impl Debug for VectorReaderService { } impl VectorReader for VectorReaderService { - fn update(&mut self) -> NodeResult<()> { - self.index.update()?; - Ok(()) - } - #[tracing::instrument(skip_all)] fn count(&self) -> NodeResult { debug!("Id for the vectorset is empty");