Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: mediator server exposed via module #13

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions affinidi-messaging-mediator/conf/mediator.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ ws_size_limit = "${WS_SIZE_LIMIT:10485760}"
### Default: redis://127.0.0.1/
database_url = "${REDIS_URL:redis://127.0.0.1/}"

### lua_scripts: path to lua scripts used in Redis
### Default: file://../affinidi-messaging-processor/redis-functions/atm-functions.lua
lua_scripts = "${MEDIATOR_LUA_SCRIPTS:file://../affinidi-messaging-processor/redis-functions/atm-functions.lua}"

### database_pool_size: Number of connections to the database
### Default: 10
database_pool_size = "${DATABASE_POOL_SIZE:10}"
Expand Down
37 changes: 0 additions & 37 deletions affinidi-messaging-mediator/src/common/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub struct ServerConfig {
#[derive(Debug, Serialize, Deserialize)]
pub struct DatabaseConfig {
pub database_url: String,
pub lua_scripts: String,
pub database_pool_size: String,
pub database_timeout: String,
pub max_message_size: String,
Expand Down Expand Up @@ -114,7 +113,6 @@ pub struct Config {
pub mediator_did: String,
pub mediator_secrets: AffinidiSecrets,
pub database_url: String,
pub lua_scripts: String,
pub database_pool_size: usize,
pub database_timeout: u32,
pub http_size_limit: u32,
Expand Down Expand Up @@ -192,7 +190,6 @@ impl Default for Config {
mediator_did: "".into(),
mediator_secrets: AffinidiSecrets::new(vec![]),
database_url: "redis://127.0.0.1/".into(),
lua_scripts: "".into(),
database_pool_size: 10,
database_timeout: 2,
max_message_size: 1048576,
Expand Down Expand Up @@ -243,7 +240,6 @@ impl TryFrom<ConfigRaw> for Config {
listen_address: raw.listen_address,
mediator_did: read_did_config(&raw.mediator_did, &aws_config).await?,
database_url: raw.database.database_url,
lua_scripts: read_lua_scripts(&raw.database.lua_scripts).await?,
database_pool_size: raw.database.database_pool_size.parse().unwrap_or(10),
database_timeout: raw.database.database_timeout.parse().unwrap_or(2),
max_message_size: raw.database.max_message_size.parse().unwrap_or(1048576),
Expand Down Expand Up @@ -369,39 +365,6 @@ async fn load_secrets(
))
}

async fn read_lua_scripts(lua_scripts: &str) -> Result<String, MediatorError> {
let parts: Vec<&str> = lua_scripts.split("://").collect();
if parts.len() != 2 {
return Err(MediatorError::ConfigError(
"NA".into(),
"Invalid `lua_scripts` format".into(),
));
}

info!(
"Loading lua_scripts method({}) path({})",
parts[0], parts[1]
);
let content: String = match parts[0] {
// "file" => read_file_lines(parts[1], false)?.concat(),
"file" => fs::read_to_string(parts[1]).map_err(|err| {
event!(Level::ERROR, "Could not open file({}). {}", parts[1], err);
MediatorError::ConfigError(
"NA".into(),
format!("Could not open file({}). {}", parts[1], err),
)
})?,
"inline" => parts[1].to_string(),
_ => {
return Err(MediatorError::ConfigError(
"NA".into(),
"Invalid `lua_scripts` format! Expecting file:// or inline:// ...".into(),
))
}
};
Ok(content)
}

/// Read the primary configuration file for the mediator
/// Returns a ConfigRaw struct, that still needs to be processed for additional information
/// and conversion to Config struct
Expand Down
4 changes: 3 additions & 1 deletion affinidi-messaging-mediator/src/database/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::common::{config::Config, errors::MediatorError};

use super::DatabaseHandler;

static LUA_SCRIPTS: &[u8] = include_bytes!("atm-functions.lua");

impl DatabaseHandler {
pub async fn new(config: &Config) -> Result<Self, MediatorError> {
// Creates initial pool Configuration from the redis database URL
Expand Down Expand Up @@ -86,7 +88,7 @@ impl DatabaseHandler {
let function_load: Result<String, deadpool_redis::redis::RedisError> =
deadpool_redis::redis::cmd("FUNCTION")
.arg("LOAD")
.arg(config.lua_scripts.clone())
.arg(LUA_SCRIPTS.clone())
.query_async(&mut conn)
.await;
match function_load {
Expand Down
1 change: 1 addition & 0 deletions affinidi-messaging-mediator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod database;
pub mod handlers;
pub mod messages;
pub mod resolvers;
pub mod server;
pub mod tasks;

#[derive(Clone)]
Expand Down
170 changes: 2 additions & 168 deletions affinidi-messaging-mediator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,172 +1,6 @@
use affinidi_did_resolver_cache_sdk::DIDCacheClient;
use affinidi_messaging_mediator::{
database::DatabaseHandler,
handlers::{application_routes, health_checker_handler},
init,
tasks::websocket_streaming::StreamingTask,
SharedData,
};
use axum::{routing::get, Router};
use axum_server::tls_rustls::RustlsConfig;
use http::Method;
use std::{env, net::SocketAddr};
use tower_http::limit::RequestBodyLimitLayer;
use tower_http::trace::{self, TraceLayer};
use tracing::{event, Level};
use tracing_subscriber::{filter, layer::SubscriberExt, reload, util::SubscriberInitExt};
use affinidi_messaging_mediator::{server::start};

#[tokio::main]
async fn main() {
// setup logging/tracing framework
let filter = filter::LevelFilter::INFO; // This can be changed in the config file!
let (filter, reload_handle) = reload::Layer::new(filter);
let ansi = env::var("LOCAL").is_ok();
tracing_subscriber::registry()
.with(filter)
.with(tracing_subscriber::fmt::layer().with_ansi(ansi))
.init();

if ansi {
event!(
Level::INFO,
r#" db ad88 ad88 88 88 88 88 88b d88 88 88"#
);
event!(
Level::INFO,
r#" d88b d8" d8" "" "" 88 "" 888b d888 88 "" ,d"#
);
event!(
Level::INFO,
r#" d8'`8b 88 88 88 88`8b d8'88 88 88"#
);
event!(
Level::INFO,
r#" d8' `8b MM88MMM MM88MMM 88 8b,dPPYba, 88 ,adPPYb,88 88 88 `8b d8' 88 ,adPPYba, ,adPPYb,88 88 ,adPPYYba, MM88MMM ,adPPYba, 8b,dPPYba,"#
);
event!(
Level::INFO,
r#" d8YaaaaY8b 88 88 88 88P' `"8a 88 a8" `Y88 88 88 `8b d8' 88 a8P_____88 a8" `Y88 88 "" `Y8 88 a8" "8a 88P' "Y8"#
);
event!(
Level::INFO,
r#" d8""""""""8b 88 88 88 88 88 88 8b 88 88 88 `8b d8' 88 8PP""""""" 8b 88 88 ,adPPPPP88 88 8b d8 88"#
);
event!(
Level::INFO,
r#" d8' `8b 88 88 88 88 88 88 "8a, ,d88 88 88 `888' 88 "8b, ,aa "8a, ,d88 88 88, ,88 88, "8a, ,a8" 88"#
);
event!(
Level::INFO,
r#" d8' `8b 88 88 88 88 88 88 `"8bbdP"Y8 88 88 `8' 88 `"Ybbd8"' `"8bbdP"Y8 88 `"8bbdP"Y8 "Y888 `"YbbdP"' 88"#
);
event!(Level::INFO, "");
}

event!(
Level::INFO,
"[Loading Affinidi Secure Messaging Mediator configuration]"
);

let config = init(Some(reload_handle))
.await
.expect("Couldn't initialize mediator!");

// Start setting up the database durability and handling
let database = match DatabaseHandler::new(&config).await {
Ok(db) => db,
Err(err) => {
event!(Level::ERROR, "Error opening database: {}", err);
event!(Level::ERROR, "Exiting...");
std::process::exit(1);
}
};

// Start the statistics thread
let _stats_database = database.clone(); // Clone the database handler for the statistics thread
tokio::spawn(async move {
affinidi_messaging_mediator::tasks::statistics::statistics(_stats_database)
.await
.expect("Error starting statistics thread");
});

// Start the streaming thread if enabled
let (streaming_task, _) = if config.streaming_enabled {
let _database = database.clone(); // Clone the database handler for the subscriber thread
let uuid = config.streaming_uuid.clone();
let (_task, _handle) = StreamingTask::new(_database.clone(), &uuid)
.await
.expect("Error starting streaming task");
(Some(_task), Some(_handle))
} else {
(None, None)
};

// Create the DID Resolver
let did_resolver = DIDCacheClient::new(config.did_resolver_config.clone())
.await
.unwrap();

// Create the shared application State
let shared_state = SharedData {
config: config.clone(),
service_start_timestamp: chrono::Utc::now(),
did_resolver,
database,
streaming_task,
};

// build our application routes
let app: Router = application_routes(&shared_state);

// Add middleware to all routes
let app = Router::new()
.merge(app)
.layer(
config
.cors_allow_origin
.allow_headers([http::header::CONTENT_TYPE])
.allow_methods([
Method::GET,
Method::POST,
Method::PUT,
Method::DELETE,
Method::PATCH,
]),
)
.layer(
TraceLayer::new_for_http()
.make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO))
.on_response(trace::DefaultOnResponse::new().level(Level::INFO)),
)
.layer(RequestBodyLimitLayer::new(config.http_size_limit as usize))
// Add the healthcheck route after the tracing so we don't fill up logs with healthchecks
.route(
"/atm/healthchecker",
get(health_checker_handler).with_state(shared_state),
);

if config.use_ssl {
event!(
Level::INFO,
"This mediator is using SSL/TLS for secure communication."
);
// configure certificate and private key used by https
// TODO: Build a proper TLS Config
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let ssl_config =
RustlsConfig::from_pem_file(config.ssl_certificate_file, config.ssl_key_file)
.await
.expect("bad certificate/key");

axum_server::bind_rustls(config.listen_address.parse().unwrap(), ssl_config)
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();
} else {
event!(Level::WARN, "**** WARNING: Running without SSL/TLS ****");
axum_server::bind(config.listen_address.parse().unwrap())
.serve(app.into_make_service_with_connect_info::<SocketAddr>())
.await
.unwrap();
}
return start().await
}
Loading
Loading