Skip to content

Commit

Permalink
Make ksni runtime-agnostic
Browse files Browse the repository at this point in the history
  • Loading branch information
iovxw committed Mar 3, 2024
1 parent 30effc1 commit 9bd67f3
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 20 deletions.
23 changes: 21 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,28 @@ keywords = ["systray", "linux", "gui"]
categories = ["api-bindings"]
license = "Unlicense"

[features]
default = ["tokio"]
tokio = ["dep:tokio", "zbus/tokio"]
async-io = [
"dep:async-io",
"dep:async-executor",
"dep:futures-lite",
"dep:futures-channel",
"dep:futures-util",
"zbus/async-io",
]

[dependencies]
thiserror = "1"
futures = { version = "0.3", default-features = false }
tokio = { version = "1", features = ["rt", "macros"] }
zbus = { version = "4", features = ["tokio"], default-features = false }
zbus = { version = "4", default-features = false }
serde = { version = "1", features = ["derive"] }

tokio = { version = "1", features = ["rt", "macros"], optional = true }

async-io = { version = "2", optional = true }
async-executor = { version = "1", optional = true }
futures-lite = { version = "2", optional = true }
futures-channel = { version = "0.3", optional = true }
futures-util = { version = "0.3", optional = true }
82 changes: 82 additions & 0 deletions src/compat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#[cfg(all(not(feature = "async-io"), not(feature = "tokio")))]
compile_error!(r#"Either "tokio" (default) or "async-io" must be enabled."#);

#[cfg(feature = "tokio")]
pub use tokio::select;

#[cfg(feature = "async-io")]
#[macro_export]
macro_rules! select {
($(Some($val:ident) = $exp:expr => $blk:block)*) => {
futures_util::select! {
$( $val = $exp => {
let Some($val) = $val else { continue };
$blk
} )*
}
};
}
#[cfg(feature = "async-io")]
pub use crate::select;

#[cfg(feature = "tokio")]
pub mod mpsc {
pub use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
}

#[cfg(feature = "async-io")]
pub mod mpsc {
use futures::StreamExt;

pub use futures_channel::mpsc::TrySendError as SendError;

pub fn unbounded_channel<T>() -> (UnboundedSender<T>, UnboundedReceiver<T>) {
let (tx, rx) = futures_channel::mpsc::unbounded();
(UnboundedSender(tx), UnboundedReceiver(rx))
}

pub struct UnboundedSender<T>(futures_channel::mpsc::UnboundedSender<T>);
impl<T> UnboundedSender<T> {
pub fn send(&self, value: T) -> Result<(), SendError<T>> {
self.0.unbounded_send(value)
}
}
impl<T> Clone for UnboundedSender<T> {
fn clone(&self) -> Self {
UnboundedSender(self.0.clone())
}
}

pub struct UnboundedReceiver<T>(futures_channel::mpsc::UnboundedReceiver<T>);
impl<T> UnboundedReceiver<T> {
pub fn recv(
&mut self,
) -> futures_util::stream::Next<'_, futures_channel::mpsc::UnboundedReceiver<T>> {
self.0.next()
}
}
}

#[cfg(feature = "tokio")]
pub mod oneshot {
pub use tokio::sync::oneshot::{channel, Receiver, Sender};
}

#[cfg(feature = "async-io")]
pub mod oneshot {
pub use futures_channel::oneshot::{channel, Receiver, Sender};
// use std::future::Future;
//
// pub use async_channel::Sender;
// pub fn channel<T>() -> (
// Sender<T>,
// Receiver<T>,
// ) {
// // The concurrent-queue that used by async-channel has
// // single-capacity optimization, performace is fine
// let (tx, rx) = async_channel::bounded(1);
// let rx = async move { rx.recv().await };
// (tx, rx)
// }
// pub type Receiver<T> = impl Future<Output = Result<T, async_channel::RecvError>>;
}
14 changes: 7 additions & 7 deletions src/dbus_interface.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use zbus::zvariant::{ObjectPath, OwnedValue, Type, Value};
use zbus::SignalContext;

use crate::compat::{mpsc, oneshot};
use crate::{Icon, ToolTip};

pub const SNI_PATH: &str = "/StatusNotifierItem";
Expand Down Expand Up @@ -73,12 +73,12 @@ pub enum SniProperty {
}

pub struct StatusNotifierItem {
sender: tokio::sync::mpsc::UnboundedSender<SniMessage>,
sender: mpsc::UnboundedSender<SniMessage>,
}

impl StatusNotifierItem {
pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<SniMessage>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
pub fn new() -> (Self, mpsc::UnboundedReceiver<SniMessage>) {
let (tx, rx) = mpsc::unbounded_channel();
(StatusNotifierItem { sender: tx }, rx)
}

Expand Down Expand Up @@ -263,12 +263,12 @@ pub enum DbusMenuProperty {
}

pub struct DbusMenu {
sender: tokio::sync::mpsc::UnboundedSender<DbusMenuMessage>,
sender: mpsc::UnboundedSender<DbusMenuMessage>,
}

impl DbusMenu {
pub fn new() -> (Self, tokio::sync::mpsc::UnboundedReceiver<DbusMenuMessage>) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
pub fn new() -> (Self, mpsc::UnboundedReceiver<DbusMenuMessage>) {
let (tx, rx) = mpsc::unbounded_channel();
(DbusMenu { sender: tx }, rx)
}

Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//!
//! See the [README.md](https://github.com/iovxw/ksni) for an example

mod compat;
mod dbus_interface;
pub mod menu;
mod service;
Expand All @@ -12,6 +13,8 @@ pub use menu::{MenuItem, TextDirection};
pub use service::{run_async, spawn};
pub use tray::{Category, Icon, Status, ToolTip};

use crate::compat::mpsc;

/// A system tray, implement this to create your tray
///
/// **NOTE**: On some system trays, [`Tray::id`] is a required property to avoid unexpected behaviors
Expand Down Expand Up @@ -168,7 +171,7 @@ pub enum ClientRequest<T> {

/// Handle to the tray
pub struct Handle<T> {
sender: tokio::sync::mpsc::UnboundedSender<ClientRequest<T>>,
sender: mpsc::UnboundedSender<ClientRequest<T>>,
}

impl<T> Handle<T> {
Expand Down
32 changes: 22 additions & 10 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use zbus::fdo::DBusProxy;
use zbus::zvariant::{OwnedValue, Str};
use zbus::Connection;

use crate::compat::mpsc;
use crate::dbus_interface::{
DbusMenu, DbusMenuMessage, DbusMenuProperty, LayoutItem, SniMessage, SniProperty,
StatusNotifierItem, StatusNotifierWatcherProxy, MENU_PATH, SNI_PATH,
};

use crate::compat::select;
use crate::menu;
use crate::tray;
use crate::{ClientRequest, Handle, Tray};
Expand All @@ -20,17 +22,27 @@ static COUNTER: AtomicUsize = AtomicUsize::new(1);

// TODO: don't use zbus result publicly(?)
pub fn spawn<T: Tray + Send + 'static>(tray: T) -> zbus::Result<Handle<T>> {
let (client_tx, client_rx) = tokio::sync::mpsc::unbounded_channel::<ClientRequest<T>>();
let (client_tx, client_rx) = mpsc::unbounded_channel::<ClientRequest<T>>();
std::thread::Builder::new()
.name("ksni-tokio".into())
.spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio::new_current_thread()");
rt.block_on(async move {
let _ = run_async(tray, client_rx).await;
});
#[cfg(feature = "tokio")]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("tokio::new_current_thread()");
rt.block_on(async move {
let _ = run_async(tray, client_rx).await;
});
}
#[cfg(feature = "async-io")]
{
let ex = async_executor::LocalExecutor::new();
futures_lite::future::block_on(ex.run(async move {
let _ = run_async(tray, client_rx).await;
}));
}
})
.map_err(|e| zbus::Error::Failure(e.to_string()))?;

Expand All @@ -39,7 +51,7 @@ pub fn spawn<T: Tray + Send + 'static>(tray: T) -> zbus::Result<Handle<T>> {

pub async fn run_async<T: Tray + Send + 'static>(
tray: T,
mut client_rx: tokio::sync::mpsc::UnboundedReceiver<ClientRequest<T>>,
mut client_rx: mpsc::UnboundedReceiver<ClientRequest<T>>,
) -> zbus::Result<()> {
let conn = Connection::session().await.unwrap();
let name = format!(
Expand Down Expand Up @@ -85,7 +97,7 @@ pub async fn run_async<T: Tray + Send + 'static>(
revision: 0,
};
loop {
tokio::select! {
select! {
Some(event) = name_changed_signal.next() => {
if let Ok(args) = event.args() {
match args.new_owner().as_ref() {
Expand Down

0 comments on commit 9bd67f3

Please sign in to comment.