Skip to content

Commit

Permalink
wip - refactoring interop and runtime 1/2
Browse files Browse the repository at this point in the history
  • Loading branch information
aspect committed Nov 18, 2023
1 parent 9a49472 commit b738d08
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 120 deletions.
2 changes: 1 addition & 1 deletion core/src/imports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub use crate::events::{ApplicationEventsChannel, Events};
pub use crate::collection::Collection;
pub use crate::core::Core;
pub use crate::interop;
pub use crate::interop::{spawn, spawn_with_result, Interop, Payload};
pub use crate::interop::{spawn, spawn_with_result, Interop, Payload, Service};
pub use crate::modules;
pub use crate::modules::{Module, ModuleCaps, ModuleStyle, ModuleT};
pub use crate::network::Network;
Expand Down
71 changes: 55 additions & 16 deletions core/src/interop/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,33 @@ cfg_if! {
}
}

pub mod runtime;
pub mod services;

use runtime::Runtime;
use services::*;

pub mod payload;
pub use payload::Payload;

#[async_trait]
pub trait Service: Sync + Send {
async fn spawn(self: Arc<Self>) -> Result<()>;
async fn join(self: Arc<Self>) -> Result<()>;
fn terminate(self: Arc<Self>);
// --
async fn attach_rpc(self: Arc<Self>, _rpc_api: Arc<dyn RpcApi>) -> Result<()> {
Ok(())
}
async fn detach_rpc(self: Arc<Self>) -> Result<()> {
Ok(())
}
}

pub struct Inner {
application_events: ApplicationEventsChannel,
// services: Mutex<Vec<Arc<dyn Service + Send + Sync + 'static>>>,
services: Mutex<Vec<Arc<dyn Service>>>,
kaspa: Arc<KaspaService>,
peer_monitor: Arc<PeerMonitorService>,
metrics_service: Arc<MetricsService>,
runtime: Runtime,
application_events: ApplicationEventsChannel,
egui_ctx: egui::Context,
is_running: Arc<AtomicBool>,
start_time: std::time::Instant,
Expand All @@ -46,15 +58,21 @@ impl Interop {
settings,
));
let metrics_service = Arc::new(MetricsService::new(application_events.clone(), settings));
let runtime = Runtime::new(&[kaspa.clone(), peer_monitor.clone(), metrics_service.clone()]);
// let runtime = Runtime::new(&[kaspa.clone(), peer_monitor.clone(), metrics_service.clone()]);

let services: Mutex<Vec<Arc<dyn Service>>> = Mutex::new(vec![
kaspa.clone(),
peer_monitor.clone(),
metrics_service.clone(),
]);

let interop = Self {
inner: Arc::new(Inner {
services,
application_events,
kaspa,
peer_monitor,
metrics_service,
runtime,
egui_ctx: egui_ctx.clone(),
is_running: Arc::new(AtomicBool::new(false)),
start_time: std::time::Instant::now(),
Expand All @@ -70,27 +88,48 @@ impl Interop {
self.inner.start_time.elapsed()
}

/// Get a reference to the interop runtime.
pub fn runtime(&self) -> &Runtime {
&self.inner.runtime
pub fn start_services(&self) {
let services = self.services();
for service in services {
spawn(async move { service.spawn().await });
}
}

pub fn services(&self) -> Vec<Arc<dyn Service>> {
self.inner.services.lock().unwrap().clone()
}

pub fn stop_services(&self) {
self.services()
.into_iter()
.for_each(|service| service.terminate());
}

pub async fn join_services(&self) {
let futures = self
.services()
.into_iter()
.map(|service| service.join())
.collect::<Vec<_>>();
join_all(futures).await;
}

pub fn services(&self) -> Vec<Arc<dyn runtime::Service + Send + Sync + 'static>> {
self.inner.runtime.services()
pub fn drop(&self) {
register_global(None);
}

/// Start the interop runtime.
// / Start the interop runtime.
pub fn start(&self) {
self.inner.is_running.store(true, Ordering::SeqCst);
self.runtime().start();
self.start_services();
}

/// Shutdown interop runtime.
pub async fn shutdown(&self) {
if self.inner.is_running.load(Ordering::SeqCst) {
self.inner.is_running.store(false, Ordering::SeqCst);
self.runtime().shutdown();
self.runtime().join().await;
self.stop_services();
self.join_services().await;
register_global(None);
}
}
Expand Down
100 changes: 0 additions & 100 deletions core/src/interop/runtime.rs

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/interop/services/kaspa/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::time::Duration;

use crate::imports::*;
use crate::interop::runtime::Service;
use crate::interop::Service;
pub use futures::{future::FutureExt, select, Future};
// use kaspa_metrics::{Metric, Metrics, MetricsSnapshot};
#[allow(unused_imports)]
Expand Down
2 changes: 1 addition & 1 deletion core/src/interop/services/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// use std::time::Duration;

use crate::imports::*;
use crate::interop::runtime::Service;
use crate::interop::Service;
pub use futures::{future::FutureExt, select, Future};
use kaspa_metrics::{Metric, Metrics, MetricsSnapshot};
#[allow(unused_imports)]
Expand Down
1 change: 0 additions & 1 deletion core/src/interop/services/peers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// use std::time::Duration;

use crate::imports::*;
use crate::interop::runtime::Service;
pub use futures::{future::FutureExt, select, Future};
use kaspa_rpc_core::RpcPeerInfo;
// use kaspa_metrics::{Metric, Metrics, MetricsSnapshot};
Expand Down

0 comments on commit b738d08

Please sign in to comment.