Skip to content

Commit

Permalink
EventsBySliceBacktrackingSpec
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed May 29, 2024
1 parent 61c8a5b commit 6f7aa69
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 4 deletions.
2 changes: 1 addition & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ akka.persistence.dynamodb {
}

# In-memory buffer holding events when reading from database.
buffer-size = 1000
buffer-size = 100
}
}
// #query-settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package akka.persistence.dynamodb.internal

import java.time.Instant
import java.util.{ HashMap => JHashMap }

import scala.concurrent.ExecutionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest

val entityTypeSlice = s"$entityType-$slice"

// FIXME we could look into using response.lastEvaluatedKey and use that as exclusiveStartKey in query,
// instead of the timestamp for subsequent queries. Not sure how that works with GSI where the
// sort key isn't unique (same timestamp). If DynamoDB can keep track of the exact offset and
// not emit duplicates would not need the seen Map and that filter.
// Well, we still need it for the first query because we want the external offset to be TimestampOffset
// and that can include seen Map.

val expressionAttributeValues =
Map(
":entityTypeSlice" -> AttributeValue.fromS(entityTypeSlice),
Expand All @@ -102,6 +109,10 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
.keyConditionExpression(s"$EntityTypeSlice = :entityTypeSlice AND $Timestamp BETWEEN :from AND :to")
.filterExpression(s"attribute_not_exists($Deleted)")
.expressionAttributeValues(expressionAttributeValues)
// Limit won't limit the number of results you get with the paginator.
// It only limits the number of results in each page
// Limit is ignored by local DynamoDB.
.limit(settings.querySettings.bufferSize)
.build()

// FIXME for backtracking we don't need all attributes, can be filtered with builder.attributesToGet
Expand Down
15 changes: 15 additions & 0 deletions core/src/test/scala/akka/persistence/dynamodb/TestData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@

package akka.persistence.dynamodb

import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.annotation.tailrec

import akka.actor.typed.ActorSystem
import akka.persistence.Persistence
import akka.persistence.typed.PersistenceId

object TestData {
Expand All @@ -18,11 +23,21 @@ trait TestData {
import TestData.pidCounter
import TestData.entityTypeCounter

def typedSystem: ActorSystem[_]

private lazy val persistenceExt = Persistence(typedSystem)

def nextPid(): String = s"p-${pidCounter.incrementAndGet()}"

def nextEntityType(): String = s"TestEntity-${entityTypeCounter.incrementAndGet()}"

def nextPersistenceId(entityType: String): PersistenceId =
PersistenceId.of(entityType, s"${pidCounter.incrementAndGet()}")

@tailrec final def randomPersistenceIdForSlice(entityType: String, slice: Int): PersistenceId = {
val p = PersistenceId.of(entityType, UUID.randomUUID().toString)
if (persistenceExt.sliceForPersistenceId(p.id) == slice) p
else randomPersistenceIdForSlice(entityType, slice)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.dynamodb

import java.time.Instant
import java.util.concurrent.CompletionException

import scala.concurrent.Await
Expand All @@ -18,11 +19,25 @@ import scala.util.control.NonFatal
import akka.Done
import akka.actor.typed.ActorSystem
import akka.persistence.Persistence
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.dynamodb.internal.JournalAttributes
import akka.persistence.dynamodb.internal.JournalAttributes.EntityTypeSlice
import akka.persistence.dynamodb.internal.JournalAttributes.EventPayload
import akka.persistence.dynamodb.internal.JournalAttributes.EventSerId
import akka.persistence.dynamodb.internal.JournalAttributes.EventSerManifest
import akka.persistence.dynamodb.internal.JournalAttributes.Pid
import akka.persistence.dynamodb.internal.JournalAttributes.SeqNr
import akka.persistence.dynamodb.internal.JournalAttributes.Timestamp
import akka.persistence.dynamodb.internal.JournalAttributes.Writer
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition
import software.amazon.awssdk.services.dynamodb.model.CreateGlobalSecondaryIndexAction
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest
Expand All @@ -32,9 +47,9 @@ import software.amazon.awssdk.services.dynamodb.model.KeyType
import software.amazon.awssdk.services.dynamodb.model.Projection
import software.amazon.awssdk.services.dynamodb.model.ProjectionType
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType
import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest

trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>

Expand All @@ -49,6 +64,8 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>

lazy val client: DynamoDbAsyncClient = ClientProvider(typedSystem).clientFor(testConfigPath + ".client")

private lazy val log = LoggerFactory.getLogger(getClass)

override protected def beforeAll(): Unit = {
try {
Await.result(createJournalTable(), 10.seconds)
Expand Down Expand Up @@ -115,4 +132,32 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
}
}

// to be able to store events with specific timestamps
def writeEvent(slice: Int, persistenceId: PersistenceId, seqNr: Long, timestamp: Instant, event: String): Unit = {
import java.util.{ HashMap => JHashMap }
import JournalAttributes._

log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, seqNr, event, timestamp)

val stringSerializer = SerializationExtension(typedSystem).serializerFor(classOf[String])

val attributes = new JHashMap[String, AttributeValue]
attributes.put(Pid, AttributeValue.fromS(persistenceId.id))
attributes.put(SeqNr, AttributeValue.fromN(seqNr.toString))
attributes.put(EntityTypeSlice, AttributeValue.fromS(s"${persistenceId.entityTypeHint}-$slice"))
val timestampMicros = InstantFactory.toEpochMicros(timestamp)
attributes.put(Timestamp, AttributeValue.fromN(timestampMicros.toString))
attributes.put(EventSerId, AttributeValue.fromN(stringSerializer.identifier.toString))
attributes.put(EventSerManifest, AttributeValue.fromS(""))
attributes.put(EventPayload, AttributeValue.fromB(SdkBytes.fromByteArray(stringSerializer.toBinary(event))))
attributes.put(Writer, AttributeValue.fromS(""))

val req = PutItemRequest
.builder()
.tableName(settings.journalTable)
.item(attributes)
.build()
Await.result(client.putItem(req).asScala, 10.seconds)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.query

import scala.concurrent.duration._

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorSystem
import akka.persistence.dynamodb.TestConfig
import akka.persistence.dynamodb.TestData
import akka.persistence.dynamodb.TestDbLifecycle
import akka.persistence.dynamodb.internal.EnvelopeOrigin
import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
import akka.persistence.query.NoOffset
import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.typed.PersistenceId
import akka.serialization.SerializationExtension
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory

object EventsBySliceBacktrackingSpec {
private val BufferSize = 10 // small buffer for testing

private val config = ConfigFactory
.parseString(s"""
akka.persistence.dynamodb.journal.publish-events = off
akka.persistence.dynamodb.query {
refresh-interval = 1s
buffer-size = $BufferSize
}
""")
.withFallback(TestConfig.config)
}

class EventsBySliceBacktrackingSpec
extends ScalaTestWithActorTestKit(EventsBySliceBacktrackingSpec.config)
with AnyWordSpecLike
with TestDbLifecycle
with TestData
with LogCapturing {

override def typedSystem: ActorSystem[_] = system

private val query = PersistenceQuery(testKit.system)
.readJournalFor[DynamoDBReadJournal](DynamoDBReadJournal.Identifier)
private val stringSerializer = SerializationExtension(system).serializerFor(classOf[String])
private val log = LoggerFactory.getLogger(getClass)

"eventsBySlices backtracking" should {

"find old events with earlier timestamp" in {
// this scenario is handled by the backtracking query
val entityType = nextEntityType()
val pid1 = nextPersistenceId(entityType)
val slice = query.sliceForPersistenceId(pid1.id)
val pid2 = randomPersistenceIdForSlice(entityType, slice)
val sinkProbe = TestSink[EventEnvelope[String]]()(system.classicSystem)

// don't let behind-current-time be a reason for not finding events
val startTime = InstantFactory.now().minusSeconds(10 * 60)

writeEvent(slice, pid1, 1L, startTime, "e1-1")
writeEvent(slice, pid1, 2L, startTime.plusMillis(1), "e1-2")

val result: TestSubscriber.Probe[EventEnvelope[String]] =
query
.eventsBySlices[String](entityType, slice, slice, NoOffset)
.runWith(sinkProbe)
.request(100)

val env1 = result.expectNext()
env1.persistenceId shouldBe pid1.id
env1.sequenceNr shouldBe 1L
env1.eventOption shouldBe Some("e1-1")
env1.source shouldBe EnvelopeOrigin.SourceQuery

val env2 = result.expectNext()
env2.persistenceId shouldBe pid1.id
env2.sequenceNr shouldBe 2L
env2.eventOption shouldBe Some("e1-2")
env2.source shouldBe EnvelopeOrigin.SourceQuery

// first backtracking query kicks in immediately after the first normal query has finished
// and it also emits duplicates (by design)
val env3 = result.expectNext()
env3.persistenceId shouldBe pid1.id
env3.sequenceNr shouldBe 1L
env3.source shouldBe EnvelopeOrigin.SourceBacktracking
// event payload isn't included in backtracking results
env3.eventOption shouldBe None
// but it can be lazy loaded
// FIXME query.loadEnvelope[String](env3.persistenceId, env3.sequenceNr).futureValue.eventOption shouldBe Some("e1-1")
// backtracking up to (and equal to) the same offset
val env4 = result.expectNext()
env4.persistenceId shouldBe pid1.id
env4.sequenceNr shouldBe 2L
env4.eventOption shouldBe None

result.expectNoMessage(100.millis) // not e1-2

writeEvent(slice, pid1, 3L, startTime.plusMillis(3), "e1-3")
val env5 = result.expectNext()
env5.persistenceId shouldBe pid1.id
env5.sequenceNr shouldBe 3L

// before e1-3 so it will not be found by the normal query
writeEvent(slice, pid2, 1L, startTime.plusMillis(2), "e2-1")

// no backtracking yet
result.expectNoMessage(settings.querySettings.refreshInterval + 100.millis)

// after 1/2 of the backtracking widow, to kick off a backtracking query
writeEvent(
slice,
pid1,
4L,
startTime.plusMillis(settings.querySettings.backtrackingWindow.toMillis / 2).plusMillis(4),
"e1-4")
val env6 = result.expectNext()
env6.persistenceId shouldBe pid1.id
env6.sequenceNr shouldBe 4L

// backtracking finds it, and it also emits duplicates (by design)
// e1-1 and e1-2 were already handled by previous backtracking query
val env7 = result.expectNext()
env7.persistenceId shouldBe pid2.id
env7.sequenceNr shouldBe 1L

val env8 = result.expectNext()
env8.persistenceId shouldBe pid1.id
env8.sequenceNr shouldBe 3L

val env9 = result.expectNext()
env9.persistenceId shouldBe pid1.id
env9.sequenceNr shouldBe 4L

result.cancel()
}

"emit from backtracking after first normal query" in {
val entityType = nextEntityType()
val pid1 = nextPersistenceId(entityType)
val slice = query.sliceForPersistenceId(pid1.id)
val pid2 = randomPersistenceIdForSlice(entityType, slice)
val sinkProbe = TestSink.probe[EventEnvelope[String]](system.classicSystem)

// don't let behind-current-time be a reason for not finding events
val startTime = InstantFactory.now().minusSeconds(10 * 60)

writeEvent(slice, pid1, 1L, startTime, "e1-1")
writeEvent(slice, pid1, 2L, startTime.plusMillis(2), "e1-2")
writeEvent(slice, pid1, 3L, startTime.plusMillis(4), "e1-3")

def startQuery(offset: Offset): TestSubscriber.Probe[EventEnvelope[String]] =
query
.eventsBySlices[String](entityType, slice, slice, offset)
.runWith(sinkProbe)
.request(100)

def expect(env: EventEnvelope[String], pid: PersistenceId, seqNr: Long, eventOption: Option[String]): Offset = {
env.persistenceId shouldBe pid.id
env.sequenceNr shouldBe seqNr
env.eventOption shouldBe eventOption
env.offset
}

val result1 = startQuery(NoOffset)
expect(result1.expectNext(), pid1, 1L, Some("e1-1"))
expect(result1.expectNext(), pid1, 2L, Some("e1-2"))
expect(result1.expectNext(), pid1, 3L, Some("e1-3"))

// first backtracking query kicks in immediately after the first normal query has finished
// and it also emits duplicates (by design)
expect(result1.expectNext(), pid1, 1L, None)
expect(result1.expectNext(), pid1, 2L, None)
val offset1 = expect(result1.expectNext(), pid1, 3L, None)
result1.cancel()

// write delayed events from pid2
writeEvent(slice, pid2, 1L, startTime.plusMillis(1), "e2-1")
writeEvent(slice, pid2, 2L, startTime.plusMillis(3), "e2-2")
writeEvent(slice, pid2, 3L, startTime.plusMillis(5), "e2-3")

val result2 = startQuery(offset1)
// backtracking
expect(result2.expectNext(), pid1, 1L, None)
expect(result2.expectNext(), pid2, 1L, None)
expect(result2.expectNext(), pid1, 2L, None)
expect(result2.expectNext(), pid2, 2L, None)
expect(result2.expectNext(), pid1, 3L, None)
// from normal query
expect(result2.expectNext(), pid2, 3L, Some("e2-3"))

result2.cancel()
}

}

}
Loading

0 comments on commit 6f7aa69

Please sign in to comment.