Skip to content

Commit

Permalink
Remove channels rust (#2457) (#2459)
Browse files Browse the repository at this point in the history
  • Loading branch information
javitonino authored Sep 13, 2024
1 parent 0df80c1 commit 6215bb8
Show file tree
Hide file tree
Showing 24 changed files with 61 additions and 220 deletions.
27 changes: 0 additions & 27 deletions nucliadb_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,10 @@ use std::fs::File;
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

pub use anyhow::{anyhow as node_error, Context, Error};
use nucliadb_protos::utils::ReleaseChannel;
use serde::{Deserialize, Serialize};

use crate::tantivy_replica::TantivyReplicaState;
pub type NodeResult<O> = anyhow::Result<O>;

#[derive(Clone, Copy, Serialize, Deserialize, Debug, PartialEq, Eq, Default)]
pub enum Channel {
#[default]
STABLE,
EXPERIMENTAL,
}

impl From<ReleaseChannel> for Channel {
fn from(value: ReleaseChannel) -> Self {
match value {
ReleaseChannel::Experimental => Channel::EXPERIMENTAL,
ReleaseChannel::Stable => Channel::STABLE,
}
}
}

impl From<Channel> for ReleaseChannel {
fn from(value: Channel) -> Self {
match value {
Channel::STABLE => ReleaseChannel::Stable,
Channel::EXPERIMENTAL => ReleaseChannel::Experimental,
}
}
}

#[derive(Debug, Default)]
pub struct RawReplicaState {
pub metadata_files: HashMap<String, Vec<u8>>,
Expand Down
2 changes: 0 additions & 2 deletions nucliadb_core/src/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::path::PathBuf;

use crate::prelude::*;
use crate::protos::*;
use crate::Channel;
use crate::IndexFiles;

pub type RelationsReaderPointer = Box<dyn RelationsReader>;
Expand All @@ -32,7 +31,6 @@ pub type ProtosResponse = RelationSearchResponse;
#[derive(Clone)]
pub struct RelationConfig {
pub path: PathBuf,
pub channel: Channel,
}

pub trait RelationsReader: std::fmt::Debug + Send + Sync {
Expand Down
2 changes: 0 additions & 2 deletions nucliadb_core/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use uuid::Uuid;
use crate::metrics::vectors::MergeSource;
use crate::prelude::*;
use crate::query_language::BooleanExpression;
use crate::Channel;
use crate::IndexFiles;
use nucliadb_protos::noderesources;

Expand All @@ -54,7 +53,6 @@ pub struct MergeContext {
pub struct VectorIndexConfig {
pub similarity: utils::VectorSimilarity,
pub path: PathBuf,
pub channel: Channel,
pub shard_id: String,
pub normalize_vectors: bool,
}
Expand Down
3 changes: 0 additions & 3 deletions nucliadb_node/src/cache/writer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ mod tests {
use std::time::Duration;

use crossbeam_utils::thread::scope;
use nucliadb_core::Channel;
use nucliadb_vectors::config::VectorConfig;
use tempfile::tempdir;

Expand Down Expand Up @@ -266,7 +265,6 @@ mod tests {
.create(NewShard {
kbid: "kbid".to_string(),
shard_id: shard_id_0.clone(),
channel: Channel::EXPERIMENTAL,
vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]),
})
.unwrap();
Expand Down Expand Up @@ -312,7 +310,6 @@ mod tests {
.create(NewShard {
kbid: "kbid".to_string(),
shard_id: shard_id_0.clone(),
channel: Channel::EXPERIMENTAL,
vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]),
})
.unwrap();
Expand Down
4 changes: 1 addition & 3 deletions nucliadb_node/src/grpc/grpc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use nucliadb_core::protos::{
ShardId, ShardIds, VectorSetId, VectorSetList,
};
use nucliadb_core::tracing::{self, Span, *};
use nucliadb_core::{Channel, NodeResult};
use nucliadb_core::NodeResult;
use nucliadb_vectors::config::VectorConfig;
use object_store::path::Path;
use std::collections::HashMap;
Expand Down Expand Up @@ -122,7 +122,6 @@ impl NodeWriter for NodeWriterGRPCDriver {
let request = request.into_inner();
let kbid = request.kbid.clone();
let shard_id = uuid::Uuid::new_v4().to_string();
let channel = Channel::from(request.release_channel());

#[allow(deprecated)]
let vector_configs = if !request.vectorsets_configs.is_empty() {
Expand Down Expand Up @@ -160,7 +159,6 @@ impl NodeWriter for NodeWriterGRPCDriver {
shards.create(NewShard {
kbid,
shard_id,
channel,
vector_configs,
})
})
Expand Down
1 change: 0 additions & 1 deletion nucliadb_node/src/replication/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ pub async fn connect_to_primary_and_replicate(
let payload = NewShard {
kbid: shard_state.kbid.clone(),
shard_id: shard_state.shard_id.clone(),
channel: shard_state.release_channel().into(),
// Create the default vectorset with a default config, it
// will get overwritten on first sync
vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]),
Expand Down
2 changes: 0 additions & 2 deletions nucliadb_node/src/replication/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use nucliadb_core::protos::ReleaseChannel;
use nucliadb_core::tantivy_replica::TantivyReplicaState;
use nucliadb_core::tracing::{debug, error, info, warn};
use nucliadb_core::{IndexFiles, NodeResult, RawReplicaState};
Expand Down Expand Up @@ -217,7 +216,6 @@ impl replication::replication_service_server::ReplicationService for Replication
shard_id,
generation_id: gen_id,
kbid: metadata.kbid(),
release_channel: ReleaseChannel::from(metadata.channel()).into(),
..Default::default()
});
}
Expand Down
20 changes: 4 additions & 16 deletions nucliadb_node/src/shards/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};

use nucliadb_core::{node_error, protos, Channel, NodeResult};
use nucliadb_core::{node_error, protos, NodeResult};
use serde::*;

use crate::disk_structure;
Expand Down Expand Up @@ -73,15 +73,13 @@ impl From<Similarity> for protos::VectorSimilarity {
pub struct ShardMetadataFile {
pub kbid: String,
pub id: String,
pub channel: Channel,
}

#[derive(Default, Debug)]
pub struct ShardMetadata {
shard_path: PathBuf,
id: String,
kbid: String,
channel: Channel,

// A generation id is a way to track if a shard has changed.
// A new id means that something in the shard has changed.
Expand All @@ -103,17 +101,15 @@ impl ShardMetadata {
shard_path,
kbid: metadata.kbid,
id: metadata.id,
channel: metadata.channel,
generation_id: RwLock::new(None),
})
}

pub fn new(shard_path: PathBuf, id: String, kbid: String, channel: Channel) -> ShardMetadata {
pub fn new(shard_path: PathBuf, id: String, kbid: String) -> ShardMetadata {
ShardMetadata {
shard_path,
kbid,
id,
channel,
generation_id: RwLock::new(None),
}
}
Expand All @@ -133,7 +129,6 @@ impl ShardMetadata {
&ShardMetadataFile {
kbid: self.kbid.clone(),
id: self.id.clone(),
channel: self.channel,
},
)?;
writer.flush()?;
Expand All @@ -153,10 +148,6 @@ impl ShardMetadata {
self.kbid.clone()
}

pub fn channel(&self) -> Channel {
self.channel
}

pub fn id(&self) -> String {
self.id.clone()
}
Expand Down Expand Up @@ -247,13 +238,11 @@ mod test {
#[test]
fn create() {
let dir = TempDir::new().unwrap();
let meta =
ShardMetadata::new(dir.path().to_path_buf(), "ID".to_string(), "KB".to_string(), Channel::EXPERIMENTAL);
let meta = ShardMetadata::new(dir.path().to_path_buf(), "ID".to_string(), "KB".to_string());
meta.serialize_metadata().unwrap();
let meta_disk = ShardMetadata::open(dir.path().to_path_buf()).unwrap();
assert_eq!(meta.kbid, meta_disk.kbid);
assert_eq!(meta.id, meta_disk.id);
assert_eq!(meta.channel, meta_disk.channel);
}

#[test]
Expand Down Expand Up @@ -281,8 +270,7 @@ mod test {
#[test]
fn test_cache_generation_id() {
let dir = TempDir::new().unwrap();
let meta =
ShardMetadata::new(dir.path().to_path_buf(), "ID".to_string(), "KB".to_string(), Channel::EXPERIMENTAL);
let meta = ShardMetadata::new(dir.path().to_path_buf(), "ID".to_string(), "KB".to_string());
let gen_id = meta.get_generation_id();
assert_eq!(gen_id, meta.get_generation_id());
// assert!(meta.generation_id.read().unwrap().is_none());
Expand Down
2 changes: 1 addition & 1 deletion nucliadb_node/src/shards/shard_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl ShardReader {
Ok(Shard {
metadata: Some(protos::ShardMetadata {
kbid: self.metadata.kbid(),
release_channel: self.metadata.channel() as i32,
release_channel: 0,
}),
shard_id: self.id.clone(),
// naming issue here, this is not number of resource
Expand Down
7 changes: 2 additions & 5 deletions nucliadb_node/src/shards/shard_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};

use nucliadb_core::paragraphs::*;
use nucliadb_core::prelude::*;
use nucliadb_core::protos::shard_created::{DocumentService, ParagraphService, RelationService, VectorService};
use nucliadb_core::protos::{Resource, ResourceId};
use nucliadb_core::relations::*;
use nucliadb_core::texts::*;
use nucliadb_core::tracing::{self, *};
use nucliadb_core::vectors::*;
use nucliadb_core::{paragraphs::*, Channel};
use nucliadb_core::{thread, IndexFiles};
use nucliadb_procs::measure;
use nucliadb_vectors::config::VectorConfig;
Expand Down Expand Up @@ -62,7 +62,6 @@ struct ShardWriterIndexes {
pub struct NewShard {
pub kbid: String,
pub shard_id: String,
pub channel: Channel,
pub vector_configs: HashMap<String, VectorConfig>,
}

Expand Down Expand Up @@ -114,7 +113,7 @@ impl ShardWriter {

let shard_id = new.shard_id;
let shard_path = disk_structure::shard_path_by_id(shards_path, &shard_id);
let metadata = Arc::new(ShardMetadata::new(shard_path.clone(), shard_id.clone(), new.kbid, new.channel));
let metadata = Arc::new(ShardMetadata::new(shard_path.clone(), shard_id.clone(), new.kbid));
let mut indexes = ShardIndexes::new(&shard_path);

std::fs::create_dir(&shard_path)?;
Expand Down Expand Up @@ -160,7 +159,6 @@ impl ShardWriter {

let rsc = RelationConfig {
path: indexes.relations_path(),
channel: new.channel,
};
let relation_task = || Some(nucliadb_relations2::writer::RelationsWriterService::create(rsc));
let info = info_span!(parent: &span, "relation start");
Expand Down Expand Up @@ -259,7 +257,6 @@ impl ShardWriter {

let rsc = RelationConfig {
path: indexes.relations_path(),
channel: metadata.channel(),
};
let info = info_span!(parent: &span, "Open relations index writer");
let relation_task = || Some(open_relations_writer(versions.relations, &rsc));
Expand Down
3 changes: 1 addition & 2 deletions nucliadb_node/src/shards/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::collections::HashMap;
use std::time::SystemTime;

use nucliadb_core::protos::prost_types::Timestamp;
use nucliadb_core::{Channel, NodeResult};
use nucliadb_core::NodeResult;
use nucliadb_protos::nodereader;
use nucliadb_protos::noderesources;
use nucliadb_vectors::config::{Similarity, VectorConfig};
Expand All @@ -45,7 +45,6 @@ fn test_vectorsets() -> NodeResult<()> {
crate::shards::writer::NewShard {
kbid: kbid.clone(),
shard_id: shard_id.clone(),
channel: Channel::default(),
vector_configs: HashMap::from([(DEFAULT_VECTORS_INDEX_NAME.to_string(), VectorConfig::default())]),
},
&shards_path,
Expand Down
15 changes: 4 additions & 11 deletions nucliadb_node/tests/test_date_range_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use nucliadb_core::protos as nucliadb_protos;
use nucliadb_protos::prost_types::Timestamp;
use nucliadb_protos::resource::ResourceStatus;
use nucliadb_protos::{
IndexMetadata, IndexParagraph, IndexParagraphs, NewShardRequest, ReleaseChannel, Resource, ResourceId,
SearchRequest, TextInformation, Timestamps, VectorSentence, VectorsetSentences,
IndexMetadata, IndexParagraph, IndexParagraphs, NewShardRequest, Resource, ResourceId, SearchRequest,
TextInformation, Timestamps, VectorSentence, VectorsetSentences,
};
use rstest::*;
use tonic::Request;
Expand Down Expand Up @@ -109,21 +109,14 @@ async fn populate(writer: &mut TestNodeWriter, shard_id: String, metadata: Index

#[rstest]
#[tokio::test]
async fn test_date_range_search(
#[values(ReleaseChannel::Stable, ReleaseChannel::Experimental)] release_channel: ReleaseChannel,
) -> Result<(), Box<dyn std::error::Error>> {
async fn test_date_range_search() -> Result<(), Box<dyn std::error::Error>> {
let base_time = Timestamp::default();
let mut fixture = NodeFixture::new();
fixture.with_writer().await?.with_reader().await?;
let mut writer = fixture.writer_client();
let mut reader = fixture.reader_client();

let new_shard_response = writer
.new_shard(Request::new(NewShardRequest {
release_channel: release_channel.into(),
..Default::default()
}))
.await?;
let new_shard_response = writer.new_shard(Request::new(NewShardRequest::default())).await?;
let shard_id = &new_shard_response.get_ref().id;

let metadata = IndexMetadata {
Expand Down
17 changes: 5 additions & 12 deletions nucliadb_node/tests/test_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,20 @@ mod common;
use std::collections::HashMap;

use common::{resources, NodeFixture, TestNodeWriter};
use nucliadb_core::protos::{
op_status, DownloadShardFileRequest, GetShardFilesRequest, NewShardRequest, ReleaseChannel,
};
use nucliadb_core::protos::{op_status, DownloadShardFileRequest, GetShardFilesRequest, NewShardRequest};
use rstest::*;
use serde_json::value::Value;
use tonic::Request;

#[rstest]
#[tokio::test]
async fn test_download_shard(
#[values(ReleaseChannel::Stable, ReleaseChannel::Experimental)] release_channel: ReleaseChannel,
) -> Result<(), Box<dyn std::error::Error>> {
async fn test_download_shard() -> Result<(), Box<dyn std::error::Error>> {
let mut fixture = NodeFixture::new();
fixture.with_writer().await?.with_reader().await?;
let mut writer = fixture.writer_client();
let mut reader = fixture.reader_client();

let shard = create_shard(&mut writer, release_channel).await;
let shard = create_shard(&mut writer).await;

// lets download it, first we get the list of files
let response = reader
Expand Down Expand Up @@ -82,11 +78,8 @@ struct ShardDetails {
id: String,
}

async fn create_shard(writer: &mut TestNodeWriter, release_channel: ReleaseChannel) -> ShardDetails {
let request = Request::new(NewShardRequest {
release_channel: release_channel.into(),
..Default::default()
});
async fn create_shard(writer: &mut TestNodeWriter) -> ShardDetails {
let request = Request::new(NewShardRequest::default());
let new_shard_response = writer.new_shard(request).await.expect("Unable to create new shard");
let shard_id = &new_shard_response.get_ref().id;
create_test_resources(writer, shard_id.clone()).await;
Expand Down
Loading

0 comments on commit 6215bb8

Please sign in to comment.