Skip to content

Commit

Permalink
Check RPC version
Browse files Browse the repository at this point in the history
  • Loading branch information
bcherry committed Oct 28, 2024
1 parent 1fbcb2c commit b8c3cdc
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 20 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ impl RoomSession {
method,
payload,
response_timeout_ms,
version,
)
.await;
}
Expand Down
43 changes: 24 additions & 19 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ impl LocalParticipant {
method: String,
payload: String,
response_timeout_ms: u32,
version: u32,
) {
if let Err(e) = self
.publish_rpc_ack(RpcAck {
Expand All @@ -797,32 +798,36 @@ impl LocalParticipant {
log::error!("Failed to publish RPC ACK: {:?}", e);
}

let handler = self.local.rpc_state.lock().handlers.get(&method).cloned();

let caller_identity_2 = caller_identity.clone();
let request_id_2 = request_id.clone();

let response = match handler {
Some(handler) => {
match tokio::task::spawn(async move {
handler(
request_id.clone(),
caller_identity.clone(),
payload.clone(),
Duration::from_millis(response_timeout_ms as u64),
)
let response = if version != 1 {
Err(RpcError::built_in(RpcErrorCode::UnsupportedVersion, None))
} else {
let handler = self.local.rpc_state.lock().handlers.get(&method).cloned();

match handler {
Some(handler) => {
match tokio::task::spawn(async move {
handler(
request_id.clone(),
caller_identity.clone(),
payload.clone(),
Duration::from_millis(response_timeout_ms as u64),
)
.await
})
.await
})
.await
{
Ok(result) => result,
Err(e) => {
log::error!("RPC method handler returned an error: {:?}", e);
Err(RpcError::built_in(RpcErrorCode::ApplicationError, None))
{
Ok(result) => result,
Err(e) => {
log::error!("RPC method handler returned an error: {:?}", e);
Err(RpcError::built_in(RpcErrorCode::ApplicationError, None))
}
}
}
None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)),
}
None => Err(RpcError::built_in(RpcErrorCode::UnsupportedMethod, None)),
};

let (payload, error) = match response {
Expand Down
2 changes: 2 additions & 0 deletions livekit/src/room/participant/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub enum RpcErrorCode {
RecipientNotFound = 1401,
RequestPayloadTooLarge = 1402,
UnsupportedServer = 1403,
UnsupportedVersion = 1404,
}

impl RpcErrorCode {
Expand All @@ -76,6 +77,7 @@ impl RpcErrorCode {
Self::RecipientNotFound => "Recipient not found",
Self::RequestPayloadTooLarge => "Request payload too large",
Self::UnsupportedServer => "RPC not supported by server",
Self::UnsupportedVersion => "Unsupported RPC version",
}
}
}
Expand Down

0 comments on commit b8c3cdc

Please sign in to comment.