From b43f1610c09458021a81b0bf504f4d3c83fda1db Mon Sep 17 00:00:00 2001 From: Robert Kwolek Date: Tue, 10 Sep 2024 10:23:15 +0200 Subject: [PATCH] feat: lua scripts embedded in mediator --- .../conf/mediator.toml | 4 - .../src/common/config.rs | 37 ---- .../src/database}/atm-functions.lua | 0 .../src/database/handlers.rs | 4 +- affinidi-messaging-mediator/src/lib.rs | 1 + affinidi-messaging-mediator/src/main.rs | 170 +---------------- affinidi-messaging-mediator/src/server.rs | 172 ++++++++++++++++++ 7 files changed, 178 insertions(+), 210 deletions(-) rename {affinidi-messaging-processor/redis-functions => affinidi-messaging-mediator/src/database}/atm-functions.lua (100%) create mode 100644 affinidi-messaging-mediator/src/server.rs diff --git a/affinidi-messaging-mediator/conf/mediator.toml b/affinidi-messaging-mediator/conf/mediator.toml index 49d9bf5..0115998 100644 --- a/affinidi-messaging-mediator/conf/mediator.toml +++ b/affinidi-messaging-mediator/conf/mediator.toml @@ -40,10 +40,6 @@ ws_size_limit = "${WS_SIZE_LIMIT:10485760}" ### Default: redis://127.0.0.1/ database_url = "${REDIS_URL:redis://127.0.0.1/}" -### lua_scripts: path to lua scripts used in Redis -### Default: file://../affinidi-messaging-processor/redis-functions/atm-functions.lua -lua_scripts = "${MEDIATOR_LUA_SCRIPTS:file://../affinidi-messaging-processor/redis-functions/atm-functions.lua}" - ### database_pool_size: Number of connections to the database ### Default: 10 database_pool_size = "${DATABASE_POOL_SIZE:10}" diff --git a/affinidi-messaging-mediator/src/common/config.rs b/affinidi-messaging-mediator/src/common/config.rs index fffe72f..c0bee1c 100644 --- a/affinidi-messaging-mediator/src/common/config.rs +++ b/affinidi-messaging-mediator/src/common/config.rs @@ -32,7 +32,6 @@ pub struct ServerConfig { #[derive(Debug, Serialize, Deserialize)] pub struct DatabaseConfig { pub database_url: String, - pub lua_scripts: String, pub database_pool_size: String, pub database_timeout: String, pub max_message_size: String, @@ -114,7 +113,6 @@ pub struct Config { pub mediator_did: String, pub mediator_secrets: AffinidiSecrets, pub database_url: String, - pub lua_scripts: String, pub database_pool_size: usize, pub database_timeout: u32, pub http_size_limit: u32, @@ -192,7 +190,6 @@ impl Default for Config { mediator_did: "".into(), mediator_secrets: AffinidiSecrets::new(vec![]), database_url: "redis://127.0.0.1/".into(), - lua_scripts: "".into(), database_pool_size: 10, database_timeout: 2, max_message_size: 1048576, @@ -243,7 +240,6 @@ impl TryFrom for Config { listen_address: raw.listen_address, mediator_did: read_did_config(&raw.mediator_did, &aws_config).await?, database_url: raw.database.database_url, - lua_scripts: read_lua_scripts(&raw.database.lua_scripts).await?, database_pool_size: raw.database.database_pool_size.parse().unwrap_or(10), database_timeout: raw.database.database_timeout.parse().unwrap_or(2), max_message_size: raw.database.max_message_size.parse().unwrap_or(1048576), @@ -369,39 +365,6 @@ async fn load_secrets( )) } -async fn read_lua_scripts(lua_scripts: &str) -> Result { - let parts: Vec<&str> = lua_scripts.split("://").collect(); - if parts.len() != 2 { - return Err(MediatorError::ConfigError( - "NA".into(), - "Invalid `lua_scripts` format".into(), - )); - } - - info!( - "Loading lua_scripts method({}) path({})", - parts[0], parts[1] - ); - let content: String = match parts[0] { - // "file" => read_file_lines(parts[1], false)?.concat(), - "file" => fs::read_to_string(parts[1]).map_err(|err| { - event!(Level::ERROR, "Could not open file({}). {}", parts[1], err); - MediatorError::ConfigError( - "NA".into(), - format!("Could not open file({}). {}", parts[1], err), - ) - })?, - "inline" => parts[1].to_string(), - _ => { - return Err(MediatorError::ConfigError( - "NA".into(), - "Invalid `lua_scripts` format! Expecting file:// or inline:// ...".into(), - )) - } - }; - Ok(content) -} - /// Read the primary configuration file for the mediator /// Returns a ConfigRaw struct, that still needs to be processed for additional information /// and conversion to Config struct diff --git a/affinidi-messaging-processor/redis-functions/atm-functions.lua b/affinidi-messaging-mediator/src/database/atm-functions.lua similarity index 100% rename from affinidi-messaging-processor/redis-functions/atm-functions.lua rename to affinidi-messaging-mediator/src/database/atm-functions.lua diff --git a/affinidi-messaging-mediator/src/database/handlers.rs b/affinidi-messaging-mediator/src/database/handlers.rs index ad010f3..cce8216 100644 --- a/affinidi-messaging-mediator/src/database/handlers.rs +++ b/affinidi-messaging-mediator/src/database/handlers.rs @@ -8,6 +8,8 @@ use crate::common::{config::Config, errors::MediatorError}; use super::DatabaseHandler; +static LUA_SCRIPTS: &[u8] = include_bytes!("atm-functions.lua"); + impl DatabaseHandler { pub async fn new(config: &Config) -> Result { // Creates initial pool Configuration from the redis database URL @@ -86,7 +88,7 @@ impl DatabaseHandler { let function_load: Result = deadpool_redis::redis::cmd("FUNCTION") .arg("LOAD") - .arg(config.lua_scripts.clone()) + .arg(LUA_SCRIPTS.clone()) .query_async(&mut conn) .await; match function_load { diff --git a/affinidi-messaging-mediator/src/lib.rs b/affinidi-messaging-mediator/src/lib.rs index e4e9a84..92d7910 100644 --- a/affinidi-messaging-mediator/src/lib.rs +++ b/affinidi-messaging-mediator/src/lib.rs @@ -22,6 +22,7 @@ pub mod database; pub mod handlers; pub mod messages; pub mod resolvers; +pub mod server; pub mod tasks; #[derive(Clone)] diff --git a/affinidi-messaging-mediator/src/main.rs b/affinidi-messaging-mediator/src/main.rs index e1b9090..6b9c49a 100644 --- a/affinidi-messaging-mediator/src/main.rs +++ b/affinidi-messaging-mediator/src/main.rs @@ -1,172 +1,6 @@ -use affinidi_did_resolver_cache_sdk::DIDCacheClient; -use affinidi_messaging_mediator::{ - database::DatabaseHandler, - handlers::{application_routes, health_checker_handler}, - init, - tasks::websocket_streaming::StreamingTask, - SharedData, -}; -use axum::{routing::get, Router}; -use axum_server::tls_rustls::RustlsConfig; -use http::Method; -use std::{env, net::SocketAddr}; -use tower_http::limit::RequestBodyLimitLayer; -use tower_http::trace::{self, TraceLayer}; -use tracing::{event, Level}; -use tracing_subscriber::{filter, layer::SubscriberExt, reload, util::SubscriberInitExt}; +use affinidi_messaging_mediator::{server::start}; #[tokio::main] async fn main() { - // setup logging/tracing framework - let filter = filter::LevelFilter::INFO; // This can be changed in the config file! - let (filter, reload_handle) = reload::Layer::new(filter); - let ansi = env::var("LOCAL").is_ok(); - tracing_subscriber::registry() - .with(filter) - .with(tracing_subscriber::fmt::layer().with_ansi(ansi)) - .init(); - - if ansi { - event!( - Level::INFO, - r#" db ad88 ad88 88 88 88 88 88b d88 88 88"# - ); - event!( - Level::INFO, - r#" d88b d8" d8" "" "" 88 "" 888b d888 88 "" ,d"# - ); - event!( - Level::INFO, - r#" d8'`8b 88 88 88 88`8b d8'88 88 88"# - ); - event!( - Level::INFO, - r#" d8' `8b MM88MMM MM88MMM 88 8b,dPPYba, 88 ,adPPYb,88 88 88 `8b d8' 88 ,adPPYba, ,adPPYb,88 88 ,adPPYYba, MM88MMM ,adPPYba, 8b,dPPYba,"# - ); - event!( - Level::INFO, - r#" d8YaaaaY8b 88 88 88 88P' `"8a 88 a8" `Y88 88 88 `8b d8' 88 a8P_____88 a8" `Y88 88 "" `Y8 88 a8" "8a 88P' "Y8"# - ); - event!( - Level::INFO, - r#" d8""""""""8b 88 88 88 88 88 88 8b 88 88 88 `8b d8' 88 8PP""""""" 8b 88 88 ,adPPPPP88 88 8b d8 88"# - ); - event!( - Level::INFO, - r#" d8' `8b 88 88 88 88 88 88 "8a, ,d88 88 88 `888' 88 "8b, ,aa "8a, ,d88 88 88, ,88 88, "8a, ,a8" 88"# - ); - event!( - Level::INFO, - r#" d8' `8b 88 88 88 88 88 88 `"8bbdP"Y8 88 88 `8' 88 `"Ybbd8"' `"8bbdP"Y8 88 `"8bbdP"Y8 "Y888 `"YbbdP"' 88"# - ); - event!(Level::INFO, ""); - } - - event!( - Level::INFO, - "[Loading Affinidi Secure Messaging Mediator configuration]" - ); - - let config = init(Some(reload_handle)) - .await - .expect("Couldn't initialize mediator!"); - - // Start setting up the database durability and handling - let database = match DatabaseHandler::new(&config).await { - Ok(db) => db, - Err(err) => { - event!(Level::ERROR, "Error opening database: {}", err); - event!(Level::ERROR, "Exiting..."); - std::process::exit(1); - } - }; - - // Start the statistics thread - let _stats_database = database.clone(); // Clone the database handler for the statistics thread - tokio::spawn(async move { - affinidi_messaging_mediator::tasks::statistics::statistics(_stats_database) - .await - .expect("Error starting statistics thread"); - }); - - // Start the streaming thread if enabled - let (streaming_task, _) = if config.streaming_enabled { - let _database = database.clone(); // Clone the database handler for the subscriber thread - let uuid = config.streaming_uuid.clone(); - let (_task, _handle) = StreamingTask::new(_database.clone(), &uuid) - .await - .expect("Error starting streaming task"); - (Some(_task), Some(_handle)) - } else { - (None, None) - }; - - // Create the DID Resolver - let did_resolver = DIDCacheClient::new(config.did_resolver_config.clone()) - .await - .unwrap(); - - // Create the shared application State - let shared_state = SharedData { - config: config.clone(), - service_start_timestamp: chrono::Utc::now(), - did_resolver, - database, - streaming_task, - }; - - // build our application routes - let app: Router = application_routes(&shared_state); - - // Add middleware to all routes - let app = Router::new() - .merge(app) - .layer( - config - .cors_allow_origin - .allow_headers([http::header::CONTENT_TYPE]) - .allow_methods([ - Method::GET, - Method::POST, - Method::PUT, - Method::DELETE, - Method::PATCH, - ]), - ) - .layer( - TraceLayer::new_for_http() - .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO)) - .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), - ) - .layer(RequestBodyLimitLayer::new(config.http_size_limit as usize)) - // Add the healthcheck route after the tracing so we don't fill up logs with healthchecks - .route( - "/atm/healthchecker", - get(health_checker_handler).with_state(shared_state), - ); - - if config.use_ssl { - event!( - Level::INFO, - "This mediator is using SSL/TLS for secure communication." - ); - // configure certificate and private key used by https - // TODO: Build a proper TLS Config - let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); - let ssl_config = - RustlsConfig::from_pem_file(config.ssl_certificate_file, config.ssl_key_file) - .await - .expect("bad certificate/key"); - - axum_server::bind_rustls(config.listen_address.parse().unwrap(), ssl_config) - .serve(app.into_make_service_with_connect_info::()) - .await - .unwrap(); - } else { - event!(Level::WARN, "**** WARNING: Running without SSL/TLS ****"); - axum_server::bind(config.listen_address.parse().unwrap()) - .serve(app.into_make_service_with_connect_info::()) - .await - .unwrap(); - } + return start().await } diff --git a/affinidi-messaging-mediator/src/server.rs b/affinidi-messaging-mediator/src/server.rs new file mode 100644 index 0000000..26d17c5 --- /dev/null +++ b/affinidi-messaging-mediator/src/server.rs @@ -0,0 +1,172 @@ +use affinidi_did_resolver_cache_sdk::DIDCacheClient; +use crate::{ + database::DatabaseHandler, + handlers::{application_routes, health_checker_handler}, + init, + tasks::websocket_streaming::StreamingTask, + SharedData, + tasks::statistics::statistics, +}; +use axum::{routing::get, Router}; +use axum_server::tls_rustls::RustlsConfig; +use http::Method; +use std::{env, net::SocketAddr}; +use tower_http::limit::RequestBodyLimitLayer; +use tower_http::trace::{self, TraceLayer}; +use tracing::{event, Level}; +use tracing_subscriber::{filter, layer::SubscriberExt, reload, util::SubscriberInitExt}; + +pub async fn start() { + // setup logging/tracing framework + let filter = filter::LevelFilter::INFO; // This can be changed in the config file! + let (filter, reload_handle) = reload::Layer::new(filter); + let ansi = env::var("LOCAL").is_ok(); + tracing_subscriber::registry() + .with(filter) + .with(tracing_subscriber::fmt::layer().with_ansi(ansi)) + .init(); + + if ansi { + event!( + Level::INFO, + r#" db ad88 ad88 88 88 88 88 88b d88 88 88"# + ); + event!( + Level::INFO, + r#" d88b d8" d8" "" "" 88 "" 888b d888 88 "" ,d"# + ); + event!( + Level::INFO, + r#" d8'`8b 88 88 88 88`8b d8'88 88 88"# + ); + event!( + Level::INFO, + r#" d8' `8b MM88MMM MM88MMM 88 8b,dPPYba, 88 ,adPPYb,88 88 88 `8b d8' 88 ,adPPYba, ,adPPYb,88 88 ,adPPYYba, MM88MMM ,adPPYba, 8b,dPPYba,"# + ); + event!( + Level::INFO, + r#" d8YaaaaY8b 88 88 88 88P' `"8a 88 a8" `Y88 88 88 `8b d8' 88 a8P_____88 a8" `Y88 88 "" `Y8 88 a8" "8a 88P' "Y8"# + ); + event!( + Level::INFO, + r#" d8""""""""8b 88 88 88 88 88 88 8b 88 88 88 `8b d8' 88 8PP""""""" 8b 88 88 ,adPPPPP88 88 8b d8 88"# + ); + event!( + Level::INFO, + r#" d8' `8b 88 88 88 88 88 88 "8a, ,d88 88 88 `888' 88 "8b, ,aa "8a, ,d88 88 88, ,88 88, "8a, ,a8" 88"# + ); + event!( + Level::INFO, + r#" d8' `8b 88 88 88 88 88 88 `"8bbdP"Y8 88 88 `8' 88 `"Ybbd8"' `"8bbdP"Y8 88 `"8bbdP"Y8 "Y888 `"YbbdP"' 88"# + ); + event!(Level::INFO, ""); + } + + event!( + Level::INFO, + "[Loading Affinidi Secure Messaging Mediator configuration]" + ); + + let config = init(Some(reload_handle)) + .await + .expect("Couldn't initialize mediator!"); + + // Start setting up the database durability and handling + let database = match DatabaseHandler::new(&config).await { + Ok(db) => db, + Err(err) => { + event!(Level::ERROR, "Error opening database: {}", err); + event!(Level::ERROR, "Exiting..."); + std::process::exit(1); + } + }; + + // Start the statistics thread + let _stats_database = database.clone(); // Clone the database handler for the statistics thread + tokio::spawn(async move { + statistics(_stats_database) + .await + .expect("Error starting statistics thread"); + }); + + // Start the streaming thread if enabled + let (streaming_task, _) = if config.streaming_enabled { + let _database = database.clone(); // Clone the database handler for the subscriber thread + let uuid = config.streaming_uuid.clone(); + let (_task, _handle) = StreamingTask::new(_database.clone(), &uuid) + .await + .expect("Error starting streaming task"); + (Some(_task), Some(_handle)) + } else { + (None, None) + }; + + // Create the DID Resolver + let did_resolver = DIDCacheClient::new(config.did_resolver_config.clone()) + .await + .unwrap(); + + // Create the shared application State + let shared_state = SharedData { + config: config.clone(), + service_start_timestamp: chrono::Utc::now(), + did_resolver, + database, + streaming_task, + }; + + // build our application routes + let app: Router = application_routes(&shared_state); + + // Add middleware to all routes + let app = Router::new() + .merge(app) + .layer( + config + .cors_allow_origin + .allow_headers([http::header::CONTENT_TYPE]) + .allow_methods([ + Method::GET, + Method::POST, + Method::PUT, + Method::DELETE, + Method::PATCH, + ]), + ) + .layer( + TraceLayer::new_for_http() + .make_span_with(trace::DefaultMakeSpan::new().level(Level::INFO)) + .on_response(trace::DefaultOnResponse::new().level(Level::INFO)), + ) + .layer(RequestBodyLimitLayer::new(config.http_size_limit as usize)) + // Add the healthcheck route after the tracing so we don't fill up logs with healthchecks + .route( + "/atm/healthchecker", + get(health_checker_handler).with_state(shared_state), + ); + + if config.use_ssl { + event!( + Level::INFO, + "This mediator is using SSL/TLS for secure communication." + ); + // configure certificate and private key used by https + // TODO: Build a proper TLS Config + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let ssl_config = + RustlsConfig::from_pem_file(config.ssl_certificate_file, config.ssl_key_file) + .await + .expect("bad certificate/key"); + + axum_server::bind_rustls(config.listen_address.parse().unwrap(), ssl_config) + .serve(app.into_make_service_with_connect_info::()) + .await + .unwrap(); + } else { + event!(Level::WARN, "**** WARNING: Running without SSL/TLS ****"); + axum_server::bind(config.listen_address.parse().unwrap()) + .serve(app.into_make_service_with_connect_info::()) + .await + .unwrap(); + } +}