Skip to content

Commit

Permalink
safe tantivy replication
Browse files Browse the repository at this point in the history
  • Loading branch information
hermeGarcia committed Jan 22, 2024
1 parent ae24033 commit 670faa5
Show file tree
Hide file tree
Showing 12 changed files with 363 additions and 206 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions nucliadb_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ tonic = "0.7"
hyper = "0.14.26"
tower = "0.4.13"
tokio = "1.32.0"
tokio-metrics = { version = "0.3.0", features = ["rt"]}
tokio-metrics = { version = "0.3.0", features = ["rt"] }
dashmap = "5.4.0"
tantivy = { git = "https://github.com/nuclia/tantivy.git", branch = "original-17" }
serde_json = "1.0.111"

[dev-dependencies]
tokio = { version = "1.32.0", features = ["rt-multi-thread"]}
tokio = { version = "1.32.0", features = ["rt-multi-thread"] }
10 changes: 9 additions & 1 deletion nucliadb_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod metrics;
pub mod paragraphs;
pub mod query_planner;
pub mod relations;
pub mod tantivy_replica;
pub mod texts;
pub mod vectors;

Expand Down Expand Up @@ -58,6 +59,8 @@ use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
pub use anyhow::{anyhow as node_error, Context, Error};
use nucliadb_protos::noderesources::{Resource, ResourceId};
use serde::{Deserialize, Serialize};

use crate::tantivy_replica::TantivyReplicaState;
pub type NodeResult<O> = anyhow::Result<O>;

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -133,11 +136,16 @@ pub fn encapsulate_writer<T>(writer: T) -> Arc<RwLock<T>> {
}

#[derive(Debug)]
pub struct IndexFiles {
pub struct RawReplicaState {
pub metadata_files: HashMap<String, Vec<u8>>,
pub files: Vec<String>,
}

pub enum IndexFiles {
Tantivy(TantivyReplicaState),
Other(RawReplicaState),
}

pub trait WriterChild: std::fmt::Debug + Send + Sync {
fn set_resource(&mut self, resource: &Resource) -> NodeResult<()>;
fn delete_resource(&mut self, resource_id: &ResourceId) -> NodeResult<()>;
Expand Down
212 changes: 212 additions & 0 deletions nucliadb_core/src/tantivy_replica.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright (C) 2021 Bosutech XXI S.L.
//
// nucliadb is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//

//! The following code works because of the Tantivy version used in this project. Inner Tantivy
//! files that are not meant to be used are used, this is basically a hack for implementing safe
//! replication at this moment in time. It should not be seen as the way of doing this, but as a way
//! last resort.
//! Ideally we should find a way of implementing safe replication inside Tantivy.

use std::path::{Path, PathBuf};

use serde::Serialize;
use serde_json::Value;
use tantivy::schema::Schema;
use tantivy::{Index, IndexSettings, LeasedItem, Opstamp, Result, Searcher, SegmentId};

pub type Json = Value;

/// Using a [`Searcher`] as a guard ensures us that until the
/// value is dropped the files it has access to are not modified.
pub struct ReplicationGuard(LeasedItem<Searcher>);

pub struct TantivyReplicaState {
pub files: Vec<PathBuf>,
pub guard: ReplicationGuard,
pub metadata_path: PathBuf,
pub index_metadata: Json,
}
impl TantivyReplicaState {
pub fn metadata_as_bytes(&self) -> Vec<u8> {
serde_json::to_vec(&self.index_metadata).unwrap()
}
}

#[derive(Serialize)]
struct SegmentDeletes {
num_deleted_docs: u32,
opstamp: u64,
}

#[derive(Serialize)]
struct SegmentSafeMetadata {
segment_id: SegmentId,
max_doc: u32,
deletes: Option<SegmentDeletes>,
}

#[derive(Serialize)]
struct SafeMetadata {
index_settings: IndexSettings,
segments: Vec<SegmentSafeMetadata>,
schema: Schema,
opstamp: Opstamp,
#[serde(skip_serializing_if = "Option::is_none")]
payload: Option<String>,
}

pub fn compute_safe_replica_state(path: &Path, index: &Index) -> Result<TantivyReplicaState> {
let searcher = index.reader()?.searcher();
let index_metadata = index.load_metas()?;
let mut segment_files = vec![];
let mut safe_metadata = SafeMetadata {
index_settings: index_metadata.index_settings,
schema: index_metadata.schema,
opstamp: index_metadata.opstamp,
payload: index_metadata.payload,
segments: vec![],
};
for segment in searcher.segment_readers() {
let segment_id = segment.segment_id();
let raw_segment_id = segment_id.uuid_string();
let max_doc = segment.max_doc();
let num_deleted_docs = segment.num_deleted_docs();
let delete_opstamp = segment.delete_opstamp();
let deletes = delete_opstamp.map(|opstamp| SegmentDeletes {
opstamp,
num_deleted_docs,
});

safe_metadata.segments.push(SegmentSafeMetadata {
deletes,
max_doc,
segment_id,
});

let postings = PathBuf::from(format!("{raw_segment_id}.idx"));
let positions = PathBuf::from(format!("{raw_segment_id}.pos"));
let terms = PathBuf::from(format!("{raw_segment_id}.term"));
let store = PathBuf::from(format!("{raw_segment_id}.store"));
let fast_fields = PathBuf::from(format!("{raw_segment_id}.fast"));
let field_norms = PathBuf::from(format!("{raw_segment_id}.fieldnorm"));
let deletes = delete_opstamp
.map(|stamp| PathBuf::from(format!("{raw_segment_id}.{stamp}.fieldnorm")));

if path.join(&postings).exists() {
segment_files.push(postings);
}
if path.join(&positions).exists() {
segment_files.push(positions);
}
if path.join(&terms).exists() {
segment_files.push(terms);
}
if path.join(&store).exists() {
segment_files.push(store);
}
if path.join(&fast_fields).exists() {
segment_files.push(fast_fields);
}
if path.join(&field_norms).exists() {
segment_files.push(field_norms);
}
if let Some(deletes) = deletes {
segment_files.push(deletes);
}
}

Ok(TantivyReplicaState {
files: segment_files,
guard: ReplicationGuard(searcher),
metadata_path: PathBuf::from("meta.json"),
index_metadata: serde_json::to_value(safe_metadata)?,
})
}

#[cfg(test)]
mod tests {
use std::fs::File;

use tantivy::doc;
use tantivy::schema::{Field, Schema, STORED, TEXT};

use super::*;

struct TestingSchema {
text_field: Field,
schema: Schema,
}

fn define_schema() -> TestingSchema {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT | STORED);
let schema = schema_builder.build();
TestingSchema { text_field, schema }
}

#[test]
fn metadata_file_validation() {
let workspace = tempfile::tempdir().unwrap();
let testing_schema = define_schema();
let index = Index::create_in_dir(workspace.path(), testing_schema.schema).unwrap();
let mut writer = index.writer_with_num_threads(1, 6_000_000).unwrap();
writer
.add_document(doc!(testing_schema.text_field => "HI IM TEXT"))
.unwrap();
writer.commit().unwrap();

let metadata_as_json = serde_json::to_value(index.load_metas().unwrap()).unwrap();
let safe_replica_state = compute_safe_replica_state(workspace.path(), &index).unwrap();
assert_eq!(safe_replica_state.index_metadata, metadata_as_json);
}

#[test]
fn recreate_using_safe_state() {
let workspace = tempfile::tempdir().unwrap();
let testing_schema = define_schema();
let index = Index::create_in_dir(&workspace, testing_schema.schema).unwrap();
let mut writer = index.writer_with_num_threads(1, 6_000_000).unwrap();
writer
.add_document(doc!(testing_schema.text_field => "ONE"))
.unwrap();
writer.commit().unwrap();

let replica_workspace = tempfile::tempdir().unwrap();
let safe_state = compute_safe_replica_state(workspace.path(), &index).unwrap();
let mut metadata_file = File::create(replica_workspace.path().join("meta.json")).unwrap();
serde_json::to_writer(&mut metadata_file, &safe_state.index_metadata).unwrap();

for file_in_replica in &safe_state.files {
let source = workspace.path().join(file_in_replica);
let target = replica_workspace.path().join(file_in_replica);
std::fs::copy(source, target).unwrap();
}

let replica_index = Index::open_in_dir(replica_workspace.path()).unwrap();
let reader = replica_index.reader().unwrap();
let searcher = reader.searcher();
let mut collector = tantivy::collector::DocSetCollector;
let docs = searcher
.search(&tantivy::query::AllQuery, &mut collector)
.unwrap();

assert_eq!(docs.len(), 1);
}
}
Loading

0 comments on commit 670faa5

Please sign in to comment.