Skip to content

Commit

Permalink
Fix "send" not respecting order of pending messages
Browse files Browse the repository at this point in the history
When the data channel is not open yet data channel messages are queued
and then sent once opened. "onStateChange" is called from the WebRTC
signaling thread, while "send" can be called potentially from any
thread, so to send the data channel messages in the same order that they
were added new messages need to be enqueued until all the pending
messages have been sent. Otherwise, even if there is synchronization
already, it could happen that "onStateChange" was called but, before
getting the lock, "send" gets it and sends the new message before the
pending messages were sent.

Signed-off-by: Daniel Calviño Sánchez <[email protected]>
  • Loading branch information
danxuliu committed Dec 11, 2024
1 parent 58f46a9 commit 1ac2c41
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,19 @@ public synchronized void send(DataChannelMessage dataChannelMessage) {
}

DataChannel statusDataChannel = dataChannels.get("status");
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN) {
if (statusDataChannel == null || statusDataChannel.state() != DataChannel.State.OPEN ||
!pendingDataChannelMessages.isEmpty()) {
Log.d(TAG, "Queuing data channel message (" + dataChannelMessage + ") " + sessionId);

pendingDataChannelMessages.add(dataChannelMessage);

return;
}

sendWithoutQueuing(statusDataChannel, dataChannelMessage);
}

private void sendWithoutQueuing(DataChannel statusDataChannel, DataChannelMessage dataChannelMessage) {
try {
Log.d(TAG, "Sending data channel message (" + dataChannelMessage + ") " + sessionId);

Expand Down Expand Up @@ -423,7 +428,7 @@ public void onStateChange() {

if (dataChannel.state() == DataChannel.State.OPEN && "status".equals(dataChannelLabel)) {
for (DataChannelMessage dataChannelMessage : pendingDataChannelMessages) {
send(dataChannelMessage);
sendWithoutQueuing(dataChannel, dataChannelMessage);
}
pendingDataChannelMessages.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.mockito.Mockito.atLeast
import org.mockito.Mockito.atMostOnce
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.doNothing
import org.mockito.Mockito.inOrder
import org.mockito.Mockito.never
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
Expand Down Expand Up @@ -287,8 +288,10 @@ class PeerConnectionWrapperTest {
throw exceptionOnStateChange!!
}

val inOrder = inOrder(mockedStatusDataChannel)

for (j in 1..dataChannelMessageCount) {
Mockito.verify(mockedStatusDataChannel).send(
inOrder.verify(mockedStatusDataChannel).send(
argThat(MatchesDataChannelMessage(DataChannelMessage("the-message-type-$j")))
)
}
Expand Down

0 comments on commit 1ac2c41

Please sign in to comment.