From 1bffdc8bba2e784ad65bbf679b742e0ae406e9d2 Mon Sep 17 00:00:00 2001 From: Herme <44120086+hermeGarcia@users.noreply.github.com> Date: Thu, 25 Jan 2024 15:13:16 +0100 Subject: [PATCH] safe tantivy replication (#1755) * safe tantivy replication * fix test compilation * taking into account ignored segments * documentation * review * fix typo --- Cargo.lock | 2 + nucliadb_core/Cargo.toml | 6 +- nucliadb_core/src/lib.rs | 10 +- nucliadb_core/src/tantivy_replica.rs | 242 +++++++++++++++++++++++ nucliadb_node/src/replication/service.rs | 110 ++++++++--- nucliadb_node/src/shards/shard_writer.rs | 32 ++- nucliadb_paragraphs/src/writer.rs | 42 +--- nucliadb_relations/src/service/writer.rs | 6 +- nucliadb_relations2/src/writer.rs | 43 +--- nucliadb_texts/src/writer.rs | 43 +--- nucliadb_texts2/src/writer.rs | 43 +--- nucliadb_vectors/src/service/writer.rs | 30 +-- 12 files changed, 405 insertions(+), 204 deletions(-) create mode 100644 nucliadb_core/src/tantivy_replica.rs diff --git a/Cargo.lock b/Cargo.lock index f7c1072ac4..ac7ec6910d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1696,6 +1696,8 @@ dependencies = [ "prost-types", "rayon", "serde", + "serde_json", + "tantivy 0.17.0 (git+https://github.com/nuclia/tantivy.git?branch=original-17)", "tempfile", "thiserror", "tokio", diff --git a/nucliadb_core/Cargo.toml b/nucliadb_core/Cargo.toml index 04e0fe9c25..78268ac059 100644 --- a/nucliadb_core/Cargo.toml +++ b/nucliadb_core/Cargo.toml @@ -24,8 +24,10 @@ tonic = "0.7" hyper = "0.14.26" tower = "0.4.13" tokio = "1.32.0" -tokio-metrics = { version = "0.3.0", features = ["rt"]} +tokio-metrics = { version = "0.3.0", features = ["rt"] } dashmap = "5.4.0" +tantivy = { git = "https://github.com/nuclia/tantivy.git", branch = "original-17" } +serde_json = "1.0.111" [dev-dependencies] -tokio = { version = "1.32.0", features = ["rt-multi-thread"]} +tokio = { version = "1.32.0", features = ["rt-multi-thread"] } diff --git a/nucliadb_core/src/lib.rs b/nucliadb_core/src/lib.rs index 5fd5e3a142..508b040050 100644 --- a/nucliadb_core/src/lib.rs +++ b/nucliadb_core/src/lib.rs @@ -23,6 +23,7 @@ pub mod metrics; pub mod paragraphs; pub mod query_planner; pub mod relations; +pub mod tantivy_replica; pub mod texts; pub mod vectors; @@ -58,6 +59,8 @@ use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; pub use anyhow::{anyhow as node_error, Context, Error}; use nucliadb_protos::noderesources::{Resource, ResourceId}; use serde::{Deserialize, Serialize}; + +use crate::tantivy_replica::TantivyReplicaState; pub type NodeResult = anyhow::Result; #[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, Default)] @@ -133,11 +136,16 @@ pub fn encapsulate_writer(writer: T) -> Arc> { } #[derive(Debug)] -pub struct IndexFiles { +pub struct RawReplicaState { pub metadata_files: HashMap>, pub files: Vec, } +pub enum IndexFiles { + Tantivy(TantivyReplicaState), + Other(RawReplicaState), +} + pub trait WriterChild: std::fmt::Debug + Send + Sync { fn set_resource(&mut self, resource: &Resource) -> NodeResult<()>; fn delete_resource(&mut self, resource_id: &ResourceId) -> NodeResult<()>; diff --git a/nucliadb_core/src/tantivy_replica.rs b/nucliadb_core/src/tantivy_replica.rs new file mode 100644 index 0000000000..b65a70b6c4 --- /dev/null +++ b/nucliadb_core/src/tantivy_replica.rs @@ -0,0 +1,242 @@ +// Copyright (C) 2021 Bosutech XXI S.L. +// +// nucliadb is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at info@nuclia.com. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . +// + +//! The following code works because of the Tantivy version used in this project. Inner Tantivy +//! files that are not meant to be used are used, this is basically a hack for implementing safe +//! replication at this moment in time. It should not be seen as the way of doing this, but as a way +//! last resort. +//! Ideally we should find a way of implementing safe replication inside Tantivy. + +use std::path::{Path, PathBuf}; + +use serde::Serialize; +use serde_json::Value; +use tantivy::schema::Schema; +use tantivy::{Index, IndexSettings, LeasedItem, Opstamp, Result, Searcher, SegmentId}; + +pub type Json = Value; + +/// Using a [`Searcher`] as a guard ensures us that until the +/// value is dropped the files it has access to are not modified. +pub struct ReplicationGuard(LeasedItem); + +pub struct TantivyReplicaState { + pub files: Vec, + pub guard: ReplicationGuard, + pub metadata_path: PathBuf, + pub index_metadata: Json, +} +impl TantivyReplicaState { + pub fn metadata_as_bytes(&self) -> Vec { + serde_json::to_vec(&self.index_metadata).unwrap() + } +} + +#[derive(Serialize)] +struct SegmentDeletes { + num_deleted_docs: u32, + opstamp: u64, +} + +#[derive(Serialize)] +struct SegmentSafeMetadata { + segment_id: SegmentId, + max_doc: u32, + deletes: Option, +} + +#[derive(Serialize)] +struct SafeMetadata { + index_settings: IndexSettings, + segments: Vec, + schema: Schema, + opstamp: Opstamp, + #[serde(skip_serializing_if = "Option::is_none")] + payload: Option, +} + +/// Needed by [`compute_safe_replica_state`] +#[derive(Clone, Copy)] +pub struct ReplicationParameters<'a> { + /// Where the index to be replicated is located on disk + pub path: &'a Path, + /// IDs of the segments already present at the replica. + pub on_replica: &'a [String], +} + +pub fn compute_safe_replica_state( + params: ReplicationParameters, + index: &Index, +) -> Result { + let searcher = index.reader()?.searcher(); + let index_metadata = index.load_metas()?; + let mut segment_files = vec![]; + let mut safe_metadata = SafeMetadata { + index_settings: index_metadata.index_settings, + schema: index_metadata.schema, + opstamp: index_metadata.opstamp, + payload: index_metadata.payload, + segments: vec![], + }; + for segment in searcher.segment_readers() { + let segment_id = segment.segment_id(); + let raw_segment_id = segment_id.uuid_string(); + let max_doc = segment.max_doc(); + let num_deleted_docs = segment.num_deleted_docs(); + let delete_opstamp = segment.delete_opstamp(); + let deletes = delete_opstamp.map(|opstamp| SegmentDeletes { + opstamp, + num_deleted_docs, + }); + + safe_metadata.segments.push(SegmentSafeMetadata { + deletes, + max_doc, + segment_id, + }); + + let deletes = + delete_opstamp.map(|stamp| PathBuf::from(format!("{raw_segment_id}.{stamp}.del"))); + + if let Some(deletes) = deletes { + segment_files.push(deletes); + } + + if params.on_replica.contains(&raw_segment_id) { + // If this segment is found on the replica we just + // send the deletes file. The rest is immutable so its + // already there. + continue; + } + + let postings = PathBuf::from(format!("{raw_segment_id}.idx")); + let positions = PathBuf::from(format!("{raw_segment_id}.pos")); + let terms = PathBuf::from(format!("{raw_segment_id}.term")); + let store = PathBuf::from(format!("{raw_segment_id}.store")); + let fast_fields = PathBuf::from(format!("{raw_segment_id}.fast")); + let field_norms = PathBuf::from(format!("{raw_segment_id}.fieldnorm")); + + if params.path.join(&postings).exists() { + segment_files.push(postings); + } + if params.path.join(&positions).exists() { + segment_files.push(positions); + } + if params.path.join(&terms).exists() { + segment_files.push(terms); + } + if params.path.join(&store).exists() { + segment_files.push(store); + } + if params.path.join(&fast_fields).exists() { + segment_files.push(fast_fields); + } + if params.path.join(&field_norms).exists() { + segment_files.push(field_norms); + } + } + + Ok(TantivyReplicaState { + files: segment_files, + guard: ReplicationGuard(searcher), + metadata_path: PathBuf::from("meta.json"), + index_metadata: serde_json::to_value(safe_metadata)?, + }) +} + +#[cfg(test)] +mod tests { + use std::fs::File; + + use tantivy::doc; + use tantivy::schema::{Field, Schema, STORED, TEXT}; + + use super::*; + + struct TestingSchema { + text_field: Field, + schema: Schema, + } + + fn define_schema() -> TestingSchema { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("text", TEXT | STORED); + let schema = schema_builder.build(); + TestingSchema { text_field, schema } + } + + #[test] + fn metadata_file_validation() { + let workspace = tempfile::tempdir().unwrap(); + let testing_schema = define_schema(); + let index = Index::create_in_dir(workspace.path(), testing_schema.schema).unwrap(); + let mut writer = index.writer_with_num_threads(1, 6_000_000).unwrap(); + writer + .add_document(doc!(testing_schema.text_field => "HI IM TEXT")) + .unwrap(); + writer.commit().unwrap(); + + let replica_params = ReplicationParameters { + path: workspace.path(), + on_replica: &[], + }; + let metadata_as_json = serde_json::to_value(index.load_metas().unwrap()).unwrap(); + let safe_replica_state = compute_safe_replica_state(replica_params, &index).unwrap(); + assert_eq!(safe_replica_state.index_metadata, metadata_as_json); + } + + #[test] + fn recreate_using_safe_state() { + let workspace = tempfile::tempdir().unwrap(); + let testing_schema = define_schema(); + let index = Index::create_in_dir(&workspace, testing_schema.schema).unwrap(); + let mut writer = index.writer_with_num_threads(1, 6_000_000).unwrap(); + writer + .add_document(doc!(testing_schema.text_field => "ONE")) + .unwrap(); + writer.commit().unwrap(); + + let replica_params = ReplicationParameters { + path: workspace.path(), + on_replica: &[], + }; + let safe_state = compute_safe_replica_state(replica_params, &index).unwrap(); + + let replica_workspace = tempfile::tempdir().unwrap(); + let mut metadata_file = File::create(replica_workspace.path().join("meta.json")).unwrap(); + serde_json::to_writer(&mut metadata_file, &safe_state.index_metadata).unwrap(); + + for file_in_replica in &safe_state.files { + let source = workspace.path().join(file_in_replica); + let target = replica_workspace.path().join(file_in_replica); + std::fs::copy(source, target).unwrap(); + } + + let replica_index = Index::open_in_dir(replica_workspace.path()).unwrap(); + let reader = replica_index.reader().unwrap(); + let searcher = reader.searcher(); + let mut collector = tantivy::collector::DocSetCollector; + let docs = searcher + .search(&tantivy::query::AllQuery, &mut collector) + .unwrap(); + + assert_eq!(docs.len(), 1); + } +} diff --git a/nucliadb_node/src/replication/service.rs b/nucliadb_node/src/replication/service.rs index 3401daae5c..4a82403953 100644 --- a/nucliadb_node/src/replication/service.rs +++ b/nucliadb_node/src/replication/service.rs @@ -21,8 +21,9 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; +use nucliadb_core::tantivy_replica::TantivyReplicaState; use nucliadb_core::tracing::{debug, error, info, warn}; -use nucliadb_core::{node_error, NodeResult}; +use nucliadb_core::{node_error, IndexFiles, NodeResult, RawReplicaState}; use nucliadb_protos::{noderesources, replication}; use tokio::fs::File; use tokio::io::AsyncReadExt; @@ -117,6 +118,67 @@ async fn stream_data( Ok(()) } +async fn replicate_from_raw( + replica_state: RawReplicaState, + chunk_size: u64, + shard_path: &Path, + generation_id: &str, + _index_prefix: PathBuf, + sender: &tokio::sync::mpsc::Sender>, +) -> NodeResult<()> { + for segment_file in replica_state.files { + stream_file( + chunk_size, + shard_path, + generation_id, + &PathBuf::from(segment_file), + sender, + ) + .await?; + } + for (metadata_file, data) in replica_state.metadata_files { + stream_data( + shard_path, + generation_id, + &PathBuf::from(metadata_file), + data, + sender, + ) + .await?; + } + Ok(()) +} + +async fn replicate_from_tantivy( + replica_state: TantivyReplicaState, + chunk_size: u64, + shard_path: &Path, + generation_id: &str, + index_prefix: PathBuf, + sender: &tokio::sync::mpsc::Sender>, +) -> NodeResult<()> { + for segment_file in &replica_state.files { + stream_file( + chunk_size, + shard_path, + generation_id, + &index_prefix.join(segment_file), + sender, + ) + .await?; + } + + stream_data( + shard_path, + generation_id, + &index_prefix.join(&replica_state.metadata_path), + replica_state.metadata_as_bytes(), + sender, + ) + .await?; + Ok(()) +} + async fn replica_shard( shard: Arc, ignored_segement_ids: HashMap>, @@ -135,27 +197,31 @@ async fn replica_shard( tokio::task::spawn_blocking(move || sshard.get_shard_files(&ignored_segement_ids)) .await??; - for segment_files in shard_files { - for segment_file in segment_files.files { - stream_file( - chunk_size, - &shard_path, - generation_id, - &PathBuf::from(segment_file), - &sender, - ) - .await?; - } - for (metadata_file, data) in segment_files.metadata_files { - stream_data( - &shard_path, - generation_id, - &PathBuf::from(metadata_file), - data, - &sender, - ) - .await?; - } + for (prefix, segment_files) in shard_files { + match segment_files { + IndexFiles::Other(raw_replica) => { + replicate_from_raw( + raw_replica, + chunk_size, + &shard_path, + generation_id, + prefix, + &sender, + ) + .await? + } + IndexFiles::Tantivy(tantivy_replica) => { + replicate_from_tantivy( + tantivy_replica, + chunk_size, + &shard_path, + generation_id, + prefix, + &sender, + ) + .await? + } + }; } // top level additional files diff --git a/nucliadb_node/src/shards/shard_writer.rs b/nucliadb_node/src/shards/shard_writer.rs index 33d601a7a5..e57ea73d03 100644 --- a/nucliadb_node/src/shards/shard_writer.rs +++ b/nucliadb_node/src/shards/shard_writer.rs @@ -504,27 +504,21 @@ impl ShardWriter { pub fn get_shard_files( &self, ignored_segement_ids: &HashMap>, - ) -> NodeResult> { + ) -> NodeResult> { let mut files = Vec::new(); let _lock = self.write_lock.lock().expect("Poisoned write lock"); // need to make sure more writes don't happen while we are reading - - files.push( - paragraph_read(&self.paragraph_writer) - .get_index_files(ignored_segement_ids.get("paragraph").unwrap_or(&Vec::new()))?, - ); - files.push( - text_read(&self.text_writer) - .get_index_files(ignored_segement_ids.get("text").unwrap_or(&Vec::new()))?, - ); - files.push( - vector_read(&self.vector_writer) - .get_index_files(ignored_segement_ids.get("vector").unwrap_or(&Vec::new()))?, - ); - files.push( - relation_read(&self.relation_writer) - .get_index_files(ignored_segement_ids.get("relation").unwrap_or(&Vec::new()))?, - ); - + let paragraph_files = paragraph_read(&self.paragraph_writer) + .get_index_files(ignored_segement_ids.get("paragraph").unwrap_or(&Vec::new()))?; + let text_files = text_read(&self.text_writer) + .get_index_files(ignored_segement_ids.get("text").unwrap_or(&Vec::new()))?; + let vector_files = vector_read(&self.vector_writer) + .get_index_files(ignored_segement_ids.get("vector").unwrap_or(&Vec::new()))?; + let relation_files = relation_read(&self.relation_writer) + .get_index_files(ignored_segement_ids.get("relation").unwrap_or(&Vec::new()))?; + files.push((PathBuf::from(PARAGRAPHS_DIR), paragraph_files)); + files.push((PathBuf::from(TEXTS_DIR), text_files)); + files.push((PathBuf::from(VECTORS_DIR), vector_files)); + files.push((PathBuf::from(RELATIONS_DIR), relation_files)); Ok(files) } } diff --git a/nucliadb_paragraphs/src/writer.rs b/nucliadb_paragraphs/src/writer.rs index 7ed66e51c0..b8462d6be0 100644 --- a/nucliadb_paragraphs/src/writer.rs +++ b/nucliadb_paragraphs/src/writer.rs @@ -27,7 +27,7 @@ use nucliadb_core::protos::prost::Message; use nucliadb_core::protos::resource::ResourceStatus; use nucliadb_core::protos::{Resource, ResourceId}; use nucliadb_core::tracing::{self, *}; -use nucliadb_core::IndexFiles; +use nucliadb_core::{tantivy_replica, IndexFiles}; use nucliadb_procs::measure; use regex::Regex; use tantivy::collector::Count; @@ -153,40 +153,12 @@ impl WriterChild for ParagraphWriterService { } fn get_index_files(&self, ignored_segment_ids: &[String]) -> NodeResult { - // Should be called along with a lock at a higher level to be safe - let mut meta_files = HashMap::new(); - let path = self.config.path.join("meta.json"); - if !path.exists() { - return Ok(IndexFiles { - metadata_files: meta_files, - files: Vec::new(), - }); - } - meta_files.insert("paragraph/meta.json".to_string(), fs::read(path)?); - - let mut files = Vec::new(); - - for segment_meta in self.index.searchable_segment_metas()? { - if ignored_segment_ids.contains(&segment_meta.id().uuid_string()) { - continue; - } - for seg_file in segment_meta.list_files() { - files.push(format!("paragraph/{}", seg_file.to_string_lossy())); - } - } - - if files.is_empty() { - // exit with no changes - return Ok(IndexFiles { - metadata_files: HashMap::new(), - files, - }); - } - - Ok(IndexFiles { - metadata_files: meta_files, - files, - }) + let params = tantivy_replica::ReplicationParameters { + path: &self.config.path, + on_replica: ignored_segment_ids, + }; + let safe_state = tantivy_replica::compute_safe_replica_state(params, &self.index)?; + Ok(IndexFiles::Tantivy(safe_state)) } } diff --git a/nucliadb_relations/src/service/writer.rs b/nucliadb_relations/src/service/writer.rs index 81bd1eee2a..4574c78ecb 100644 --- a/nucliadb_relations/src/service/writer.rs +++ b/nucliadb_relations/src/service/writer.rs @@ -25,7 +25,7 @@ use nucliadb_core::prelude::*; use nucliadb_core::protos::resource::ResourceStatus; use nucliadb_core::protos::{Resource, ResourceId}; use nucliadb_core::tracing::{self, *}; -use nucliadb_core::IndexFiles; +use nucliadb_core::{IndexFiles, RawReplicaState}; use nucliadb_procs::measure; use super::utils::*; @@ -153,10 +153,10 @@ impl WriterChild for RelationsWriterService { fn get_index_files(&self, _ignored_segment_ids: &[String]) -> NodeResult { // not implemented, not supported right now - Ok(IndexFiles { + Ok(IndexFiles::Other(RawReplicaState { metadata_files: HashMap::new(), files: Vec::new(), - }) + })) } } diff --git a/nucliadb_relations2/src/writer.rs b/nucliadb_relations2/src/writer.rs index 7f4faf7350..e48e02e4aa 100644 --- a/nucliadb_relations2/src/writer.rs +++ b/nucliadb_relations2/src/writer.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . // -use std::collections::HashMap; use std::fmt::Debug; use std::fs; use std::path::Path; @@ -28,7 +27,7 @@ use nucliadb_core::protos::prost::Message; use nucliadb_core::protos::resource::ResourceStatus; use nucliadb_core::protos::{Resource, ResourceId}; use nucliadb_core::tracing::{self, *}; -use nucliadb_core::IndexFiles; +use nucliadb_core::{tantivy_replica, IndexFiles}; use nucliadb_procs::measure; use tantivy::collector::Count; use tantivy::query::AllQuery; @@ -111,40 +110,12 @@ impl WriterChild for RelationsWriterService { } fn get_index_files(&self, ignored_segment_ids: &[String]) -> NodeResult { - // Should be called along with a lock at a higher level to be safe - let mut meta_files = HashMap::new(); - let path = self.config.path.join("meta.json"); - if !path.exists() { - return Ok(IndexFiles { - metadata_files: meta_files, - files: Vec::new(), - }); - } - meta_files.insert("text/meta.json".to_string(), fs::read(path)?); - - let mut files = Vec::new(); - - for segment_meta in self.index.searchable_segment_metas()? { - if ignored_segment_ids.contains(&segment_meta.id().uuid_string()) { - continue; - } - for seg_file in segment_meta.list_files() { - files.push(format!("text/{}", seg_file.to_string_lossy())); - } - } - - if files.is_empty() { - // exit with no changes - return Ok(IndexFiles { - metadata_files: HashMap::new(), - files, - }); - } - - Ok(IndexFiles { - metadata_files: meta_files, - files, - }) + let params = tantivy_replica::ReplicationParameters { + path: &self.config.path, + on_replica: ignored_segment_ids, + }; + let safe_state = tantivy_replica::compute_safe_replica_state(params, &self.index)?; + Ok(IndexFiles::Tantivy(safe_state)) } } diff --git a/nucliadb_texts/src/writer.rs b/nucliadb_texts/src/writer.rs index c8ff727b4e..68483e5ff2 100644 --- a/nucliadb_texts/src/writer.rs +++ b/nucliadb_texts/src/writer.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . // -use std::collections::HashMap; use std::fmt::Debug; use std::fs; use std::path::Path; @@ -28,7 +27,7 @@ use nucliadb_core::prelude::*; use nucliadb_core::protos::resource::ResourceStatus; use nucliadb_core::protos::{Resource, ResourceId}; use nucliadb_core::tracing::{self, *}; -use nucliadb_core::IndexFiles; +use nucliadb_core::{tantivy_replica, IndexFiles}; use nucliadb_procs::measure; use tantivy::collector::Count; use tantivy::query::AllQuery; @@ -154,40 +153,12 @@ impl WriterChild for TextWriterService { } fn get_index_files(&self, ignored_segment_ids: &[String]) -> NodeResult { - // Should be called along with a lock at a higher level to be safe - let mut meta_files = HashMap::new(); - let path = self.config.path.join("meta.json"); - if !path.exists() { - return Ok(IndexFiles { - metadata_files: meta_files, - files: Vec::new(), - }); - } - meta_files.insert("text/meta.json".to_string(), fs::read(path)?); - - let mut files = Vec::new(); - - for segment_meta in self.index.searchable_segment_metas()? { - if ignored_segment_ids.contains(&segment_meta.id().uuid_string()) { - continue; - } - for seg_file in segment_meta.list_files() { - files.push(format!("text/{}", seg_file.to_string_lossy())); - } - } - - if files.is_empty() { - // exit with no changes - return Ok(IndexFiles { - metadata_files: HashMap::new(), - files, - }); - } - - Ok(IndexFiles { - metadata_files: meta_files, - files, - }) + let params = tantivy_replica::ReplicationParameters { + path: &self.config.path, + on_replica: ignored_segment_ids, + }; + let safe_state = tantivy_replica::compute_safe_replica_state(params, &self.index)?; + Ok(IndexFiles::Tantivy(safe_state)) } } diff --git a/nucliadb_texts2/src/writer.rs b/nucliadb_texts2/src/writer.rs index 05403db72a..95e8f13674 100644 --- a/nucliadb_texts2/src/writer.rs +++ b/nucliadb_texts2/src/writer.rs @@ -18,7 +18,6 @@ // along with this program. If not, see . // -use std::collections::HashMap; use std::fmt::Debug; use std::fs; use std::path::Path; @@ -28,7 +27,7 @@ use nucliadb_core::prelude::*; use nucliadb_core::protos::resource::ResourceStatus; use nucliadb_core::protos::{Resource, ResourceId}; use nucliadb_core::tracing::{self, *}; -use nucliadb_core::IndexFiles; +use nucliadb_core::{tantivy_replica, IndexFiles}; use nucliadb_procs::measure; use tantivy::collector::Count; use tantivy::query::AllQuery; @@ -153,40 +152,12 @@ impl WriterChild for TextWriterService { } fn get_index_files(&self, ignored_segment_ids: &[String]) -> NodeResult { - // Should be called along with a lock at a higher level to be safe - let mut meta_files = HashMap::new(); - let path = self.config.path.join("meta.json"); - if !path.exists() { - return Ok(IndexFiles { - metadata_files: meta_files, - files: Vec::new(), - }); - } - meta_files.insert("text/meta.json".to_string(), fs::read(path)?); - - let mut files = Vec::new(); - - for segment_meta in self.index.searchable_segment_metas()? { - if ignored_segment_ids.contains(&segment_meta.id().uuid_string()) { - continue; - } - for seg_file in segment_meta.list_files() { - files.push(format!("text/{}", seg_file.to_string_lossy())); - } - } - - if files.is_empty() { - // exit with no changes - return Ok(IndexFiles { - metadata_files: HashMap::new(), - files, - }); - } - - Ok(IndexFiles { - metadata_files: meta_files, - files, - }) + let params = tantivy_replica::ReplicationParameters { + path: &self.config.path, + on_replica: ignored_segment_ids, + }; + let safe_state = tantivy_replica::compute_safe_replica_state(params, &self.index)?; + Ok(IndexFiles::Tantivy(safe_state)) } } diff --git a/nucliadb_vectors/src/service/writer.rs b/nucliadb_vectors/src/service/writer.rs index 3c8b279c2a..da14e63dc7 100644 --- a/nucliadb_vectors/src/service/writer.rs +++ b/nucliadb_vectors/src/service/writer.rs @@ -28,7 +28,7 @@ use nucliadb_core::protos::prost::Message; use nucliadb_core::protos::resource::ResourceStatus; use nucliadb_core::protos::{Resource, ResourceId, VectorSetId, VectorSimilarity}; use nucliadb_core::tracing::{self, *}; -use nucliadb_core::{metrics, Channel, IndexFiles}; +use nucliadb_core::{metrics, Channel, IndexFiles, RawReplicaState}; use nucliadb_procs::measure; use crate::data_point::{DataPoint, Elem, LabelDictionary}; @@ -347,12 +347,12 @@ impl WriterChild for VectorWriterService { fn get_index_files(&self, ignored_segment_ids: &[String]) -> NodeResult { // Should be called along with a lock at a higher level to be safe - let mut meta_files = HashMap::new(); - meta_files.insert( + let mut metadata_files = HashMap::new(); + metadata_files.insert( "vectors/state.bincode".to_string(), fs::read(self.config.path.join("state.bincode"))?, ); - meta_files.insert( + metadata_files.insert( "vectors/metadata.json".to_string(), fs::read(self.config.path.join("metadata.json"))?, ); @@ -377,7 +377,7 @@ impl WriterChild for VectorWriterService { let vectorsets = self.list_vectorsets()?; if !vectorsets.is_empty() { - meta_files.insert( + metadata_files.insert( "vectorset/state.bincode".to_string(), fs::read(self.config.vectorset.join("state.bincode"))?, ); @@ -399,11 +399,11 @@ impl WriterChild for VectorWriterService { files.push(format!("vectors/{}/fst/labels.idx", segment_id)); } } - meta_files.insert( + metadata_files.insert( format!("vectorset/{}/state.bincode", vs), fs::read(self.config.vectorset.join(format!("{}/state.bincode", vs)))?, ); - meta_files.insert( + metadata_files.insert( format!("vectorset/{}/metadata.json", vs), fs::read(self.config.vectorset.join(format!("{}/metadata.json", vs)))?, ); @@ -412,16 +412,16 @@ impl WriterChild for VectorWriterService { if files.is_empty() { // exit with no changes - return Ok(IndexFiles { + return Ok(IndexFiles::Other(RawReplicaState { metadata_files: HashMap::new(), files, - }); + })); } - Ok(IndexFiles { - metadata_files: meta_files, + Ok(IndexFiles::Other(RawReplicaState { + metadata_files, files, - }) + })) } } @@ -746,8 +746,10 @@ mod tests { let segments = writer.get_segment_ids().unwrap(); assert_eq!(segments.len(), 2); - let existing_segs: Vec = Vec::new(); - let index_files = writer.get_index_files(&existing_segs).unwrap(); + let existing_secs: Vec = Vec::new(); + let Ok(IndexFiles::Other(index_files)) = writer.get_index_files(&existing_secs) else { + panic!("Expected another outcome"); + }; let mut expected_files = Vec::new(); for segment in segments { expected_files.push(format!("vectors/{}/index.hnsw", segment));