Skip to content

Commit

Permalink
box readers + swap update
Browse files Browse the repository at this point in the history
  • Loading branch information
hermeGarcia committed May 7, 2024
1 parent 7e44af2 commit 91e93b8
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 142 deletions.
3 changes: 1 addition & 2 deletions nucliadb_core/src/paragraphs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::prelude::*;
use crate::protos::*;
use crate::query_language::BooleanExpression;
use crate::IndexFiles;

pub type ParagraphsReaderPointer = Arc<RwLock<dyn ParagraphReader>>;
pub type ParagraphsReaderPointer = Box<dyn ParagraphReader>;
pub type ParagraphsWriterPointer = Box<dyn ParagraphWriter>;
pub type ProtosRequest = ParagraphSearchRequest;
pub type ProtosResponse = ParagraphSearchResponse;
Expand Down
3 changes: 1 addition & 2 deletions nucliadb_core/src/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::prelude::*;
use crate::protos::*;
use crate::Channel;
use crate::IndexFiles;

pub type RelationsReaderPointer = Arc<RwLock<dyn RelationsReader>>;
pub type RelationsReaderPointer = Box<dyn RelationsReader>;
pub type RelationsWriterPointer = Box<dyn RelationsWriter>;
pub type ProtosRequest = RelationSearchRequest;
pub type ProtosResponse = RelationSearchResponse;
Expand Down
3 changes: 1 addition & 2 deletions nucliadb_core/src/texts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use crate::prelude::*;
use crate::protos::*;
use crate::query_planner::{PreFilterRequest, PreFilterResponse};
use crate::IndexFiles;

pub type TextsReaderPointer = Arc<RwLock<dyn FieldReader>>;
pub type TextsReaderPointer = Box<dyn FieldReader>;
pub type TextsWriterPointer = Box<dyn FieldWriter>;
pub type ProtosRequest = DocumentSearchRequest;
pub type ProtosResponse = DocumentSearchResponse;
Expand Down
5 changes: 1 addition & 4 deletions nucliadb_core/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

use uuid::Uuid;

Expand All @@ -31,7 +30,7 @@ use crate::query_language::BooleanExpression;
use crate::Channel;
use crate::IndexFiles;

pub type VectorsReaderPointer = Arc<RwLock<dyn VectorReader>>;
pub type VectorsReaderPointer = Box<dyn VectorReader>;
pub type VectorsWriterPointer = Box<dyn VectorWriter>;
pub type ProtosRequest = VectorSearchRequest;
pub type ProtosResponse = VectorSearchResponse;
Expand Down Expand Up @@ -84,8 +83,6 @@ pub trait VectorReader: std::fmt::Debug + Send + Sync {
fn search(&self, request: &ProtosRequest, context: &VectorsContext) -> NodeResult<ProtosResponse>;
fn stored_ids(&self) -> NodeResult<Vec<String>>;
fn count(&self) -> NodeResult<usize>;

fn update(&mut self) -> NodeResult<()>;
}

pub trait VectorWriter: std::fmt::Debug + Send + Sync {
Expand Down
94 changes: 40 additions & 54 deletions nucliadb_node/src/shards/shard_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,43 +50,40 @@ use std::collections::HashSet;
use std::fs::{self, File};
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::sync::RwLock;

const MAX_SUGGEST_COMPOUND_WORDS: usize = 3;
const MIN_VIABLE_PREFIX_SUGGEST: usize = 1;
const CHUNK_SIZE: usize = 65535;

fn open_vectors_reader(version: u32, path: &Path) -> NodeResult<VectorsReaderPointer> {
match version {
1 => nucliadb_vectors::service::VectorReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as VectorsReaderPointer),
2 => nucliadb_vectors::service::VectorReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as VectorsReaderPointer),
1 => nucliadb_vectors::service::VectorReaderService::open(path).map(|i| Box::new(i) as VectorsReaderPointer),
2 => nucliadb_vectors::service::VectorReaderService::open(path).map(|i| Box::new(i) as VectorsReaderPointer),
v => Err(node_error!("Invalid vectors version {v}")),
}
}
fn open_paragraphs_reader(version: u32, path: &Path) -> NodeResult<ParagraphsReaderPointer> {
match version {
2 => nucliadb_paragraphs2::reader::ParagraphReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as ParagraphsReaderPointer),
.map(|i| Box::new(i) as ParagraphsReaderPointer),
3 => nucliadb_paragraphs3::reader::ParagraphReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as ParagraphsReaderPointer),
.map(|i| Box::new(i) as ParagraphsReaderPointer),
v => Err(node_error!("Invalid paragraphs version {v}")),
}
}

fn open_texts_reader(version: u32, path: &Path) -> NodeResult<TextsReaderPointer> {
match version {
2 => nucliadb_texts2::reader::TextReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as TextsReaderPointer),
2 => nucliadb_texts2::reader::TextReaderService::open(path).map(|i| Box::new(i) as TextsReaderPointer),
v => Err(node_error!("Invalid text reader version {v}")),
}
}

fn open_relations_reader(version: u32, path: &Path) -> NodeResult<RelationsReaderPointer> {
match version {
2 => nucliadb_relations2::reader::RelationsReaderService::open(path)
.map(|i| Arc::new(RwLock::new(i)) as RelationsReaderPointer),
.map(|i| Box::new(i) as RelationsReaderPointer),
v => Err(node_error!("Invalid relations version {v}")),
}
}
Expand Down Expand Up @@ -140,12 +137,13 @@ impl Iterator for ShardFileChunkIterator {
pub struct ShardReader {
pub id: String,
pub metadata: ShardMetadata,
indexes: ShardIndexes,
root_path: PathBuf,
suffixed_root_path: String,
text_reader: TextsReaderPointer,
paragraph_reader: ParagraphsReaderPointer,
vector_reader: VectorsReaderPointer,
relation_reader: RelationsReaderPointer,
text_reader: RwLock<TextsReaderPointer>,
paragraph_reader: RwLock<ParagraphsReaderPointer>,
vector_reader: RwLock<VectorsReaderPointer>,
relation_reader: RwLock<RelationsReaderPointer>,
versions: Versions,
}

Expand Down Expand Up @@ -195,16 +193,12 @@ impl ShardReader {
pub fn get_info(&self) -> NodeResult<Shard> {
let span = tracing::Span::current();

let paragraphs = self.paragraph_reader.clone();
let vectors = self.vector_reader.clone();
let texts = self.text_reader.clone();

let info = info_span!(parent: &span, "text count");
let text_task = || run_with_telemetry(info, move || read_rw_lock(&texts).count());
let text_task = || run_with_telemetry(info, || read_rw_lock(&self.text_reader).count());
let info = info_span!(parent: &span, "paragraph count");
let paragraph_task = || run_with_telemetry(info, move || read_rw_lock(&paragraphs).count());
let paragraph_task = || run_with_telemetry(info, || read_rw_lock(&self.paragraph_reader).count());
let info = info_span!(parent: &span, "vector count");
let vector_task = || run_with_telemetry(info, move || read_rw_lock(&vectors).count());
let vector_task = || run_with_telemetry(info, || read_rw_lock(&self.vector_reader).count());

let mut text_result = Ok(0);
let mut paragraph_result = Ok(0);
Expand Down Expand Up @@ -298,12 +292,13 @@ impl ShardReader {
Ok(ShardReader {
id,
metadata,
root_path: shard_path.to_path_buf(),
suffixed_root_path,
text_reader: fields.unwrap(),
paragraph_reader: paragraphs.unwrap(),
vector_reader: vectors.unwrap(),
relation_reader: relations.unwrap(),
indexes,
root_path: shard_path.to_path_buf(),
text_reader: RwLock::new(fields.unwrap()),
paragraph_reader: RwLock::new(paragraphs.unwrap()),
vector_reader: RwLock::new(vectors.unwrap()),
relation_reader: RwLock::new(relations.unwrap()),
versions,
})
}
Expand Down Expand Up @@ -336,9 +331,6 @@ impl ShardReader {

let mut suggest_paragraphs = request.features.contains(&(SuggestFeatures::Paragraphs as i32));
let suggest_entities = request.features.contains(&(SuggestFeatures::Entities as i32));

let paragraphs_reader_service = self.paragraph_reader.clone();
let relations_reader_service = self.relation_reader.clone();
let prefixes = Self::split_suggest_query(request.body.clone(), MAX_SUGGEST_COMPOUND_WORDS);

// Prefilter to apply field label filters
Expand Down Expand Up @@ -371,7 +363,7 @@ impl ShardReader {
}

let suggest_paragraphs_task = suggest_paragraphs.then(|| {
let paragraph_task = move || read_rw_lock(&paragraphs_reader_service).suggest(&request);
let paragraph_task = move || read_rw_lock(&self.paragraph_reader).suggest(&request);
let info = info_span!(parent: &span, "paragraph suggest");
|| run_with_telemetry(info, paragraph_task)
});
Expand All @@ -392,9 +384,8 @@ impl ShardReader {
},
);

let responses = requests
.map(|request| read_rw_lock(&relations_reader_service).search(&request))
.collect::<Vec<_>>();
let responses: Vec<_> =
requests.map(|request| read_rw_lock(&self.relation_reader).search(&request)).collect();

let entities = responses
.into_iter()
Expand Down Expand Up @@ -466,34 +457,30 @@ impl ShardReader {
// Run the rest of the plan
let text_task = index_queries.texts_request.map(|mut request| {
request.id = search_id.clone();
let text_reader_service = self.text_reader.clone();
let info = info_span!(parent: &span, "text search");
let task = move || read_rw_lock(&text_reader_service).search(&request);
let task = move || read_rw_lock(&self.text_reader).search(&request);
|| run_with_telemetry(info, task)
});

let paragraph_task = index_queries.paragraphs_request.map(|mut request| {
request.id = search_id.clone();
let paragraphs_context = &index_queries.paragraphs_context;
let paragraph_reader_service = self.paragraph_reader.clone();
let info = info_span!(parent: &span, "paragraph search");
let task = move || read_rw_lock(&paragraph_reader_service).search(&request, paragraphs_context);
let task = move || read_rw_lock(&self.paragraph_reader).search(&request, paragraphs_context);
|| run_with_telemetry(info, task)
});

let vector_task = index_queries.vectors_request.map(|mut request| {
request.id = search_id.clone();
let vectors_context = &index_queries.vectors_context;
let vector_reader_service = self.vector_reader.clone();
let info = info_span!(parent: &span, "vector search");
let task = move || read_rw_lock(&vector_reader_service).search(&request, vectors_context);
let task = move || read_rw_lock(&self.vector_reader).search(&request, vectors_context);
|| run_with_telemetry(info, task)
});

let relation_task = index_queries.relations_request.map(|request| {
let relation_reader_service = self.relation_reader.clone();
let info = info_span!(parent: &span, "relations search");
let task = move || read_rw_lock(&relation_reader_service).search(&request);
let task = move || read_rw_lock(&self.relation_reader).search(&request);
|| run_with_telemetry(info, task)
});

Expand Down Expand Up @@ -529,18 +516,16 @@ impl ShardReader {
#[tracing::instrument(skip_all)]
pub fn paragraph_iterator(&self, request: StreamRequest) -> NodeResult<ParagraphIterator> {
let span = tracing::Span::current();
let paragraph_task_copy = Arc::clone(&self.paragraph_reader);
run_with_telemetry(info_span!(parent: &span, "paragraph iteration"), || {
read_rw_lock(&paragraph_task_copy).iterator(&request)
read_rw_lock(&self.paragraph_reader).iterator(&request)
})
}

#[tracing::instrument(skip_all)]
pub fn document_iterator(&self, request: StreamRequest) -> NodeResult<DocumentIterator> {
let span = tracing::Span::current();
let text_task_copy = Arc::clone(&self.text_reader);
run_with_telemetry(info_span!(parent: &span, "field iteration"), || {
read_rw_lock(&text_task_copy).iterator(&request)
read_rw_lock(&self.text_reader).iterator(&request)
})
}

Expand Down Expand Up @@ -592,39 +577,35 @@ impl ShardReader {
#[tracing::instrument(skip_all)]
pub fn paragraph_search(&self, search_request: ParagraphSearchRequest) -> NodeResult<ParagraphSearchResponse> {
let span = tracing::Span::current();
let paragraph_task_copy = Arc::clone(&self.paragraph_reader);

run_with_telemetry(info_span!(parent: &span, "paragraph reader search"), || {
read_rw_lock(&paragraph_task_copy).search(&search_request, &ParagraphsContext::default())
read_rw_lock(&self.paragraph_reader).search(&search_request, &ParagraphsContext::default())
})
}

#[tracing::instrument(skip_all)]
pub fn document_search(&self, search_request: DocumentSearchRequest) -> NodeResult<DocumentSearchResponse> {
let span = tracing::Span::current();
let text_task_copy = Arc::clone(&self.text_reader);

run_with_telemetry(info_span!(parent: &span, "field reader search"), || {
read_rw_lock(&text_task_copy).search(&search_request)
read_rw_lock(&self.text_reader).search(&search_request)
})
}

#[tracing::instrument(skip_all)]
pub fn vector_search(&self, search_request: VectorSearchRequest) -> NodeResult<VectorSearchResponse> {
let span = tracing::Span::current();
let vector_task_copy = Arc::clone(&self.vector_reader);

run_with_telemetry(info_span!(parent: &span, "vector reader search"), || {
read_rw_lock(&vector_task_copy).search(&search_request, &VectorsContext::default())
read_rw_lock(&self.vector_reader).search(&search_request, &VectorsContext::default())
})
}
#[tracing::instrument(skip_all)]
pub fn relation_search(&self, search_request: RelationSearchRequest) -> NodeResult<RelationSearchResponse> {
let span = tracing::Span::current();
let relation_task_copy = Arc::clone(&self.relation_reader);

run_with_telemetry(info_span!(parent: &span, "relation reader search"), || {
read_rw_lock(&relation_task_copy).search(&search_request)
read_rw_lock(&self.relation_reader).search(&search_request)
})
}

Expand All @@ -644,7 +625,12 @@ impl ShardReader {
}

pub fn update(&self) -> NodeResult<()> {
write_rw_lock(&self.vector_reader).update()
let version = self.versions.vectors;
let path = self.indexes.vectors_path();
let new_reader = open_vectors_reader(version, &path)?;
let mut writer = write_rw_lock(&self.vector_reader);
*writer = new_reader;
Ok(())
}
}

Expand Down
Loading

0 comments on commit 91e93b8

Please sign in to comment.