Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
integrate new client
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Steiner committed Jan 17, 2021
1 parent b37c4b8 commit 475ee63
Show file tree
Hide file tree
Showing 22 changed files with 1,758 additions and 5 deletions.
405 changes: 404 additions & 1 deletion rust/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
# internals
"benches",
"examples",
"e2e",
]

[workspace.metadata]
Expand Down
2 changes: 2 additions & 0 deletions rust/e2e/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**/*.log
**/*.pem
40 changes: 40 additions & 0 deletions rust/e2e/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[package]
name = "e2e"
version = "0.0.0"
authors = ["Xayn Engineering <[email protected]>"]
publish = false
edition = "2018"
description = "The Xayn Network project is building a privacy layer for machine learning so that AI projects can meet compliance such as GDPR and CCPA. The approach relies on Federated Learning as enabling technology that allows production AI applications to be fully privacy compliant."
readme = "../../README.md"
homepage = "https://xaynet.dev/"
repository = "https://github.com/xaynetwork/xaynet/"
license-file = "../../LICENSE"


[package.metadata]
# minimum supported rust version
msrv = "1.46.0"

[dependencies]
async-trait = "0.1.42"
anyhow = "1.0.38"
futures = "0.3.12"
serde = { version = "1.0.119", features = ["derive"] }
serde_yaml = "0.8.15"
tokio = { version = "0.2.24", features = ["full"] }
reqwest = { version = "0.10.10", default-features = false, features = ["json", "gzip", "stream"] }
xaynet-sdk = { path = "../xaynet-sdk", features = ["reqwest-client"] }
xaynet-core = { path = "../xaynet-core" }
xaynet-server = { path = "../xaynet-server", features = ["model-persistence"] }
tracing = "0.1.22"
tracing-subscriber = "0.2.15"
config = "0.10.1"
indicatif = "0.15.0"
influxdb = "0.3.0"
chrono = { version = "0.4.19" }
toml = "0.5"
kube = { version = "0.43.0" }
kube-runtime = { version = "0.43.0" }
k8s-openapi = { version = "0.9.0", default-features = false, features = ["v1_16"] }
serde_json = { version = "1.0.61" }
console = "0.13"
79 changes: 79 additions & 0 deletions rust/e2e/src/bin/test_case_1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use e2e::{
test_client::builder::{TestClientBuilder, TestClientBuilderSettings},
test_env::{utils, TestEnvironment, TestEnvironmentSettings},
};
use tokio::{
signal,
time::{timeout, Duration},
};
use tracing::info;
use xaynet_server::state_machine::phases::PhaseName;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let env_settings = TestEnvironmentSettings::from_file("src/bin/test_case_1")?;
let env = TestEnvironment::new(env_settings.clone()).await?;

tokio::select! {
res = timeout(Duration::from_secs(6000), run(env)) => {
res?
}
_ = signal::ctrl_c() => { Ok(()) }
}
}

async fn run(mut env: TestEnvironment) -> anyhow::Result<()> {
let k8s = env.get_k8s_client().await?;
k8s.deploy_with_config(env.get_env_settings().coordinator.config)
.await?;
let handle = k8s
.save_coordinator_logs("src/bin/test_case_1/coordinator.log")
.await?;

let _pfi_guard = k8s.port_forward_influxdb()?;
let _pfc_guard = if env.get_env_settings().api_client.certificates.is_none() {
Some(k8s.port_forward_coordinator().await?)
} else {
None
};

let mut api_client = env.get_api_client()?;
let influx_client = env.get_influx_client();

info!("wait until clients are ready");
let _ = tokio::join!(
utils::wait_until_coordinator_is_ready(&mut api_client),
utils::wait_until_influxdb_is_ready(&influx_client),
utils::wait_until_phase(&influx_client, PhaseName::Sum)
);

////////////////////////////////////////////////////////////////////////////////////////////////

let coordinator_settings = env.get_coordinator_settings()?;
let test_client_builder_settings =
TestClientBuilderSettings::from_coordinator_settings(coordinator_settings);

let mut test_client_builder = TestClientBuilder::new(test_client_builder_settings, api_client);

////////////////////////////////////////////////////////////////////////////////////////////////

for round in 0..10 {
info!("Round: {}", round);

let mut runner = test_client_builder.build_clients().await?;
info!("run sum clients...");
runner.run_sum_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Update).await;
info!("run update clients...");
runner.run_update_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Sum2).await;
info!("run sum2 clients...");
runner.run_sum2_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Sum).await;
}

////////////////////////////////////////////////////////////////////////////////////////////////

timeout(Duration::from_secs(10), handle).await???;
Ok(())
}
31 changes: 31 additions & 0 deletions rust/e2e/src/bin/test_case_1/Env.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
filter = "test_case=debug,e2e=debug,xaynet=info"

[k8s]
namespace = "development"
coordinator_pod_label = "app=coordinator"
coordinator_image = "xaynetwork/xaynet:development"
influxdb_pod_name = "influxdb-0"
redis_pod_name = "redis-master-0"
s3_pod_label = "minio"

[coordinator]
config = "src/bin/test_case_1/config.toml"

[influx]
url = "http://localhost:8086"
db = "metrics"

[redis]
url = "redis://localhost/"

[s3]
access_key = "minio"
secret_access_key = "minio123"
region = ["minio", "http://localhost:9000"]

[api_client]
address = "http://localhost:8081"
# tls
# address = "https://dev-coordinator.xayn.com"
# certificates = [ "src/bin/test_case_1/dev-coordinator-xayn-com.pem" ]
# identity =
47 changes: 47 additions & 0 deletions rust/e2e/src/bin/test_case_1/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[log]
filter = "xaynet=debug,http=warn,info"

[api]
bind_address = "0.0.0.0:8081"
tls_certificate = "/app/ssl/tls.pem"
tls_key = "/app/ssl/tls.key"

[pet]
min_sum_count = 1
min_update_count = 3
min_sum2_count = 1
max_sum_count = 100
max_update_count = 10000
max_sum2_count = 100
min_sum_time = 5
min_update_time = 10
min_sum2_time = 5
max_sum_time = 3600
max_update_time = 3600
max_sum2_time = 3600
sum = 0.5
update = 0.9

[mask]
group_type = "Prime"
data_type = "F32"
bound_type = "B0"
model_type = "M3"

[model]
length = 1

[metrics.influxdb]
url = "http://influxdb:8086"
db = "metrics"

[redis]
url = "redis://127.0.0.1/"

[s3]
access_key = "minio"
secret_access_key = "minio123"
region = ["minio", "http://minio:9000"]

[restore]
enable = false
6 changes: 6 additions & 0 deletions rust/e2e/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[macro_use]
extern crate tracing;

pub mod test_client;
pub mod test_env;
pub mod utils;
129 changes: 129 additions & 0 deletions rust/e2e/src/test_client/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use super::{
runner::ClientRunner,
utils::{default_sum_client, default_update_client, Event, LocalModel},
};
use crate::utils::concurrent_futures::ConcurrentFutures;
use futures::future::BoxFuture;
use std::sync::Arc;
use tokio::sync::mpsc;
use xaynet_core::{
common::RoundParameters,
mask::{FromPrimitives, Model},
};
use xaynet_sdk::{client::Client as ApiClient, StateMachine, XaynetClient};

pub struct TestClientBuilderSettings {
number_of_sum: u64,
number_of_update: u64,
model_length: usize,
}

impl TestClientBuilderSettings {
pub fn from_coordinator_settings(
coordinator_settings: xaynet_server::settings::Settings,
) -> Self {
Self {
number_of_sum: coordinator_settings.pet.min_sum_count,
number_of_update: coordinator_settings.pet.min_update_count,
model_length: coordinator_settings.model.length,
}
}
}

pub struct TestClientBuilder {
settings: TestClientBuilderSettings,
api_client: ApiClient<reqwest::Client>,
model: Arc<Model>,
}

impl TestClientBuilder {
pub fn new(
settings: TestClientBuilderSettings,
api_client: ApiClient<reqwest::Client>,
) -> Self {
let model = Model::from_primitives(vec![1; settings.model_length].into_iter()).unwrap();
Self {
api_client,
settings,
model: Arc::new(model),
}
}

pub async fn build_sum_clients<
F: Fn(
&RoundParameters,
ApiClient<reqwest::Client>,
LocalModel,
) -> anyhow::Result<
BoxFuture<'static, BoxFuture<'static, (StateMachine, mpsc::Receiver<Event>)>>,
>,
>(
&mut self,
f: F,
) -> anyhow::Result<
ConcurrentFutures<
BoxFuture<'static, BoxFuture<'static, (StateMachine, mpsc::Receiver<Event>)>>,
>,
> {
let round_params = self.api_client.get_round_params().await?;
let mut sum_clients = ConcurrentFutures::<
BoxFuture<'static, BoxFuture<'static, (StateMachine, mpsc::Receiver<Event>)>>,
>::new(100);

for _ in 0..self.settings.number_of_sum {
let summer = f(
&round_params,
self.api_client.clone(),
LocalModel(self.model.clone()),
)?;
sum_clients.push(summer);
}

Ok(sum_clients)
}

pub async fn build_update_clients<
F: Fn(
&RoundParameters,
ApiClient<reqwest::Client>,
LocalModel,
) -> anyhow::Result<BoxFuture<'static, (StateMachine, mpsc::Receiver<Event>)>>,
>(
&mut self,
f: F,
) -> anyhow::Result<ConcurrentFutures<BoxFuture<'static, (StateMachine, mpsc::Receiver<Event>)>>>
{
let round_params = self.api_client.get_round_params().await?;
let mut update_clients = ConcurrentFutures::<
BoxFuture<'static, (StateMachine, mpsc::Receiver<Event>)>,
>::new(100);

for _ in 0..self.settings.number_of_update {
let updater = f(
&round_params,
self.api_client.clone(),
LocalModel(self.model.clone()),
)?;

update_clients.push(updater);
}

Ok(update_clients)
}

pub async fn build_clients(&mut self) -> anyhow::Result<ClientRunner> {
let sum_clients = self
.build_sum_clients(|round_params, api_client, model_store| {
default_sum_client(round_params, api_client, model_store)
})
.await?;

let update_clients = self
.build_update_clients(|round_params, api_client, model_store| {
default_update_client(round_params, api_client, model_store)
})
.await?;

Ok(ClientRunner::new(sum_clients, update_clients))
}
}
3 changes: 3 additions & 0 deletions rust/e2e/src/test_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod builder;
pub mod runner;
pub mod utils;
Loading

0 comments on commit 475ee63

Please sign in to comment.