Skip to content

Commit

Permalink
chore: add extra logging for by-slice queries (#114)
Browse files Browse the repository at this point in the history
* chore: add extra logging for by-slice queries

* update log name to be more similar to other log prefixes

* set log levels, warn on failure

---------

Co-authored-by: Patrik Nordwall <[email protected]>
  • Loading branch information
pvlugter and patriknw authored Dec 12, 2024
1 parent 1ee98ab commit 8bb8e6d
Showing 1 changed file with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import scala.jdk.FutureConverters._
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.event.Logging
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.Attributes
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.QueryRequest
import software.amazon.awssdk.services.dynamodb.model.QueryResponse

/**
* INTERNAL API
Expand All @@ -48,6 +51,8 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
s"$bySliceWithMetaProjectionExpression, $EventPayload"
}

private val logging = Logging(system.classicSystem, this.getClass.getName)

def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
Expand Down Expand Up @@ -206,16 +211,37 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest

val publisher = client.queryPaginator(req)

def getTimestamp(item: JMap[String, AttributeValue]): Instant =
InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong)

val logName = s"[$entityType] itemsBySlice [$slice] [${if (backtracking) "backtracking" else "query"}]"

def logQueryResponse: QueryResponse => String = response => {
if (response.hasItems && !response.items.isEmpty) {
val items = response.items
val count = items.size
val first = getTimestamp(items.get(0))
val last = getTimestamp(items.get(items.size - 1))
val scanned = response.scannedCount
val hasMore = response.hasLastEvaluatedKey && !response.lastEvaluatedKey.isEmpty
s"query response page with [$count] events between [$first - $last] (scanned [$scanned], has more [$hasMore])"
} else "empty query response page"
}

Source
.fromPublisher(publisher)
// note that this is not logging each item, only the QueryResponse
.log(logName, logQueryResponse)(logging)
.withAttributes(Attributes
.logLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.WarningLevel))
.mapConcat(_.items.iterator.asScala)
.take(settings.querySettings.bufferSize)
.map { item =>
if (backtracking) {
SerializedJournalItem(
persistenceId = item.get(Pid).s(),
seqNr = item.get(SeqNr).n().toLong,
writeTimestamp = InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong),
writeTimestamp = getTimestamp(item),
readTimestamp = InstantFactory.now(),
payload = None, // lazy loaded for backtracking
serId = item.get(EventSerId).n().toInt,
Expand Down

0 comments on commit 8bb8e6d

Please sign in to comment.