From b738d084d07feb665e8728dd5a288fbb8d4eabf9 Mon Sep 17 00:00:00 2001 From: Anton Yemelyanov Date: Sat, 18 Nov 2023 02:39:14 +0200 Subject: [PATCH] wip - refactoring interop and runtime 1/2 --- core/src/imports.rs | 2 +- core/src/interop/mod.rs | 71 ++++++++++++++---- core/src/interop/runtime.rs | 100 ------------------------- core/src/interop/services/kaspa/mod.rs | 2 +- core/src/interop/services/metrics.rs | 2 +- core/src/interop/services/peers.rs | 1 - 6 files changed, 58 insertions(+), 120 deletions(-) delete mode 100644 core/src/interop/runtime.rs diff --git a/core/src/imports.rs b/core/src/imports.rs index 1ae5c10..d17690c 100644 --- a/core/src/imports.rs +++ b/core/src/imports.rs @@ -69,7 +69,7 @@ pub use crate::events::{ApplicationEventsChannel, Events}; pub use crate::collection::Collection; pub use crate::core::Core; pub use crate::interop; -pub use crate::interop::{spawn, spawn_with_result, Interop, Payload}; +pub use crate::interop::{spawn, spawn_with_result, Interop, Payload, Service}; pub use crate::modules; pub use crate::modules::{Module, ModuleCaps, ModuleStyle, ModuleT}; pub use crate::network::Network; diff --git a/core/src/interop/mod.rs b/core/src/interop/mod.rs index db21b1b..560bb01 100644 --- a/core/src/interop/mod.rs +++ b/core/src/interop/mod.rs @@ -9,21 +9,33 @@ cfg_if! { } } -pub mod runtime; pub mod services; - -use runtime::Runtime; use services::*; pub mod payload; pub use payload::Payload; +#[async_trait] +pub trait Service: Sync + Send { + async fn spawn(self: Arc) -> Result<()>; + async fn join(self: Arc) -> Result<()>; + fn terminate(self: Arc); + // -- + async fn attach_rpc(self: Arc, _rpc_api: Arc) -> Result<()> { + Ok(()) + } + async fn detach_rpc(self: Arc) -> Result<()> { + Ok(()) + } +} + pub struct Inner { - application_events: ApplicationEventsChannel, + // services: Mutex>>, + services: Mutex>>, kaspa: Arc, peer_monitor: Arc, metrics_service: Arc, - runtime: Runtime, + application_events: ApplicationEventsChannel, egui_ctx: egui::Context, is_running: Arc, start_time: std::time::Instant, @@ -46,15 +58,21 @@ impl Interop { settings, )); let metrics_service = Arc::new(MetricsService::new(application_events.clone(), settings)); - let runtime = Runtime::new(&[kaspa.clone(), peer_monitor.clone(), metrics_service.clone()]); + // let runtime = Runtime::new(&[kaspa.clone(), peer_monitor.clone(), metrics_service.clone()]); + + let services: Mutex>> = Mutex::new(vec![ + kaspa.clone(), + peer_monitor.clone(), + metrics_service.clone(), + ]); let interop = Self { inner: Arc::new(Inner { + services, application_events, kaspa, peer_monitor, metrics_service, - runtime, egui_ctx: egui_ctx.clone(), is_running: Arc::new(AtomicBool::new(false)), start_time: std::time::Instant::now(), @@ -70,27 +88,48 @@ impl Interop { self.inner.start_time.elapsed() } - /// Get a reference to the interop runtime. - pub fn runtime(&self) -> &Runtime { - &self.inner.runtime + pub fn start_services(&self) { + let services = self.services(); + for service in services { + spawn(async move { service.spawn().await }); + } + } + + pub fn services(&self) -> Vec> { + self.inner.services.lock().unwrap().clone() + } + + pub fn stop_services(&self) { + self.services() + .into_iter() + .for_each(|service| service.terminate()); + } + + pub async fn join_services(&self) { + let futures = self + .services() + .into_iter() + .map(|service| service.join()) + .collect::>(); + join_all(futures).await; } - pub fn services(&self) -> Vec> { - self.inner.runtime.services() + pub fn drop(&self) { + register_global(None); } - /// Start the interop runtime. + // / Start the interop runtime. pub fn start(&self) { self.inner.is_running.store(true, Ordering::SeqCst); - self.runtime().start(); + self.start_services(); } /// Shutdown interop runtime. pub async fn shutdown(&self) { if self.inner.is_running.load(Ordering::SeqCst) { self.inner.is_running.store(false, Ordering::SeqCst); - self.runtime().shutdown(); - self.runtime().join().await; + self.stop_services(); + self.join_services().await; register_global(None); } } diff --git a/core/src/interop/runtime.rs b/core/src/interop/runtime.rs deleted file mode 100644 index 93dd943..0000000 --- a/core/src/interop/runtime.rs +++ /dev/null @@ -1,100 +0,0 @@ -use crate::imports::*; - -#[async_trait] -pub trait Service: Sync + Send { - async fn spawn(self: Arc) -> Result<()>; - async fn join(self: Arc) -> Result<()>; - fn terminate(self: Arc); - // -- - async fn attach_rpc(self: Arc, _rpc_api: Arc) -> Result<()> { - Ok(()) - } - async fn detach_rpc(self: Arc) -> Result<()> { - Ok(()) - } -} - -pub struct Inner { - services: Mutex>>, - is_running: Arc, -} - -#[derive(Clone)] -pub struct Runtime { - inner: Arc, -} - -impl Default for Runtime { - fn default() -> Self { - Runtime::new(&[]) - } -} - -impl Runtime { - pub fn new(services: &[Arc]) -> Self { - let runtime = Self { - inner: Arc::new(Inner { - services: Mutex::new(services.to_vec()), - is_running: Arc::new(AtomicBool::new(false)), - }), - }; - - register_global(Some(runtime.clone())); - - runtime - } - - pub fn register_service(&self, service: Arc) { - self.inner.services.lock().unwrap().push(service); - } - - pub fn start(&self) { - self.inner.is_running.store(true, Ordering::SeqCst); - let services = self.services(); - for service in services { - spawn(async move { service.spawn().await }); - } - } - - pub fn services(&self) -> Vec> { - self.inner.services.lock().unwrap().clone() - } - - pub fn shutdown(&self) { - self.services() - .into_iter() - .for_each(|service| service.terminate()); - } - - pub async fn join(&self) { - let futures = self - .services() - .into_iter() - .map(|service| service.join()) - .collect::>(); - join_all(futures).await; - self.inner.is_running.store(false, Ordering::SeqCst); - } - - pub fn drop(&self) { - register_global(None); - } -} - -static mut RUNTIME: Option = None; - -fn _runtime() -> &'static Runtime { - unsafe { - if let Some(runtime) = &RUNTIME { - runtime - } else { - panic!("runtime not initialized") - } - } -} - -fn register_global(runtime: Option) { - unsafe { - RUNTIME = runtime; - } -} diff --git a/core/src/interop/services/kaspa/mod.rs b/core/src/interop/services/kaspa/mod.rs index 31a43c2..3430c87 100644 --- a/core/src/interop/services/kaspa/mod.rs +++ b/core/src/interop/services/kaspa/mod.rs @@ -1,7 +1,7 @@ use std::time::Duration; use crate::imports::*; -use crate::interop::runtime::Service; +use crate::interop::Service; pub use futures::{future::FutureExt, select, Future}; // use kaspa_metrics::{Metric, Metrics, MetricsSnapshot}; #[allow(unused_imports)] diff --git a/core/src/interop/services/metrics.rs b/core/src/interop/services/metrics.rs index ae5333e..4af7378 100644 --- a/core/src/interop/services/metrics.rs +++ b/core/src/interop/services/metrics.rs @@ -1,7 +1,7 @@ // use std::time::Duration; use crate::imports::*; -use crate::interop::runtime::Service; +use crate::interop::Service; pub use futures::{future::FutureExt, select, Future}; use kaspa_metrics::{Metric, Metrics, MetricsSnapshot}; #[allow(unused_imports)] diff --git a/core/src/interop/services/peers.rs b/core/src/interop/services/peers.rs index 0dc2cb4..fc0c7e3 100644 --- a/core/src/interop/services/peers.rs +++ b/core/src/interop/services/peers.rs @@ -1,7 +1,6 @@ // use std::time::Duration; use crate::imports::*; -use crate::interop::runtime::Service; pub use futures::{future::FutureExt, select, Future}; use kaspa_rpc_core::RpcPeerInfo; // use kaspa_metrics::{Metric, Metrics, MetricsSnapshot};