Skip to content

Commit

Permalink
Add more metrics
Browse files Browse the repository at this point in the history
* Latest synced height per chain
* Number of fungible tokens
* Number of non-fungible tokens
* Number of events
  • Loading branch information
tdroxler committed Aug 27, 2024
1 parent cdfcc66 commit 152f1f6
Show file tree
Hide file tree
Showing 10 changed files with 853 additions and 254 deletions.
857 changes: 613 additions & 244 deletions app/src/main/resources/grafana/explorer-backend-overview.json

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion app/src/main/scala/org/alephium/explorer/ExplorerState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.typesafe.scalalogging.StrictLogging
import slick.basic.DatabaseConfig
import slick.jdbc.PostgresProfile

import org.alephium.explorer.cache.{BlockCache, TransactionCache}
import org.alephium.explorer.cache.{BlockCache, MetricCache, TransactionCache}
import org.alephium.explorer.config.{BootMode, ExplorerConfig}
import org.alephium.explorer.persistence.Database
import org.alephium.explorer.service._
Expand Down Expand Up @@ -64,6 +64,12 @@ sealed trait ExplorerState extends Service with StrictLogging {
directCliqueAccess = config.directCliqueAccess
)

implicit lazy val metricCache: MetricCache =
new MetricCache(
database,
config.cacheRowCountReloadPeriod
)

override def startSelfOnce(): Future[Unit] = {
Future.unit
}
Expand All @@ -77,6 +83,7 @@ sealed trait ExplorerState extends Service with StrictLogging {
override def subServices: ArraySeq[Service] = {
val writeOnlyServices =
ArraySeq(
metricCache,
transactionCache,
blockFlowClient,
database
Expand Down
4 changes: 3 additions & 1 deletion app/src/main/scala/org/alephium/explorer/SyncServices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import sttp.model.Uri

import org.alephium.api.model.{ChainParams, PeerAddress}
import org.alephium.explorer.RichAVector._
import org.alephium.explorer.cache.BlockCache
import org.alephium.explorer.cache.{BlockCache, MetricCache}
import org.alephium.explorer.config.{BootMode, ExplorerConfig}
import org.alephium.explorer.error.ExplorerError._
import org.alephium.explorer.service._
Expand All @@ -47,6 +47,7 @@ object SyncServices extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
blockCache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting
): Future[Unit] =
config.bootMode match {
Expand Down Expand Up @@ -86,6 +87,7 @@ object SyncServices extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
blockCache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting
): Future[Unit] =
Future.fromTry {
Expand Down
27 changes: 24 additions & 3 deletions app/src/main/scala/org/alephium/explorer/cache/BlockCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.concurrent.duration._
import scala.jdk.FutureConverters._

import com.github.benmanes.caffeine.cache.{AsyncCacheLoader, Caffeine}
import io.prometheus.metrics.core.metrics.Gauge
import slick.basic.DatabaseConfig
import slick.jdbc.PostgresProfile
import slick.jdbc.PostgresProfile.api._
Expand Down Expand Up @@ -60,7 +61,7 @@ object BlockCache {
ec: ExecutionContext,
dc: DatabaseConfig[PostgresProfile]
): BlockCache = {
val groupConfig: GroupConfig = groupSetting.groupConfig
implicit val groupConfig: GroupConfig = groupSetting.groupConfig

/*
* `Option.get` is used to avoid unnecessary memory allocations.
Expand Down Expand Up @@ -125,6 +126,16 @@ object BlockCache {
)
}

val latestBlocksSynced: Gauge = Gauge
.builder()
.name(
"alephimum_explorer_backend_latest_blocks_synced"
)
.help(
"Latest blocks synced per chainindex"
)
.labelNames("chain_from", "chain_to")
.register()
}

/** Cache used by Block queries.
Expand All @@ -135,7 +146,13 @@ class BlockCache(
blockTimes: CaffeineAsyncCache[ChainIndex, Duration],
rowCount: AsyncReloadingCache[Int],
latestBlocks: CaffeineAsyncCache[ChainIndex, LatestBlock]
) {
)(implicit groupConfig: GroupConfig) {

private val latestBlocksSyncedLabeled =
groupConfig.cliqueChainIndexes.map(chainIndex =>
BlockCache.latestBlocksSynced
.labelValues(chainIndex.from.value.toString, chainIndex.to.value.toString)
)

/** Operations on `blockTimes` cache */
def getAllBlockTimes(chainIndexes: Iterable[ChainIndex])(implicit
Expand All @@ -150,8 +167,12 @@ class BlockCache(
): Future[ArraySeq[(ChainIndex, LatestBlock)]] =
latestBlocks.getAll(groupSetting.chainIndexes)

def putLatestBlock(chainIndex: ChainIndex, block: LatestBlock): Unit =
def putLatestBlock(chainIndex: ChainIndex, block: LatestBlock): Unit = {
latestBlocks.put(chainIndex, block)
latestBlocksSyncedLabeled
.get(chainIndex.flattenIndex)
.foreach(_.set(block.height.value.toDouble))
}

def getMainChainBlockCount(): Int =
rowCount.get()
Expand Down
113 changes: 113 additions & 0 deletions app/src/main/scala/org/alephium/explorer/cache/MetricCache.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.cache

import scala.collection.immutable.ArraySeq
import scala.concurrent.{ExecutionContext,Future}
import scala.concurrent.duration._

import io.prometheus.metrics.core.metrics.Gauge

import org.alephium.explorer.persistence.Database
import org.alephium.explorer.persistence.DBRunner._
import org.alephium.explorer.persistence.queries.{EventQueries,TokenQueries}
import org.alephium.util.Service

class MetricCache(database:Database, reloadPeriod: FiniteDuration)(implicit val executionContext:ExecutionContext) extends Service {

private val fungibleCount: AsyncReloadingCache[Int] ={
AsyncReloadingCache(0, reloadPeriod) { _ =>
run(TokenQueries.countFungibles())(database.databaseConfig).map { count =>
MetricCache.fungibleCountGauge.set(count.toDouble)
count
}
}
}

private val nftCount: AsyncReloadingCache[Int] ={
AsyncReloadingCache(0, reloadPeriod) { _ =>
run(TokenQueries.countNFT())(database.databaseConfig).map { count =>
MetricCache.nftCountGauge.set(count.toDouble)
count
}
}
}

private val eventCount: AsyncReloadingCache[Int] ={
AsyncReloadingCache(0, reloadPeriod) { _ =>
run(EventQueries.countEvents())(database.databaseConfig).map { count =>
MetricCache.eventCountGauge.set(count.toDouble)
count
}
}
}

def reloadTokenCountIfOverdue():Unit = {
val _= fungibleCount.get()
val _= nftCount.get()
val _= eventCount.get()
}

def reloadEventCountIfOverdue():Unit = {
val _= fungibleCount.get()
val _= nftCount.get()
val _= eventCount.get()
}

override def startSelfOnce(): Future[Unit] = {
for {
_<- fungibleCount.expireAndReloadFuture().map(_ => ())
_<- nftCount.expireAndReloadFuture().map(_ => ())
_<- eventCount.expireAndReloadFuture().map(_ => ())
} yield ()
}

override def stopSelfOnce(): Future[Unit] = {
Future.unit
}

override def subServices: ArraySeq[Service] = ArraySeq(database)
}
object MetricCache {
val fungibleCountGauge: Gauge = Gauge
.builder()
.name(
"alephimum_explorer_backend_fungible_count"
).help(
"Number of fungible tokens in the system"
)
.register()

val nftCountGauge: Gauge = Gauge
.builder()
.name(
"alephimum_explorer_backend_nft_count"
).help(
"Number of NFT in the system"
)
.register()

val eventCountGauge: Gauge = Gauge
.builder()
.name(
"alephimum_explorer_backend_event_count"
).help(
"Number of events in the system"
)
.register()

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.alephium.explorer.persistence.queries

import scala.concurrent.ExecutionContext

import slick.jdbc.{PositionedParameters, SetParameter, SQLActionBuilder}
import slick.jdbc.PostgresProfile.api._

Expand Down Expand Up @@ -98,4 +100,13 @@ object EventQueries {
.paginate(pagination)
.asASE[EventEntity](eventGetResult)
}

def countEvents()(implicit
ec: ExecutionContext
): DBActionR[Int] = {
sql"""
SELECT count(*)
FROM events
""".asAS[Int].exactlyOne
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,4 +356,24 @@ object TokenQueries extends StrictLogging {
WHERE token = $token
"""
}

def countFungibles()(implicit
ec: ExecutionContext
): DBActionR[Int] = {
sql"""
SELECT count(*)
FROM token_info
WHERE interface_id = '0001'
""".asAS[Int].exactlyOne
}

def countNFT()(implicit
ec: ExecutionContext
): DBActionR[Int] = {
sql"""
SELECT count(*)
FROM token_info
WHERE interface_id = '0003' OR interface_id = '000301'
""".asAS[Int].exactlyOne
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import sttp.model.Uri

import org.alephium.explorer.{foldFutures, GroupSetting}
import org.alephium.explorer.api.model.Height
import org.alephium.explorer.cache.BlockCache
import org.alephium.explorer.cache.{BlockCache, MetricCache}
import org.alephium.explorer.error.ExplorerError.BlocksInDifferentChains
import org.alephium.explorer.persistence.DBRunner._
import org.alephium.explorer.persistence.dao.BlockDao
Expand Down Expand Up @@ -73,6 +73,7 @@ case object BlockFlowSyncService extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
cache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting,
scheduler: Scheduler
): Future[Unit] =
Expand All @@ -88,6 +89,7 @@ case object BlockFlowSyncService extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
cache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting
): Future[Unit] = {
if (initialBackStepDone.get()) {
Expand All @@ -105,6 +107,7 @@ case object BlockFlowSyncService extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
cache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting
): Future[Unit] = {
logger.debug("Start syncing")
Expand Down Expand Up @@ -144,6 +147,7 @@ case object BlockFlowSyncService extends StrictLogging {
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
cache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting
): Future[Int] = {
blockFlowClient.fetchBlocks(from, to, uri).flatMap { multiChain =>
Expand Down Expand Up @@ -355,18 +359,33 @@ case object BlockFlowSyncService extends StrictLogging {
}
}

private def reloadMetricCache(blocks: ArraySeq[BlockEntityWithEvents])(implicit
metricCache: MetricCache
): Unit = {
if (blocks.map(_.block).exists(_.outputs.exists(_.tokens.map(_.nonEmpty).getOrElse(false)))) {
metricCache.reloadTokenCountIfOverdue()
}
if (blocks.flatMap(_.events).nonEmpty) {
metricCache.reloadEventCountIfOverdue()
}
}

private def insertBlocks(blocksWithEvents: ArraySeq[BlockEntityWithEvents])(implicit
ec: ExecutionContext,
dc: DatabaseConfig[PostgresProfile],
blockFlowClient: BlockFlowClient,
cache: BlockCache,
metricCache: MetricCache,
groupSetting: GroupSetting
): Future[Int] = {
if (blocksWithEvents.nonEmpty) {
for {
_ <- foldFutures(blocksWithEvents)(insertWithEvents)
_ <- BlockDao.updateLatestBlock(blocksWithEvents.last.block)
} yield blocksWithEvents.size
} yield {
reloadMetricCache(blocksWithEvents)
blocksWithEvents.size
}
} else {
Future.successful(0)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2018 The Alephium Authors
// This file is part of the alephium project.
//
// The library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the library. If not, see <http://www.gnu.org/licenses/>.

package org.alephium.explorer.cache

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

import org.alephium.explorer.persistence.Database

object TestMetricCache {

def apply(database: Database)(implicit
ec: ExecutionContext
): MetricCache =
new MetricCache(
database= database,
reloadPeriod = 1.second
)

}

Loading

0 comments on commit 152f1f6

Please sign in to comment.