Skip to content

Commit

Permalink
feat: Add prometheus metrics. (#540)
Browse files Browse the repository at this point in the history
* ref: Move stats config to a separate file.

* ref: Add an abstraction layer in front of statsd.

* ref: Move statsd code under metrics, make private.

* Add a prometheus interface at /metrics.

* feat: Add "healthy" and "recording" metrics.

* ref: Rename a function for clarity.
  • Loading branch information
bgrozev authored Mar 12, 2024
1 parent df5bfaa commit 03d63f3
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 142 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@
<version>1.2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jitsi</groupId>
<artifactId>jicoco-metrics</artifactId>
<version>1.1-133-g768ef2e</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
55 changes: 19 additions & 36 deletions src/main/kotlin/org/jitsi/jibri/JibriManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.jitsi.jibri
import org.jitsi.jibri.config.Config
import org.jitsi.jibri.config.XmppCredentials
import org.jitsi.jibri.health.EnvironmentContext
import org.jitsi.jibri.metrics.JibriMetrics
import org.jitsi.jibri.selenium.CallParams
import org.jitsi.jibri.service.JibriService
import org.jitsi.jibri.service.JibriServiceStatusHandler
Expand All @@ -30,14 +31,6 @@ import org.jitsi.jibri.service.impl.SipGatewayJibriService
import org.jitsi.jibri.service.impl.SipGatewayServiceParams
import org.jitsi.jibri.service.impl.StreamingJibriService
import org.jitsi.jibri.service.impl.StreamingParams
import org.jitsi.jibri.statsd.ASPECT_BUSY
import org.jitsi.jibri.statsd.ASPECT_ERROR
import org.jitsi.jibri.statsd.ASPECT_START
import org.jitsi.jibri.statsd.ASPECT_STOP
import org.jitsi.jibri.statsd.JibriStatsDClient
import org.jitsi.jibri.statsd.TAG_SERVICE_LIVE_STREAM
import org.jitsi.jibri.statsd.TAG_SERVICE_RECORDING
import org.jitsi.jibri.statsd.TAG_SERVICE_SIP_GATEWAY
import org.jitsi.jibri.status.ComponentBusyStatus
import org.jitsi.jibri.status.ComponentHealthStatus
import org.jitsi.jibri.status.ComponentState
Expand Down Expand Up @@ -99,40 +92,23 @@ class JibriManager : StatusPublisher<Any>() {
private var pendingIdleFunc: () -> Unit = {}
private var serviceTimeoutTask: ScheduledFuture<*>? = null

private val enableStatsD: Boolean by config {
"JibriConfig::enableStatsD" { Config.legacyConfigSource.enabledStatsD!! }
"jibri.stats.enable-stats-d".from(Config.configSource)
}

private val statsdHost: String by config {
"jibri.stats.host".from(Config.configSource)
}

private val statsdPort: Int by config {
"jibri.stats.port".from(Config.configSource)
}

private val singleUseMode: Boolean by config {
"JibriConfig::singleUseMode" { Config.legacyConfigSource.singleUseMode!! }
"jibri.single-use-mode".from(Config.configSource)
}

val statsDClient: JibriStatsDClient? = if (enableStatsD) {
JibriStatsDClient(statsdHost, statsdPort)
} else {
null
}
val jibriMetrics = JibriMetrics()

/**
* Note: should only be called if the instance-wide lock is held (i.e. called from
* one of the synchronized methods)
* TODO: instead of the synchronized decorators, use a synchronized(this) block
* which we can also use here
*/
private fun throwIfBusy() {
private fun throwIfBusy(sinkType: RecordingSinkType) {
if (busy()) {
logger.info("Jibri is busy, can't start service")
statsDClient?.incrementCounter(ASPECT_BUSY, TAG_SERVICE_RECORDING)
jibriMetrics.requestWhileBusy(sinkType)
throw JibriBusyException()
}
}
Expand All @@ -148,7 +124,7 @@ class JibriManager : StatusPublisher<Any>() {
environmentContext: EnvironmentContext? = null,
serviceStatusHandler: JibriServiceStatusHandler? = null
) {
throwIfBusy()
throwIfBusy(RecordingSinkType.FILE)
logger.info("Starting a file recording with params: $fileRecordingRequestParams")
val service = FileRecordingJibriService(
FileRecordingParams(
Expand All @@ -158,7 +134,7 @@ class JibriManager : StatusPublisher<Any>() {
serviceParams.appData?.fileRecordingMetadata
)
)
statsDClient?.incrementCounter(ASPECT_START, TAG_SERVICE_RECORDING)
jibriMetrics.start(RecordingSinkType.FILE)
startService(service, serviceParams, environmentContext, serviceStatusHandler)
}

Expand All @@ -174,9 +150,9 @@ class JibriManager : StatusPublisher<Any>() {
serviceStatusHandler: JibriServiceStatusHandler? = null
) {
logger.info("Starting a stream with params: $serviceParams $streamingParams")
throwIfBusy()
throwIfBusy(RecordingSinkType.STREAM)
val service = StreamingJibriService(streamingParams)
statsDClient?.incrementCounter(ASPECT_START, TAG_SERVICE_LIVE_STREAM)
jibriMetrics.start(RecordingSinkType.STREAM)
startService(service, serviceParams, environmentContext, serviceStatusHandler)
}

Expand All @@ -188,15 +164,15 @@ class JibriManager : StatusPublisher<Any>() {
serviceStatusHandler: JibriServiceStatusHandler? = null
) {
logger.info("Starting a SIP gateway with params: $serviceParams $sipGatewayServiceParams")
throwIfBusy()
throwIfBusy(RecordingSinkType.GATEWAY)
val service = SipGatewayJibriService(
SipGatewayServiceParams(
sipGatewayServiceParams.callParams,
sipGatewayServiceParams.callLoginParams,
sipGatewayServiceParams.sipClientParams
)
)
statsDClient?.incrementCounter(ASPECT_START, TAG_SERVICE_SIP_GATEWAY)
jibriMetrics.start(RecordingSinkType.GATEWAY)
return startService(service, serviceParams, environmentContext, serviceStatusHandler)
}

Expand All @@ -219,7 +195,7 @@ class JibriManager : StatusPublisher<Any>() {
when (it) {
is ComponentState.Error -> {
if (it.error.scope == ErrorScope.SYSTEM) {
statsDClient?.incrementCounter(ASPECT_ERROR, JibriStatsDClient.getTagForService(jibriService))
jibriMetrics.error(jibriService.getSinkType())
publishStatus(ComponentHealthStatus.UNHEALTHY)
}
stopService()
Expand Down Expand Up @@ -270,7 +246,7 @@ class JibriManager : StatusPublisher<Any>() {
logger.info("No service active, ignoring stop")
return
}
statsDClient?.incrementCounter(ASPECT_STOP, JibriStatsDClient.getTagForService(currentService))
jibriMetrics.stop(currentService.getSinkType())
logger.info("Stopping the current service")
serviceTimeoutTask?.cancel(false)
// Note that this will block until the service is completely stopped
Expand Down Expand Up @@ -309,3 +285,10 @@ class JibriManager : StatusPublisher<Any>() {
}
}
}

private fun JibriService.getSinkType() = when (this) {
is FileRecordingJibriService -> RecordingSinkType.FILE
is StreamingJibriService -> RecordingSinkType.GATEWAY
is SipGatewayJibriService -> RecordingSinkType.GATEWAY
else -> throw IllegalArgumentException("JibriService of unsupported type: ${JibriService::class.java.name}")
}
6 changes: 4 additions & 2 deletions src/main/kotlin/org/jitsi/jibri/Main.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ fun main(args: Array<String>) {
jibriStatusManager.addStatusHandler {
webhookClient.updateStatus(it)
}
jibriStatusManager.addStatusHandler {
jibriManager.jibriMetrics.updateStatus(it)
}
webhookSubscribers.forEach(webhookClient::addSubscriber)
val statusUpdaterTask = TaskPools.recurringTasksPool.scheduleAtFixedRate(
1,
Expand Down Expand Up @@ -144,8 +147,7 @@ fun main(args: Array<String>) {
val xmppApi = XmppApi(
jibriManager = jibriManager,
xmppConfigs = xmppEnvironments,
jibriStatusManager = jibriStatusManager,
jibriManager.statsDClient
jibriStatusManager = jibriStatusManager
)
xmppApi.start()

Expand Down
30 changes: 30 additions & 0 deletions src/main/kotlin/org/jitsi/jibri/api/http/HttpApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.jitsi.jibri.api.http

import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.serialization.jackson.jackson
import io.ktor.server.application.Application
Expand All @@ -24,10 +25,12 @@ import io.ktor.server.application.install
import io.ktor.server.plugins.contentnegotiation.ContentNegotiation
import io.ktor.server.request.receive
import io.ktor.server.response.respond
import io.ktor.server.response.respondText
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.routing.route
import io.ktor.server.routing.routing
import io.prometheus.client.exporter.common.TextFormat
import jakarta.ws.rs.core.Response
import org.jitsi.jibri.FileRecordingRequestParams
import org.jitsi.jibri.JibriBusyException
Expand All @@ -36,6 +39,8 @@ import org.jitsi.jibri.RecordingSinkType
import org.jitsi.jibri.config.Config
import org.jitsi.jibri.config.XmppCredentials
import org.jitsi.jibri.health.JibriHealth
import org.jitsi.jibri.metrics.JibriMetricsContainer
import org.jitsi.jibri.metrics.StatsConfig
import org.jitsi.jibri.selenium.CallParams
import org.jitsi.jibri.service.JibriServiceStatusHandler
import org.jitsi.jibri.service.ServiceParams
Expand Down Expand Up @@ -130,6 +135,31 @@ class HttpApi(
call.respond(HttpStatusCode.OK)
}
}
if (StatsConfig.enablePrometheus) {
logger.info("Enabling prometheus interface at :$port/metrics")
get("/metrics") {
val accept = call.request.headers["Accept"]
when {
accept?.startsWith("application/openmetrics-text") == true ->
call.respondText(
JibriMetricsContainer.getPrometheusMetrics(TextFormat.CONTENT_TYPE_OPENMETRICS_100),
contentType = ContentType.parse(TextFormat.CONTENT_TYPE_OPENMETRICS_100)
)

accept?.startsWith("text/plain") == true ->
call.respondText(
JibriMetricsContainer.getPrometheusMetrics(TextFormat.CONTENT_TYPE_004),
contentType = ContentType.parse(TextFormat.CONTENT_TYPE_004)
)

else ->
call.respondText(
JibriMetricsContainer.jsonString,
contentType = ContentType.parse("application/json")
)
}
}
}
}
}

Expand Down
23 changes: 7 additions & 16 deletions src/main/kotlin/org/jitsi/jibri/api/xmpp/XmppApi.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ import org.jitsi.jibri.service.impl.SipGatewayServiceParams
import org.jitsi.jibri.service.impl.StreamingParams
import org.jitsi.jibri.service.impl.YOUTUBE_URL
import org.jitsi.jibri.sipgateway.SipClientParams
import org.jitsi.jibri.statsd.JibriStatsDClient
import org.jitsi.jibri.statsd.STOPPED_ON_XMPP_CLOSED
import org.jitsi.jibri.statsd.XMPP_CLOSED
import org.jitsi.jibri.statsd.XMPP_CLOSED_ON_ERROR
import org.jitsi.jibri.statsd.XMPP_CONNECTED
import org.jitsi.jibri.statsd.XMPP_PING_FAILED
import org.jitsi.jibri.statsd.XMPP_RECONNECTING
import org.jitsi.jibri.statsd.XMPP_RECONNECTION_FAILED
import org.jitsi.jibri.status.ComponentState
import org.jitsi.jibri.status.JibriStatus
import org.jitsi.jibri.status.JibriStatusManager
Expand Down Expand Up @@ -78,22 +70,21 @@ class XmppApi(
private val jibriManager: JibriManager,
private val xmppConfigs: List<XmppEnvironmentConfig>,
private val jibriStatusManager: JibriStatusManager,
private val statsDClient: JibriStatsDClient? = null
) : IQListener {
private val logger = createLogger()

private val connectionStateListener = object : ConnectionStateListener {
override fun connected(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_CONNECTED, mucClient.tags())
jibriManager.jibriMetrics.xmppConnected(mucClient.tags())
}
override fun reconnecting(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_RECONNECTING, mucClient.tags())
jibriManager.jibriMetrics.xmppReconnecting(mucClient.tags())
}
override fun reconnectionFailed(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_RECONNECTION_FAILED, mucClient.tags())
jibriManager.jibriMetrics.xmppReconnectionFailed(mucClient.tags())
}
override fun pingFailed(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_PING_FAILED, mucClient.tags())
jibriManager.jibriMetrics.xmppPingFailed(mucClient.tags())
}

/**
Expand All @@ -102,7 +93,7 @@ class XmppApi(
* recording is stopped.
*/
override fun closed(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_CLOSED, mucClient.tags())
jibriManager.jibriMetrics.xmppClosed(mucClient.tags())
maybeStop(mucClient)
}

Expand All @@ -112,7 +103,7 @@ class XmppApi(
* recording is stopped.
*/
override fun closedOnError(mucClient: MucClient) {
statsDClient?.incrementCounter(XMPP_CLOSED_ON_ERROR, mucClient.tags())
jibriManager.jibriMetrics.xmppClosedOnError(mucClient.tags())
maybeStop(mucClient)
}

Expand All @@ -121,7 +112,7 @@ class XmppApi(
val environmentContext = createEnvironmentContext(xmppEnvironment, mucClient)
if (jibriManager.currentEnvironmentContext == environmentContext) {
logger.warn("XMPP disconnected, stopping.")
statsDClient?.incrementCounter(STOPPED_ON_XMPP_CLOSED, mucClient.tags())
jibriManager.jibriMetrics.stoppedOnXmppClosed(mucClient.tags())
jibriManager.stopService()
}
}
Expand Down
Loading

0 comments on commit 03d63f3

Please sign in to comment.