Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a vectors index resource wrapper to set resources in the proper vectorset #2152

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions nucliadb_core/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,25 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//

use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;

use nucliadb_protos::nodereader;
use nucliadb_protos::utils;
use uuid::Uuid;

use crate::metrics::vectors::MergeSource;
use crate::prelude::*;
use crate::protos::*;
use crate::query_language::BooleanExpression;
use crate::Channel;
use crate::IndexFiles;
use nucliadb_protos::noderesources;

pub type VectorsReaderPointer = Box<dyn VectorReader>;
pub type VectorsWriterPointer = Box<dyn VectorWriter>;
pub type ProtosRequest = VectorSearchRequest;
pub type ProtosResponse = VectorSearchResponse;
pub type ProtosRequest = nodereader::VectorSearchRequest;
pub type ProtosResponse = nodereader::VectorSearchResponse;

#[derive(Debug, Clone, Copy)]
pub struct MergeParameters {
Expand All @@ -49,7 +52,7 @@ pub struct MergeContext {

#[derive(Clone)]
pub struct VectorConfig {
pub similarity: VectorSimilarity,
pub similarity: utils::VectorSimilarity,
pub path: PathBuf,
pub channel: Channel,
pub shard_id: String,
Expand Down Expand Up @@ -94,9 +97,65 @@ pub trait VectorWriter: std::fmt::Debug + Send + Sync {

fn prepare_merge(&self, parameters: MergeParameters) -> NodeResult<Option<Box<dyn MergeRunner>>>;
fn record_merge(&mut self, merge_result: Box<dyn MergeResults>, source: MergeSource) -> NodeResult<MergeMetrics>;
fn set_resource(&mut self, resource: &Resource) -> NodeResult<()>;
fn delete_resource(&mut self, resource_id: &ResourceId) -> NodeResult<()>;
fn set_resource(&mut self, resource: ResourceWrapper) -> NodeResult<()>;
fn delete_resource(&mut self, resource_id: &noderesources::ResourceId) -> NodeResult<()>;
fn garbage_collection(&mut self) -> NodeResult<()>;
fn force_garbage_collection(&mut self) -> NodeResult<()>;
fn reload(&mut self) -> NodeResult<()>;
}

pub struct ResourceWrapper<'a> {
resource: &'a noderesources::Resource,
vectorset: Option<String>,
}

impl<'a> From<&'a noderesources::Resource> for ResourceWrapper<'a> {
fn from(value: &'a noderesources::Resource) -> Self {
Self {
resource: value,
vectorset: None,
}
}
}

impl<'a> ResourceWrapper<'a> {
pub fn new_vectorset_resource(resource: &'a noderesources::Resource, vectorset: &str) -> Self {
Self {
resource,
vectorset: Some(vectorset.to_string()),
}
}

pub fn id(&self) -> &String {
&self.resource.shard_id
}

pub fn fields(&self) -> impl Iterator<Item = (&String, impl Iterator<Item = ParagraphVectors>)> {
self.resource.paragraphs.iter().map(|(field_id, paragraphs_wrapper)| {
let sentences_iterator = paragraphs_wrapper.paragraphs.iter().filter_map(|(_paragraph_id, paragraph)| {
let sentences = if let Some(vectorset) = &self.vectorset {
// indexing a vectorset, we should return only paragraphs from this vectorset.
// If vectorset is not found, we'll skip this paragraph
paragraph.vectorsets_sentences.get(vectorset).map(|x| &x.sentences)
} else {
// Default vectors index (no vectorset)
Some(&paragraph.sentences)
};
sentences.map(|s| ParagraphVectors {
vectors: s,
labels: &paragraph.labels,
})
});
(field_id, sentences_iterator)
})
}

pub fn sentences_to_delete(&self) -> impl Iterator<Item = &str> {
self.resource.paragraphs_to_delete.iter().map(|paragraph_id| paragraph_id.as_str())
}
}

pub struct ParagraphVectors<'a> {
pub vectors: &'a HashMap<String, noderesources::VectorSentence>,
pub labels: &'a Vec<String>,
}
2 changes: 2 additions & 0 deletions nucliadb_node/src/shards/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub mod indexes;
pub mod metadata;
pub mod shard_reader;
pub mod shard_writer;
#[cfg(test)]
mod tests;
pub mod versioning;

// Alias for more readable imports
Expand Down
37 changes: 27 additions & 10 deletions nucliadb_node/src/shards/shard_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,34 +271,51 @@ impl ShardReader {
let indexes = ShardIndexes::load(shard_path).unwrap_or_else(|_| ShardIndexes::new(shard_path));

let versions = Versions::load(&shard_path.join(VERSION_FILE))?;
let text_task = || Some(open_texts_reader(versions.texts, &indexes.texts_path()));
let paragraph_task = || Some(open_paragraphs_reader(versions.paragraphs, &indexes.paragraphs_path()));
let vector_task = || Some(open_vectors_reader(versions.vectors, &indexes.vectors_path()));
let relation_task = || Some(open_relations_reader(versions.relations, &indexes.relations_path()));

let text_task = || Some(open_texts_reader(versions.texts, &indexes.texts_path()));
let info = info_span!(parent: &span, "text open");
let text_task = || run_with_telemetry(info, text_task);

let paragraph_task = || Some(open_paragraphs_reader(versions.paragraphs, &indexes.paragraphs_path()));
let info = info_span!(parent: &span, "paragraph open");
let paragraph_task = || run_with_telemetry(info, paragraph_task);
let info = info_span!(parent: &span, "vector open");
let vector_task = || run_with_telemetry(info, vector_task);

let mut vector_tasks = vec![];
for (name, path) in indexes.iter_vectors_indexes() {
vector_tasks.push(|| {
run_with_telemetry(info_span!(parent: &span, "vector open"), move || {
Some((name, open_vectors_reader(versions.vectors, &path)))
})
});
}

let relation_task = || Some(open_relations_reader(versions.relations, &indexes.relations_path()));
let info = info_span!(parent: &span, "relation open");
let relation_task = || run_with_telemetry(info, relation_task);

let mut text_result = None;
let mut paragraph_result = None;
let mut vector_result = None;
let mut vector_results = Vec::with_capacity(vector_tasks.len());
for _ in 0..vector_tasks.len() {
vector_results.push(None);
}
let mut relation_result = None;
crossbeam_thread::scope(|s| {
s.spawn(|_| text_result = text_task());
s.spawn(|_| paragraph_result = paragraph_task());
s.spawn(|_| vector_result = vector_task());
for (vector_task, vector_result) in vector_tasks.into_iter().zip(vector_results.iter_mut()) {
s.spawn(|_| *vector_result = vector_task());
}
s.spawn(|_| relation_result = relation_task());
})
.expect("Failed to join threads");
let fields = text_result.transpose()?;
let paragraphs = paragraph_result.transpose()?;
let vectors = vector_result.transpose()?;
let mut vectors = HashMap::with_capacity(vector_results.len());
for result in vector_results {
let (name, vector_writer) = result.unwrap();
vectors.insert(name, vector_writer?);
}
let relations = relation_result.transpose()?;
let suffixed_root_path = shard_path.to_str().unwrap().to_owned() + "/";

Expand All @@ -309,7 +326,7 @@ impl ShardReader {
root_path: shard_path.to_path_buf(),
text_reader: RwLock::new(fields.unwrap()),
paragraph_reader: RwLock::new(paragraphs.unwrap()),
vector_readers: RwLock::new(HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), vectors.unwrap())])),
vector_readers: RwLock::new(vectors),
relation_reader: RwLock::new(relations.unwrap()),
versions,
})
Expand Down
16 changes: 13 additions & 3 deletions nucliadb_node/src/shards/shard_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,11 @@ impl ShardWriter {
let mut vector_tasks = vec![];
for (name, path) in indexes.iter_vectors_indexes() {
let id = metadata.id();
vector_tasks.push(move || Some((name, open_vectors_writer(versions.vectors, &path, id))));
vector_tasks.push(|| {
run_with_telemetry(info_span!(parent: &span, "Open vectors index writer"), move || {
Some((name, open_vectors_writer(versions.vectors, &path, id)))
})
});
}

let rsc = RelationConfig {
Expand Down Expand Up @@ -383,11 +387,17 @@ impl ShardWriter {
};

let mut vector_tasks = vec![];
for (_, vector_writer) in indexes.vectors_indexes.iter_mut() {
for (vectorset, vector_writer) in indexes.vectors_indexes.iter_mut() {
vector_tasks.push(|| {
run_with_telemetry(info_span!(parent: &span, "vector set_resource"), || {
debug!("Vector service starts set_resource");
let result = vector_writer.set_resource(&resource);
let vectorset_resource = match vectorset.as_str() {
"" | DEFAULT_VECTORS_INDEX_NAME => (&resource).into(),
vectorset => {
nucliadb_core::vectors::ResourceWrapper::new_vectorset_resource(&resource, vectorset)
}
};
let result = vector_writer.set_resource(vectorset_resource);
debug!("Vector service ends set_resource");
result
})
Expand Down
Loading
Loading