Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

box readers + swap update #2122

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions nucliadb_core/src/paragraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::prelude::*;
use crate::protos::*;
use crate::query_language::BooleanExpression;
use crate::IndexFiles;

pub type ParagraphsReaderPointer = Arc<RwLock<dyn ParagraphReader>>;
pub type ParagraphsReaderPointer = Box<dyn ParagraphReader>;
pub type ParagraphsWriterPointer = Box<dyn ParagraphWriter>;
pub type ProtosRequest = ParagraphSearchRequest;
pub type ProtosResponse = ParagraphSearchResponse;
Expand Down
3 changes: 1 addition & 2 deletions nucliadb_core/src/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::prelude::*;
use crate::protos::*;
use crate::Channel;
use crate::IndexFiles;

pub type RelationsReaderPointer = Arc<RwLock<dyn RelationsReader>>;
pub type RelationsReaderPointer = Box<dyn RelationsReader>;
pub type RelationsWriterPointer = Box<dyn RelationsWriter>;
pub type ProtosRequest = RelationSearchRequest;
pub type ProtosResponse = RelationSearchResponse;
Expand Down
3 changes: 1 addition & 2 deletions nucliadb_core/src/texts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::prelude::*;
use crate::protos::*;
use crate::query_planner::{PreFilterRequest, PreFilterResponse};
use crate::IndexFiles;

pub type TextsReaderPointer = Arc<RwLock<dyn FieldReader>>;
pub type TextsReaderPointer = Box<dyn FieldReader>;
pub type TextsWriterPointer = Box<dyn FieldWriter>;
pub type ProtosRequest = DocumentSearchRequest;
pub type ProtosResponse = DocumentSearchResponse;
Expand Down
5 changes: 1 addition & 4 deletions nucliadb_core/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use uuid::Uuid;

Expand All @@ -31,7 +30,7 @@ use crate::query_language::BooleanExpression;
use crate::Channel;
use crate::IndexFiles;

pub type VectorsReaderPointer = Arc<RwLock<dyn VectorReader>>;
pub type VectorsReaderPointer = Box<dyn VectorReader>;
pub type VectorsWriterPointer = Box<dyn VectorWriter>;
pub type ProtosRequest = VectorSearchRequest;
pub type ProtosResponse = VectorSearchResponse;
Expand Down Expand Up @@ -84,8 +83,6 @@ pub trait VectorReader: std::fmt::Debug + Send + Sync {
fn search(&self, request: &ProtosRequest, context: &VectorsContext) -> NodeResult<ProtosResponse>;
fn stored_ids(&self) -> NodeResult<Vec<String>>;
fn count(&self) -> NodeResult<usize>;

fn update(&mut self) -> NodeResult<()>;
}

pub trait VectorWriter: std::fmt::Debug + Send + Sync {
Expand Down
94 changes: 40 additions & 54 deletions nucliadb_node/src/shards/shard_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,43 +50,40 @@ use std::collections::HashSet;
use std::fs::{self, File};
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::RwLock;

const MAX_SUGGEST_COMPOUND_WORDS: usize = 3;
const MIN_VIABLE_PREFIX_SUGGEST: usize = 1;
const CHUNK_SIZE: usize = 65535;

fn open_vectors_reader(version: u32, path: &Path) -> NodeResult<VectorsReaderPointer> {
match version {
1 => nucliadb_vectors::service::VectorReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as VectorsReaderPointer),
2 => nucliadb_vectors::service::VectorReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as VectorsReaderPointer),
1 => nucliadb_vectors::service::VectorReaderService::open(path).map(|i| Box::new(i) as VectorsReaderPointer),
2 => nucliadb_vectors::service::VectorReaderService::open(path).map(|i| Box::new(i) as VectorsReaderPointer),
v => Err(node_error!("Invalid vectors version {v}")),
}
}
fn open_paragraphs_reader(version: u32, path: &Path) -> NodeResult<ParagraphsReaderPointer> {
match version {
2 => nucliadb_paragraphs2::reader::ParagraphReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as ParagraphsReaderPointer),
.map(|i| Box::new(i) as ParagraphsReaderPointer),
3 => nucliadb_paragraphs3::reader::ParagraphReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as ParagraphsReaderPointer),
.map(|i| Box::new(i) as ParagraphsReaderPointer),
v => Err(node_error!("Invalid paragraphs version {v}")),
}
}

fn open_texts_reader(version: u32, path: &Path) -> NodeResult<TextsReaderPointer> {
match version {
2 => nucliadb_texts2::reader::TextReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as TextsReaderPointer),
2 => nucliadb_texts2::reader::TextReaderService::open(path).map(|i| Box::new(i) as TextsReaderPointer),
v => Err(node_error!("Invalid text reader version {v}")),
}
}

fn open_relations_reader(version: u32, path: &Path) -> NodeResult<RelationsReaderPointer> {
match version {
2 => nucliadb_relations2::reader::RelationsReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as RelationsReaderPointer),
.map(|i| Box::new(i) as RelationsReaderPointer),
v => Err(node_error!("Invalid relations version {v}")),
}
}
Expand Down Expand Up @@ -140,12 +137,13 @@ impl Iterator for ShardFileChunkIterator {
pub struct ShardReader {
pub id: String,
pub metadata: ShardMetadata,
indexes: ShardIndexes,
root_path: PathBuf,
suffixed_root_path: String,
text_reader: TextsReaderPointer,
paragraph_reader: ParagraphsReaderPointer,
vector_reader: VectorsReaderPointer,
relation_reader: RelationsReaderPointer,
text_reader: RwLock<TextsReaderPointer>,
paragraph_reader: RwLock<ParagraphsReaderPointer>,
vector_reader: RwLock<VectorsReaderPointer>,
relation_reader: RwLock<RelationsReaderPointer>,
versions: Versions,
}

Expand Down Expand Up @@ -195,16 +193,12 @@ impl ShardReader {
pub fn get_info(&self) -> NodeResult<Shard> {
let span = tracing::Span::current();

let paragraphs = self.paragraph_reader.clone();
let vectors = self.vector_reader.clone();
let texts = self.text_reader.clone();

let info = info_span!(parent: &span, "text count");
let text_task = || run_with_telemetry(info, move || read_rw_lock(&texts).count());
let text_task = || run_with_telemetry(info, || read_rw_lock(&self.text_reader).count());
let info = info_span!(parent: &span, "paragraph count");
let paragraph_task = || run_with_telemetry(info, move || read_rw_lock(&paragraphs).count());
let paragraph_task = || run_with_telemetry(info, || read_rw_lock(&self.paragraph_reader).count());
let info = info_span!(parent: &span, "vector count");
let vector_task = || run_with_telemetry(info, move || read_rw_lock(&vectors).count());
let vector_task = || run_with_telemetry(info, || read_rw_lock(&self.vector_reader).count());

let mut text_result = Ok(0);
let mut paragraph_result = Ok(0);
Expand Down Expand Up @@ -298,12 +292,13 @@ impl ShardReader {
Ok(ShardReader {
id,
metadata,
root_path: shard_path.to_path_buf(),
suffixed_root_path,
text_reader: fields.unwrap(),
paragraph_reader: paragraphs.unwrap(),
vector_reader: vectors.unwrap(),
relation_reader: relations.unwrap(),
indexes,
root_path: shard_path.to_path_buf(),
text_reader: RwLock::new(fields.unwrap()),
paragraph_reader: RwLock::new(paragraphs.unwrap()),
vector_reader: RwLock::new(vectors.unwrap()),
relation_reader: RwLock::new(relations.unwrap()),
versions,
})
}
Expand Down Expand Up @@ -336,9 +331,6 @@ impl ShardReader {

let mut suggest_paragraphs = request.features.contains(&(SuggestFeatures::Paragraphs as i32));
let suggest_entities = request.features.contains(&(SuggestFeatures::Entities as i32));

let paragraphs_reader_service = self.paragraph_reader.clone();
let relations_reader_service = self.relation_reader.clone();
let prefixes = Self::split_suggest_query(request.body.clone(), MAX_SUGGEST_COMPOUND_WORDS);

// Prefilter to apply field label filters
Expand Down Expand Up @@ -371,7 +363,7 @@ impl ShardReader {
}

let suggest_paragraphs_task = suggest_paragraphs.then(|| {
let paragraph_task = move || read_rw_lock(&paragraphs_reader_service).suggest(&request);
let paragraph_task = move || read_rw_lock(&self.paragraph_reader).suggest(&request);
let info = info_span!(parent: &span, "paragraph suggest");
|| run_with_telemetry(info, paragraph_task)
});
Expand All @@ -392,9 +384,8 @@ impl ShardReader {
},
);

let responses = requests
.map(|request| read_rw_lock(&relations_reader_service).search(&request))
.collect::<Vec<_>>();
let responses: Vec<_> =
requests.map(|request| read_rw_lock(&self.relation_reader).search(&request)).collect();

let entities = responses
.into_iter()
Expand Down Expand Up @@ -466,34 +457,30 @@ impl ShardReader {
// Run the rest of the plan
let text_task = index_queries.texts_request.map(|mut request| {
request.id = search_id.clone();
let text_reader_service = self.text_reader.clone();
let info = info_span!(parent: &span, "text search");
let task = move || read_rw_lock(&text_reader_service).search(&request);
let task = move || read_rw_lock(&self.text_reader).search(&request);
|| run_with_telemetry(info, task)
});

let paragraph_task = index_queries.paragraphs_request.map(|mut request| {
request.id = search_id.clone();
let paragraphs_context = &index_queries.paragraphs_context;
let paragraph_reader_service = self.paragraph_reader.clone();
let info = info_span!(parent: &span, "paragraph search");
let task = move || read_rw_lock(&paragraph_reader_service).search(&request, paragraphs_context);
let task = move || read_rw_lock(&self.paragraph_reader).search(&request, paragraphs_context);
|| run_with_telemetry(info, task)
});

let vector_task = index_queries.vectors_request.map(|mut request| {
request.id = search_id.clone();
let vectors_context = &index_queries.vectors_context;
let vector_reader_service = self.vector_reader.clone();
let info = info_span!(parent: &span, "vector search");
let task = move || read_rw_lock(&vector_reader_service).search(&request, vectors_context);
let task = move || read_rw_lock(&self.vector_reader).search(&request, vectors_context);
|| run_with_telemetry(info, task)
});

let relation_task = index_queries.relations_request.map(|request| {
let relation_reader_service = self.relation_reader.clone();
let info = info_span!(parent: &span, "relations search");
let task = move || read_rw_lock(&relation_reader_service).search(&request);
let task = move || read_rw_lock(&self.relation_reader).search(&request);
|| run_with_telemetry(info, task)
});

Expand Down Expand Up @@ -529,18 +516,16 @@ impl ShardReader {
#[tracing::instrument(skip_all)]
pub fn paragraph_iterator(&self, request: StreamRequest) -> NodeResult<ParagraphIterator> {
let span = tracing::Span::current();
let paragraph_task_copy = Arc::clone(&self.paragraph_reader);
run_with_telemetry(info_span!(parent: &span, "paragraph iteration"), || {
read_rw_lock(&paragraph_task_copy).iterator(&request)
read_rw_lock(&self.paragraph_reader).iterator(&request)
})
}

#[tracing::instrument(skip_all)]
pub fn document_iterator(&self, request: StreamRequest) -> NodeResult<DocumentIterator> {
let span = tracing::Span::current();
let text_task_copy = Arc::clone(&self.text_reader);
run_with_telemetry(info_span!(parent: &span, "field iteration"), || {
read_rw_lock(&text_task_copy).iterator(&request)
read_rw_lock(&self.text_reader).iterator(&request)
})
}

Expand Down Expand Up @@ -592,39 +577,35 @@ impl ShardReader {
#[tracing::instrument(skip_all)]
pub fn paragraph_search(&self, search_request: ParagraphSearchRequest) -> NodeResult<ParagraphSearchResponse> {
let span = tracing::Span::current();
let paragraph_task_copy = Arc::clone(&self.paragraph_reader);

run_with_telemetry(info_span!(parent: &span, "paragraph reader search"), || {
read_rw_lock(&paragraph_task_copy).search(&search_request, &ParagraphsContext::default())
read_rw_lock(&self.paragraph_reader).search(&search_request, &ParagraphsContext::default())
})
}

#[tracing::instrument(skip_all)]
pub fn document_search(&self, search_request: DocumentSearchRequest) -> NodeResult<DocumentSearchResponse> {
let span = tracing::Span::current();
let text_task_copy = Arc::clone(&self.text_reader);

run_with_telemetry(info_span!(parent: &span, "field reader search"), || {
read_rw_lock(&text_task_copy).search(&search_request)
read_rw_lock(&self.text_reader).search(&search_request)
})
}

#[tracing::instrument(skip_all)]
pub fn vector_search(&self, search_request: VectorSearchRequest) -> NodeResult<VectorSearchResponse> {
let span = tracing::Span::current();
let vector_task_copy = Arc::clone(&self.vector_reader);

run_with_telemetry(info_span!(parent: &span, "vector reader search"), || {
read_rw_lock(&vector_task_copy).search(&search_request, &VectorsContext::default())
read_rw_lock(&self.vector_reader).search(&search_request, &VectorsContext::default())
})
}
#[tracing::instrument(skip_all)]
pub fn relation_search(&self, search_request: RelationSearchRequest) -> NodeResult<RelationSearchResponse> {
let span = tracing::Span::current();
let relation_task_copy = Arc::clone(&self.relation_reader);

run_with_telemetry(info_span!(parent: &span, "relation reader search"), || {
read_rw_lock(&relation_task_copy).search(&search_request)
read_rw_lock(&self.relation_reader).search(&search_request)
})
}

Expand All @@ -644,7 +625,12 @@ impl ShardReader {
}

pub fn update(&self) -> NodeResult<()> {
write_rw_lock(&self.vector_reader).update()
let version = self.versions.vectors;
let path = self.indexes.vectors_path();
let new_reader = open_vectors_reader(version, &path)?;
let mut writer = write_rw_lock(&self.vector_reader);
*writer = new_reader;
Ok(())
}
}

Expand Down
Loading
Loading