Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC updates #476

Merged
merged 7 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

120 changes: 61 additions & 59 deletions examples/rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,71 +78,73 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

async fn register_receiver_methods(greeters_room: &Arc<Room>, math_genius_room: &Arc<Room>) {
greeters_room.local_participant().register_rpc_method(
"arrival".to_string(),
|_, caller_identity, payload, _| {
Box::pin(async move {
println!(
"[{}] [Greeter] Oh {} arrived and said \"{}\"",
elapsed_time(),
caller_identity,
payload
);
sleep(Duration::from_secs(2)).await;
Ok("Welcome and have a wonderful day!".to_string())
})
},
);

math_genius_room.local_participant().register_rpc_method("square-root".to_string(), |_, caller_identity, payload, response_timeout_ms| {
greeters_room.local_participant().register_rpc_method("arrival".to_string(), |data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&payload).unwrap();
let number = json_data["number"].as_f64().unwrap();
println!(
"[{}] [Math Genius] I guess {} wants the square root of {}. I've only got {} seconds to respond but I think I can pull it off.",
"[{}] [Greeter] Oh {} arrived and said \"{}\"",
elapsed_time(),
caller_identity,
number,
response_timeout_ms.as_secs()
data.caller_identity,
data.payload
);

println!("[{}] [Math Genius] *doing math*…", elapsed_time());
sleep(Duration::from_secs(2)).await;

let result = number.sqrt();
println!("[{}] [Math Genius] Aha! It's {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
Ok("Welcome and have a wonderful day!".to_string())
})
});

math_genius_room.local_participant().register_rpc_method(
"divide".to_string(),
|_, caller_identity, payload, _| {
"square-root".to_string(),
|data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&payload).unwrap();
let dividend = json_data["dividend"].as_i64().unwrap();
let divisor = json_data["divisor"].as_i64().unwrap();
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let number = json_data["number"].as_f64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to divide {} by {}.",
"[{}] [Math Genius] I guess {} wants the square root of {}. I've only got {} seconds to respond but I think I can pull it off.",
elapsed_time(),
caller_identity,
dividend,
divisor
data.caller_identity,
number,
data.response_timeout.as_secs()
);

let result = dividend / divisor;
println!("[{}] [Math Genius] The result is {}", elapsed_time(), result);
println!("[{}] [Math Genius] *doing math*…", elapsed_time());
sleep(Duration::from_secs(2)).await;

let result = number.sqrt();
println!("[{}] [Math Genius] Aha! It's {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
})
},
);

math_genius_room.local_participant().register_rpc_method("divide".to_string(), |data| {
Box::pin(async move {
let json_data: Value = serde_json::from_str(&data.payload).unwrap();
let dividend = json_data["dividend"].as_i64().unwrap();
let divisor = json_data["divisor"].as_i64().unwrap();
println!(
"[{}] [Math Genius] {} wants me to divide {} by {}.",
elapsed_time(),
data.caller_identity,
dividend,
divisor
);

let result = dividend / divisor;
println!("[{}] [Math Genius] The result is {}", elapsed_time(), result);
Ok(json!({"result": result}).to_string())
})
});
}

async fn perform_greeting(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Error>> {
println!("[{}] Letting the greeter know that I've arrived", elapsed_time());
match room
.local_participant()
.perform_rpc("greeter".to_string(), "arrival".to_string(), "Hello".to_string(), None)
.perform_rpc(PerformRpcParams {
destination_identity: "greeter".to_string(),
method: "arrival".to_string(),
payload: "Hello".to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -157,12 +159,12 @@ async fn perform_square_root(room: &Arc<Room>) -> Result<(), Box<dyn std::error:
println!("[{}] What's the square root of 16?", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"square-root".to_string(),
json!({"number": 16}).to_string(),
None,
)
.perform_rpc(PerformRpcParams {
destination_identity: "math-genius".to_string(),
method: "square-root".to_string(),
payload: json!({"number": 16}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -180,12 +182,12 @@ async fn perform_quantum_hypergeometric_series(
println!("[{}] What's the quantum hypergeometric series of 42?", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"quantum-hypergeometric-series".to_string(),
json!({"number": 42}).to_string(),
None,
)
.perform_rpc(PerformRpcParams {
destination_identity: "math-genius".to_string(),
method: "quantum-hypergeometric-series".to_string(),
payload: json!({"number": 42}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand All @@ -207,12 +209,12 @@ async fn perform_division(room: &Arc<Room>) -> Result<(), Box<dyn std::error::Er
println!("[{}] Let's try dividing 5 by 0", elapsed_time());
match room
.local_participant()
.perform_rpc(
"math-genius".to_string(),
"divide".to_string(),
json!({"dividend": 5, "divisor": 0}).to_string(),
None,
)
.perform_rpc(PerformRpcParams {
destination_identity: "math-genius".to_string(),
method: "divide".to_string(),
payload: json!({"dividend": 5, "divisor": 0}).to_string(),
..Default::default()
})
.await
{
Ok(response) => {
Expand Down
67 changes: 30 additions & 37 deletions livekit-ffi/src/server/participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ impl FfiParticipant {

let handle = server.async_runtime.spawn(async move {
let result = local
.perform_rpc(
request.destination_identity.to_string(),
request.method,
request.payload,
request.response_timeout_ms,
)
.perform_rpc(PerformRpcParams {
destination_identity: request.destination_identity.to_string(),
method: request.method,
payload: request.payload,
response_timeout: request
.response_timeout_ms
.map(|ms| Duration::from_millis(ms as u64))
.unwrap_or(PerformRpcParams::default().response_timeout),
})
.await;

let callback = proto::PerformRpcCallback {
Expand Down Expand Up @@ -91,34 +94,27 @@ impl FfiParticipant {

let local_participant_handle = self.handle.clone();
let room: Arc<RoomInner> = self.room.clone();
local.register_rpc_method(
method.clone(),
move |request_id, caller_identity, payload, response_timeout| {
Box::pin({
let room = room.clone();
let method = method.clone();
async move {
forward_rpc_method_invocation(
server,
room,
local_participant_handle,
method,
request_id,
caller_identity,
payload,
response_timeout,
)
.await
}
})
},
);
local.register_rpc_method(method.clone(), move |data| {
Box::pin({
let room = room.clone();
let method = method.clone();
async move {
forward_rpc_method_invocation(
server,
room,
local_participant_handle,
method,
data,
)
.await
}
})
});
Ok(proto::RegisterRpcMethodResponse {})
}

pub fn unregister_rpc_method(
&self,
server: &'static FfiServer,
request: proto::UnregisterRpcMethodRequest,
) -> FfiResult<proto::UnregisterRpcMethodResponse> {
let local = match &self.participant {
Expand All @@ -139,10 +135,7 @@ async fn forward_rpc_method_invocation(
room: Arc<RoomInner>,
local_participant_handle: FfiHandleId,
method: String,
request_id: String,
caller_identity: ParticipantIdentity,
payload: String,
response_timeout: Duration,
data: RpcInvocationData,
) -> Result<String, RpcError> {
let (tx, rx) = oneshot::channel();
let invocation_id = server.next_id();
Expand All @@ -152,10 +145,10 @@ async fn forward_rpc_method_invocation(
local_participant_handle: local_participant_handle as u64,
invocation_id,
method,
request_id,
caller_identity: caller_identity.into(),
payload,
response_timeout_ms: response_timeout.as_millis() as u32,
request_id: data.request_id,
caller_identity: data.caller_identity.into(),
payload: data.payload,
response_timeout_ms: data.response_timeout.as_millis() as u32,
},
));

Expand Down
2 changes: 1 addition & 1 deletion livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ fn on_unregister_rpc_method(
) -> FfiResult<proto::UnregisterRpcMethodResponse> {
let ffi_participant =
server.retrieve_handle::<FfiParticipant>(request.local_participant_handle)?.clone();
return ffi_participant.unregister_rpc_method(server, request);
return ffi_participant.unregister_rpc_method(request);
}

fn on_rpc_method_invocation_response(
Expand Down
3 changes: 2 additions & 1 deletion livekit/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
pub use crate::{
id::*,
participant::{
ConnectionQuality, LocalParticipant, Participant, RemoteParticipant, RpcError, RpcErrorCode,
ConnectionQuality, LocalParticipant, Participant, PerformRpcParams, RemoteParticipant,
RpcError, RpcErrorCode, RpcInvocationData,
},
publication::{LocalTrackPublication, RemoteTrackPublication, TrackPublication},
track::{
Expand Down
7 changes: 4 additions & 3 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ pub struct RpcRequest {
pub id: String,
pub method: String,
pub payload: String,
pub response_timeout_ms: u32,
pub response_timeout: Duration,
pub version: u32,
}

Expand Down Expand Up @@ -689,7 +689,7 @@ impl RoomSession {
request_id,
method,
payload,
response_timeout_ms,
response_timeout,
version,
} => {
if caller_identity.is_none() {
Expand All @@ -702,7 +702,8 @@ impl RoomSession {
request_id,
method,
payload,
response_timeout_ms,
response_timeout,
version,
)
.await;
}
Expand Down
Loading
Loading