Skip to content

Commit

Permalink
Merge pull request #64 from iawia002/run-cli
Browse files Browse the repository at this point in the history
Support for running wasi:cli components
  • Loading branch information
iawia002 authored Apr 23, 2024
2 parents ed8e2e0 + 22b90fd commit 62ffbf6
Show file tree
Hide file tree
Showing 16 changed files with 672 additions and 621 deletions.
890 changes: 433 additions & 457 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ $ wacker run hello.wasm
$ wacker run time.wasm
```

Where `hello.wasm` is a simple WASM program that prints out `Hello World!` and exits, and `time.wasm` is a long-running program that constantly prints out the current time.
Where [hello.wasm](https://github.com/iawia002/wasi-examples/tree/main/hello) is a simple WASM program that prints out `Hello, world!` and exits, and [time.wasm](https://github.com/iawia002/wasi-examples/tree/main/time) is a long-running program that constantly prints out the current time.

Serve an HTTP WebAssembly program:
> You can find more WASI program examples at https://github.com/iawia002/wasi-examples.
Serve an [HTTP WebAssembly program](https://github.com/iawia002/wasi-examples/tree/main/http):

```
$ wacker serve hello_wasi_http.wasm --addr 127.0.0.1:8081
Expand All @@ -53,7 +55,7 @@ Fetch the logs:

```
$ wacker logs hello-w0AqXnf
Hello World!
Hello, world!
$ wacker logs -f --tail 5 time-xhQVmjU
current time: 2023-11-22 07:42:34
Expand Down
4 changes: 4 additions & 0 deletions wacker-cli/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub struct RunCommand {
/// Program file path
#[arg(required = true)]
path: String,
/// Arguments to pass to the WebAssembly module.
#[arg(trailing_var_arg = true)]
args: Vec<String>,
}

impl RunCommand {
Expand All @@ -16,6 +19,7 @@ impl RunCommand {
match client
.run(RunRequest {
path: self.path.to_string(),
args: self.args,
})
.await
{
Expand Down
10 changes: 3 additions & 7 deletions wacker-daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use chrono::Local;
use clap::Parser;
use env_logger::{Builder, WriteStyle};
use log::{info, LevelFilter};
use std::fs::{create_dir, create_dir_all, remove_file};
use std::fs::{create_dir_all, remove_file};
use std::io::Write;
use tokio::net::UnixListener;
use tokio::signal;
Expand All @@ -27,13 +27,9 @@ impl WackerDaemon {
bail!("wackerd socket file exists, is wackerd already running?");
}

let parent_path = sock_path.parent().unwrap();
if !parent_path.exists() {
create_dir_all(parent_path)?;
}
let logs_dir = get_logs_dir()?;
if !logs_dir.exists() {
create_dir(logs_dir)?;
create_dir_all(logs_dir)?;
}

let uds = UnixListener::bind(sock_path)?;
Expand All @@ -58,7 +54,7 @@ impl WackerDaemon {

info!("server listening on {:?}", sock_path);
Ok(tonic::transport::Server::builder()
.add_service(new_service(db.clone(), logs_dir.clone()).await?)
.add_service(new_service(db.clone(), logs_dir).await?)
.serve_with_incoming_shutdown(uds_stream, async {
signal::ctrl_c().await.expect("failed to listen for event");
println!();
Expand Down
9 changes: 5 additions & 4 deletions wacker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ once_cell.workspace = true
ahash.workspace = true

dirs = "5.0.1"
wasi-common = { version = "19.0.0", features = ["tokio"] }
wasmtime = "19.0.0"
wasmtime-wasi = "19.0.0"
wasmtime-wasi-http = "19.0.0"
wasi-common = { version = "20.0.0", features = ["tokio"] }
wasmtime = "20.0.0"
wasmtime-wasi = "20.0.0"
wasmtime-wasi-http = "20.0.0"
wasmparser = "0.202.0"
cap-std = "3.0.0"
rand = "0.8.5"
tower = "0.4.13"
Expand Down
1 change: 1 addition & 0 deletions wacker/proto/wacker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ service Wacker {

message RunRequest {
string path = 1;
repeated string args = 2;
}

message ServeRequest {
Expand Down
27 changes: 10 additions & 17 deletions wacker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,28 @@ use anyhow::{anyhow, Result};
use once_cell::sync::OnceCell;
use std::path::PathBuf;

const MAIN_DIR: &str = ".wacker";

static MAIN_DIR: OnceCell<PathBuf> = OnceCell::new();
static SOCK_PATH: OnceCell<PathBuf> = OnceCell::new();
static LOGS_DIR: OnceCell<PathBuf> = OnceCell::new();
static DB_PATH: OnceCell<PathBuf> = OnceCell::new();

pub fn get_sock_path() -> Result<&'static PathBuf> {
SOCK_PATH.get_or_try_init(|| -> Result<PathBuf> {
fn get_main_dir() -> Result<&'static PathBuf> {
MAIN_DIR.get_or_try_init(|| -> Result<PathBuf> {
match dirs::home_dir() {
Some(home_dir) => Ok(home_dir.join(MAIN_DIR).join("wacker.sock")),
Some(home_dir) => Ok(home_dir.join(".wacker")),
None => Err(anyhow!("can't get home dir")),
}
})
}

pub fn get_sock_path() -> Result<&'static PathBuf> {
SOCK_PATH.get_or_try_init(|| -> Result<PathBuf> { Ok(get_main_dir()?.join("wacker.sock")) })
}

pub fn get_logs_dir() -> Result<&'static PathBuf> {
LOGS_DIR.get_or_try_init(|| -> Result<PathBuf> {
match dirs::home_dir() {
Some(home_dir) => Ok(home_dir.join(MAIN_DIR).join("logs")),
None => Err(anyhow!("can't get home dir")),
}
})
LOGS_DIR.get_or_try_init(|| -> Result<PathBuf> { Ok(get_main_dir()?.join("logs")) })
}

pub fn get_db_path() -> Result<&'static PathBuf> {
DB_PATH.get_or_try_init(|| -> Result<PathBuf> {
match dirs::home_dir() {
Some(home_dir) => Ok(home_dir.join(MAIN_DIR).join("db")),
None => Err(anyhow!("can't get home dir")),
}
})
DB_PATH.get_or_try_init(|| -> Result<PathBuf> { Ok(get_main_dir()?.join("db")) })
}
24 changes: 19 additions & 5 deletions wacker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@ mod proto {
}

use anyhow::Result;
use sled::Db;
use std::path::Path;
use tokio::net::UnixStream;
use tonic::codec::CompressionEncoding;
use tonic::transport::{Channel, Endpoint};
use tonic::{
codec::CompressionEncoding,
transport::{Channel, Endpoint},
};
use tower::service_fn;

pub use self::config::*;
Expand All @@ -24,13 +28,23 @@ pub const PROGRAM_STATUS_FINISHED: u32 = 1;
pub const PROGRAM_STATUS_ERROR: u32 = 2;
pub const PROGRAM_STATUS_STOPPED: u32 = 3;

pub async fn new_service<P: AsRef<Path>>(db: Db, logs_dir: P) -> Result<WackerServer<Server>> {
Ok(WackerServer::new(Server::new(db, logs_dir).await?)
.send_compressed(CompressionEncoding::Zstd)
.accept_compressed(CompressionEncoding::Zstd))
}

pub async fn new_client() -> Result<WackerClient<Channel>> {
let sock_path = get_sock_path()?;
new_client_with_path(get_sock_path()?).await
}

let channel = Endpoint::try_from("http://[::]:50051")?
pub async fn new_client_with_path<P: AsRef<Path>>(sock_path: P) -> Result<WackerClient<Channel>> {
let path = sock_path.as_ref().to_path_buf();
// We will ignore this uri because uds do not use it
let channel = Endpoint::try_from("unix://wacker")?
.connect_with_connector(service_fn(move |_| {
// Connect to a Uds socket
UnixStream::connect(sock_path)
UnixStream::connect(path.clone())
}))
.await?;

Expand Down
58 changes: 12 additions & 46 deletions wacker/src/runtime/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::runtime::{Engine, ProgramMeta};
use anyhow::{anyhow, bail, Error, Result};
use crate::runtime::{
logs::LogStream,
{Engine, ProgramMeta},
};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use hyper::Request;
use std::fs::File;
use std::io::Write;
use std::sync::{
Expand All @@ -9,9 +13,7 @@ use std::sync::{
};
use wasmtime::component::{Component, InstancePre, Linker, ResourceTable};
use wasmtime::Store;
use wasmtime_wasi::{
bindings, HostOutputStream, StdoutStream, StreamError, StreamResult, Subscribe, WasiCtx, WasiCtxBuilder, WasiView,
};
use wasmtime_wasi::{bindings, WasiCtx, WasiCtxBuilder, WasiView};
use wasmtime_wasi_http::{
bindings::http::types as http_types, body::HyperOutgoingBody, hyper_response_error, io::TokioIo, WasiHttpCtx,
WasiHttpView,
Expand Down Expand Up @@ -152,9 +154,10 @@ impl ProxyHandler {
}
}

type Request = hyper::Request<hyper::body::Incoming>;

async fn handle_request(ProxyHandler(inner): ProxyHandler, req: Request) -> Result<hyper::Response<HyperOutgoingBody>> {
async fn handle_request(
ProxyHandler(inner): ProxyHandler,
req: Request<hyper::body::Incoming>,
) -> Result<hyper::Response<HyperOutgoingBody>> {
use http_body_util::BodyExt;

let (sender, receiver) = tokio::sync::oneshot::channel();
Expand Down Expand Up @@ -190,7 +193,7 @@ async fn handle_request(ProxyHandler(inner): ProxyHandler, req: Request) -> Resu
.map_err(|_| http_types::ErrorCode::HttpRequestUriInvalid)?
};

let req = hyper::Request::from_parts(parts, body.map_err(hyper_response_error).boxed());
let req = Request::from_parts(parts, body.map_err(hyper_response_error).boxed());

let mut stdout = inner.stdout.try_clone()?;
stdout.write_fmt(format_args!(
Expand Down Expand Up @@ -230,40 +233,3 @@ async fn handle_request(ProxyHandler(inner): ProxyHandler, req: Request) -> Resu
}
}
}

struct LogStream {
output: File,
}

impl StdoutStream for LogStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(LogStream {
output: self.output.try_clone().expect(""),
})
}

fn isatty(&self) -> bool {
false
}
}

impl HostOutputStream for LogStream {
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
self.output
.write_all(bytes.as_ref())
.map_err(|e| StreamError::LastOperationFailed(Error::from(e)))
}

fn flush(&mut self) -> StreamResult<()> {
Ok(())
}

fn check_write(&mut self) -> StreamResult<usize> {
Ok(1024 * 1024)
}
}

#[async_trait::async_trait]
impl Subscribe for LogStream {
async fn ready(&mut self) {}
}
44 changes: 44 additions & 0 deletions wacker/src/runtime/logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use anyhow::Error;
use async_trait::async_trait;
use std::fs::File;
use std::io::Write;
use wasmtime_wasi::{HostOutputStream, StdoutStream, StreamError, StreamResult, Subscribe};

pub struct LogStream {
pub output: File,
}

impl StdoutStream for LogStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(LogStream {
output: self.output.try_clone().expect(""),
})
}

fn isatty(&self) -> bool {
false
}
}

impl HostOutputStream for LogStream {
fn write(&mut self, bytes: bytes::Bytes) -> StreamResult<()> {
self.output
.write_all(bytes.as_ref())
.map_err(|e| StreamError::LastOperationFailed(Error::from(e)))
}

fn flush(&mut self) -> StreamResult<()> {
self.output
.flush()
.map_err(|e| StreamError::LastOperationFailed(Error::from(e)))
}

fn check_write(&mut self) -> StreamResult<usize> {
Ok(1024 * 1024)
}
}

#[async_trait]
impl Subscribe for LogStream {
async fn ready(&mut self) {}
}
2 changes: 2 additions & 0 deletions wacker/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod http;
mod logs;
mod wasi;

use ahash::AHashMap;
Expand All @@ -13,6 +14,7 @@ pub struct ProgramMeta {
pub path: String,
pub program_type: u32,
pub addr: Option<String>,
pub args: Vec<String>,
}

#[async_trait]
Expand Down
Loading

0 comments on commit 62ffbf6

Please sign in to comment.