Skip to content

Commit

Permalink
feat: Export audio to a recorder over a WS.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrozev committed Oct 24, 2024
1 parent 7245c6a commit d8c07c0
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ class AudioLevelReader(

if (!silence) stats.nonSilence(AudioLevelHeaderExtension.getVad(ext))
if (silence && forwardedSilencePackets > forwardedSilencePacketsLimit) {
packetInfo.shouldDiscard = true
// packetInfo.shouldDiscard = true
stats.discardedSilence()
} else if (this@AudioLevelReader.forceMute) {
packetInfo.shouldDiscard = true
// packetInfo.shouldDiscard = true
stats.discardedForceMute()
} else {
forwardedSilencePackets = if (silence) forwardedSilencePackets + 1 else 0
audioLevelListener?.let { listener ->
if (listener.onLevelReceived(audioRtpPacket.ssrc, (127 - level).toPositiveLong())) {
packetInfo.shouldDiscard = true
// packetInfo.shouldDiscard = true
stats.discardedRanking()
}
}
Expand Down
4 changes: 4 additions & 0 deletions jvb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>jicoco-config</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jicoco-mediajson</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jicoco-metrics</artifactId>
Expand Down
19 changes: 18 additions & 1 deletion jvb/src/main/java/org/jitsi/videobridge/Conference.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.jitsi.utils.logging2.*;
import org.jitsi.utils.queue.*;
import org.jitsi.videobridge.colibri2.*;
import org.jitsi.videobridge.export.*;
import org.jitsi.videobridge.message.*;
import org.jitsi.videobridge.metrics.*;
import org.jitsi.videobridge.relay.*;
Expand All @@ -40,7 +41,6 @@
import org.json.simple.*;
import org.jxmpp.jid.*;

import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
Expand Down Expand Up @@ -180,6 +180,9 @@ public long getLocalVideoSsrc()
@Nullable
private final String meetingId;

@NotNull
private final Exporter exporter = new Exporter();

/**
* A regex pattern to trim UUIDs to just their first 8 hex characters.
*/
Expand Down Expand Up @@ -599,6 +602,7 @@ void expire()
logger.debug(() -> "Expiring endpoints.");
getEndpoints().forEach(AbstractEndpoint::expire);
getRelays().forEach(Relay::expire);
exporter.stop();
speechActivity.expire();

updateStatisticsOnExpire();
Expand Down Expand Up @@ -1118,6 +1122,14 @@ private void sendOut(PacketInfo packetInfo)
prevHandler = relay;
}
}
if (exporter.wants(packetInfo))
{
if (prevHandler != null)
{
prevHandler.send(packetInfo.clone());
}
prevHandler = exporter;
}

if (prevHandler != null)
{
Expand All @@ -1130,6 +1142,11 @@ private void sendOut(PacketInfo packetInfo)
}
}

public void setConnects(List<Connect> exports)
{
exporter.setConnects(exports);
}

public boolean hasRelays()
{
return !relaysById.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.jitsi.videobridge;

import org.jetbrains.annotations.*;
import org.jitsi.nlj.*;

public interface PotentialPacketHandler
Expand All @@ -26,11 +27,11 @@ public interface PotentialPacketHandler
* @param packet the RTP/RTCP packet
* @return true if this handler wants the given packet, false otherwise
*/
boolean wants(PacketInfo packet);
boolean wants(@NotNull PacketInfo packet);

/**
* Send the given RTP/RTCP 'packet' (which came from 'source')
* @param packet the RTP/RTCP packet
*/
void send(PacketInfo packet);
void send(@NotNull PacketInfo packet);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class Colibri2ConferenceHandler(
for (e in conferenceModifyIQ.endpoints) {
responseBuilder.addEndpoint(handleColibri2Endpoint(e, ignoreUnknownEndpoints))
}
conferenceModifyIQ.connects?.let { conference.setConnects(it.getConnects()) }
for (r in conferenceModifyIQ.relays) {
if (!RelayConfig.config.enabled) {
throw IqProcessingException(Condition.feature_not_implemented, "Octo is disabled in configuration.")
Expand Down
108 changes: 108 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/export/Exporter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright @ 2024 - Present, 8x8 Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.export

import org.eclipse.jetty.websocket.api.WebSocketAdapter
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest
import org.eclipse.jetty.websocket.client.WebSocketClient
import org.jitsi.nlj.PacketInfo
import org.jitsi.nlj.rtp.AudioRtpPacket
import org.jitsi.nlj.util.PacketInfoQueue
import org.jitsi.utils.logging2.createLogger
import org.jitsi.videobridge.PotentialPacketHandler
import org.jitsi.videobridge.colibri2.FeatureNotImplementedException
import org.jitsi.videobridge.util.ByteBufferPool
import org.jitsi.videobridge.util.TaskPools
import org.jitsi.videobridge.websocket.config.WebsocketServiceConfig
import org.jitsi.xmpp.extensions.colibri2.Connect

class Exporter : PotentialPacketHandler {
val logger = createLogger()
var started = false
val queue = PacketInfoQueue(
"${javaClass.simpleName}-packet-queue",
TaskPools.IO_POOL,
this::doHandlePacket,
128
)

private var wsNotConnectedErrors = 0
private fun logWsNotConnectedError(): Boolean = (wsNotConnectedErrors++ % 1000) == 0
private val encoder = MediaJsonEncoder {
if (recorderWebSocket.isConnected) {
recorderWebSocket.remote?.sendString(it.toJson())
?: logger.info("Websocket is connected, but remote is null")
} else if (logWsNotConnectedError()) {
logger.info("Can not send packet, websocket is not connected (count=$wsNotConnectedErrors).")
}
}
private var recorderWebSocket = WebSocketAdapter()

fun setConnects(exports: List<Connect>) {
when {
started && exports.isNotEmpty() -> throw FeatureNotImplementedException("Changing exports once enabled.")
exports.isEmpty() -> stop()
exports.size > 1 -> throw FeatureNotImplementedException("Multiple exports")
exports[0].video -> throw FeatureNotImplementedException("Video")
else -> start(exports[0])
}
}

private fun doHandlePacket(packet: PacketInfo): Boolean {
if (started) {
encoder.encode(packet.packetAs(), packet.endpointId!!)
}
ByteBufferPool.returnBuffer(packet.packet.buffer)
return true
}

override fun wants(packet: PacketInfo): Boolean = started && packet.packet is AudioRtpPacket

override fun send(packet: PacketInfo) {
if (started) {
queue.add(packet)
} else {
ByteBufferPool.returnBuffer(packet.packet.buffer)
}
}

fun stop() {
started = false
logger.info("Stopping.")
recorderWebSocket.session?.close(org.eclipse.jetty.websocket.core.CloseStatus.SHUTDOWN, "closing")
}

fun start(connect: Connect) {
if (connect.video) throw FeatureNotImplementedException("Video")
if (connect.protocol != Connect.Protocols.MEDIAJSON) {
throw FeatureNotImplementedException("Protocol ${connect.protocol}")
}
if (connect.type != Connect.Types.RECORDER) {
throw FeatureNotImplementedException("Type ${connect.type}")
}

logger.info("Starting with url=${connect.url}")
webSocketClient.connect(recorderWebSocket, connect.url, ClientUpgradeRequest())
started = true
}

companion object {
val webSocketClient = WebSocketClient().apply {
idleTimeout = WebsocketServiceConfig.config.idleTimeout
start()
}
}
}
102 changes: 102 additions & 0 deletions jvb/src/main/kotlin/org/jitsi/videobridge/export/MediaJsonEncoder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright @ 2024 - Present, 8x8 Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jitsi.videobridge.export

import org.jitsi.mediajson.CustomParameters
import org.jitsi.mediajson.Event
import org.jitsi.mediajson.Media
import org.jitsi.mediajson.MediaEvent
import org.jitsi.mediajson.MediaFormat
import org.jitsi.mediajson.Start
import org.jitsi.mediajson.StartEvent
import org.jitsi.nlj.rtp.AudioRtpPacket
import org.jitsi.nlj.util.Rfc3711IndexTracker
import org.jitsi.rtp.rtp.RtpPacket
import org.jitsi.utils.logging2.createLogger
import java.time.Clock
import java.time.Duration
import kotlin.io.encoding.Base64
import kotlin.io.encoding.ExperimentalEncodingApi

/**
* Encodes the media in a conference into a mediajson format. Maintains state for each SSRC in order to maintain a
* common space for timestamps.
*
* Note we're using a common clock with a rate of 48000 for all SSRCs (that's equivalent to the RTP timestamp for opus).
*/
class MediaJsonEncoder(
/** Encoded mediajson events are sent to this function */
val handleEvent: (Event) -> Unit
) {
val logger = createLogger()
val ref = Clock.systemUTC().instant()

private data class SsrcState(
val ssrc: Long,
val initialRtpTs: Long,
// Offset of this SSRC since the start time in RTP units
val offset: Long
)

private val ssrcsStarted = mutableSetOf<SsrcState>()
var seq = 0

fun encode(p: AudioRtpPacket, epId: String) = synchronized(ssrcsStarted) {
if (ssrcsStarted.none { it.ssrc == p.ssrc }) {
// This is a new SSRC, save it and produce a StartEvent
val state = SsrcState(
p.ssrc,
p.timestamp,
(Duration.between(ref, Clock.systemUTC().instant()).toNanos() * 48.0e-6).toLong()
)
ssrcsStarted.add(state)
val startEvent = StartEvent(
++seq,
Start(
"$epId-${p.ssrc}",
MediaFormat(
"opus",
48000,
2
),
CustomParameters(endpointId = epId)
)
)
handleEvent(startEvent)
}

seq++
handleEvent(p.encodeAsJson(epId))
}

@OptIn(ExperimentalEncodingApi::class)
private fun RtpPacket.encodeAsJson(epId: String): Event {
val ssrcState = ssrcsStarted.find { it.ssrc == this.ssrc }!!
val elapsedRtpTime = this.timestamp - ssrcState.initialRtpTs
val ts = elapsedRtpTime + ssrcState.offset
val indexTracker = Rfc3711IndexTracker()
val p = MediaEvent(
seq,
media = Media(
"$epId-${this.ssrc}",
indexTracker.update(this.sequenceNumber),
ts,
Base64.encode(this.buffer, this.payloadOffset, this.payloadOffset + this.payloadLength)
)
)
return p
}
}
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<kotest.version>5.9.1</kotest.version>
<junit.version>5.10.2</junit.version>
<jitsi.utils.version>1.0-133-g6af1020</jitsi.utils.version>
<jicoco.version>1.1-143-g175c44b</jicoco.version>
<jicoco.version>1.1-144-ga2c5ec1</jicoco.version>
<mockk.version>1.13.11</mockk.version>
<ktlint-maven-plugin.version>3.2.0</ktlint-maven-plugin.version>
<maven-shade-plugin.version>3.6.0</maven-shade-plugin.version>
Expand Down Expand Up @@ -93,6 +93,11 @@
<artifactId>jicoco-metrics</artifactId>
<version>${jicoco.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jicoco-mediajson</artifactId>
<version>${jicoco.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jitsi-utils</artifactId>
Expand All @@ -111,7 +116,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jitsi-xmpp-extensions</artifactId>
<version>1.0-81-g3816e5a</version>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down

0 comments on commit d8c07c0

Please sign in to comment.