Skip to content

Commit

Permalink
rusty
Browse files Browse the repository at this point in the history
  • Loading branch information
bcherry committed Oct 18, 2024
1 parent 3e0ca0e commit b7c748a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 52 deletions.
24 changes: 24 additions & 0 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,30 @@ pub struct ChatMessage {
pub generated: Option<bool>,
}

#[derive(Debug, Clone)]
pub struct RpcRequest {
pub destination_identity: String,
pub id: String,
pub method: String,
pub payload: String,
pub response_timeout_ms: u32,
pub version: u32,
}

#[derive(Debug, Clone)]
pub struct RpcResponse {
destination_identity: String,
request_id: String,
payload: Option<String>,
error: Option<proto::RpcError>,
}

#[derive(Debug, Clone)]
pub struct RpcAck {
destination_identity: String,
request_id: String,
}

#[derive(Debug, Clone)]
pub struct RoomOptions {
pub auto_subscribe: bool,
Expand Down
86 changes: 34 additions & 52 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ use std::{
};

use super::{ConnectionQuality, ParticipantInner, ParticipantKind};
use crate::room::proto::RpcError as RpcError_Proto;
use crate::{
e2ee::EncryptionType,
options::{self, compute_video_encodings, video_layers_from_encodings, TrackPublishOptions},
prelude::*,
room::participant::rpc::{RpcError, RpcErrorCode, MAX_PAYLOAD_BYTES},
rtc_engine::{EngineError, RtcEngine},
ChatMessage, DataPacket, SipDTMF, Transcription,
ChatMessage, DataPacket, RpcAck, RpcRequest, RpcResponse, SipDTMF, Transcription,
};
use chrono::Utc;
use futures_util::Future;
Expand Down Expand Up @@ -86,16 +85,6 @@ struct LocalInfo {
rpc_state: Mutex<RpcState>,
}

#[derive(Debug, Clone)]
pub struct PublishRpcRequestOptions {
pub destination_identity: String,
pub id: String,
pub method: String,
pub payload: String,
pub response_timeout_ms: u32,
pub version: u32,
}

#[derive(Clone)]
pub struct LocalParticipant {
inner: Arc<ParticipantInner>,
Expand Down Expand Up @@ -528,19 +517,19 @@ impl LocalParticipant {
.map_err(Into::into)
}

async fn publish_rpc_request(&self, options: PublishRpcRequestOptions) -> RoomResult<()> {
let destination_identities: Vec<String> = vec![options.destination_identity];
let rpc_request = proto::RpcRequest {
id: options.id,
method: options.method,
payload: options.payload,
response_timeout_ms: options.response_timeout_ms,
version: options.version,
async fn publish_rpc_request(&self, rpc_request: RpcRequest) -> RoomResult<()> {
let destination_identities = vec![rpc_request.destination_identity];
let rpc_request_message = proto::RpcRequest {
id: rpc_request.id,
method: rpc_request.method,
payload: rpc_request.payload,
response_timeout_ms: rpc_request.response_timeout_ms,
version: rpc_request.version,
..Default::default()
};

let data = proto::DataPacket {
value: Some(proto::data_packet::Value::RpcRequest(rpc_request)),
value: Some(proto::data_packet::Value::RpcRequest(rpc_request_message)),
destination_identities,
..Default::default()
};
Expand All @@ -552,30 +541,23 @@ impl LocalParticipant {
.map_err(Into::into)
}

async fn publish_rpc_response(
&self,
destination_identity: String,
request_id: String,
payload: Option<String>,
error: Option<RpcError_Proto>,
) -> RoomResult<()> {
let destination_identities: Vec<String> =
[destination_identity].into_iter().map(Into::into).collect();
let rpc_response = proto::RpcResponse {
request_id: request_id,
value: Some(match error {
async fn publish_rpc_response(&self, rpc_response: RpcResponse) -> RoomResult<()> {
let destination_identities = vec![rpc_response.destination_identity];
let rpc_response_message = proto::RpcResponse {
request_id: rpc_response.request_id,
value: Some(match rpc_response.error {
Some(error) => proto::rpc_response::Value::Error(proto::RpcError {
code: error.code,
message: error.message,
data: error.data,
}),
None => proto::rpc_response::Value::Payload(payload.unwrap()),
None => proto::rpc_response::Value::Payload(rpc_response.payload.unwrap()),
}),
..Default::default()
};

let data = proto::DataPacket {
value: Some(proto::data_packet::Value::RpcResponse(rpc_response)),
value: Some(proto::data_packet::Value::RpcResponse(rpc_response_message)),
destination_identities: destination_identities.clone(),
..Default::default()
};
Expand All @@ -587,17 +569,12 @@ impl LocalParticipant {
.map_err(Into::into)
}

async fn publish_rpc_ack(
&self,
destination_identity: String,
request_id: String,
) -> RoomResult<()> {
let destination_identities: Vec<String> =
[destination_identity].into_iter().map(Into::into).collect();
let rpc_ack = proto::RpcAck { request_id: request_id, ..Default::default() };
async fn publish_rpc_ack(&self, rpc_ack: RpcAck) -> RoomResult<()> {
let destination_identities = vec![rpc_ack.destination_identity];
let rpc_ack_message = proto::RpcAck { request_id: rpc_ack.request_id, ..Default::default() };

let data = proto::DataPacket {
value: Some(proto::data_packet::Value::RpcAck(rpc_ack)),
value: Some(proto::data_packet::Value::RpcAck(rpc_ack_message)),
destination_identities: destination_identities.clone(),
..Default::default()
};
Expand Down Expand Up @@ -690,7 +667,7 @@ impl LocalParticipant {
let (response_tx, response_rx) = oneshot::channel();

match self
.publish_rpc_request(PublishRpcRequestOptions {
.publish_rpc_request(RpcRequest {
destination_identity: destination_identity.clone(),
id: id.clone(),
method: method.clone(),
Expand Down Expand Up @@ -782,7 +759,7 @@ impl LocalParticipant {
&self,
request_id: String,
payload: Option<String>,
error: Option<RpcError_Proto>,
error: Option<proto::RpcError>,
) {
let mut rpc_state = self.local.rpc_state.lock();
if let Some(tx) = rpc_state.pending_responses.remove(&request_id) {
Expand All @@ -803,7 +780,10 @@ impl LocalParticipant {
payload: String,
response_timeout_ms: u32,
) {
if let Err(e) = self.publish_rpc_ack(caller_identity.to_string(), request_id.clone()).await
if let Err(e) = self.publish_rpc_ack(RpcAck {
destination_identity: caller_identity.to_string(),
request_id: request_id.clone(),
}).await
{
log::error!("Failed to publish RPC ACK: {:?}", e);
}
Expand Down Expand Up @@ -854,14 +834,16 @@ impl LocalParticipant {

if let Err(e) = self
.publish_rpc_response(
caller_identity_2.to_string(),
request_id_2,
payload,
error.map(|e| e.to_proto()),
RpcResponse {
destination_identity: caller_identity_2.to_string(),
request_id: request_id_2,
payload,
error: error.map(|e| e.to_proto()),
}
)
.await
{
log::error!("Failed to publish RPC response: {:?}", e);
}
}
}
}

0 comments on commit b7c748a

Please sign in to comment.