diff --git a/nucliadb_core/src/vectors.rs b/nucliadb_core/src/vectors.rs index c850fcc05a..c0e1a03a21 100644 --- a/nucliadb_core/src/vectors.rs +++ b/nucliadb_core/src/vectors.rs @@ -18,22 +18,25 @@ // along with this program. If not, see . // +use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; +use nucliadb_protos::nodereader; +use nucliadb_protos::utils; use uuid::Uuid; use crate::metrics::vectors::MergeSource; use crate::prelude::*; -use crate::protos::*; use crate::query_language::BooleanExpression; use crate::Channel; use crate::IndexFiles; +use nucliadb_protos::noderesources; pub type VectorsReaderPointer = Box; pub type VectorsWriterPointer = Box; -pub type ProtosRequest = VectorSearchRequest; -pub type ProtosResponse = VectorSearchResponse; +pub type ProtosRequest = nodereader::VectorSearchRequest; +pub type ProtosResponse = nodereader::VectorSearchResponse; #[derive(Debug, Clone, Copy)] pub struct MergeParameters { @@ -49,7 +52,7 @@ pub struct MergeContext { #[derive(Clone)] pub struct VectorConfig { - pub similarity: VectorSimilarity, + pub similarity: utils::VectorSimilarity, pub path: PathBuf, pub channel: Channel, pub shard_id: String, @@ -94,9 +97,65 @@ pub trait VectorWriter: std::fmt::Debug + Send + Sync { fn prepare_merge(&self, parameters: MergeParameters) -> NodeResult>>; fn record_merge(&mut self, merge_result: Box, source: MergeSource) -> NodeResult; - fn set_resource(&mut self, resource: &Resource) -> NodeResult<()>; - fn delete_resource(&mut self, resource_id: &ResourceId) -> NodeResult<()>; + fn set_resource(&mut self, resource: ResourceWrapper) -> NodeResult<()>; + fn delete_resource(&mut self, resource_id: &noderesources::ResourceId) -> NodeResult<()>; fn garbage_collection(&mut self) -> NodeResult<()>; fn force_garbage_collection(&mut self) -> NodeResult<()>; fn reload(&mut self) -> NodeResult<()>; } + +pub struct ResourceWrapper<'a> { + resource: &'a noderesources::Resource, + vectorset: Option, +} + +impl<'a> From<&'a noderesources::Resource> for ResourceWrapper<'a> { + fn from(value: &'a noderesources::Resource) -> Self { + Self { + resource: value, + vectorset: None, + } + } +} + +impl<'a> ResourceWrapper<'a> { + pub fn new_vectorset_resource(resource: &'a noderesources::Resource, vectorset: &str) -> Self { + Self { + resource, + vectorset: Some(vectorset.to_string()), + } + } + + pub fn id(&self) -> &String { + &self.resource.shard_id + } + + pub fn fields(&self) -> impl Iterator)> { + self.resource.paragraphs.iter().map(|(field_id, paragraphs_wrapper)| { + let sentences_iterator = paragraphs_wrapper.paragraphs.iter().filter_map(|(_paragraph_id, paragraph)| { + let sentences = if let Some(vectorset) = &self.vectorset { + // indexing a vectorset, we should return only paragraphs from this vectorset. + // If vectorset is not found, we'll skip this paragraph + paragraph.vectorsets_sentences.get(vectorset).map(|x| &x.sentences) + } else { + // Default vectors index (no vectorset) + Some(¶graph.sentences) + }; + sentences.map(|s| ParagraphVectors { + vectors: s, + labels: ¶graph.labels, + }) + }); + (field_id, sentences_iterator) + }) + } + + pub fn sentences_to_delete(&self) -> impl Iterator { + self.resource.paragraphs_to_delete.iter().map(|paragraph_id| paragraph_id.as_str()) + } +} + +pub struct ParagraphVectors<'a> { + pub vectors: &'a HashMap, + pub labels: &'a Vec, +} diff --git a/nucliadb_node/src/shards/mod.rs b/nucliadb_node/src/shards/mod.rs index 3a2eec6277..0c23d4fcbd 100644 --- a/nucliadb_node/src/shards/mod.rs +++ b/nucliadb_node/src/shards/mod.rs @@ -24,6 +24,8 @@ pub mod indexes; pub mod metadata; pub mod shard_reader; pub mod shard_writer; +#[cfg(test)] +mod tests; pub mod versioning; // Alias for more readable imports diff --git a/nucliadb_node/src/shards/shard_reader.rs b/nucliadb_node/src/shards/shard_reader.rs index 0dbd3d71d2..2e4a5d16f9 100644 --- a/nucliadb_node/src/shards/shard_reader.rs +++ b/nucliadb_node/src/shards/shard_reader.rs @@ -271,34 +271,51 @@ impl ShardReader { let indexes = ShardIndexes::load(shard_path).unwrap_or_else(|_| ShardIndexes::new(shard_path)); let versions = Versions::load(&shard_path.join(VERSION_FILE))?; - let text_task = || Some(open_texts_reader(versions.texts, &indexes.texts_path())); - let paragraph_task = || Some(open_paragraphs_reader(versions.paragraphs, &indexes.paragraphs_path())); - let vector_task = || Some(open_vectors_reader(versions.vectors, &indexes.vectors_path())); - let relation_task = || Some(open_relations_reader(versions.relations, &indexes.relations_path())); + let text_task = || Some(open_texts_reader(versions.texts, &indexes.texts_path())); let info = info_span!(parent: &span, "text open"); let text_task = || run_with_telemetry(info, text_task); + + let paragraph_task = || Some(open_paragraphs_reader(versions.paragraphs, &indexes.paragraphs_path())); let info = info_span!(parent: &span, "paragraph open"); let paragraph_task = || run_with_telemetry(info, paragraph_task); - let info = info_span!(parent: &span, "vector open"); - let vector_task = || run_with_telemetry(info, vector_task); + + let mut vector_tasks = vec![]; + for (name, path) in indexes.iter_vectors_indexes() { + vector_tasks.push(|| { + run_with_telemetry(info_span!(parent: &span, "vector open"), move || { + Some((name, open_vectors_reader(versions.vectors, &path))) + }) + }); + } + + let relation_task = || Some(open_relations_reader(versions.relations, &indexes.relations_path())); let info = info_span!(parent: &span, "relation open"); let relation_task = || run_with_telemetry(info, relation_task); let mut text_result = None; let mut paragraph_result = None; - let mut vector_result = None; + let mut vector_results = Vec::with_capacity(vector_tasks.len()); + for _ in 0..vector_tasks.len() { + vector_results.push(None); + } let mut relation_result = None; crossbeam_thread::scope(|s| { s.spawn(|_| text_result = text_task()); s.spawn(|_| paragraph_result = paragraph_task()); - s.spawn(|_| vector_result = vector_task()); + for (vector_task, vector_result) in vector_tasks.into_iter().zip(vector_results.iter_mut()) { + s.spawn(|_| *vector_result = vector_task()); + } s.spawn(|_| relation_result = relation_task()); }) .expect("Failed to join threads"); let fields = text_result.transpose()?; let paragraphs = paragraph_result.transpose()?; - let vectors = vector_result.transpose()?; + let mut vectors = HashMap::with_capacity(vector_results.len()); + for result in vector_results { + let (name, vector_writer) = result.unwrap(); + vectors.insert(name, vector_writer?); + } let relations = relation_result.transpose()?; let suffixed_root_path = shard_path.to_str().unwrap().to_owned() + "/"; @@ -309,7 +326,7 @@ impl ShardReader { root_path: shard_path.to_path_buf(), text_reader: RwLock::new(fields.unwrap()), paragraph_reader: RwLock::new(paragraphs.unwrap()), - vector_readers: RwLock::new(HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), vectors.unwrap())])), + vector_readers: RwLock::new(vectors), relation_reader: RwLock::new(relations.unwrap()), versions, }) diff --git a/nucliadb_node/src/shards/shard_writer.rs b/nucliadb_node/src/shards/shard_writer.rs index 46f388c5d4..a30dced324 100644 --- a/nucliadb_node/src/shards/shard_writer.rs +++ b/nucliadb_node/src/shards/shard_writer.rs @@ -270,7 +270,11 @@ impl ShardWriter { let mut vector_tasks = vec![]; for (name, path) in indexes.iter_vectors_indexes() { let id = metadata.id(); - vector_tasks.push(move || Some((name, open_vectors_writer(versions.vectors, &path, id)))); + vector_tasks.push(|| { + run_with_telemetry(info_span!(parent: &span, "Open vectors index writer"), move || { + Some((name, open_vectors_writer(versions.vectors, &path, id))) + }) + }); } let rsc = RelationConfig { @@ -383,11 +387,17 @@ impl ShardWriter { }; let mut vector_tasks = vec![]; - for (_, vector_writer) in indexes.vectors_indexes.iter_mut() { + for (vectorset, vector_writer) in indexes.vectors_indexes.iter_mut() { vector_tasks.push(|| { run_with_telemetry(info_span!(parent: &span, "vector set_resource"), || { debug!("Vector service starts set_resource"); - let result = vector_writer.set_resource(&resource); + let vectorset_resource = match vectorset.as_str() { + "" | DEFAULT_VECTORS_INDEX_NAME => (&resource).into(), + vectorset => { + nucliadb_core::vectors::ResourceWrapper::new_vectorset_resource(&resource, vectorset) + } + }; + let result = vector_writer.set_resource(vectorset_resource); debug!("Vector service ends set_resource"); result }) diff --git a/nucliadb_node/src/shards/tests.rs b/nucliadb_node/src/shards/tests.rs new file mode 100644 index 0000000000..549a1d55a7 --- /dev/null +++ b/nucliadb_node/src/shards/tests.rs @@ -0,0 +1,197 @@ +// Copyright (C) 2021 Bosutech XXI S.L. +// +// nucliadb is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at info@nuclia.com. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::HashMap; +use std::{sync::Arc, time::SystemTime}; + +use nucliadb_core::protos::prost_types::Timestamp; +use nucliadb_core::{Channel, NodeResult}; +use nucliadb_protos::nodereader; +use nucliadb_protos::{noderesources, utils::VectorSimilarity}; +use tempfile; +use uuid::Uuid; + +use crate::shards::indexes::DEFAULT_VECTORS_INDEX_NAME; +use crate::shards::reader::ShardReader; + +use super::*; +use super::{ + metadata::{ShardMetadata, Similarity}, + writer::ShardWriter, +}; + +#[test] +fn test_vectorsets() -> NodeResult<()> { + let tempdir = tempfile::tempdir()?; + let shard_path = tempdir.path().join("shard"); + let shard_id = "shard".to_string(); + let kbid = "kbid".to_string(); + + let metadata = Arc::new(ShardMetadata::new( + shard_path.clone(), + shard_id.clone(), + kbid.clone(), + Similarity::default(), + Channel::default(), + false, + )); + + let writer = ShardWriter::new(Arc::clone(&metadata))?; + writer.create_vectors_index(writer::NewVectorsIndex { + shard_id: shard_id.clone(), + name: "myvectorset".to_string(), + channel: Channel::STABLE, + similarity: VectorSimilarity::Cosine, + normalize_vectors: true, + })?; + + let vectorsets = writer.list_vectors_indexes(); + assert_eq!(vectorsets.len(), 2); + + let resource = generate_resource(shard_id.clone()); + writer.set_resource(resource)?; + + let reader = ShardReader::new(shard_id.clone(), &shard_path)?; + + // Search in default vectorset (by omitting vectorset) + let results = reader.search(nodereader::SearchRequest { + shard: shard_id.clone(), + vector: vec![0.7; 100], + min_score_semantic: 0.0, + result_per_page: 20, + ..Default::default() + })?; + assert_eq!(results.vector.as_ref().unwrap().documents.len(), 2); + assert!(results.vector.as_ref().unwrap().documents[0].doc_id.as_ref().unwrap().id.ends_with("#defaultvectors")); + assert!(results.vector.as_ref().unwrap().documents[1].doc_id.as_ref().unwrap().id.ends_with("#defaultvectors")); + + // Search in default vectorset (using default vectorset string) + let results = reader.search(nodereader::SearchRequest { + shard: shard_id.clone(), + vector: vec![0.7; 100], + vectorset: DEFAULT_VECTORS_INDEX_NAME.to_string(), + min_score_semantic: 0.0, + result_per_page: 20, + ..Default::default() + })?; + assert_eq!(results.vector.as_ref().unwrap().documents.len(), 2); + assert!(results.vector.as_ref().unwrap().documents[0].doc_id.as_ref().unwrap().id.ends_with("#defaultvectors")); + assert!(results.vector.as_ref().unwrap().documents[1].doc_id.as_ref().unwrap().id.ends_with("#defaultvectors")); + + // Search in a vectorset + let results = reader.search(nodereader::SearchRequest { + shard: shard_id.clone(), + vector: vec![0.7; 100], + vectorset: "myvectorset".to_string(), + min_score_semantic: 0.0, + result_per_page: 20, + ..Default::default() + })?; + assert_eq!(results.vector.as_ref().unwrap().documents.len(), 1); + assert!(results.vector.as_ref().unwrap().documents[0].doc_id.as_ref().unwrap().id.ends_with("#vectorset")); + + Ok(()) +} + +/// Generate a Resource to index in multiple vectorsets. +/// +/// Vector keys are suffixed so we'll be able to recognize where are they +/// from in the tests +fn generate_resource(shard_id: String) -> noderesources::Resource { + let mut resource = minimal_resource(shard_id); + let rid = &resource.resource.as_ref().unwrap().uuid; + + resource.texts.insert( + "a/title".to_string(), + noderesources::TextInformation { + text: "Testing vectorsets".to_string(), + ..Default::default() + }, + ); + + resource.paragraphs.insert( + "a/title".to_string(), + noderesources::IndexParagraphs { + paragraphs: HashMap::from([( + format!("{rid}/a/title/0-18"), + noderesources::IndexParagraph { + start: 0, + end: 18, + field: "a/title".to_string(), + sentences: HashMap::from([ + ( + format!("{rid}/a/title/0-9#defaultvectors"), + noderesources::VectorSentence { + vector: vec![0.5; 100], + ..Default::default() + }, + ), + ( + format!("{rid}/a/title/9-18#defaultvectors"), + noderesources::VectorSentence { + vector: vec![0.7; 100], + ..Default::default() + }, + ), + ]), + vectorsets_sentences: HashMap::from([( + "myvectorset".to_string(), + noderesources::VectorsetSentences { + sentences: HashMap::from([( + format!("{rid}/a/title/0-18#vectorset"), + noderesources::VectorSentence { + vector: vec![0.2; 100], + ..Default::default() + }, + )]), + }, + )]), + ..Default::default() + }, + )]), + }, + ); + + resource +} + +fn minimal_resource(shard_id: String) -> noderesources::Resource { + let resource_id = Uuid::new_v4().to_string(); + + let now = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let timestamp = Timestamp { + seconds: now.as_secs() as i64, + nanos: 0, + }; + + let metadata = noderesources::IndexMetadata { + created: Some(timestamp.clone()), + modified: Some(timestamp), + }; + + noderesources::Resource { + shard_id: shard_id.clone(), + resource: Some(noderesources::ResourceId { + shard_id, + uuid: resource_id, + }), + status: noderesources::resource::ResourceStatus::Processed as i32, + metadata: Some(metadata), + ..Default::default() + } +} diff --git a/nucliadb_vectors/src/service/reader.rs b/nucliadb_vectors/src/service/reader.rs index 9493b9668a..fb0349c6f6 100644 --- a/nucliadb_vectors/src/service/reader.rs +++ b/nucliadb_vectors/src/service/reader.rs @@ -261,7 +261,7 @@ mod tests { }; // insert - delete - insert sequence let mut writer = VectorWriterService::create(vsc).unwrap(); - writer.set_resource(&resource).unwrap(); + writer.set_resource((&resource).into()).unwrap(); let reader = VectorReaderService::open(&shard_path).unwrap(); let mut request = VectorSearchRequest { @@ -351,7 +351,7 @@ mod tests { }; // insert - delete - insert sequence let mut writer = VectorWriterService::create(vsc).unwrap(); - let res = writer.set_resource(&resource); + let res = writer.set_resource((&resource).into()); assert!(res.is_ok()); let reader = VectorReaderService::open(&shard_path).unwrap(); let request = VectorSearchRequest { @@ -475,7 +475,7 @@ mod tests { ..Default::default() }; let mut writer = VectorWriterService::create(vsc).unwrap(); - let res = writer.set_resource(&resource); + let res = writer.set_resource((&resource).into()); assert!(res.is_ok()); let reader = VectorReaderService::open(&shard_path).unwrap(); let request = VectorSearchRequest { diff --git a/nucliadb_vectors/src/service/writer.rs b/nucliadb_vectors/src/service/writer.rs index 2d17825111..1f4997bf32 100644 --- a/nucliadb_vectors/src/service/writer.rs +++ b/nucliadb_vectors/src/service/writer.rs @@ -27,8 +27,7 @@ use nucliadb_core::metrics::request_time; use nucliadb_core::metrics::vectors::MergeSource; use nucliadb_core::prelude::*; use nucliadb_core::protos::prost::Message; -use nucliadb_core::protos::resource::ResourceStatus; -use nucliadb_core::protos::{Resource, ResourceId}; +use nucliadb_core::protos::ResourceId; use nucliadb_core::tracing::{self, *}; use nucliadb_core::vectors::MergeMetrics; use nucliadb_core::vectors::*; @@ -103,10 +102,10 @@ impl VectorWriter for VectorWriterService { #[measure(actor = "vectors", metric = "set_resource")] #[tracing::instrument(skip_all)] - fn set_resource(&mut self, resource: &Resource) -> NodeResult<()> { + fn set_resource(&mut self, resource: nucliadb_core::vectors::ResourceWrapper) -> NodeResult<()> { let time = Instant::now(); - let id = resource.resource.as_ref().map(|i| &i.shard_id); + let id = resource.id(); debug!("{id:?} - Updating main index"); let v = time.elapsed().as_millis(); debug!("{id:?} - Creating elements for the main index: starts {v} ms"); @@ -115,26 +114,24 @@ impl VectorWriter for VectorWriterService { let mut lengths: HashMap> = HashMap::new(); let mut elems = Vec::new(); let normalize_vectors = self.index.metadata().normalize_vectors; - if resource.status != ResourceStatus::Delete as i32 { - for (field_id, field_paragraphs) in resource.paragraphs.iter() { - for paragraph in field_paragraphs.paragraphs.values() { - let mut inner_labels = paragraph.labels.clone(); - inner_labels.push(field_id.clone()); - let labels = LabelDictionary::new(inner_labels); - - for (key, sentence) in paragraph.sentences.iter().clone() { - let key = key.to_string(); - let labels = labels.clone(); - let vector = if normalize_vectors { - utils::normalize_vector(&sentence.vector) - } else { - sentence.vector.clone() - }; - let metadata = sentence.metadata.as_ref().map(|m| m.encode_to_vec()); - let bucket = lengths.entry(vector.len()).or_default(); - elems.push(Elem::new(key, vector, labels, metadata)); - bucket.push(field_id); - } + for (field_id, field_paragraphs) in resource.fields() { + for paragraph in field_paragraphs { + let mut inner_labels = paragraph.labels.clone(); + inner_labels.push(field_id.clone()); + let labels = LabelDictionary::new(inner_labels); + + for (key, sentence) in paragraph.vectors.iter().clone() { + let key = key.to_string(); + let labels = labels.clone(); + let vector = if normalize_vectors { + utils::normalize_vector(&sentence.vector) + } else { + sentence.vector.clone() + }; + let metadata = sentence.metadata.as_ref().map(|m| m.encode_to_vec()); + let bucket = lengths.entry(vector.len()).or_default(); + elems.push(Elem::new(key, vector, labels, metadata)); + bucket.push(field_id); } } } @@ -157,7 +154,7 @@ impl VectorWriter for VectorWriterService { self.index.add_data_point(data_point_pin)?; } - for to_delete in &resource.sentences_to_delete { + for to_delete in resource.sentences_to_delete() { let key_as_bytes = to_delete.as_bytes(); self.index.record_delete(key_as_bytes, temporal_mark); } @@ -324,11 +321,11 @@ mod tests { }; // insert - delete - insert sequence let mut writer = VectorWriterService::create(vsc).unwrap(); - let res = writer.set_resource(&resource); + let res = writer.set_resource(nucliadb_core::vectors::ResourceWrapper::from(&resource)); assert!(res.is_ok()); let res = writer.delete_resource(&resource_id); assert!(res.is_ok()); - let res = writer.set_resource(&resource); + let res = writer.set_resource(nucliadb_core::vectors::ResourceWrapper::from(&resource)); assert!(res.is_ok()); } @@ -393,11 +390,11 @@ mod tests { }; // insert - delete - insert sequence let mut writer = VectorWriterService::create(vsc).unwrap(); - let res = writer.set_resource(&resource); + let res = writer.set_resource(nucliadb_core::vectors::ResourceWrapper::from(&resource)); assert!(res.is_ok()); let res = writer.delete_resource(&resource_id); assert!(res.is_ok()); - let res = writer.set_resource(&resource); + let res = writer.set_resource(nucliadb_core::vectors::ResourceWrapper::from(&resource)); assert!(res.is_ok()); let segments = writer.get_segment_ids().unwrap();