Skip to content

Commit

Permalink
Support vectorsets at shard level (#2129)
Browse files Browse the repository at this point in the history
* Basic vectorset support in shard writer

New, open, set and remove resource, GC and reload

* Skip implementation of vectorsets merge and replication (for now)

* Add create vectors index function and some renames

* Add normalize_vector parameter to new vectorset call

* Add normalize_vector parameter to new vectorset call

* Implement add_vector_set gRPC call

* Add basic test creating 2 vectors indexes and setting a resource

* Implement remove vectorset gRPC call

* Implement list vectorsets gRPC call

* Add more operations on the vectorset test

* Fix rebase

* Fix python lints after protos changes

* Better way to pass parameters to open_vectors_writer

Co-authored-by: Javier Torres <[email protected]>

* Fix

* No need to store ShardIndexes in the reader

* Remove print from test

* Start vectorsets support in shard reader

* Use proto vectorset on search

* More

* Merge all vectors indexes and return any error

* Add vectorset sentences to index paragraph proto

* Use it on the tests

* Fix merge so it doesn't block indexes lock

---------

Co-authored-by: Javier Torres <[email protected]>
  • Loading branch information
jotare and javitonino authored May 14, 2024
1 parent 6c53c37 commit 55fd11d
Show file tree
Hide file tree
Showing 51 changed files with 2,175 additions and 2,112 deletions.
6 changes: 3 additions & 3 deletions nucliadb/nucliadb/common/cluster/grpc_node_dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ async def AddVectorSet(self, data): # pragma: no cover
result = OpStatus()
return result

async def ListVectorSet(self, data: ShardId): # pragma: no cover
self.calls.setdefault("ListVectorSet", []).append(data)
async def ListVectorSets(self, data: ShardId): # pragma: no cover
self.calls.setdefault("ListVectorSets", []).append(data)
result = VectorSetList()
result.shard.id = data.id
result.vectorset.append("base")
result.vectorsets.append("base")
return result

async def GC(self, request: ShardId) -> EmptyResponse: # pragma: no cover
Expand Down
123 changes: 115 additions & 8 deletions nucliadb_node/src/grpc/grpc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::grpc::collect_garbage::{garbage_collection_loop, GCParameters};
use crate::merge::{global_merger, MergePriority, MergeRequest, MergeWaiter};
use crate::settings::Settings;
use crate::shards::metadata::ShardMetadata;
use crate::shards::writer::ShardWriter;
use crate::shards::writer::{NewVectorsIndex, ShardWriter};
use crate::telemetry::run_with_telemetry;
use crate::utils::{get_primary_node_id, list_shards, read_host_key};
use nucliadb_core::metrics::get_metrics;
Expand All @@ -38,7 +38,7 @@ use nucliadb_core::protos::{
ShardId, ShardIds, VectorSetId, VectorSetList,
};
use nucliadb_core::tracing::{self, Span, *};
use nucliadb_core::Channel;
use nucliadb_core::{Channel, NodeResult};
use object_store::path::Path;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -280,16 +280,123 @@ impl NodeWriter for NodeWriterGRPCDriver {
Ok(tonic::Response::new(status))
}

async fn add_vector_set(&self, _: Request<NewVectorSetRequest>) -> Result<Response<OpStatus>, Status> {
Err(tonic::Status::internal("Coming soon.."))
async fn add_vector_set(&self, request: Request<NewVectorSetRequest>) -> Result<Response<OpStatus>, Status> {
let span = Span::current();

let request = request.into_inner();
let similarity = request.similarity();
let normalize_vectors = request.normalize_vectors;
let Some(VectorSetId {
shard: Some(ShardId {
id: shard_id,
}),
vectorset,
}) = request.id
else {
return Ok(tonic::Response::new(OpStatus {
status: op_status::Status::Error.into(),
detail: "Vectorset ID must be provided".to_string(),
..Default::default()
}));
};

let shards = Arc::clone(&self.shards);
let task = move || {
run_with_telemetry(info_span!(parent: &span, "Add a vectorset"), move || {
let shard = obtain_shard(shards, shard_id.clone())?;
shard.create_vectors_index(NewVectorsIndex {
shard_id,
name: vectorset,
channel: shard.metadata.channel(),
similarity,
normalize_vectors,
})
})
};
let result = tokio::task::spawn_blocking(task)
.await
.map_err(|error| tonic::Status::internal(format!("Blocking task panicked: {error:?}")))?;
let status = match result {
Ok(()) => OpStatus {
status: op_status::Status::Ok.into(),
detail: "Vectorset successfully created".to_string(),
..Default::default()
},
Err(error) => OpStatus {
status: op_status::Status::Error.into(),
detail: error.to_string(),
..Default::default()
},
};
Ok(tonic::Response::new(status))
}

async fn remove_vector_set(&self, _: Request<VectorSetId>) -> Result<Response<OpStatus>, Status> {
Err(tonic::Status::internal("Coming soon.."))
async fn remove_vector_set(&self, request: Request<VectorSetId>) -> Result<Response<OpStatus>, Status> {
let span = Span::current();

let VectorSetId {
shard: Some(ShardId {
id: shard_id,
}),
vectorset,
} = request.into_inner()
else {
return Ok(tonic::Response::new(OpStatus {
status: op_status::Status::Error.into(),
detail: "Vectorset ID must be provided".to_string(),
..Default::default()
}));
};

let shards = Arc::clone(&self.shards);
let task = move || {
run_with_telemetry(info_span!(parent: &span, "Remove vectorset"), move || {
let shard = obtain_shard(shards, shard_id.clone())?;
shard.remove_vectors_index(vectorset)
})
};
let result = tokio::task::spawn_blocking(task)
.await
.map_err(|error| tonic::Status::internal(format!("Blocking task panicked: {error:?}")))?;
let status = match result {
Ok(()) => OpStatus {
status: op_status::Status::Ok.into(),
detail: "Vectorset successfully deleted".to_string(),
..Default::default()
},
Err(error) => OpStatus {
status: op_status::Status::Error.into(),
detail: error.to_string(),
..Default::default()
},
};
Ok(tonic::Response::new(status))
}

async fn list_vector_sets(&self, _: Request<ShardId>) -> Result<Response<VectorSetList>, Status> {
Err(tonic::Status::internal("Coming soon.."))
async fn list_vector_sets(&self, request: Request<ShardId>) -> Result<Response<VectorSetList>, Status> {
let span = Span::current();

let shard_id = request.into_inner().id;
let shard_id_clone = shard_id.clone();
let shards = Arc::clone(&self.shards);
let task = move || {
run_with_telemetry(info_span!(parent: &span, "Remove vectorset"), move || {
let shard = obtain_shard(shards, shard_id_clone)?;
Ok(shard.list_vectors_indexes())
})
};
let result: NodeResult<Vec<String>> = tokio::task::spawn_blocking(task)
.await
.map_err(|error| tonic::Status::internal(format!("Blocking task panicked: {error:?}")))?;
match result {
Ok(vectorsets) => Ok(tonic::Response::new(VectorSetList {
shard: Some(ShardId {
id: shard_id,
}),
vectorsets,
})),
Err(error) => Err(tonic::Status::internal(error.to_string())),
}
}

async fn get_metadata(&self, _request: Request<EmptyQuery>) -> Result<Response<NodeMetadata>, Status> {
Expand Down
73 changes: 37 additions & 36 deletions nucliadb_node/src/shards/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use uuid::Uuid;

use crate::disk_structure;

pub const DEFAULT_VECTOR_INDEX_NAME: &str = "__default__";
pub const DEFAULT_VECTORS_INDEX_NAME: &str = "__default__";
pub const MAX_ALLOWED_VECTORSETS: usize = 5;
pub const SHARD_INDEXES_FILENAME: &str = "indexes.json";
pub const TEMP_SHARD_INDEXES_FILENAME: &str = "indexes.temp.json";
Expand Down Expand Up @@ -70,7 +70,7 @@ impl ShardIndexes {
}

pub fn vectors_path(&self) -> PathBuf {
self.vectorset_path(DEFAULT_VECTOR_INDEX_NAME).expect("Default vectors index should always be present")
self.vectorset_path(DEFAULT_VECTORS_INDEX_NAME).expect("Default vectors index should always be present")
}

pub fn vectorset_path(&self, name: &str) -> Option<PathBuf> {
Expand All @@ -83,17 +83,16 @@ impl ShardIndexes {

// Vectorsets

#[allow(dead_code)]
/// Add a new vectorset to the index and returns it's path
pub fn add_vectorset(&mut self, name: String) -> NodeResult<PathBuf> {
/// Add a new vectors index to the shard and returns it's path
pub fn add_vectors_index(&mut self, name: String) -> NodeResult<PathBuf> {
if self.inner.vectorsets.len() >= MAX_ALLOWED_VECTORSETS {
return Err(node_error!(format!(
"Max amount of allowed vectorsets reached: {}",
self.inner.vectorsets.len()
)));
}
if name == DEFAULT_VECTOR_INDEX_NAME {
return Err(node_error!(format!("Vectorset id {DEFAULT_VECTOR_INDEX_NAME} is reserved for internal use")));
if name == DEFAULT_VECTORS_INDEX_NAME {
return Err(node_error!(format!("Vectorset id {DEFAULT_VECTORS_INDEX_NAME} is reserved for internal use")));
}
if self.inner.vectorsets.contains_key(&name) {
return Err(node_error!(format!("Vectorset id {name} is already in use")));
Expand All @@ -105,22 +104,24 @@ impl ShardIndexes {
Ok(path)
}

#[allow(dead_code)]
/// Removes a vectorset from the shard and returns the index path
pub fn remove_vectorset(&mut self, name: &str) -> NodeResult<Option<PathBuf>> {
if name == DEFAULT_VECTOR_INDEX_NAME {
/// Removes a vectors index from the shard and returns its path
pub fn remove_vectors_index(&mut self, name: &str) -> NodeResult<Option<PathBuf>> {
if name == DEFAULT_VECTORS_INDEX_NAME {
return Err(node_error!(format!(
"Vectorset id {DEFAULT_VECTOR_INDEX_NAME} is reserved and can't be removed"
"Vectorset id {DEFAULT_VECTORS_INDEX_NAME} is reserved and can't be removed"
)));
}
let removed = self.inner.vectorsets.remove(name).map(|vectorset| self.shard_path.join(vectorset));
Ok(removed)
}

#[allow(dead_code)]
pub fn iter_vectorsets(&self) -> impl Iterator<Item = (String, PathBuf)> + '_ {
pub fn iter_vectors_indexes(&self) -> impl Iterator<Item = (String, PathBuf)> + '_ {
self.inner.vectorsets.iter().map(|(name, vectorset)| (name.to_owned(), self.shard_path.join(vectorset)))
}

pub fn count_vectors_indexes(&self) -> usize {
self.inner.vectorsets.len()
}
}

#[cfg_attr(test, derive(PartialEq))]
Expand All @@ -135,7 +136,7 @@ struct ShardIndexesFile {
impl ShardIndexesFile {
pub fn load(shard_path: &Path) -> NodeResult<Self> {
let mut reader = BufReader::new(File::open(shard_path.join(SHARD_INDEXES_FILENAME))?);
let indexes: ShardIndexesFile = serde_json::from_reader(&mut reader)?;
let indexes: Self = serde_json::from_reader(&mut reader)?;
Ok(indexes)
}

Expand All @@ -155,7 +156,7 @@ impl Default for ShardIndexesFile {
Self {
texts: disk_structure::TEXTS_DIR.into(),
paragraphs: disk_structure::PARAGRAPHS_DIR.into(),
vectorsets: HashMap::from([(DEFAULT_VECTOR_INDEX_NAME.to_string(), disk_structure::VECTORS_DIR.into())]),
vectorsets: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), disk_structure::VECTORS_DIR.into())]),
relations: disk_structure::RELATIONS_DIR.into(),
}
}
Expand Down Expand Up @@ -191,18 +192,18 @@ mod tests {

let mut indexes = ShardIndexes::new(shard_path);

let vectorsets = indexes.iter_vectorsets().collect::<Vec<(String, PathBuf)>>();
let vectorsets = indexes.iter_vectors_indexes().collect::<Vec<(String, PathBuf)>>();
assert_eq!(vectorsets.len(), 1);
assert_eq!(vectorsets[0].0, DEFAULT_VECTOR_INDEX_NAME.to_string());
assert_eq!(vectorsets[0].0, DEFAULT_VECTORS_INDEX_NAME.to_string());
assert_eq!(vectorsets[0].1, shard_path.join(disk_structure::VECTORS_DIR));

assert_eq!(
indexes.vectorset_path(DEFAULT_VECTOR_INDEX_NAME),
indexes.vectorset_path(DEFAULT_VECTORS_INDEX_NAME),
Some(shard_path.join(disk_structure::VECTORS_DIR))
);

// Default vectorset can't be removed
assert!(indexes.remove_vectorset(DEFAULT_VECTOR_INDEX_NAME).is_err());
assert!(indexes.remove_vectors_index(DEFAULT_VECTORS_INDEX_NAME).is_err());
}

#[test]
Expand All @@ -212,7 +213,7 @@ mod tests {

let mut indexes = ShardIndexes::new(shard_path);

indexes.add_vectorset("gecko".to_string()).unwrap();
indexes.add_vectors_index("gecko".to_string()).unwrap();

assert_eq!(indexes.texts_path(), shard_path.join(disk_structure::TEXTS_DIR));
assert_eq!(indexes.paragraphs_path(), shard_path.join(disk_structure::PARAGRAPHS_DIR));
Expand All @@ -233,13 +234,13 @@ mod tests {

let mut indexes = ShardIndexes::new(shard_path);

indexes.add_vectorset("gecko".to_string()).unwrap();
indexes.add_vectorset("openai".to_string()).unwrap();
indexes.add_vectors_index("gecko".to_string()).unwrap();
indexes.add_vectors_index("openai".to_string()).unwrap();

let vectorsets = indexes.iter_vectorsets().sorted().collect::<Vec<(String, PathBuf)>>();
let vectorsets = indexes.iter_vectors_indexes().sorted().collect::<Vec<(String, PathBuf)>>();
assert_eq!(vectorsets.len(), 3);

assert_eq!(vectorsets[0].0, DEFAULT_VECTOR_INDEX_NAME.to_string());
assert_eq!(vectorsets[0].0, DEFAULT_VECTORS_INDEX_NAME.to_string());
assert_eq!(vectorsets[1].0, "gecko".to_string());
assert_eq!(vectorsets[1].1, indexes.vectorset_path("gecko").unwrap());
assert_eq!(vectorsets[2].0, "openai".to_string());
Expand All @@ -255,27 +256,27 @@ mod tests {

// Add two vectorsets more

let added = indexes.add_vectorset("gecko".to_string()).is_ok();
let added = indexes.add_vectors_index("gecko".to_string()).is_ok();
assert!(added);
let added = indexes.add_vectorset("openai".to_string()).is_ok();
let added = indexes.add_vectors_index("openai".to_string()).is_ok();
assert!(added);

let vectorsets = indexes.iter_vectorsets().sorted().collect::<Vec<(String, PathBuf)>>();
let vectorsets = indexes.iter_vectors_indexes().sorted().collect::<Vec<(String, PathBuf)>>();
assert_eq!(vectorsets.len(), 3);

assert_eq!(vectorsets[0].0, DEFAULT_VECTOR_INDEX_NAME.to_string());
assert_eq!(vectorsets[0].0, DEFAULT_VECTORS_INDEX_NAME.to_string());
assert_eq!(vectorsets[1].0, "gecko".to_string());
assert_eq!(vectorsets[1].1, indexes.vectorset_path("gecko").unwrap());
assert_eq!(vectorsets[2].0, "openai".to_string());
assert_eq!(vectorsets[2].1, indexes.vectorset_path("openai").unwrap());

// Remove a regular vectorset

assert!(indexes.remove_vectorset("gecko").is_ok());
assert!(indexes.remove_vectors_index("gecko").is_ok());

let vectorsets = indexes.iter_vectorsets().sorted().collect::<Vec<(String, PathBuf)>>();
let vectorsets = indexes.iter_vectors_indexes().sorted().collect::<Vec<(String, PathBuf)>>();
assert_eq!(vectorsets.len(), 2);
assert_eq!(vectorsets[0].0, DEFAULT_VECTOR_INDEX_NAME.to_string());
assert_eq!(vectorsets[0].0, DEFAULT_VECTORS_INDEX_NAME.to_string());
assert_eq!(vectorsets[1].0, "openai".to_string());
assert_eq!(vectorsets[1].1, indexes.vectorset_path("openai").unwrap());
}
Expand All @@ -289,8 +290,8 @@ mod tests {

// Add two vectorsets more

assert!(indexes.add_vectorset("gecko".to_string()).is_ok());
assert!(indexes.add_vectorset("gecko".to_string()).is_err());
assert!(indexes.add_vectors_index("gecko".to_string()).is_ok());
assert!(indexes.add_vectors_index("gecko".to_string()).is_err());
}

#[test]
Expand All @@ -301,8 +302,8 @@ mod tests {
let mut indexes = ShardIndexes::new(shard_path);

for i in 0..(MAX_ALLOWED_VECTORSETS - 1) {
assert!(indexes.add_vectorset(format!("vectorset-{i}")).is_ok());
assert!(indexes.add_vectors_index(format!("vectorset-{i}")).is_ok());
}
assert!(indexes.add_vectorset("too-many".to_string()).is_err());
assert!(indexes.add_vectors_index("too-many".to_string()).is_err());
}
}
2 changes: 1 addition & 1 deletion nucliadb_node/src/shards/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
//! This module provides tools for managing shards

// pub mod errors;
mod indexes;
pub mod indexes;
pub mod metadata;
pub mod shard_reader;
pub mod shard_writer;
Expand Down
Loading

3 comments on commit 55fd11d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 55fd11d Previous: 6c53c37 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13140.148578835571 iter/sec (stddev: 8.343017325820474e-7) 13198.084460244272 iter/sec (stddev: 3.4157717375989134e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 55fd11d Previous: 6c53c37 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13099.025494679947 iter/sec (stddev: 4.824226359743248e-7) 13198.084460244272 iter/sec (stddev: 3.4157717375989134e-7) 1.01

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 55fd11d Previous: 6c53c37 Ratio
nucliadb/search/tests/unit/search/test_fetch.py::test_highligh_error 13229.232975222492 iter/sec (stddev: 2.8456513355174894e-7) 13198.084460244272 iter/sec (stddev: 3.4157717375989134e-7) 1.00

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.