Skip to content

Commit

Permalink
Initial support sending logs to user
Browse files Browse the repository at this point in the history
when user send request to daemon, daemon also redirect plugin logs to user.

The `NipartNativePlugin:log()` is introduced allowing native plugin to
send logs to user.

Signed-off-by: Gris Ge <[email protected]>
  • Loading branch information
cathay4t committed Aug 27, 2024
1 parent 60bd424 commit 9edfe23
Show file tree
Hide file tree
Showing 20 changed files with 540 additions and 274 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/target
Cargo.lock
*.swp
tags
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rust-version = "1.75"
[workspace.dependencies]
serde = { version = "1.0.144", features = ["derive"] }
env_logger = "0.9.0"
log = "0.4.17"
log = { version = "0.4.17", features = ["std"] }
serde_json = "1.0.87"
serde_yaml = "0.9.27"
uuid = { version = "1.6.1", default-features = false, features = ["std", "v7"] }
Expand Down
2 changes: 1 addition & 1 deletion src/cli/nipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn main() -> Result<(), CliError> {
.get_matches();

let mut log_builder = env_logger::Builder::new();
log_builder.filter(Some("nipart"), log::LevelFilter::Info);
log_builder.filter(Some("nipart"), log::LevelFilter::Debug);
log_builder.filter(None, log::LevelFilter::Debug);
log_builder.init();

Expand Down
66 changes: 54 additions & 12 deletions src/daemon/api_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex};

use nipart::{
ErrorKind, NipartConnection, NipartConnectionListener, NipartError,
NipartEvent, NipartEventAddress, NipartPluginEvent,
NipartEvent, NipartEventAddress, NipartPluginEvent, NipartUserEvent,
};

use tokio::sync::mpsc::{Receiver, Sender};
Expand Down Expand Up @@ -65,17 +65,7 @@ async fn api_thread(
if event.dst == NipartEventAddress::Daemon {
handle_daemon_event(event);
} else {
let tx = if let Ok(mut queue) = tracking_queue.lock() {
queue.remove(&event.uuid)
} else {None};
if let Some(tx) = tx {
if let Err(e) = tx.send(event.clone()).await {
log::warn!("Failed to reply event to user {e}") ;
}
} else {
log::warn!(
"Discarding event for disconnected user {event:?}");
}
send_reply_to_client(tracking_queue.clone(), event).await;
}
}
}
Expand All @@ -93,6 +83,28 @@ async fn handle_client(
tokio::select! {
Ok(mut event) = np_conn.recv::<NipartEvent>() => {
log::trace!("handle_client(): from user {event:?}");
if event.plugin != NipartPluginEvent::None {
log::debug!(
"handle_client(): discard invalid API request {event}");
let reply = NipartEvent::new_with_uuid(
event.uuid,
NipartUserEvent::Error(
NipartError::new(
ErrorKind::InvalidArgument,
format!("API request is not allowed to set \
plugin event, but got: {event}"))
),
NipartPluginEvent::None,
NipartEventAddress::Daemon,
NipartEventAddress::User,
event.timeout,
);
if let Err(e) = np_conn.send(&reply).await {
log::error!("{e}");
}
continue;
}

// Redirect user request to Commander
event.dst = NipartEventAddress::Commander;
if let Ok(mut queue) = tracking_queue.lock() {
Expand Down Expand Up @@ -159,3 +171,33 @@ fn handle_daemon_event(event: NipartEvent) {
log::warn!("API thread go unexpected daemon event {event}");
}
}

async fn send_reply_to_client(
tracking_queue: Arc<Mutex<BTreeMap<u128, Sender<NipartEvent>>>>,
event: NipartEvent,
) {
let tx = match tracking_queue.lock() {
Ok(mut queue) => {
// We cannot use `get_mut()` here because MutexGuard is not `Send`.
// hence cannot be used for await.
queue.remove(&event.uuid)
}
Err(e) => {
log::error!("BUG: api_thread() Failed to lock tracking_queue: {e}");
None
}
};

if let Some(tx) = tx {
if let Err(e) = tx.send(event.clone()).await {
log::warn!("Failed to reply event to user {e}");
}
if event.is_log() {
if let Ok(mut queue) = tracking_queue.lock() {
queue.insert(event.uuid, tx);
}
}
} else {
log::debug!("Discarding event for disconnected user {event:?}");
}
}
136 changes: 28 additions & 108 deletions src/daemon/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{DEFAULT_TIMEOUT, MPSC_CHANNLE_SIZE};
const PLUGIN_PREFIX: &str = "nipart_plugin_";
const QUERY_PLUGIN_RETRY: usize = 5;
const QUERY_PLUGIN_RETRY_INTERAL: u64 = 500; // milliseconds
const PLUGIN_DEFAULT_LOG_LEVEL: NipartLogLevel = NipartLogLevel::Debug;

pub(crate) type PluginConnections = HashMap<String, PluginConnection>;

Expand Down Expand Up @@ -98,10 +99,9 @@ pub(crate) struct Plugins {
impl Plugins {
pub(crate) fn insert(
&mut self,
name: &str,
roles: Vec<NipartRole>,
connection: PluginConnection,
plugin: (&str, Vec<NipartRole>, PluginConnection),
) {
let (name, roles, connection) = plugin;
self.roles.insert(name, roles);
self.connections.insert(name.to_string(), connection);
}
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Plugins {
.await
{
Ok((conn, roles)) => {
self.insert(&plugin_name, roles, conn);
self.insert((&plugin_name, roles, conn));
}
Err(e) => {
log::warn!(
Expand All @@ -159,31 +159,11 @@ impl Plugins {
}

async fn load_native_plugins(&mut self) -> Result<(), NipartError> {
self.insert(
"nispor",
NipartPluginNispor::roles(),
start_nispor_plugin().await?,
);
self.insert(
"mozim",
NipartPluginMozim::roles(),
start_mozim_plugin().await?,
);
self.insert(
"baize",
NipartPluginBaize::roles(),
start_baize_plugin().await?,
);
self.insert(
"sima",
NipartPluginSima::roles(),
start_sima_plugin().await?,
);
self.insert(
"smith",
NipartPluginSmith::roles(),
start_smith_plugin().await?,
);
self.insert(start_plugin::<NipartPluginNispor>().await?);
self.insert(start_plugin::<NipartPluginMozim>().await?);
self.insert(start_plugin::<NipartPluginBaize>().await?);
self.insert(start_plugin::<NipartPluginSima>().await?);
self.insert(start_plugin::<NipartPluginSmith>().await?);
Ok(())
}

Expand Down Expand Up @@ -384,88 +364,28 @@ async fn get_external_plugin_info(
}
}

async fn start_nispor_plugin() -> Result<PluginConnection, NipartError> {
let (nispor_to_switch_tx, nispor_to_switch_rx) =
async fn start_plugin<T>(
) -> Result<(&'static str, Vec<NipartRole>, PluginConnection), NipartError>
where
T: NipartNativePlugin,
{
let (plugin_to_switch_tx, plugin_to_switch_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);
let (switch_to_nispor_tx, switch_to_nispor_rx) =
let (switch_to_plugin_tx, switch_to_plugin_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);

let mut plugin =
NipartPluginNispor::init(nispor_to_switch_tx, switch_to_nispor_rx)
.await?;
let mut plugin = T::init(
PLUGIN_DEFAULT_LOG_LEVEL,
plugin_to_switch_tx,
switch_to_plugin_rx,
)
.await?;

tokio::spawn(async move { plugin.run().await });
log::info!("Native plugin nispor started");
Ok(PluginConnection::Mpsc((
switch_to_nispor_tx,
nispor_to_switch_rx,
)))
}

async fn start_mozim_plugin() -> Result<PluginConnection, NipartError> {
let (mozim_to_switch_tx, mozim_to_switch_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);
let (switch_to_mozim_tx, switch_to_mozim_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);

let mut mozim_plugin =
NipartPluginMozim::init(mozim_to_switch_tx, switch_to_mozim_rx).await?;

tokio::spawn(async move { mozim_plugin.run().await });
log::info!("Native plugin mozim started");
Ok(PluginConnection::Mpsc((
switch_to_mozim_tx,
mozim_to_switch_rx,
)))
}

async fn start_baize_plugin() -> Result<PluginConnection, NipartError> {
let (baize_to_switch_tx, baize_to_switch_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);
let (switch_to_baize_tx, switch_to_baize_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);

let mut baize_plugin =
NipartPluginBaize::init(baize_to_switch_tx, switch_to_baize_rx).await?;

tokio::spawn(async move { baize_plugin.run().await });
log::info!("Native plugin baize started");
Ok(PluginConnection::Mpsc((
switch_to_baize_tx,
baize_to_switch_rx,
)))
}

async fn start_smith_plugin() -> Result<PluginConnection, NipartError> {
let (smith_to_switch_tx, smith_to_switch_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);
let (switch_to_smith_tx, switch_to_smith_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);

let mut plugin =
NipartPluginSmith::init(smith_to_switch_tx, switch_to_smith_rx).await?;

tokio::spawn(async move { plugin.run().await });
log::info!("Native plugin smith started");
Ok(PluginConnection::Mpsc((
switch_to_smith_tx,
smith_to_switch_rx,
)))
}

async fn start_sima_plugin() -> Result<PluginConnection, NipartError> {
let (sima_to_switch_tx, sima_to_switch_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);
let (switch_to_sima_tx, switch_to_sima_rx) =
tokio::sync::mpsc::channel(MPSC_CHANNLE_SIZE);

let mut sima_plugin =
NipartPluginSima::init(sima_to_switch_tx, switch_to_sima_rx).await?;

tokio::spawn(async move { sima_plugin.run().await });
log::info!("Native plugin sima started");
Ok(PluginConnection::Mpsc((
switch_to_sima_tx,
sima_to_switch_rx,
)))
log::info!("Native plugin {} started", T::PLUGIN_NAME);
Ok((
T::PLUGIN_NAME,
T::roles(),
PluginConnection::Mpsc((switch_to_plugin_tx, plugin_to_switch_rx)),
))
}
8 changes: 7 additions & 1 deletion src/daemon/switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn run_event_switch(
plugin_futures.push(plugin_conn.recv());
}

let event = tokio::select! {
let mut event = tokio::select! {
Some(Ok(event)) = plugin_futures.next() => {
log::trace!("run_event_switch(): from plugin {event:?}");
log::debug!("run_event_switch(): from plugin {event}");
Expand All @@ -70,6 +70,12 @@ async fn run_event_switch(
};
drop(plugin_futures);

// For log event, we redirect to user
if event.is_log() {
event.emit_log();
event.dst = NipartEventAddress::User;
}

if event.postpone_millis > 0 {
let t = event.postpone_millis;
postponed_events
Expand Down
37 changes: 26 additions & 11 deletions src/lib/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use serde::{Deserialize, Serialize};

use crate::{
NetworkCommit, NetworkCommitQueryOption, NetworkState, NipartApplyOption,
NipartError, NipartLogLevel, NipartPluginEvent, NipartPluginInfo,
NipartQueryOption, NipartRole,
NipartError, NipartLogEntry, NipartLogLevel, NipartPluginEvent,
NipartPluginInfo, NipartQueryOption, NipartRole,
};

#[derive(
Expand Down Expand Up @@ -38,15 +38,15 @@ pub enum NipartEventAddress {
impl std::fmt::Display for NipartEventAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::User => write!(f, "event_address.user"),
Self::Unicast(v) => write!(f, "event_address.{v}"),
Self::Daemon => write!(f, "event_address.daemon"),
Self::Commander => write!(f, "event_address.commander"),
Self::Dhcp => write!(f, "event_address.dhcp"),
Self::Track => write!(f, "event_address.track"),
Self::Group(v) => write!(f, "event_address.group:{v}"),
Self::AllPlugins => write!(f, "event_address.all_plugins"),
Self::Locker => write!(f, "event_address.locker"),
Self::User => write!(f, "user"),
Self::Unicast(v) => write!(f, "{v}"),
Self::Daemon => write!(f, "daemon"),
Self::Commander => write!(f, "commander"),
Self::Dhcp => write!(f, "dhcp"),
Self::Track => write!(f, "track"),
Self::Group(v) => write!(f, "group/{v}"),
Self::AllPlugins => write!(f, "all_plugins"),
Self::Locker => write!(f, "locker"),
}
}
}
Expand Down Expand Up @@ -137,6 +137,17 @@ impl NipartEvent {
Ok(self)
}
}

pub fn is_log(&self) -> bool {
matches!(self.user, NipartUserEvent::Log(_))
}

pub fn emit_log(&self) {
if let NipartUserEvent::Log(log_entry) = &self.user {
let log_source = format!("nipart.{}", self.src);
log_entry.emit_log(log_source.as_str())
}
}
}

impl From<NipartError> for NipartEvent {
Expand Down Expand Up @@ -174,6 +185,9 @@ pub enum NipartUserEvent {

QueryCommits(NetworkCommitQueryOption),
QueryCommitsReply(Box<Vec<NetworkCommit>>),

/// Plugin or daemon logs to user
Log(NipartLogEntry),
}

impl std::fmt::Display for NipartUserEvent {
Expand All @@ -199,6 +213,7 @@ impl std::fmt::Display for NipartUserEvent {
Self::ApplyNetStateReply => "user_event.apply_netstate_reply",
Self::QueryCommits(_) => "user_event.query_commits",
Self::QueryCommitsReply(_) => "user_event.query_commits_reply",
Self::Log(_) => "user_event.log",
}
)
}
Expand Down
Loading

0 comments on commit 9edfe23

Please sign in to comment.