Skip to content

Commit

Permalink
Merge pull request #2040 from bendk/blocking-task-queue-without-speci…
Browse files Browse the repository at this point in the history
…alized-code

Blocking task queue without specialized code
  • Loading branch information
bendk authored Mar 23, 2024
2 parents 6b09f11 + 1fbfde3 commit 17482fa
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 132 deletions.
28 changes: 28 additions & 0 deletions examples/async-api-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
This crate is a toy build an async API client, with some parts implemented in Rust and some parts
implemented in the foreign language. Each side makes async calls across the FFI.

The motivation is to show how to build an async-based Rust library, using a foreign async executor to drive the futures.
Note that the Rust code does not start any threads of its own, nor does it use startup an async runtime like tokio.
Instead, it awaits async calls to the foreign code and the foreign executor manages the threads.

There are two basic ways the Rust code in this crate awaits the foreign code:

## API calls

API calls are the simple case.
Rust awaits an HTTP call to the foreign side, then uses `serde` to parse the JSON into a structured response.
As long as the Rust code is "non-blocking" this system should work fine.
Note: there is not a strict definition for "non-blocking", but typically it means not performing IO and not executing a long-running CPU operation.

## Blocking tasks

The more difficult case is a blocking Rust call.
The example from this crate is reading the API credentials from disk.
The `tasks.rs` module and the foreign implementations of the `TaskRunner` interface are an experiment to show how this can be accomplished using async callback methods.

The code works, but is a bit clunky.
For example requiring that the task closure is `'static` creates some extra work for the `load_credentials` function.
It also requires an extra `Mutex` and `Arc`.

The UniFFI team is looking for ways to simplify this process by handling it natively in UniFFI, see https://github.com/mozilla/uniffi-rs/pull/1837.
If you are writing Rust code that needs to make async blocking calls, please tell us about your use case which will help us develop the feature.
76 changes: 76 additions & 0 deletions examples/async-api-client/src/api_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use crate::{run_task, ApiError, Result, TaskRunner};
use std::sync::Arc;

#[async_trait::async_trait]
pub trait HttpClient: Send + Sync {
async fn fetch(&self, url: String, credentials: String) -> Result<String>;
}

impl From<serde_json::Error> for ApiError {
fn from(e: serde_json::Error) -> Self {
Self::Json {
reason: e.to_string(),
}
}
}

#[derive(Debug, serde::Deserialize)]
pub struct Issue {
pub url: String,
pub title: String,
pub state: IssueState,
}

#[derive(Debug, serde::Deserialize)]
pub enum IssueState {
#[serde(rename = "open")]
Open,
#[serde(rename = "closed")]
Closed,
}

pub struct ApiClient {
http_client: Arc<dyn HttpClient>,
task_runner: Arc<dyn TaskRunner>,
}

impl ApiClient {
// Pretend this is a blocking call that needs to load the credentials from disk/network
fn load_credentials_sync(&self) -> String {
String::from("username:password")
}

async fn load_credentials(self: Arc<Self>) -> String {
let self_cloned = Arc::clone(&self);
run_task(&self.task_runner, move || {
self_cloned.load_credentials_sync()
})
.await
}
}

impl ApiClient {
pub fn new(http_client: Arc<dyn HttpClient>, task_runner: Arc<dyn TaskRunner>) -> Self {
Self {
http_client,
task_runner,
}
}

pub async fn get_issue(
self: Arc<Self>,
owner: String,
repository: String,
issue_number: u32,
) -> Result<Issue> {
let credentials = self.clone().load_credentials().await;
let url =
format!("https://api.github.com/repos/{owner}/{repository}/issues/{issue_number}");
let body = self.http_client.fetch(url, credentials).await?;
Ok(serde_json::from_str(&body)?)
}
}
17 changes: 15 additions & 2 deletions examples/async-api-client/src/async-api-client.udl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,20 @@ interface ApiError {
[Trait, WithForeign]
interface HttpClient {
[Throws=ApiError, Async]
string fetch(string url); // fetch an URL and return the body
string fetch(string url, string credentials); // fetch an URL and return the body
};

// Run Rust tasks in a thread pool.
// Implemented by the foreign bindings
[Trait, WithForeign]
interface TaskRunner {
[Async]
void run_task(RustTask task);
};

[Trait]
interface RustTask {
void execute();
};

dictionary Issue {
Expand All @@ -29,7 +42,7 @@ enum IssueState {

// Implemented by the Rust code
interface ApiClient {
constructor(HttpClient http_client);
constructor(HttpClient http_client, TaskRunner task_runner);

[Throws=ApiError, Async]
Issue get_issue(string owner, string repository, u32 issue_number);
Expand Down
131 changes: 7 additions & 124 deletions examples/async-api-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use std::sync::Arc;
mod api_client;
mod tasks;
mod test_data;

pub use api_client::{ApiClient, HttpClient, Issue, IssueState};
pub use tasks::{run_task, RustTask, TaskRunner};
pub use test_data::test_response_data;

#[derive(Debug, thiserror::Error)]
pub enum ApiError {
Expand All @@ -16,127 +22,4 @@ pub enum ApiError {

pub type Result<T> = std::result::Result<T, ApiError>;

#[async_trait::async_trait]
pub trait HttpClient: Send + Sync {
async fn fetch(&self, url: String) -> Result<String>;
}

#[derive(Debug, serde::Deserialize)]
pub struct Issue {
url: String,
title: String,
state: IssueState,
}

#[derive(Debug, serde::Deserialize)]
pub enum IssueState {
#[serde(rename = "open")]
Open,
#[serde(rename = "closed")]
Closed,
}

pub struct ApiClient {
http_client: Arc<dyn HttpClient>,
}

impl ApiClient {
pub fn new(http_client: Arc<dyn HttpClient>) -> Self {
Self { http_client }
}

pub async fn get_issue(
&self,
owner: String,
repository: String,
issue_number: u32,
) -> Result<Issue> {
let url =
format!("https://api.github.com/repos/{owner}/{repository}/issues/{issue_number}");
let body = self.http_client.fetch(url).await?;
Ok(serde_json::from_str(&body)?)
}
}

impl From<serde_json::Error> for ApiError {
fn from(e: serde_json::Error) -> Self {
Self::Json {
reason: e.to_string(),
}
}
}

/// Sample data downloaded from a real github api call
///
/// The tests don't make real HTTP calls to avoid them failing because of network errors.
pub fn test_response_data() -> String {
String::from(
r#"{
"url": "https://api.github.com/repos/mozilla/uniffi-rs/issues/2017",
"repository_url": "https://api.github.com/repos/mozilla/uniffi-rs",
"labels_url": "https://api.github.com/repos/mozilla/uniffi-rs/issues/2017/labels{/name}",
"comments_url": "https://api.github.com/repos/mozilla/uniffi-rs/issues/2017/comments",
"events_url": "https://api.github.com/repos/mozilla/uniffi-rs/issues/2017/events",
"html_url": "https://github.com/mozilla/uniffi-rs/issues/2017",
"id": 2174982360,
"node_id": "I_kwDOECpYAM6Bo5jY",
"number": 2017,
"title": "Foreign-implemented async traits",
"user": {
"login": "bendk",
"id": 1012809,
"node_id": "MDQ6VXNlcjEwMTI4MDk=",
"avatar_url": "https://avatars.githubusercontent.com/u/1012809?v=4",
"gravatar_id": "",
"url": "https://api.github.com/users/bendk",
"html_url": "https://github.com/bendk",
"followers_url": "https://api.github.com/users/bendk/followers",
"following_url": "https://api.github.com/users/bendk/following{/other_user}",
"gists_url": "https://api.github.com/users/bendk/gists{/gist_id}",
"starred_url": "https://api.github.com/users/bendk/starred{/owner}{/repo}",
"subscriptions_url": "https://api.github.com/users/bendk/subscriptions",
"organizations_url": "https://api.github.com/users/bendk/orgs",
"repos_url": "https://api.github.com/users/bendk/repos",
"events_url": "https://api.github.com/users/bendk/events{/privacy}",
"received_events_url": "https://api.github.com/users/bendk/received_events",
"type": "User",
"site_admin": false
},
"labels": [
],
"state": "open",
"locked": false,
"assignee": null,
"assignees": [
],
"milestone": null,
"comments": 0,
"created_at": "2024-03-07T23:07:29Z",
"updated_at": "2024-03-07T23:07:29Z",
"closed_at": null,
"author_association": "CONTRIBUTOR",
"active_lock_reason": null,
"body": "We currently allow Rust code to implement async trait methods, but foreign implementations are not supported. We should extend support to allow for foreign code.\\r\\n\\r\\nI think this is a key feature for full async support. It allows Rust code to define an async method that depends on a foreign async method. This allows users to use async code without running a Rust async runtime, you can effectively piggyback on the foreign async runtime.",
"closed_by": null,
"reactions": {
"url": "https://api.github.com/repos/mozilla/uniffi-rs/issues/2017/reactions",
"total_count": 0,
"+1": 0,
"-1": 0,
"laugh": 0,
"hooray": 0,
"confused": 0,
"heart": 0,
"rocket": 0,
"eyes": 0
},
"timeline_url": "https://api.github.com/repos/mozilla/uniffi-rs/issues/2017/timeline",
"performed_via_github_app": null,
"state_reason": null
}"#,
)
}

uniffi::include_scaffolding!("async-api-client");
90 changes: 90 additions & 0 deletions examples/async-api-client/src/tasks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use std::sync::{Arc, Mutex};

#[async_trait::async_trait]
pub trait TaskRunner: Send + Sync {
async fn run_task(&self, task: Arc<dyn RustTask>);
}

pub trait RustTask: Send + Sync {
fn execute(&self);
}

pub async fn run_task<F, T>(runner: &Arc<dyn TaskRunner>, closure: F) -> T
where
F: FnOnce() -> T + Send + Sync + 'static,
T: Send + 'static,
{
let closure = Arc::new(TaskClosure::new(closure));
runner
.run_task(Arc::clone(&closure) as Arc<dyn RustTask>)
.await;
closure.take_result()
}

struct TaskClosure<F, T>
where
F: FnOnce() -> T + Send + Sync,
T: Send,
{
inner: Mutex<TaskClosureInner<F, T>>,
}

enum TaskClosureInner<F, T>
where
F: FnOnce() -> T + Send + Sync,
T: Send,
{
Pending(F),
Running,
Complete(T),
Finished,
}

impl<F, T> TaskClosure<F, T>
where
F: FnOnce() -> T + Send + Sync,
T: Send,
{
fn new(closure: F) -> Self {
Self {
inner: Mutex::new(TaskClosureInner::Pending(closure)),
}
}

fn take_result(&self) -> T {
let mut inner = self.inner.lock().unwrap();
match *inner {
TaskClosureInner::Pending(_) => panic!("Task never ran"),
TaskClosureInner::Running => panic!("Task still running"),
TaskClosureInner::Finished => panic!("Task already finished"),
TaskClosureInner::Complete(_) => (),
};
match std::mem::replace(&mut *inner, TaskClosureInner::Finished) {
TaskClosureInner::Complete(v) => v,
_ => unreachable!(),
}
}
}

impl<F, T> RustTask for TaskClosure<F, T>
where
F: FnOnce() -> T + Send + Sync,
T: Send,
{
fn execute(&self) {
let mut inner = self.inner.lock().unwrap();
match std::mem::replace(&mut *inner, TaskClosureInner::Running) {
TaskClosureInner::Pending(f) => {
let result = f();
*inner = TaskClosureInner::Complete(result)
}
TaskClosureInner::Running => panic!("Task already started"),
TaskClosureInner::Complete(_) => panic!("Task already executed"),
TaskClosureInner::Finished => panic!("Task already finished"),
}
}
}
Loading

0 comments on commit 17482fa

Please sign in to comment.