Skip to content

Commit

Permalink
Send state also through signaling messages
Browse files Browse the repository at this point in the history
The speaking state is still sent only through data channels, as it is
not currently handled by other clients when sent through signaling
messages.

Signed-off-by: Daniel Calviño Sánchez <[email protected]>
  • Loading branch information
danxuliu committed Dec 23, 2024
1 parent 7cfc8b9 commit d094dc5
Show file tree
Hide file tree
Showing 6 changed files with 425 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
package com.nextcloud.talk.call;

import com.nextcloud.talk.models.json.signaling.DataChannelMessage;
import com.nextcloud.talk.models.json.signaling.NCMessagePayload;
import com.nextcloud.talk.models.json.signaling.NCSignalingMessage;

import java.util.Objects;

Expand Down Expand Up @@ -48,6 +50,7 @@ public void onChange() {
audioEnabled = localCallParticipantModel.isAudioEnabled();

messageSender.sendToAll(getDataChannelMessageForAudioState());
messageSender.sendToAll(getSignalingMessageForAudioState());
}

if (!Objects.equals(speaking, localCallParticipantModel.isSpeaking())) {
Expand All @@ -60,6 +63,7 @@ public void onChange() {
videoEnabled = localCallParticipantModel.isVideoEnabled();

messageSender.sendToAll(getDataChannelMessageForVideoState());
messageSender.sendToAll(getSignalingMessageForVideoState());
}
}
}
Expand Down Expand Up @@ -106,4 +110,61 @@ protected DataChannelMessage getDataChannelMessageForVideoState() {

return new DataChannelMessage(type);
}

/**
* Returns a signaling message with the common fields set (type and room type).
*
* @param type the type of the signaling message
* @return the signaling message
*/
private NCSignalingMessage createBaseSignalingMessage(String type) {
NCSignalingMessage ncSignalingMessage = new NCSignalingMessage();
// "roomType" is not really relevant without a peer or when referring to the whole participant, but it is
// nevertheless expected in the message. As most of the signaling messages currently sent to all participants
// are related to audio/video state "video" is used as the room type.
ncSignalingMessage.setRoomType("video");
ncSignalingMessage.setType(type);

return ncSignalingMessage;
}

/**
* Returns a signaling message to notify current audio state.
*
* @return the signaling message
*/
protected NCSignalingMessage getSignalingMessageForAudioState() {
String type = "mute";
if (localCallParticipantModel.isAudioEnabled() != null && localCallParticipantModel.isAudioEnabled()) {
type = "unmute";
}

NCSignalingMessage ncSignalingMessage = createBaseSignalingMessage(type);

NCMessagePayload ncMessagePayload = new NCMessagePayload();
ncMessagePayload.setName("audio");
ncSignalingMessage.setPayload(ncMessagePayload);

return ncSignalingMessage;
}

/**
* Returns a signaling message to notify current video state.
*
* @return the signaling message
*/
protected NCSignalingMessage getSignalingMessageForVideoState() {
String type = "mute";
if (localCallParticipantModel.isVideoEnabled() != null && localCallParticipantModel.isVideoEnabled()) {
type = "unmute";
}

NCSignalingMessage ncSignalingMessage = createBaseSignalingMessage(type);

NCMessagePayload ncMessagePayload = new NCMessagePayload();
ncMessagePayload.setName("video");
ncSignalingMessage.setPayload(ncMessagePayload);

return ncSignalingMessage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/
package com.nextcloud.talk.call;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
Expand All @@ -26,11 +28,30 @@
* initial state when the local participant joins the call, as all the remote participants joined from the point of
* view of the local participant). If the state was already being sent the sending is restarted with each new
* participant that joins.
* <p>
* Similarly, in the case of signaling messages it is not possible either to know when the remote participants have
* "seen" the local participant and thus are ready to handle signaling messages about the state. However, in the case
* of signaling messages it is possible to send them to a specific participant, so the initial state is sent several
* times with an increasing delay directly to the participant that was added. Moreover, if the participant is removed
* the state is no longer directly sent.
* <p>
* In any case, note that the state is sent only when the remote participant joins the call. Even in case of
* temporary disconnections the normal state updates sent when the state changes are expected to be received by the
* other participant, as signaling messages are sent through a WebSocket and are therefore reliable. Moreover, even
* if the WebSocket is restarted and the connection resumed (rather than joining with a new session ID) the messages
* would be also received, as in that case they would be queued until the WebSocket is connected again.
* <p>
* Data channel messages, on the other hand, could be lost if the remote participant restarts the peer receiver
* connection (although they would be received in case of temporary disconnections, as data channels use a reliable
* transport by default). Therefore, as the speaking state is sent only through data channels, updates of the speaking
* state could be not received by remote participants.
*/
public class LocalStateBroadcasterMcu extends LocalStateBroadcaster {

private final MessageSender messageSender;

private final Map<String, Disposable> sendStateWithRepetitionByParticipant = new HashMap<>();

private Disposable sendStateWithRepetition;

public LocalStateBroadcasterMcu(LocalCallParticipantModel localCallParticipantModel,
Expand All @@ -46,6 +67,10 @@ public void destroy() {
if (sendStateWithRepetition != null) {
sendStateWithRepetition.dispose();
}

for (Disposable sendStateWithRepetitionForParticipant: sendStateWithRepetitionByParticipant.values()) {
sendStateWithRepetitionForParticipant.dispose();
}
}

@Override
Expand All @@ -58,15 +83,36 @@ public void handleCallParticipantAdded(CallParticipantModel callParticipantModel
.fromArray(new Integer[]{0, 1, 2, 4, 8, 16})
.concatMap(i -> Observable.just(i).delay(i, TimeUnit.SECONDS, Schedulers.io()))
.subscribe(value -> sendState());

String sessionId = callParticipantModel.getSessionId();
Disposable sendStateWithRepetitionForParticipant = sendStateWithRepetitionByParticipant.get(sessionId);
if (sendStateWithRepetitionForParticipant != null) {
sendStateWithRepetitionForParticipant.dispose();
}

sendStateWithRepetitionByParticipant.put(sessionId, Observable
.fromArray(new Integer[]{0, 1, 2, 4, 8, 16})
.concatMap(i -> Observable.just(i).delay(i, TimeUnit.SECONDS, Schedulers.io()))
.subscribe(value -> sendState(sessionId)));
}

@Override
public void handleCallParticipantRemoved(CallParticipantModel callParticipantModel) {
String sessionId = callParticipantModel.getSessionId();
Disposable sendStateWithRepetitionForParticipant = sendStateWithRepetitionByParticipant.get(sessionId);
if (sendStateWithRepetitionForParticipant != null) {
sendStateWithRepetitionForParticipant.dispose();
}
}

private void sendState() {
messageSender.sendToAll(getDataChannelMessageForAudioState());
messageSender.sendToAll(getDataChannelMessageForSpeakingState());
messageSender.sendToAll(getDataChannelMessageForVideoState());
}

private void sendState(String sessionId) {
messageSender.send(getSignalingMessageForAudioState(), sessionId);
messageSender.send(getSignalingMessageForVideoState(), sessionId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
* after a temporary disconnection; data channels use a reliable transport by default, so even if the state changes
* while the connection is temporarily interrupted the normal state update messages should be received by the other
* participant once the connection is restored.
* <p>
* Nevertheless, in case of a failed connection and an ICE restart it is unclear whether the data channel messages
* would be received or not (as the data channel transport may be the one that failed and needs to be restarted).
* However, the state (except the speaking state) is also sent through signaling messages, which need to be
* explicitly fetched from the internal signaling server, so even in case of a failed connection they will be
* eventually received once the remote participant connects again.
*/
public class LocalStateBroadcasterNoMcu extends LocalStateBroadcaster {

Expand Down Expand Up @@ -115,5 +121,8 @@ private void sendState(String sessionId) {
messageSender.send(getDataChannelMessageForAudioState(), sessionId);
messageSender.send(getDataChannelMessageForSpeakingState(), sessionId);
messageSender.send(getDataChannelMessageForVideoState(), sessionId);

messageSender.send(getSignalingMessageForAudioState(), sessionId);
messageSender.send(getSignalingMessageForVideoState(), sessionId);
}
}
Loading

0 comments on commit d094dc5

Please sign in to comment.