Skip to content

Commit

Permalink
Implement version checking in IPC
Browse files Browse the repository at this point in the history
implement version checking; streamed IPC

streamed IPC will allow multiple requests per connection

add nonsense request

change inline struct to json macro

only check version if request actually fails

fix usage of inspect_err (MSRV 1.72.0; stabilized 1.76.0)

"nonsense request" -> "return error"

oneshot connections
  • Loading branch information
sodiboo authored and YaLTeR committed Apr 19, 2024
1 parent 0b93c46 commit c462763
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 53 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ anyhow = "1.0.81"
bitflags = "2.5.0"
clap = { version = "~4.4.18", features = ["derive"] }
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.115"
tracing = { version = "0.1.40", features = ["max_level_trace", "release_max_level_debug"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracy-client = { version = "0.17.0", default-features = false }
Expand Down Expand Up @@ -67,7 +68,7 @@ portable-atomic = { version = "1.6.0", default-features = false, features = ["fl
profiling = "1.0.15"
sd-notify = "0.4.1"
serde.workspace = true
serde_json = "1.0.115"
serde_json.workspace = true
smithay-drm-extras.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
Expand Down
1 change: 1 addition & 0 deletions niri-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository.workspace = true
[dependencies]
clap = { workspace = true, optional = true }
serde.workspace = true
serde_json.workspace = true

[features]
clap = ["dep:clap"]
11 changes: 9 additions & 2 deletions niri-ipc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ use std::str::FromStr;

use serde::{Deserialize, Serialize};

/// Name of the environment variable containing the niri IPC socket path.
pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET";
mod socket;

pub use socket::{NiriSocket, SOCKET_PATH_ENV};

/// Request from client to niri.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Request {
/// Always responds with an error. (For testing error handling)
ReturnError,
/// Request the version string for the running niri instance.
Version,
/// Request information about connected outputs.
Outputs,
/// Request information about the focused window.
Expand All @@ -35,6 +40,8 @@ pub type Reply = Result<Response, String>;
pub enum Response {
/// A request that does not need a response was handled successfully.
Handled,
/// The version string for the running niri instance.
Version(String),
/// Information about connected outputs.
///
/// Map from connector name to output info.
Expand Down
73 changes: 73 additions & 0 deletions niri-ipc/src/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::io::{self, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;

use serde_json::de::IoRead;
use serde_json::StreamDeserializer;

use crate::{Reply, Request};

/// Name of the environment variable containing the niri IPC socket path.
pub const SOCKET_PATH_ENV: &str = "NIRI_SOCKET";

/// A client for the niri IPC server.
///
/// This struct is used to communicate with the niri IPC server. It handles the socket connection
/// and serialization/deserialization of messages.
pub struct NiriSocket {
stream: UnixStream,
responses: StreamDeserializer<'static, IoRead<UnixStream>, Reply>,
}

impl TryFrom<UnixStream> for NiriSocket {
type Error = io::Error;
fn try_from(stream: UnixStream) -> io::Result<Self> {
let responses = serde_json::Deserializer::from_reader(stream.try_clone()?).into_iter();
Ok(Self { stream, responses })
}
}

impl NiriSocket {
/// Connects to the default niri IPC socket
///
/// This is equivalent to calling [Self::connect] with the value of the [SOCKET_PATH_ENV]
/// environment variable.
pub fn new() -> io::Result<Self> {
let socket_path = std::env::var_os(SOCKET_PATH_ENV).ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
format!("{SOCKET_PATH_ENV} is not set, are you running this within niri?"),
)
})?;
Self::connect(socket_path)
}

/// Connect to the socket at the given path
///
/// See also: [UnixStream::connect]
pub fn connect(path: impl AsRef<Path>) -> io::Result<Self> {
Self::try_from(UnixStream::connect(path.as_ref())?)
}

/// Handle a request to the niri IPC server
///
/// # Returns
/// Ok(Ok([Response](crate::Response))) corresponds to a successful response from the running
/// niri instance. Ok(Err([String])) corresponds to an error received from the running niri
/// instance. Err([std::io::Error]) corresponds to an error in the IPC communication.
pub fn send(mut self, request: Request) -> io::Result<Reply> {
let mut buf = serde_json::to_vec(&request).unwrap();
writeln!(buf).unwrap();
self.stream.write_all(&buf)?; // .context("error writing IPC request")?;
self.stream.flush()?;

if let Some(next) = self.responses.next() {
next.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
} else {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"no response from server",
))
}
}
}
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ pub enum Sub {

#[derive(Subcommand)]
pub enum Msg {
/// Print an error message.
Error,
/// Print the version string of the running niri instance.
Version,
/// List connected outputs.
Outputs,
/// Print information about the focused window.
Expand Down
119 changes: 84 additions & 35 deletions src/ipc/client.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,103 @@
use std::env;
use std::io::{Read, Write};
use std::net::Shutdown;
use std::os::unix::net::UnixStream;

use anyhow::{anyhow, bail, Context};
use niri_ipc::{LogicalOutput, Mode, Output, Reply, Request, Response};
use anyhow::{bail, Context};
use niri_ipc::{LogicalOutput, Mode, NiriSocket, Output, Request, Response};
use serde_json::json;

use crate::cli::Msg;
use crate::utils::version;

pub fn handle_msg(msg: Msg, json: bool) -> anyhow::Result<()> {
let socket_path = env::var_os(niri_ipc::SOCKET_PATH_ENV).with_context(|| {
format!(
"{} is not set, are you running this within niri?",
niri_ipc::SOCKET_PATH_ENV
)
})?;
let client = NiriSocket::new()
.context("a communication error occured while trying to initialize the socket")?;

let mut stream =
UnixStream::connect(socket_path).context("error connecting to {socket_path}")?;
// Default SIGPIPE so that our prints don't panic on stdout closing.
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
}

let request = match &msg {
Msg::Error => Request::ReturnError,
Msg::Version => Request::Version,
Msg::Outputs => Request::Outputs,
Msg::FocusedWindow => Request::FocusedWindow,
Msg::Action { action } => Request::Action(action.clone()),
};
let mut buf = serde_json::to_vec(&request).unwrap();
stream
.write_all(&buf)
.context("error writing IPC request")?;
stream
.shutdown(Shutdown::Write)
.context("error closing IPC stream for writing")?;

buf.clear();
stream
.read_to_end(&mut buf)
.context("error reading IPC response")?;
let reply = client
.send(request)
.context("a communication error occurred while sending request to niri")?;

let response = match reply {
Ok(r) => r,
Err(err_msg) => {
eprintln!("The compositor returned an error:");
eprintln!();
eprintln!("{err_msg}");

if matches!(msg, Msg::Version) {
eprintln!();
eprintln!("Note: unable to get the compositor's version.");
eprintln!("Did you forget to restart niri after an update?");
} else {
match NiriSocket::new().and_then(|client| client.send(Request::Version)) {
Ok(Ok(Response::Version(server_version))) => {
let my_version = version();
if my_version != server_version {
eprintln!();
eprintln!("Note: niri msg was invoked with a different version of niri than the running compositor.");
eprintln!("niri msg: {my_version}");
eprintln!("compositor: {server_version}");
eprintln!("Did you forget to restart niri after an update?");
}
}
Ok(Ok(_)) => {
// nonsensical response, do not add confusing context
}
Ok(Err(_)) => {
eprintln!();
eprintln!("Note: unable to get the compositor's version.");
eprintln!("Did you forget to restart niri after an update?");
}
Err(_) => {
// communication error, do not add irrelevant context
}
}
}

let reply: Reply = serde_json::from_slice(&buf).context("error parsing IPC reply")?;
return Ok(());
}
};

let response = reply
.map_err(|msg| anyhow!(msg))
.context("niri could not handle the request")?;
match msg {
Msg::Error => {
bail!("unexpected response: expected an error, got {response:?}");
}
Msg::Version => {
let Response::Version(server_version) = response else {
bail!("unexpected response: expected Version, got {response:?}");
};

// Default SIGPIPE so that our prints don't panic on stdout closing.
unsafe {
libc::signal(libc::SIGPIPE, libc::SIG_DFL);
}
if json {
println!(
"{}",
json!({
"cli": version(),
"compositor": server_version,
})
);
return Ok(());
}

match msg {
let client_version = version();

println!("niri msg is {client_version}");
println!("the compositor is {server_version}");
if client_version != server_version {
eprintln!();
eprintln!("These are different");
eprintln!("Did you forget to restart niri after an update?");
}
println!();
}
Msg::Outputs => {
let Response::Outputs(outputs) = response else {
bail!("unexpected response: expected Outputs, got {response:?}");
Expand Down
46 changes: 31 additions & 15 deletions src/ipc/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::io::Write;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
Expand All @@ -7,8 +8,8 @@ use anyhow::Context;
use calloop::io::Async;
use directories::BaseDirs;
use futures_util::io::{AsyncReadExt, BufReader};
use futures_util::{AsyncBufReadExt, AsyncWriteExt};
use niri_ipc::{Request, Response};
use futures_util::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
use niri_ipc::{Reply, Request, Response};
use smithay::desktop::Window;
use smithay::reexports::calloop::generic::Generic;
use smithay::reexports::calloop::{Interest, LoopHandle, Mode, PostAction};
Expand All @@ -18,6 +19,7 @@ use smithay::wayland::shell::xdg::XdgToplevelSurfaceData;

use crate::backend::IpcOutputMap;
use crate::niri::State;
use crate::utils::version;

pub struct IpcServer {
pub socket_path: PathBuf,
Expand Down Expand Up @@ -106,29 +108,43 @@ fn on_new_ipc_client(state: &mut State, stream: UnixStream) {

async fn handle_client(ctx: ClientCtx, stream: Async<'_, UnixStream>) -> anyhow::Result<()> {
let (read, mut write) = stream.split();
let mut buf = String::new();

// Read a single line to allow extensibility in the future to keep reading.
BufReader::new(read)
.read_line(&mut buf)
.await
.context("error reading request")?;
// note that we can't use the stream json deserializer here
// because the stream is asynchronous and the deserializer doesn't support that
// https://github.com/serde-rs/json/issues/575

let reply = process(&ctx, &buf).map_err(|err| {
let mut lines = BufReader::new(read).lines();

let line = match lines.next().await.unwrap_or(Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Unreachable; BufReader returned None but when the stream ends, the connection should be reset"))) {
Ok(line) => line,
Err(err) if err.kind() == io::ErrorKind::ConnectionReset => return Ok(()),
Err(err) => return Err(err).context("error reading line"),
};

let reply: Reply = serde_json::from_str(&line)
.map_err(|err| format!("error parsing request: {err}"))
.and_then(|req| process(&ctx, req));

if let Err(err) = &reply {
warn!("error processing IPC request: {err:?}");
err.to_string()
});
}

let buf = serde_json::to_vec(&reply).context("error formatting reply")?;
let mut buf = serde_json::to_vec(&reply).context("error formatting reply")?;
writeln!(buf).unwrap();
write.write_all(&buf).await.context("error writing reply")?;
write.flush().await.context("error flushing reply")?;

// We do not check for more lines at this moment.
// Dropping the stream will reset the connection before we read them.
// For now, a client should not be sending more than one request per connection.

Ok(())
}

fn process(ctx: &ClientCtx, buf: &str) -> anyhow::Result<Response> {
let request: Request = serde_json::from_str(buf).context("error parsing request")?;

fn process(ctx: &ClientCtx, request: Request) -> Reply {
let response = match request {
Request::ReturnError => return Err("client wanted an error".into()),
Request::Version => Response::Version(version()),
Request::Outputs => {
let ipc_outputs = ctx.ipc_outputs.lock().unwrap().clone();
Response::Outputs(ipc_outputs)
Expand Down

0 comments on commit c462763

Please sign in to comment.