diff --git a/rust/Cargo.lock b/rust/Cargo.lock index cb6be6025..e3e6638f2 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1,5 +1,21 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] + +[[package]] +name = "adler" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" + [[package]] name = "aead" version = "0.3.2" @@ -99,6 +115,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73" +[[package]] +name = "array_tool" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f8cb5d814eb646a863c4f24978cff2880c4be96ad8cde2c0f0678732902e271" + [[package]] name = "arrayvec" version = "0.5.2" @@ -116,6 +138,19 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-compression" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72c1f1154e234325b50864a349b9c8e56939e266a4c307c0f159812df2f9537" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite 0.2.4", + "tokio", +] + [[package]] name = "async-dup" version = "1.2.2" @@ -580,6 +615,21 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "console" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cc80946b3480f421c2f17ed1cb841753a371c7c5104f51d507e13f532c856aa" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "regex", + "terminal_size", + "unicode-width", + "winapi", +] + [[package]] name = "const_fn" version = "0.4.5" @@ -897,12 +947,45 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "88d7ed2934d741c6b37e33e3832298e8850b53fd2d2bea03873375596c7cea4e" +[[package]] +name = "e2e" +version = "0.0.0" +dependencies = [ + "anyhow", + "async-trait", + "chrono", + "config", + "console", + "futures", + "indicatif", + "influxdb", + "k8s-openapi", + "kube", + "reqwest", + "serde 1.0.123", + "serde_json", + "serde_yaml", + "tokio", + "toml", + "tracing", + "tracing-subscriber", + "xaynet-core", + "xaynet-sdk", + "xaynet-server", +] + [[package]] name = "either" version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.26" @@ -937,6 +1020,19 @@ dependencies = [ "syn", ] +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "event-listener" version = "2.5.1" @@ -993,6 +1089,18 @@ dependencies = [ "log", ] +[[package]] +name = "flate2" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0" +dependencies = [ + "cfg-if 1.0.0", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "float-cmp" version = "0.8.0" @@ -1386,6 +1494,15 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + [[package]] name = "hyper" version = "0.14.2" @@ -1425,6 +1542,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite 0.2.4", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1465,6 +1594,18 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "indicatif" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7baab56125e25686df467fe470785512329883aab42696d661247aca2a2896e4" +dependencies = [ + "console", + "lazy_static", + "number_prefix", + "regex", +] + [[package]] name = "infer" version = "0.2.3" @@ -1481,6 +1622,8 @@ dependencies = [ "futures", "lazy_static", "regex", + "serde 1.0.123", + "serde_json", "surf", "thiserror", ] @@ -1567,6 +1710,68 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "jsonpath_lib" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61352ec23883402b7d30b3313c16cbabefb8907361c4eb669d990cbb87ceee5a" +dependencies = [ + "array_tool", + "env_logger", + "log", + "serde 1.0.123", + "serde_json", +] + +[[package]] +name = "k8s-openapi" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcc1f973542059e6d5a6d63de6a9539d0ec784f82b2327f3c1915d33200bc6a4" +dependencies = [ + "base64 0.13.0", + "bytes", + "chrono", + "serde 1.0.123", + "serde-value", + "serde_json", +] + +[[package]] +name = "kube" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459b1593a538fd0bb77e7133fdd3158c55e183affd0926337fa873986a741e0b" +dependencies = [ + "Inflector", + "base64 0.13.0", + "bytes", + "chrono", + "dirs-next", + "either", + "futures", + "http", + "hyper", + "hyper-timeout", + "hyper-tls", + "jsonpath_lib", + "k8s-openapi", + "log", + "openssl", + "pem", + "pin-project 1.0.4", + "serde 1.0.123", + "serde_json", + "serde_yaml", + "static_assertions", + "thiserror", + "tokio", + "tokio-native-tls", + "tokio-util", + "tower", + "url", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -1715,6 +1920,16 @@ dependencies = [ "unicase", ] +[[package]] +name = "miniz_oxide" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f2d26ec3309788e423cfbf68ad1800f061638098d76a83681af979dc4eda19d" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.7.7" @@ -1957,6 +2172,12 @@ dependencies = [ "syn", ] +[[package]] +name = "number_prefix" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" + [[package]] name = "once_cell" version = "1.5.2" @@ -2014,6 +2235,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dacdec97876ef3ede8c50efc429220641a0b11ba0048b4b0c357bccbc47c5204" +dependencies = [ + "num-traits 0.2.14", +] + [[package]] name = "parking" version = "2.0.0" @@ -2051,6 +2281,17 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5d65c4d95931acda4498f675e332fcbdc9a06705cd07086c510e9b6009cd1c1" +[[package]] +name = "pem" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c220d01f863d13d96ca82359d1e81e64a7c6bf0637bcde7b2349630addf0c6" +dependencies = [ + "base64 0.13.0", + "once_cell", + "regex", +] + [[package]] name = "percent-encoding" version = "2.1.0" @@ -2473,6 +2714,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd281b1030aa675fb90aa994d07187645bb3c8fc756ca766e7c3070b439de9de" dependencies = [ + "async-compression", "base64 0.13.0", "bytes", "encoding_rs", @@ -2491,9 +2733,11 @@ dependencies = [ "pin-project-lite 0.2.4", "rustls", "serde 1.0.123", + "serde_json", "serde_urlencoded", "tokio", "tokio-rustls", + "tokio-util", "url", "wasm-bindgen", "wasm-bindgen-futures", @@ -2745,6 +2989,16 @@ dependencies = [ "serde 0.8.23", ] +[[package]] +name = "serde-value" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" +dependencies = [ + "ordered-float", + "serde 1.0.123", +] + [[package]] name = "serde_cbor" version = "0.11.1" @@ -2772,6 +3026,7 @@ version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea1c6153794552ea7cf7cf63b1231a25de00ec90db326ba6264440fa08e31486" dependencies = [ + "indexmap", "itoa", "ryu", "serde 1.0.123", @@ -2821,6 +3076,18 @@ dependencies = [ "serde 1.0.123", ] +[[package]] +name = "serde_yaml" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdd2af560da3c1fdc02cb80965289254fc35dff869810061e2d8290ee48848ae" +dependencies = [ + "dtoa", + "linked-hash-map 0.5.4", + "serde 1.0.123", + "yaml-rust", +] + [[package]] name = "serial_test" version = "0.5.1" @@ -3110,6 +3377,25 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "terminal_size" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86ca8ced750734db02076f44132d802af0b33b09942331f4459dde8636fd2406" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -3235,12 +3521,23 @@ dependencies = [ "mio", "num_cpus", "once_cell", + "parking_lot", "pin-project-lite 0.2.4", "signal-hook-registry", "tokio-macros", "winapi", ] +[[package]] +name = "tokio-io-timeout" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" +dependencies = [ + "pin-project-lite 0.2.4", + "tokio", +] + [[package]] name = "tokio-macros" version = "1.1.0" @@ -3342,6 +3639,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fd7b451959622e21de79261673d658a0944b835012c58c51878ea55957fb51a" dependencies = [ "futures-core", + "futures-util", "pin-project 1.0.4", "tokio", "tokio-stream", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 111313f0a..1c9694c80 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -10,6 +10,7 @@ members = [ # internals "benches", "examples", + "e2e", ] [workspace.metadata] diff --git a/rust/e2e/.gitignore b/rust/e2e/.gitignore new file mode 100644 index 000000000..86c6455a5 --- /dev/null +++ b/rust/e2e/.gitignore @@ -0,0 +1,2 @@ +**/*.log +**/*.pem diff --git a/rust/e2e/Cargo.toml b/rust/e2e/Cargo.toml new file mode 100644 index 000000000..62195d12d --- /dev/null +++ b/rust/e2e/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "e2e" +version = "0.0.0" +authors = ["Xayn Engineering "] +edition = "2018" +description = "End-to-end tests for Xaynet" +readme = "../../README.md" +homepage = "https://xaynet.dev/" +repository = "https://github.com/xaynetwork/xaynet/" +keywords = ["xaynet", "e2e"] +categories = ["testing"] +license-file = "../../LICENSE" +publish = false + +[dependencies] +anyhow = "1.0.38" +async-trait = "0.1.42" +chrono = { version = "0.4.19" } +config = "0.10.1" +console = "0.14" +futures = "0.3.12" +indicatif = "0.15.0" +influxdb = { version = "0.3.0", default-features = false, features = ["h1-client", "use-serde"] } +kube = "0.50.0" +k8s-openapi = { version = "0.11.0", default-features = false, features = ["v1_19"] } +reqwest = { version = "0.11.0", default-features = false, features = ["json", "gzip", "stream"] } +serde = { version = "1.0.123", features = ["derive"] } +serde_json = { version = "1.0.62" } +serde_yaml = "0.8.16" +tokio = { version = "1.2.0", features = ["full"] } +toml = "0.5" +tracing = "0.1.23" +tracing-subscriber = "0.2.15" +xaynet-core = { path = "../xaynet-core" } +xaynet-sdk = { path = "../xaynet-sdk", features = ["reqwest-client"] } +xaynet-server = { path = "../xaynet-server", features = ["model-persistence"] } diff --git a/rust/e2e/src/bin/test_case_1.rs b/rust/e2e/src/bin/test_case_1.rs new file mode 100644 index 000000000..4246e0f06 --- /dev/null +++ b/rust/e2e/src/bin/test_case_1.rs @@ -0,0 +1,78 @@ +use e2e::{ + test_client::builder::{TestClientBuilder, TestClientBuilderSettings}, + test_env::{utils, TestEnvironment, TestEnvironmentSettings}, +}; +use tokio::{ + signal, + time::{timeout, Duration}, +}; +use tracing::info; +use xaynet_server::state_machine::phases::PhaseName; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let env_settings = TestEnvironmentSettings::from_file("src/bin/test_case_1")?; + let env = TestEnvironment::new(env_settings.clone()).await?; + + tokio::select! { + res = timeout(Duration::from_secs(6000), run(env)) => { + res? + } + _ = signal::ctrl_c() => { Ok(()) } + } +} + +async fn run(mut env: TestEnvironment) -> anyhow::Result<()> { + let k8s = env.get_k8s_client().await?; + k8s.deploy_with_image_and_config(env.get_env_settings().coordinator.config) + .await?; + let handle = k8s + .save_coordinator_logs("src/bin/test_case_1/coordinator.log") + .await?; + + let _pfi_guard = k8s.port_forward_influxdb()?; + let _pfc_guard = if env.get_env_settings().api_client.certificates.is_none() { + Some(k8s.port_forward_coordinator().await?) + } else { + None + }; + + let mut api_client = env.get_api_client()?; + let mut influx_client = env.get_influx_client(); + + info!("wait until clients are ready"); + let _ = tokio::join!( + utils::wait_until_client_is_ready(&mut api_client), + utils::wait_until_client_is_ready(&mut influx_client), + ); + utils::wait_until_phase(&influx_client, PhaseName::Sum).await; + + //////////////////////////////////////////////////////////////////////////////////////////////// + + let coordinator_settings = env.get_coordinator_settings()?; + let test_client_builder_settings = TestClientBuilderSettings::from(coordinator_settings); + + let mut test_client_builder = TestClientBuilder::new(test_client_builder_settings, api_client); + + //////////////////////////////////////////////////////////////////////////////////////////////// + + for round in 0..10 { + info!("Round: {}", round); + + let mut runner = test_client_builder.build_clients().await?; + info!("run sum clients..."); + runner.run_sum_clients().await?; + utils::wait_until_phase(&influx_client, PhaseName::Update).await; + info!("run update clients..."); + runner.run_update_clients().await?; + utils::wait_until_phase(&influx_client, PhaseName::Sum2).await; + info!("run sum2 clients..."); + runner.run_sum2_clients().await?; + utils::wait_until_phase(&influx_client, PhaseName::Sum).await; + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + + timeout(Duration::from_secs(10), handle).await???; + Ok(()) +} diff --git a/rust/e2e/src/bin/test_case_1/Env.toml b/rust/e2e/src/bin/test_case_1/Env.toml new file mode 100644 index 000000000..6e5f1b2be --- /dev/null +++ b/rust/e2e/src/bin/test_case_1/Env.toml @@ -0,0 +1,31 @@ +filter = "test_case=debug,e2e=debug,xaynet=info" + +[k8s] +namespace = "xaynet" +coordinator_pod_label = "app=coordinator" +coordinator_image = "xaynetwork/xaynet:development" +influxdb_pod_name = "influxdb-0" +redis_pod_name = "redis-master-0" +s3_pod_label = "minio" + +[coordinator] +config = "src/bin/test_case_1/config.toml" + +[influx] +url = "http://localhost:8086" +db = "metrics" + +[redis] +url = "redis://localhost/" + +[s3] +access_key = "minio" +secret_access_key = "minio123" +region = ["minio", "http://localhost:9000"] + +[api_client] +address = "http://localhost:8081" +# tls +# address = "https://dev-coordinator.xayn.com" +# certificates = [ "src/bin/test_case_1/dev-coordinator-xayn-com.pem" ] +# identity = diff --git a/rust/e2e/src/bin/test_case_1/config.toml b/rust/e2e/src/bin/test_case_1/config.toml new file mode 100644 index 000000000..eb9b9f1e9 --- /dev/null +++ b/rust/e2e/src/bin/test_case_1/config.toml @@ -0,0 +1,45 @@ +[log] +filter = "xaynet=debug,http=warn,info" + +[api] +bind_address = "0.0.0.0:8081" +tls_certificate = "/app/ssl/tls.pem" +tls_key = "/app/ssl/tls.key" + +[pet.sum] +prob = 0.5 +count = { min = 10, max = 100 } +time = { min = 5, max = 3600 } + +[pet.update] +prob = 0.9 +count = { min = 3, max = 10000 } +time = { min = 10, max = 3600 } + +[pet.sum2] +count = { min = 5, max = 100 } +time = { min = 5, max = 3600 } + +[mask] +group_type = "Prime" +data_type = "F32" +bound_type = "B0" +model_type = "M3" + +[model] +length = 1 + +[metrics.influxdb] +url = "http://influxdb:8086" +db = "metrics" + +[redis] +url = "redis://127.0.0.1/" + +[s3] +access_key = "minio" +secret_access_key = "minio123" +region = ["minio", "http://minio:9000"] + +[restore] +enable = false diff --git a/rust/e2e/src/lib.rs b/rust/e2e/src/lib.rs new file mode 100644 index 000000000..a9a668f5e --- /dev/null +++ b/rust/e2e/src/lib.rs @@ -0,0 +1,3 @@ +pub mod test_client; +pub mod test_env; +pub mod utils; diff --git a/rust/e2e/src/test_client/builder.rs b/rust/e2e/src/test_client/builder.rs new file mode 100644 index 000000000..3cf657198 --- /dev/null +++ b/rust/e2e/src/test_client/builder.rs @@ -0,0 +1,118 @@ +use std::sync::Arc; + +use anyhow::bail; +use xaynet_core::{ + crypto::SigningKeyPair, + mask::{FromPrimitives, Model}, +}; +use xaynet_sdk::{client::Client as ApiClient, XaynetClient}; +use xaynet_server::settings::Settings as CoordinatorSettings; + +use super::{ + runner::ClientRunner, + utils::{default_sum_client, default_update_client, generate_client, ClientType, LocalModel}, +}; +use crate::utils::concurrent_futures::ConcurrentFutures; + +pub struct TestClientBuilderSettings { + number_of_sum: u64, + number_of_update: u64, + number_of_sum2: u64, + model_length: usize, +} + +impl TestClientBuilderSettings { + pub fn new( + number_of_sum: u64, + number_of_update: u64, + number_of_sum2: u64, + model_length: usize, + ) -> Self { + Self { + number_of_sum, + number_of_update, + number_of_sum2, + model_length, + } + } +} + +impl From for TestClientBuilderSettings { + fn from(settings: CoordinatorSettings) -> Self { + Self { + number_of_sum: settings.pet.sum.count.min, + number_of_update: settings.pet.update.count.min, + number_of_sum2: settings.pet.sum2.count.min, + model_length: settings.model.length, + } + } +} + +pub struct TestClientBuilder { + settings: TestClientBuilderSettings, + api_client: ApiClient, + model: Arc, +} + +impl TestClientBuilder { + pub fn new( + settings: TestClientBuilderSettings, + api_client: ApiClient, + ) -> Self { + let model = Model::from_primitives(vec![1; settings.model_length].into_iter()).unwrap(); + Self { + api_client, + settings, + model: Arc::new(model), + } + } + + pub async fn build_client( + &mut self, + r#type: &ClientType, + func: F, + ) -> anyhow::Result> + where + F: Fn(SigningKeyPair, ApiClient, LocalModel) -> R, + R: Send + 'static + futures::Future, + ::Output: Send + 'static, + { + let round_params = self.api_client.get_round_params().await?; + let mut clients = ConcurrentFutures::::new(100); + + let number_of_clients = match r#type { + ClientType::Sum => self.settings.number_of_sum, + ClientType::Update => self.settings.number_of_update, + _ => bail!("client type is not supported"), + }; + + for _ in 0..number_of_clients { + let key_pair = generate_client(r#type, &round_params); + let client = func( + key_pair, + self.api_client.clone(), + LocalModel(self.model.clone()), + ); + + clients.push(client); + } + + Ok(clients) + } + + pub async fn build_clients(&mut self) -> anyhow::Result { + let sum_clients = self + .build_client(&ClientType::Sum, default_sum_client) + .await?; + + let update_clients = self + .build_client(&ClientType::Update, default_update_client) + .await?; + + Ok(ClientRunner::new( + sum_clients, + update_clients, + self.settings.number_of_sum2, + )) + } +} diff --git a/rust/e2e/src/test_client/mod.rs b/rust/e2e/src/test_client/mod.rs new file mode 100644 index 000000000..8226245e5 --- /dev/null +++ b/rust/e2e/src/test_client/mod.rs @@ -0,0 +1,3 @@ +pub mod builder; +pub mod runner; +pub mod utils; diff --git a/rust/e2e/src/test_client/runner.rs b/rust/e2e/src/test_client/runner.rs new file mode 100644 index 000000000..163618828 --- /dev/null +++ b/rust/e2e/src/test_client/runner.rs @@ -0,0 +1,84 @@ +#![allow(clippy::type_complexity)] +use anyhow::anyhow; +use futures::{future::BoxFuture, StreamExt}; +use tokio::sync::mpsc; +use xaynet_sdk::StateMachine; + +use super::utils::Event; +use crate::utils::concurrent_futures::ConcurrentFutures; + +pub struct ClientRunner { + sum_clients: Option< + ConcurrentFutures< + BoxFuture<'static, BoxFuture<'static, (StateMachine, mpsc::Receiver)>>, + >, + >, + update_clients: + Option)>>>, + sum2_clients: + Option)>>>, + sum2_count: u64, +} + +impl ClientRunner { + pub fn new( + sum_clients: ConcurrentFutures< + BoxFuture<'static, BoxFuture<'static, (StateMachine, mpsc::Receiver)>>, + >, + update_clients: ConcurrentFutures< + BoxFuture<'static, (StateMachine, mpsc::Receiver)>, + >, + sum2_count: u64, + ) -> Self { + Self { + sum_clients: Some(sum_clients), + update_clients: Some(update_clients), + sum2_clients: None, + sum2_count, + } + } + + pub async fn run_sum_clients(&mut self) -> anyhow::Result<()> { + let mut sum2_clients = ConcurrentFutures::< + BoxFuture<'static, (StateMachine, mpsc::Receiver)>, + >::new(100); + + let mut sum_clients = self + .sum_clients + .take() + .ok_or_else(|| anyhow!("No sum clients available"))?; + + let mut summer2 = 0; + while let Some(sum_client) = sum_clients.next().await { + if summer2 < self.sum2_count { + sum2_clients.push(sum_client?); + summer2 += 1; + } + } + + self.sum2_clients = Some(sum2_clients); + + Ok(()) + } + + pub async fn run_update_clients(&mut self) -> anyhow::Result<()> { + let mut update_clients = self + .update_clients + .take() + .ok_or_else(|| anyhow!("No update clients available"))?; + + while update_clients.next().await.is_some() {} + + Ok(()) + } + pub async fn run_sum2_clients(&mut self) -> anyhow::Result<()> { + let mut sum2_clients = self + .sum2_clients + .take() + .ok_or_else(|| anyhow!("No sum2 clients available"))?; + + while sum2_clients.next().await.is_some() {} + + Ok(()) + } +} diff --git a/rust/e2e/src/test_client/utils.rs b/rust/e2e/src/test_client/utils.rs new file mode 100644 index 000000000..43467d15c --- /dev/null +++ b/rust/e2e/src/test_client/utils.rs @@ -0,0 +1,206 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use futures::future::{BoxFuture, FutureExt}; +use tokio::sync::mpsc; +use tracing::warn; +use xaynet_core::{ + common::RoundParameters, + crypto::{ByteObject, Signature, SigningKeyPair}, + mask::Model, + ParticipantSecretKey, +}; +use xaynet_sdk::{ + client::Client as ApiClient, + settings::PetSettings, + ModelStore, + Notify, + StateMachine, + TransitionOutcome, +}; + +#[derive(Ord, PartialOrd, Eq, PartialEq)] +pub enum ClientType { + Awaiting, + Sum, + Update, +} + +pub fn generate_client(r#type: &ClientType, round_params: &RoundParameters) -> SigningKeyPair { + loop { + let (client_type, key_pair) = new_client(&round_params); + if client_type == *r#type { + break key_pair; + } + } +} + +fn new_client(round_params: &RoundParameters) -> (ClientType, SigningKeyPair) { + let key_pair = SigningKeyPair::generate(); + let role = determine_role( + key_pair.secret.clone(), + round_params.seed.as_slice(), + round_params.sum, + round_params.update, + ); + (role, key_pair) +} + +pub fn determine_role( + secret_key: ParticipantSecretKey, + round_seed: &[u8], + round_sum: f64, + round_update: f64, +) -> ClientType { + let (sum_signature, update_signature) = compute_signatures(secret_key, round_seed); + if sum_signature.is_eligible(round_sum) { + ClientType::Sum + } else if update_signature.is_eligible(round_update) { + ClientType::Update + } else { + ClientType::Awaiting + } +} + +/// Compute the sum and update signatures for the given round seed. +fn compute_signatures( + secret_key: ParticipantSecretKey, + round_seed: &[u8], +) -> (Signature, Signature) { + ( + secret_key.sign_detached(&[round_seed, b"sum"].concat()), + secret_key.sign_detached(&[round_seed, b"update"].concat()), + ) +} + +pub fn default_sum_client( + key_pair: SigningKeyPair, + api_client: ApiClient, + model_store: LocalModel, +) -> BoxFuture<'static, BoxFuture<'static, (StateMachine, mpsc::Receiver)>> { + let (event_tx, mut event_rx) = mpsc::channel::(2); + + let mut sum_client = StateMachine::new( + PetSettings::new(key_pair), + api_client, + model_store, + Notifier(event_tx), + ); + + #[allow(clippy::async_yields_async)] + Box::pin(async move { + // Idle event + let _ = event_rx.recv().now_or_never(); + + for _ in 0..2 { + sum_client = match sum_client.transition().await { + TransitionOutcome::Pending(s) => s, + TransitionOutcome::Complete(s) => s, + }; + } + + // NewRound event + let _ = event_rx.recv().now_or_never(); + + for _ in 0..4 { + sum_client = match sum_client.transition().await { + TransitionOutcome::Pending(s) => s, + TransitionOutcome::Complete(s) => s, + }; + } + + Box::pin(async { + loop { + sum_client = match sum_client.transition().await { + TransitionOutcome::Pending(s) => s, + TransitionOutcome::Complete(s) => s, + }; + if let Some(Some(Event::Idle)) | Some(Some(Event::NewRound)) = + event_rx.recv().now_or_never() + { + break; + } + } + + (sum_client, event_rx) + }) as BoxFuture<'static, (StateMachine, mpsc::Receiver)> + }) +} + +pub fn default_update_client( + key_pair: SigningKeyPair, + api_client: ApiClient, + model_store: LocalModel, +) -> BoxFuture<'static, (StateMachine, mpsc::Receiver)> { + let (event_tx, mut event_rx) = mpsc::channel::(2); + + let mut update_client = StateMachine::new( + PetSettings::new(key_pair), + api_client, + model_store, + Notifier(event_tx), + ); + + Box::pin(async move { + // Idle event + let _ = event_rx.recv().now_or_never(); + + for _ in 0..2 { + update_client = match update_client.transition().await { + TransitionOutcome::Pending(s) => s, + TransitionOutcome::Complete(s) => s, + }; + } + + // NewRound event + let _ = event_rx.recv().now_or_never(); + + loop { + update_client = match update_client.transition().await { + TransitionOutcome::Pending(s) => s, + TransitionOutcome::Complete(s) => s, + }; + if let Some(Some(Event::Idle)) | Some(Some(Event::NewRound)) = + event_rx.recv().now_or_never() + { + break; + } + } + + (update_client, event_rx) + }) +} + +pub enum Event { + Idle, + NewRound, +} + +#[derive(Clone)] +pub struct Notifier(pub mpsc::Sender); + +impl Notify for Notifier { + fn idle(&mut self) { + if let Err(e) = self.0.try_send(Event::Idle) { + warn!("failed to notify participant: {}", e); + } + } + + fn new_round(&mut self) { + if let Err(e) = self.0.try_send(Event::NewRound) { + warn!("failed to notify participant: {}", e); + } + } +} + +pub struct LocalModel(pub Arc); + +#[async_trait] +impl ModelStore for LocalModel { + type Model = Arc; + type Error = std::convert::Infallible; + + async fn load_model(&mut self) -> Result, Self::Error> { + Ok(Some(self.0.clone())) + } +} diff --git a/rust/e2e/src/test_env/environment.rs b/rust/e2e/src/test_env/environment.rs new file mode 100644 index 000000000..173a6656c --- /dev/null +++ b/rust/e2e/src/test_env/environment.rs @@ -0,0 +1,143 @@ +use std::path::PathBuf; + +use tracing_subscriber::{fmt::Formatter, reload::Handle, EnvFilter, FmtSubscriber}; +use xaynet_sdk::client::Client as ApiClient; +use xaynet_server::{ + settings::Settings, + storage::{coordinator_storage::redis, model_storage::s3}, +}; + +use super::{influx::InfluxClient, k8s::K8sClient, TestEnvironmentSettings}; + +#[allow(dead_code)] +pub struct TestEnvironment { + k8s_client: Option, + api_client: Option>, + influx_client: Option, + redis_client: Option, + s3_client: Option, + filter_handle: Handle, + settings: TestEnvironmentSettings, +} + +impl TestEnvironment { + pub async fn new(settings: TestEnvironmentSettings) -> anyhow::Result { + let filter_handle = Self::init_tracing(&settings.filter); + + Ok(Self { + k8s_client: None, + api_client: None, + influx_client: None, + redis_client: None, + s3_client: None, + filter_handle, + settings, + }) + } + + fn init_tracing(filter: impl Into) -> Handle { + let fmt_subscriber = FmtSubscriber::builder() + .with_env_filter(filter) + .with_ansi(true) + .with_filter_reloading(); + let filter_handle = fmt_subscriber.reload_handle(); + fmt_subscriber.init(); + filter_handle + } + + #[allow(dead_code)] + fn reload_filter(&self, filter: impl Into) { + let _ = self.filter_handle.reload(filter); + } + + async fn init_k8s_client(&mut self) -> anyhow::Result<()> { + self.k8s_client = Some(K8sClient::new(self.settings.k8s.clone()).await?); + Ok(()) + } + + fn init_api_client(&mut self) -> anyhow::Result<()> { + // let certificates = self + // .settings + // .api_client + // .certificates + // .as_ref() + // .map(ApiClient::certificates_from) + // .transpose()?; + + // let identity = self + // .settings + // .api_client + // .identity + // .as_ref() + // .map(ApiClient::identity_from) + // .transpose()?; + + let http_client = reqwest::ClientBuilder::new().build().unwrap(); + let api_client = ApiClient::new(http_client, &self.settings.api_client.address)?; + self.api_client = Some(api_client); + Ok(()) + } + + fn init_influx_client(&mut self) { + self.influx_client = Some(InfluxClient::new(self.settings.influx.clone())); + } + + async fn init_redis_client(&mut self) -> anyhow::Result<()> { + self.redis_client = Some(redis::Client::new(self.settings.redis.url.clone()).await?); + Ok(()) + } + + pub fn init_s3_client(&mut self) -> anyhow::Result<()> { + self.s3_client = Some(s3::Client::new(self.settings.s3.clone())?); + Ok(()) + } + + pub async fn get_k8s_client(&mut self) -> anyhow::Result { + if self.k8s_client.is_none() { + self.init_k8s_client().await?; + } + + Ok(self.k8s_client.clone().unwrap()) + } + + pub fn get_api_client(&mut self) -> anyhow::Result> { + if self.api_client.is_none() { + self.init_api_client()?; + } + + Ok(self.api_client.clone().unwrap()) + } + + pub fn get_influx_client(&mut self) -> InfluxClient { + if self.influx_client.is_none() { + self.init_influx_client(); + } + + self.influx_client.clone().unwrap() + } + + pub async fn get_redis_client(&mut self) -> anyhow::Result { + if self.redis_client.is_none() { + self.init_redis_client().await?; + } + + Ok(self.redis_client.clone().unwrap()) + } + + pub async fn get_s3_client(&mut self) -> anyhow::Result { + if self.s3_client.is_none() { + self.init_s3_client()?; + } + + Ok(self.s3_client.clone().unwrap()) + } + + pub fn get_env_settings(&self) -> TestEnvironmentSettings { + self.settings.clone() + } + + pub fn get_coordinator_settings(&self) -> anyhow::Result { + let settings = Settings::new(PathBuf::from(&self.settings.coordinator.config))?; + Ok(settings) + } +} diff --git a/rust/e2e/src/test_env/influx.rs b/rust/e2e/src/test_env/influx.rs new file mode 100644 index 000000000..063335211 --- /dev/null +++ b/rust/e2e/src/test_env/influx.rs @@ -0,0 +1,66 @@ +use anyhow::{anyhow, bail}; +use chrono::{DateTime, Utc}; +use influxdb::{integrations::serde_integration::DatabaseQueryResult, Client, Query}; +use xaynet_server::{settings::InfluxSettings, state_machine::phases::PhaseName}; + +#[derive(Clone)] +pub struct InfluxClient { + client: Client, +} + +impl InfluxClient { + pub fn new(settings: InfluxSettings) -> Self { + Self { + client: Client::new(settings.url, settings.db), + } + } + + pub async fn ping(&self) -> anyhow::Result<()> { + self.client.ping().await.map_err(|err| anyhow!(err))?; + Ok(()) + } + + pub async fn get_current_phase(&self) -> anyhow::Result { + let read_query = Query::raw_read_query("SELECT LAST(value) FROM phase GROUP BY *"); + let read_result = self + .client + .json_query(read_query) + .await + .map_err(|err| anyhow!(err))?; + deserialize_phase(read_result) + } +} + +fn deserialize_phase(mut read_result: DatabaseQueryResult) -> anyhow::Result { + let phase = read_result + .deserialize_next::() + .map_err(|err| anyhow!("no phase: {}", err))? + .series + .first() + .ok_or_else(|| anyhow!("no phase"))? + .values + .first() + .ok_or_else(|| anyhow!("no phase"))? + .last; + Ok(from_u8(phase)?) +} + +fn from_u8(phase: u8) -> anyhow::Result { + let phase_name = match phase { + 0 => PhaseName::Idle, + 1 => PhaseName::Sum, + 2 => PhaseName::Update, + 3 => PhaseName::Sum2, + 4 => PhaseName::Unmask, + 5 => PhaseName::Failure, + 6 => PhaseName::Shutdown, + _ => bail!("unknown phase"), + }; + Ok(phase_name) +} + +#[derive(Debug, serde::Deserialize)] +pub struct PhaseReading { + time: DateTime, + last: u8, +} diff --git a/rust/e2e/src/test_env/k8s.rs b/rust/e2e/src/test_env/k8s.rs new file mode 100644 index 000000000..b57d7c036 --- /dev/null +++ b/rust/e2e/src/test_env/k8s.rs @@ -0,0 +1,328 @@ +use std::{path::Path, process::Stdio}; + +use anyhow::anyhow; +use console::strip_ansi_codes; +use futures::{StreamExt, TryStreamExt}; +use k8s_openapi::{ + api::core::v1::{ConfigMap, Pod}, + apimachinery::pkg::apis::meta::v1::Time, +}; +use kube::{ + api::{Api, DeleteParams, ListParams, LogParams, Meta, Patch, PatchParams, WatchEvent}, + Client, +}; +use serde_json::json; +use tokio::{ + fs, + fs::File, + io::AsyncWriteExt, + process::{Child, Command}, + task::JoinHandle, +}; +use tracing::{error, info}; + +use super::K8sSettings; + +#[derive(Clone)] +pub struct K8sClient { + settings: K8sSettings, + pod_api: Api, + config_map_api: Api, +} + +impl K8sClient { + pub async fn new(settings: K8sSettings) -> anyhow::Result { + let client = Client::try_default().await?; + + let pod_api: Api = Api::namespaced(client.clone(), &settings.namespace); + let config_map_api: Api = Api::namespaced(client, &settings.namespace); + + Ok(Self { + settings, + pod_api, + config_map_api, + }) + } + + pub async fn deploy_with_config(&self, path: impl AsRef) -> anyhow::Result<()> { + let pod = self.find_pod(&self.settings.coordinator_pod_label).await?; + let (pod_name, config_map_name) = Self::reveal_pod_and_config_map_name(&pod)?; + let config_content = fs::read_to_string(path).await?; + self.patch_config_map(&config_map_name, config_content) + .await?; + self.restart_pod(&pod_name, &self.settings.coordinator_pod_label) + .await?; + Ok(()) + } + + pub async fn deploy_with_image(&self) -> anyhow::Result<()> { + let pod = self.find_pod(&self.settings.coordinator_pod_label).await?; + self.patch_image( + &PodSpec::name(&pod), + &self.settings.coordinator_pod_label, + &self.settings.coordinator_image, + ) + .await?; + Ok(()) + } + + pub async fn deploy_with_image_and_config(&self, path: impl AsRef) -> anyhow::Result<()> { + let pod = self.find_pod(&self.settings.coordinator_pod_label).await?; + let (pod_name, config_map_name) = Self::reveal_pod_and_config_map_name(&pod)?; + let config_content = fs::read_to_string(path).await?; + self.patch_config_map(&config_map_name, config_content) + .await?; + self.patch_image( + &pod_name, + &self.settings.coordinator_pod_label, + &self.settings.coordinator_image, + ) + .await?; + Ok(()) + } + + pub async fn restart_coordinator(&self) -> anyhow::Result<()> { + let pod = self.find_pod(&self.settings.coordinator_pod_label).await?; + self.restart_pod(&PodSpec::name(&pod), &self.settings.coordinator_pod_label) + .await?; + Ok(()) + } + + pub async fn kill_influxdb(&self) -> anyhow::Result<()> { + self.delete_pod(&self.settings.influxdb_pod_name).await + } + + pub async fn kill_redis(&self) -> anyhow::Result<()> { + self.delete_pod(&self.settings.redis_pod_name).await + } + + pub async fn kill_s3(&self) -> anyhow::Result<()> { + let pod = self.find_pod(&self.settings.s3_pod_label).await?; + self.delete_pod(&PodSpec::name(&pod)).await + } + + pub async fn port_forward_coordinator(&self) -> anyhow::Result { + let pod = self.find_pod(&self.settings.coordinator_pod_label).await?; + Self::new_port_forward(&PodSpec::name(&pod), "8081:8081") + } + + pub fn port_forward_influxdb(&self) -> anyhow::Result { + Self::new_port_forward(&self.settings.influxdb_pod_name, "8086:8086") + } + + pub fn port_forward_redis(&self) -> anyhow::Result { + Self::new_port_forward(&self.settings.redis_pod_name, "6379:6379") + } + + pub async fn port_forward_s3(&self) -> anyhow::Result { + let pod = self.find_pod(&self.settings.s3_pod_label).await?; + Self::new_port_forward(&PodSpec::name(&pod), "9000:9000") + } + + pub async fn save_coordinator_logs( + &self, + path: &str, + ) -> anyhow::Result>> { + let pod = self.find_pod(&self.settings.coordinator_pod_label).await?; + info!("writing {} log into: {}", PodSpec::name(&pod), path); + + let lp = LogParams { + follow: true, + ..Default::default() + }; + let mut logs = self + .pod_api + .log_stream(&PodSpec::name(&pod), &lp) + .await? + .boxed(); + let path = path.to_string(); + + let handle = tokio::spawn(async move { + let mut file = File::create(path).await?; + while let Some(line) = logs.try_next().await? { + let log = &String::from_utf8_lossy(&line); + file.write_all(strip_ansi_codes(log).as_bytes()).await?; + } + Ok::<(), anyhow::Error>(()) + }); + + Ok(handle) + } +} + +impl K8sClient { + async fn find_pod(&self, label: &str) -> anyhow::Result { + info!("searching for pod with label: {}", label); + + let lp = ListParams::default().labels(label); + let pods = self.pod_api.list(&lp).await?; + + let mut pods = pods.items; + pods.sort_by(|a, b| PodSpec::start_time(&b).cmp(&PodSpec::start_time(&a))); + let found_pod = pods + .into_iter() + .find(PodSpec::is_running) + .ok_or_else(|| anyhow!("cannot find pod with label: {}", label))?; + + Ok(found_pod) + } + + async fn patch_config_map( + &self, + config_map_name: &str, + config_content: String, + ) -> anyhow::Result<()> { + info!("patching config map: {}", config_map_name); + + let config_patch = json!( + { + "data": { + "config.toml": config_content + } + } + ); + + self.config_map_api + .patch( + config_map_name, + &PatchParams::default(), + &Patch::Strategic(&config_patch), + ) + .await?; + info!("patched config map: {}", config_map_name); + Ok(()) + } + + async fn restart_pod(&self, pod_name: &str, label: &str) -> anyhow::Result { + info!("restarting pod: {}", pod_name); + self.delete_pod(pod_name).await?; + + let new_pod_name = self.wait_until_restarted(label).await?; + + info!("new pod id: {}", new_pod_name); + Ok(new_pod_name) + } + + async fn delete_pod(&self, pod_name: &str) -> anyhow::Result<()> { + let dp = DeleteParams::default(); + self.pod_api.delete(pod_name, &dp).await?.map_left(|pdel| { + info!("deleting pod: {}", PodSpec::name(&pdel)); + }); + + Ok(()) + } + + async fn wait_until_restarted(&self, label: &str) -> anyhow::Result { + let lp = ListParams::default().labels(label); + let mut stream = self.pod_api.watch(&lp, "0").await?.boxed(); + + loop { + if let Some(status) = stream.try_next().await? { + match status { + WatchEvent::Added(o) => info!("added {}", PodSpec::name(&o)), + WatchEvent::Modified(o) => { + let s = o.status.as_ref().expect("status exists on pod"); + let phase = s.phase.clone().unwrap_or_default(); + info!("modified: {} with phase: {}", PodSpec::name(&o), phase); + if phase == "Running" { + break Ok(PodSpec::name(&o)); + } + } + WatchEvent::Deleted(o) => info!("deleted {}", Meta::name(&o)), + WatchEvent::Error(e) => error!("error {}", e), + _ => {} + } + } + } + } + + async fn patch_image( + &self, + pod_name: &str, + label: &str, + image: &str, + ) -> anyhow::Result { + info!("patching image of pod: {}", pod_name); + let image_patch = json!( + { + "spec": { + "containers": [ + { + "name": "coordinator", + "image": image + } + ] + } + } + ); + + self.pod_api + .patch( + pod_name, + &PatchParams::default(), + &Patch::Strategic(&image_patch), + ) + .await?; + + let new_pod_name = self.wait_until_restarted(label).await?; + + info!("Patched pod: {}", new_pod_name); + Ok(new_pod_name) + } + + fn reveal_pod_and_config_map_name(pod: &Pod) -> anyhow::Result<(String, String)> { + let pod_name = PodSpec::name(pod); + let config_map_name = + PodSpec::config_map_name(pod).ok_or_else(|| anyhow!("cannot find config map name"))?; + info!( + "pod name: {}, config map name: {}", + pod_name, config_map_name + ); + Ok((pod_name, config_map_name)) + } + + fn new_port_forward(pod_name: &str, port_mapping: &str) -> anyhow::Result { + info!( + "new port forward for pod: {} with port mapping: {}", + pod_name, port_mapping + ); + let handle = Command::new("kubectl") + .arg("port-forward") + .arg(pod_name) + .arg(port_mapping) + .kill_on_drop(true) + .stdout(Stdio::null()) + .spawn()?; + Ok(handle) + } +} + +struct PodSpec; +impl PodSpec { + fn config_map_name(pod: &Pod) -> Option { + pod.spec + .as_ref()? + .volumes + .as_ref()? + .get(1)? + .config_map + .as_ref()? + .name + .clone() + } + + fn name(pod: &Pod) -> String { + Meta::name(pod) + } + + fn is_running(pod: &Pod) -> bool { + let s = pod.status.as_ref().expect("status exists on pod"); + let phase = s.phase.clone().unwrap_or_default(); + phase == "Running" + } + + fn start_time(pod: &Pod) -> Time { + let s = pod.status.as_ref().expect("status exists on pod"); + s.start_time.as_ref().expect("start time").clone() + } +} diff --git a/rust/e2e/src/test_env/mod.rs b/rust/e2e/src/test_env/mod.rs new file mode 100644 index 000000000..e78375ce4 --- /dev/null +++ b/rust/e2e/src/test_env/mod.rs @@ -0,0 +1,63 @@ +pub mod environment; +pub mod influx; +pub mod k8s; +pub mod utils; + +use std::path::PathBuf; + +use anyhow::anyhow; +use config::Config; +use serde::Deserialize; +use xaynet_server::settings::{InfluxSettings, RedisSettings, S3Settings}; + +pub use self::environment::TestEnvironment; + +#[derive(Deserialize, Clone)] +pub struct TestEnvironmentSettings { + pub filter: String, + pub k8s: K8sSettings, + pub coordinator: CoordinatorSettings, + pub influx: InfluxSettings, + pub redis: RedisSettings, + pub s3: S3Settings, + pub api_client: ApiClientSettings, +} + +impl TestEnvironmentSettings { + pub fn from_file(path: &str) -> anyhow::Result { + let mut path = PathBuf::from(path); + path.push("Env.toml"); + let settings: TestEnvironmentSettings = Self::load(path)?; + Ok(settings) + } + + fn load(path: PathBuf) -> anyhow::Result { + let mut config = Config::new(); + config.merge(config::File::from(path))?; + config + .try_into() + .map_err(|e| anyhow!("config error: {}", e)) + } +} + +#[derive(Deserialize, Clone)] +pub struct K8sSettings { + pub namespace: String, + pub coordinator_pod_label: String, + pub coordinator_image: String, + pub influxdb_pod_name: String, + pub redis_pod_name: String, + pub s3_pod_label: String, +} + +#[derive(Deserialize, Clone)] +pub struct CoordinatorSettings { + pub config: String, +} + +#[derive(Deserialize, Clone)] +pub struct ApiClientSettings { + pub address: String, + pub certificates: Option>, + pub identity: Option, +} diff --git a/rust/e2e/src/test_env/utils.rs b/rust/e2e/src/test_env/utils.rs new file mode 100644 index 000000000..4dd04e16b --- /dev/null +++ b/rust/e2e/src/test_env/utils.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; +use tokio::time::{interval, Duration}; +use xaynet_sdk::{client::Client as HttpApiClient, XaynetClient}; +use xaynet_server::state_machine::phases::PhaseName; + +use super::influx::InfluxClient; +use crate::utils::terminal::spinner; + +#[async_trait] +pub trait IsClientReady { + async fn is_ready(&mut self) -> bool; +} + +pub async fn wait_until_client_is_ready(client: &mut C) { + let mut interval = interval(Duration::from_millis(500)); + while !(client.is_ready().await) { + interval.tick().await; + } +} + +#[async_trait] +impl IsClientReady for InfluxClient { + async fn is_ready(&mut self) -> bool { + self.ping().await.is_ok() + } +} + +#[async_trait] +impl IsClientReady for HttpApiClient { + async fn is_ready(&mut self) -> bool { + self.get_round_params().await.is_ok() + } +} + +pub async fn wait_until_phase(client: &InfluxClient, phase: PhaseName) { + let spinner = spinner(&format!("Wait for phase {:?}", phase), ""); + let mut interval = interval(Duration::from_millis(500)); + loop { + let current_phase = client.get_current_phase().await; + match current_phase { + Ok(current_phase) => { + if current_phase == phase { + break; + } else { + spinner.set_message(&format!("current phase: {:?}", current_phase)); + } + } + Err(err) => spinner.set_message(&format!("No phase metrics available: {:?}", err)), + } + interval.tick().await; + } + spinner.finish_with_message("Ok"); +} diff --git a/rust/xaynet-sdk/src/utils/concurrent_futures.rs b/rust/e2e/src/utils/concurrent_futures.rs similarity index 99% rename from rust/xaynet-sdk/src/utils/concurrent_futures.rs rename to rust/e2e/src/utils/concurrent_futures.rs index 429c34b80..06f86d785 100644 --- a/rust/xaynet-sdk/src/utils/concurrent_futures.rs +++ b/rust/e2e/src/utils/concurrent_futures.rs @@ -1,5 +1,3 @@ -#![allow(dead_code)] - use std::{ collections::VecDeque, pin::Pin, diff --git a/rust/e2e/src/utils/mod.rs b/rust/e2e/src/utils/mod.rs new file mode 100644 index 000000000..7390198af --- /dev/null +++ b/rust/e2e/src/utils/mod.rs @@ -0,0 +1,2 @@ +pub mod concurrent_futures; +pub mod terminal; diff --git a/rust/e2e/src/utils/terminal.rs b/rust/e2e/src/utils/terminal.rs new file mode 100644 index 000000000..99d5a09f9 --- /dev/null +++ b/rust/e2e/src/utils/terminal.rs @@ -0,0 +1,22 @@ +use indicatif::ProgressStyle; + +pub fn spinner(prefix: &str, msg: &str) -> indicatif::ProgressBar { + let pb = indicatif::ProgressBar::new_spinner(); + pb.enable_steady_tick(120); + pb.set_style( + ProgressStyle::default_spinner() + .tick_strings(&[ + "▹▹▹▹▹", + "▸▹▹▹▹", + "▹▸▹▹▹", + "▹▹▸▹▹", + "▹▹▹▸▹", + "▹▹▹▹▸", + "▪▪▪▪▪", + ]) + .template("{prefix:<25.bold.dim} {spinner:.blue} [{elapsed_precise}] {msg}"), + ); + pb.set_message(msg); + pb.set_prefix(prefix); + pb +} diff --git a/rust/xaynet-sdk/src/lib.rs b/rust/xaynet-sdk/src/lib.rs index 2a5fb4240..02c2e502f 100644 --- a/rust/xaynet-sdk/src/lib.rs +++ b/rust/xaynet-sdk/src/lib.rs @@ -211,7 +211,6 @@ mod message_encoder; pub mod settings; mod state_machine; mod traits; -pub(crate) mod utils; pub(crate) use self::message_encoder::MessageEncoder; pub use self::traits::{ModelStore, Notify, XaynetClient}; diff --git a/rust/xaynet-sdk/src/utils/mod.rs b/rust/xaynet-sdk/src/utils/mod.rs deleted file mode 100644 index 7be6bff5f..000000000 --- a/rust/xaynet-sdk/src/utils/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -// TODO: move to the e2e package -pub mod concurrent_futures; diff --git a/rust/xaynet-server/src/settings/mod.rs b/rust/xaynet-server/src/settings/mod.rs index b7828a3b9..b287b3b48 100644 --- a/rust/xaynet-server/src/settings/mod.rs +++ b/rust/xaynet-server/src/settings/mod.rs @@ -601,7 +601,7 @@ pub struct MetricsSettings { pub influxdb: InfluxSettings, } -#[derive(Debug, Deserialize, Validate)] +#[derive(Debug, Deserialize, Validate, Clone)] /// InfluxDB settings. pub struct InfluxSettings { #[validate(url)] @@ -638,7 +638,7 @@ pub struct InfluxSettings { pub db: String, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] /// Redis settings. pub struct RedisSettings { /// The URL where Redis is running. diff --git a/rust/xaynet-server/src/settings/s3.rs b/rust/xaynet-server/src/settings/s3.rs index 950b8843b..8e761ab45 100644 --- a/rust/xaynet-server/src/settings/s3.rs +++ b/rust/xaynet-server/src/settings/s3.rs @@ -10,7 +10,7 @@ use serde::{ }; use validator::{Validate, ValidationError}; -#[derive(Debug, Validate, Deserialize)] +#[derive(Debug, Validate, Deserialize, Clone)] /// S3 settings. pub struct S3Settings { /// The [access key ID](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html). @@ -83,7 +83,7 @@ pub struct S3Settings { pub buckets: S3BucketsSettings, } -#[derive(Debug, Validate, Deserialize)] +#[derive(Debug, Validate, Deserialize, Clone)] /// S3 buckets settings. pub struct S3BucketsSettings { /// The bucket name in which the global models are stored.