Skip to content

Commit

Permalink
Cleaner initial replication (#2492)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Sep 25, 2024
1 parent 3b21024 commit 4b324d2
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 42 deletions.
111 changes: 71 additions & 40 deletions nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,82 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::fs;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use futures::Future;
use nucliadb_core::metrics::replication as replication_metrics;
use nucliadb_core::tracing::{debug, error, info, warn};
use nucliadb_core::{metrics, Error, NodeResult};
use nucliadb_protos::prelude::EmptyQuery;
use nucliadb_protos::replication;
use nucliadb_vectors::config::VectorConfig;
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::Semaphore;
use tokio::time::Duration; // Import the Future trait
use tonic::Request;

use crate::cache::ShardWriterCache;
use crate::disk_structure::VERSION_FILE;
use crate::disk_structure;
use crate::replication::health::ReplicationHealthManager;
use crate::settings::Settings;
use crate::shards::indexes::DEFAULT_VECTORS_INDEX_NAME;
use crate::shards::writer::{NewShard, ShardWriter};
use crate::shards::writer::ShardWriter;
use crate::utils::{list_shards, set_primary_node_id};

pub enum ShardStub {
Shard(Arc<ShardWriter>),
New {
id: String,
path: PathBuf,
cache: Arc<ShardWriterCache>,
},
}

impl ShardStub {
fn get_shard_segments(&self) -> NodeResult<HashMap<String, Vec<String>>> {
if let ShardStub::Shard(s) = self {
s.get_shard_segments()
} else {
Ok(HashMap::new())
}
}

fn path(&self) -> &PathBuf {
match self {
ShardStub::Shard(shard) => &shard.path,
ShardStub::New {
path,
..
} => path,
}
}

fn get_open_shard(self) -> NodeResult<Arc<ShardWriter>> {
match self {
ShardStub::Shard(shard) => Ok(shard),
ShardStub::New {
id,
cache,
..
} => cache.get(&id),
}
}
}

pub async fn replicate_shard(
shard_state: replication::PrimaryShardReplicationState,
mut client: replication::replication_service_client::ReplicationServiceClient<tonic::transport::Channel>,
shard: Arc<ShardWriter>,
shard: ShardStub,
) -> NodeResult<()> {
let metrics = metrics::get_metrics();

// do not allow gc while replicating
let _gc_lock = shard.gc_lock.lock().await;
let mut _gc_lock = None;
if let ShardStub::Shard(ref s) = shard {
_gc_lock = Some(s.gc_lock.lock().await);
}

let metrics = metrics::get_metrics();
let existing_segment_ids = shard
.get_shard_segments()?
.iter()
Expand All @@ -63,6 +105,7 @@ pub async fn replicate_shard(
)
})
.collect();

let mut stream = client
.replicate_shard(Request::new(replication::ReplicateShardRequest {
shard_id: shard_state.shard_id.clone(),
Expand All @@ -72,8 +115,7 @@ pub async fn replicate_shard(
.await?
.into_inner();

let shard_path = shard.path.clone();
let replicate_work_path = shard_path.join("replication");
let replicate_work_path = shard.path().join("replication");
// create replication work path if not exists
if !replicate_work_path.exists() {
std::fs::create_dir_all(&replicate_work_path)?;
Expand Down Expand Up @@ -109,7 +151,7 @@ pub async fn replicate_shard(
// close file
drop(file);

let dest_filepath = shard_path.join(filepath.clone().unwrap());
let dest_filepath = shard.path().join(filepath.clone().unwrap());
// check if path exists
if dest_filepath.exists() {
std::fs::remove_file(dest_filepath.clone())?;
Expand All @@ -131,6 +173,9 @@ pub async fn replicate_shard(
drop(file);
drop(_gc_lock);

// Open the shard if replicating for the first time
let shard = shard.get_open_shard()?;

if let Some(gen_id) = generation_id {
// After successful sync, set the generation id
shard.metadata.set_generation_id(gen_id);
Expand Down Expand Up @@ -259,35 +304,21 @@ pub async fn connect_to_primary_and_replicate(

let shard_id = shard_state.shard_id.clone();
let shard_lookup;
if existing_shards.contains(&shard_id) {
let shard_path = disk_structure::shard_path_by_id(&settings.shards_path(), &shard_id);
let shard = if existing_shards.contains(&shard_id) {
let shard_cache_clone = shard_cache.clone();
let shard_id_clone = shard_id.clone();
shard_lookup = tokio::task::spawn_blocking(move || shard_cache_clone.get(&shard_id_clone)).await?;
ShardStub::Shard(shard_lookup?)
} else {
let shard_cache_clone = Arc::clone(&shard_cache);

info!("Creating shard to replicate: {shard_id}");
let payload = NewShard {
kbid: shard_state.kbid.clone(),
shard_id: shard_state.shard_id.clone(),
// Create the default vectorset with a default config, it
// will get overwritten on first sync
vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]),
};
let shard_create = tokio::task::spawn_blocking(move || shard_cache_clone.create(payload)).await?;
if let Ok(ref shard) = shard_create {
// We want to avoid shard readers to open this until it's replicated, so it doesn't end up opening the
// wrong version of the code for an index. So we delete one the metadata file containing the version
// which will make the opening of the reader to fail with shard not found (it can fall back to the primary).
// This file will be created once the shard is replicated, with the proper index versions
std::fs::remove_file(shard.path.join(VERSION_FILE))?;
} else {
warn!("Failed to create shard: {:?}", shard_create);
continue;
std::fs::create_dir(&shard_path)?;
ShardStub::New {
id: shard_id.clone(),
path: shard_path.clone(),
cache: shard_cache.clone(),
}
shard_lookup = shard_create;
}
let shard = shard_lookup?;
};

let mut current_gen_id = "UNKNOWN".to_string();
if let Some(metadata) = shard_cache.get_metadata(shard_id.clone()) {
current_gen_id = metadata.get_generation_id().unwrap_or("UNSET_SECONDARY".to_string());
Expand All @@ -298,7 +329,7 @@ pub async fn connect_to_primary_and_replicate(
shard_id, shard_state.generation_id, current_gen_id
);

let replicate_work_path = shard.path.join("replication");
let replicate_work_path = shard_path.join("replication");
if replicate_work_path.exists() {
// clear out replication directory before we start in case there is anything
// left behind from a former failed sync
Expand Down
25 changes: 23 additions & 2 deletions nucliadb_node/tests/test_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use common::{resources, NodeFixture, TestNodeReader, TestNodeWriter};
use nucliadb_core::protos::{
op_status, IndexParagraphs, NewShardRequest, NewVectorSetRequest, SearchRequest, SearchResponse, ShardId,
VectorSetId,
VectorIndexConfig, VectorSetId,
};
use nucliadb_node::replication::health::ReplicationHealthManager;
use rstest::*;
Expand Down Expand Up @@ -84,6 +84,18 @@ async fn test_search_replicated_data() -> Result<(), Box<dyn std::error::Error>>
primary_shard.upgrade().unwrap().metadata.get_generation_id(),
secondary_shard.upgrade().unwrap().metadata.get_generation_id()
);
println!("PRIMARY {:?}", primary_shard.upgrade().unwrap().metadata.get_generation_id());

// Test a second change is replicated
create_test_resources(&mut writer, shard.id.clone(), None).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

// Validate generation id is the same
assert_eq!(
primary_shard.upgrade().unwrap().metadata.get_generation_id(),
secondary_shard.upgrade().unwrap().metadata.get_generation_id()
);
println!("PRIMARY {:?}", primary_shard.upgrade().unwrap().metadata.get_generation_id());

// Test deleting shard deletes it from secondary
delete_shard(&mut writer, shard.id.clone()).await;
Expand All @@ -106,7 +118,16 @@ struct ShardDetails {
}

async fn create_shard(writer: &mut TestNodeWriter) -> ShardDetails {
let request = Request::new(NewShardRequest::default());
let request = Request::new(NewShardRequest {
vectorsets_configs: HashMap::from([(
"multilingual".to_string(),
VectorIndexConfig {
vector_dimension: Some(3),
..Default::default()
},
)]),
..Default::default()
});
let new_shard_response = writer.new_shard(request).await.expect("Unable to create new shard");
let shard_id = &new_shard_response.get_ref().id;
create_test_resources(writer, shard_id.clone(), None).await;
Expand Down

0 comments on commit 4b324d2

Please sign in to comment.