From 4b324d284b45500987dae6d82baa23e6d59954d0 Mon Sep 17 00:00:00 2001 From: Javier Torres Date: Wed, 25 Sep 2024 12:17:28 +0200 Subject: [PATCH] Cleaner initial replication (#2492) --- nucliadb_node/src/replication/replicator.rs | 111 +++++++++++++------- nucliadb_node/tests/test_replication.rs | 25 ++++- 2 files changed, 94 insertions(+), 42 deletions(-) diff --git a/nucliadb_node/src/replication/replicator.rs b/nucliadb_node/src/replication/replicator.rs index 82415de193..e1515fa231 100644 --- a/nucliadb_node/src/replication/replicator.rs +++ b/nucliadb_node/src/replication/replicator.rs @@ -17,40 +17,82 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::HashMap; -use std::fs; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; - use futures::Future; use nucliadb_core::metrics::replication as replication_metrics; use nucliadb_core::tracing::{debug, error, info, warn}; use nucliadb_core::{metrics, Error, NodeResult}; use nucliadb_protos::prelude::EmptyQuery; use nucliadb_protos::replication; -use nucliadb_vectors::config::VectorConfig; +use std::collections::HashMap; +use std::fs; +use std::path::PathBuf; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::sync::Semaphore; use tokio::time::Duration; // Import the Future trait use tonic::Request; use crate::cache::ShardWriterCache; -use crate::disk_structure::VERSION_FILE; +use crate::disk_structure; use crate::replication::health::ReplicationHealthManager; use crate::settings::Settings; -use crate::shards::indexes::DEFAULT_VECTORS_INDEX_NAME; -use crate::shards::writer::{NewShard, ShardWriter}; +use crate::shards::writer::ShardWriter; use crate::utils::{list_shards, set_primary_node_id}; +pub enum ShardStub { + Shard(Arc), + New { + id: String, + path: PathBuf, + cache: Arc, + }, +} + +impl ShardStub { + fn get_shard_segments(&self) -> NodeResult>> { + if let ShardStub::Shard(s) = self { + s.get_shard_segments() + } else { + Ok(HashMap::new()) + } + } + + fn path(&self) -> &PathBuf { + match self { + ShardStub::Shard(shard) => &shard.path, + ShardStub::New { + path, + .. + } => path, + } + } + + fn get_open_shard(self) -> NodeResult> { + match self { + ShardStub::Shard(shard) => Ok(shard), + ShardStub::New { + id, + cache, + .. + } => cache.get(&id), + } + } +} + pub async fn replicate_shard( shard_state: replication::PrimaryShardReplicationState, mut client: replication::replication_service_client::ReplicationServiceClient, - shard: Arc, + shard: ShardStub, ) -> NodeResult<()> { + let metrics = metrics::get_metrics(); + // do not allow gc while replicating - let _gc_lock = shard.gc_lock.lock().await; + let mut _gc_lock = None; + if let ShardStub::Shard(ref s) = shard { + _gc_lock = Some(s.gc_lock.lock().await); + } - let metrics = metrics::get_metrics(); let existing_segment_ids = shard .get_shard_segments()? .iter() @@ -63,6 +105,7 @@ pub async fn replicate_shard( ) }) .collect(); + let mut stream = client .replicate_shard(Request::new(replication::ReplicateShardRequest { shard_id: shard_state.shard_id.clone(), @@ -72,8 +115,7 @@ pub async fn replicate_shard( .await? .into_inner(); - let shard_path = shard.path.clone(); - let replicate_work_path = shard_path.join("replication"); + let replicate_work_path = shard.path().join("replication"); // create replication work path if not exists if !replicate_work_path.exists() { std::fs::create_dir_all(&replicate_work_path)?; @@ -109,7 +151,7 @@ pub async fn replicate_shard( // close file drop(file); - let dest_filepath = shard_path.join(filepath.clone().unwrap()); + let dest_filepath = shard.path().join(filepath.clone().unwrap()); // check if path exists if dest_filepath.exists() { std::fs::remove_file(dest_filepath.clone())?; @@ -131,6 +173,9 @@ pub async fn replicate_shard( drop(file); drop(_gc_lock); + // Open the shard if replicating for the first time + let shard = shard.get_open_shard()?; + if let Some(gen_id) = generation_id { // After successful sync, set the generation id shard.metadata.set_generation_id(gen_id); @@ -259,35 +304,21 @@ pub async fn connect_to_primary_and_replicate( let shard_id = shard_state.shard_id.clone(); let shard_lookup; - if existing_shards.contains(&shard_id) { + let shard_path = disk_structure::shard_path_by_id(&settings.shards_path(), &shard_id); + let shard = if existing_shards.contains(&shard_id) { let shard_cache_clone = shard_cache.clone(); let shard_id_clone = shard_id.clone(); shard_lookup = tokio::task::spawn_blocking(move || shard_cache_clone.get(&shard_id_clone)).await?; + ShardStub::Shard(shard_lookup?) } else { - let shard_cache_clone = Arc::clone(&shard_cache); - - info!("Creating shard to replicate: {shard_id}"); - let payload = NewShard { - kbid: shard_state.kbid.clone(), - shard_id: shard_state.shard_id.clone(), - // Create the default vectorset with a default config, it - // will get overwritten on first sync - vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]), - }; - let shard_create = tokio::task::spawn_blocking(move || shard_cache_clone.create(payload)).await?; - if let Ok(ref shard) = shard_create { - // We want to avoid shard readers to open this until it's replicated, so it doesn't end up opening the - // wrong version of the code for an index. So we delete one the metadata file containing the version - // which will make the opening of the reader to fail with shard not found (it can fall back to the primary). - // This file will be created once the shard is replicated, with the proper index versions - std::fs::remove_file(shard.path.join(VERSION_FILE))?; - } else { - warn!("Failed to create shard: {:?}", shard_create); - continue; + std::fs::create_dir(&shard_path)?; + ShardStub::New { + id: shard_id.clone(), + path: shard_path.clone(), + cache: shard_cache.clone(), } - shard_lookup = shard_create; - } - let shard = shard_lookup?; + }; + let mut current_gen_id = "UNKNOWN".to_string(); if let Some(metadata) = shard_cache.get_metadata(shard_id.clone()) { current_gen_id = metadata.get_generation_id().unwrap_or("UNSET_SECONDARY".to_string()); @@ -298,7 +329,7 @@ pub async fn connect_to_primary_and_replicate( shard_id, shard_state.generation_id, current_gen_id ); - let replicate_work_path = shard.path.join("replication"); + let replicate_work_path = shard_path.join("replication"); if replicate_work_path.exists() { // clear out replication directory before we start in case there is anything // left behind from a former failed sync diff --git a/nucliadb_node/tests/test_replication.rs b/nucliadb_node/tests/test_replication.rs index 2a6bdbfb5c..ddf37044bc 100644 --- a/nucliadb_node/tests/test_replication.rs +++ b/nucliadb_node/tests/test_replication.rs @@ -26,7 +26,7 @@ use std::sync::Arc; use common::{resources, NodeFixture, TestNodeReader, TestNodeWriter}; use nucliadb_core::protos::{ op_status, IndexParagraphs, NewShardRequest, NewVectorSetRequest, SearchRequest, SearchResponse, ShardId, - VectorSetId, + VectorIndexConfig, VectorSetId, }; use nucliadb_node::replication::health::ReplicationHealthManager; use rstest::*; @@ -84,6 +84,18 @@ async fn test_search_replicated_data() -> Result<(), Box> primary_shard.upgrade().unwrap().metadata.get_generation_id(), secondary_shard.upgrade().unwrap().metadata.get_generation_id() ); + println!("PRIMARY {:?}", primary_shard.upgrade().unwrap().metadata.get_generation_id()); + + // Test a second change is replicated + create_test_resources(&mut writer, shard.id.clone(), None).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + + // Validate generation id is the same + assert_eq!( + primary_shard.upgrade().unwrap().metadata.get_generation_id(), + secondary_shard.upgrade().unwrap().metadata.get_generation_id() + ); + println!("PRIMARY {:?}", primary_shard.upgrade().unwrap().metadata.get_generation_id()); // Test deleting shard deletes it from secondary delete_shard(&mut writer, shard.id.clone()).await; @@ -106,7 +118,16 @@ struct ShardDetails { } async fn create_shard(writer: &mut TestNodeWriter) -> ShardDetails { - let request = Request::new(NewShardRequest::default()); + let request = Request::new(NewShardRequest { + vectorsets_configs: HashMap::from([( + "multilingual".to_string(), + VectorIndexConfig { + vector_dimension: Some(3), + ..Default::default() + }, + )]), + ..Default::default() + }); let new_shard_response = writer.new_shard(request).await.expect("Unable to create new shard"); let shard_id = &new_shard_response.get_ref().id; create_test_resources(writer, shard_id.clone(), None).await;