Skip to content


feat/fix: skip backtracking when far behind, accept pubsub events aft…
Browse files Browse the repository at this point in the history
…er idle (#96)

* feat/fix: skip backtracking when far behind, accept pubsub events after idle
* docker-compose fix and actually request what we're expecting
* pubsub backtracking spec
  • Loading branch information
leviramsey authored Nov 15, 2024
1 parent c33f655 commit 600a2a0
Show file tree
Hide file tree
Showing 7 changed files with 516 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.dynamodb.internal

import java.time.Clock
import java.time.Instant
import java.time.{ Duration => JDuration }

Expand All @@ -26,7 +27,22 @@ import org.slf4j.Logger

object QueryState {
val empty: QueryState =
QueryState(TimestampOffset.Zero, 0, 0, 0, 0, backtrackingCount = 0, TimestampOffset.Zero, 0, 0)
latest = TimestampOffset.Zero,
itemCount = 0,
itemCountSinceBacktracking = 0,
queryCount = 0,
idleCount = 0,
backtrackingCount = 0,
latestBacktracking = TimestampOffset.Zero,
latestBacktrackingSeenCount = 0,
backtrackingExpectFiltered = 0,
previous = TimestampOffset.Zero,
previousBacktracking = TimestampOffset.Zero,
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)

final case class QueryState(
Expand All @@ -38,17 +54,26 @@ import org.slf4j.Logger
backtrackingCount: Int,
latestBacktracking: TimestampOffset,
latestBacktrackingSeenCount: Int,
backtrackingExpectFiltered: Int) {
backtrackingExpectFiltered: Int,
previous: TimestampOffset,
previousBacktracking: TimestampOffset,
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {

def backtracking: Boolean = backtrackingCount > 0

def currentOffset: TimestampOffset =
if (backtracking) latestBacktracking
else latest

def nextQueryFromTimestamp: Instant =
if (backtracking) latestBacktracking.timestamp
else latest.timestamp
def nextQueryFromTimestamp(backtrackingWindow: JDuration): Instant =
if (backtracking) {
if (latest.timestamp.minus(backtrackingWindow).isAfter(latestBacktracking.timestamp))
else latestBacktracking.timestamp
} else latest.timestamp

def nextQueryToTimestamp: Option[Instant] = {
if (backtracking) Some(latest.timestamp)
Expand Down Expand Up @@ -81,15 +106,18 @@ import org.slf4j.Logger
dao: BySliceQuery.Dao[Item],
createEnvelope: (TimestampOffset, Item) => Envelope,
extractOffset: Envelope => TimestampOffset,
createHeartbeat: Instant => Option[Envelope],
clock: Clock,
settings: DynamoDBSettings,
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
import TimestampOffset.toTimestampOffset

private val backtrackingWindow = JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
private val firstBacktrackingQueryWindow =
private val backtrackingBehindCurrentTime =
private val firstBacktrackingQueryWindow =

def currentBySlice(
logPrefix: String,
Expand All @@ -99,15 +127,17 @@ import org.slf4j.Logger
filterEventsBeforeSnapshots: (String, Long, String) => Boolean = (_, _, _) => true): Source[Envelope, NotUsed] = {
val initialOffset = toTimestampOffset(offset)

def nextOffset(state: QueryState, envelope: Envelope): QueryState =
state.copy(latest = extractOffset(envelope), itemCount = state.itemCount + 1)
def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state
else state.copy(latest = extractOffset(envelope), itemCount = state.itemCount + 1)

def nextQuery(state: QueryState, endTimestamp: Instant): (QueryState, Option[Source[Envelope, NotUsed]]) = {
// Note that we can't know how many events with the same timestamp that are filtered out
// so continue until itemCount is 0. That means an extra query at the end to make sure there are no
// more to fetch.
if (state.queryCount == 0L || state.itemCount > 0) {
val newState = state.copy(itemCount = 0, queryCount = state.queryCount + 1)
val newState = state.copy(itemCount = 0, queryCount = state.queryCount + 1, previous = state.latest)

val toTimestamp = newState.nextQueryToTimestamp match {
case Some(t) =>
Expand Down Expand Up @@ -176,41 +206,45 @@ import org.slf4j.Logger
log.debug("Starting {} query from slice [{}], from time [{}].", logPrefix, slice, initialOffset.timestamp)

def nextOffset(state: QueryState, envelope: Envelope): QueryState = {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")
if (EnvelopeOrigin.isHeartbeatEvent(envelope)) state
else {
val offset = extractOffset(envelope)
if (state.backtracking) {
if (offset.timestamp.isBefore(state.latestBacktracking.timestamp))
throw new IllegalArgumentException(
s"Unexpected offset [$offset] before latestBacktracking [${state.latestBacktracking}].")

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1
else 1

val newSeenCount =
if (offset.timestamp == state.latestBacktracking.timestamp) state.latestBacktrackingSeenCount + 1 else 1

latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
itemCount = state.itemCount + 1)
latestBacktracking = offset,
latestBacktrackingSeenCount = newSeenCount,
itemCount = state.itemCount + 1)

} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")
} else {
if (offset.timestamp.isBefore(state.latest.timestamp))
throw new IllegalArgumentException(s"Unexpected offset [$offset] before latest [${state.latest}].")

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",

if (log.isDebugEnabled()) {
if (state.latestBacktracking.seen.nonEmpty &&
"{} next offset is outside the backtracking window, latestBacktracking: [{}], offset: [{}]",
state.copy(latest = offset, itemCount = state.itemCount + 1)

state.copy(latest = offset, itemCount = state.itemCount + 1)

def delayNextQuery(state: QueryState): Option[FiniteDuration] = {
if (switchFromBacktracking(state)) {
// switch from from backtracking immediately
// switch from backtracking immediately
} else {
val delay = ContinuousQuery.adjustNextDelay(
Expand All @@ -236,20 +270,44 @@ import org.slf4j.Logger
state.backtracking && state.itemCount < settings.querySettings.bufferSize - state.backtrackingExpectFiltered

def switchToBacktracking(state: QueryState, newIdleCount: Long): Boolean = {
// Note that when starting the query with offset = NoOffset, it will try to switch to
// backtracking immediately after the first normal query because
// between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow

val qSettings = settings.querySettings

def disableBacktrackingWhenFarBehindCurrentWallClockTime: Boolean = {
val aheadOfInitial =
initialOffset == TimestampOffset.Zero || state.latestBacktracking.timestamp.isAfter(initialOffset.timestamp)

val previousTimestamp =
if (state.previous == TimestampOffset.Zero) state.latest.timestamp
else state.previous.timestamp

aheadOfInitial && previousTimestamp.isBefore(clock.instant().minus(firstBacktrackingQueryWindow))

qSettings.backtrackingEnabled &&
!state.backtracking &&
state.latest != TimestampOffset.Zero &&
!disableBacktrackingWhenFarBehindCurrentWallClockTime &&
(newIdleCount >= 5 || // FIXME config?
state.itemCountSinceBacktracking + state.itemCount >= qSettings.bufferSize * 3 ||
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.itemCount == 0) state.idleCount + 1 else 0
val newState =
if (settings.querySettings.backtrackingEnabled && !state.backtracking && state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 ||
state.itemCountSinceBacktracking + state.itemCount >= settings.querySettings.bufferSize * 3 ||
.between(state.latestBacktracking.timestamp, state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
// FIXME config for newIdleCount >= 5 and maybe something like `newIdleCount % 5 == 0`

// Note that when starting the query with offset = NoOffset it will switch to backtracking immediately after
// the first normal query because between(latestBacktracking.timestamp, latest.timestamp) > halfBacktrackingWindow
// start tracking query wall clock for heartbeats after initial backtracking query
val newQueryWallClock =
if (state.latestBacktracking != TimestampOffset.Zero) clock.instant()
else Instant.EPOCH

val newState =
if (switchToBacktracking(state, newIdleCount)) {
// switching to backtracking
val fromOffset =
if (state.latestBacktracking == TimestampOffset.Zero)
Expand All @@ -264,28 +322,34 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 1,
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else if (switchFromBacktracking(state)) {
// switch from backtracking
// switching from backtracking
itemCount = 0,
itemCountSinceBacktracking = 0,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = 0)
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
} else {
// continue
// continuing
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
itemCount = 0,
itemCountSinceBacktracking = state.itemCountSinceBacktracking + state.itemCount,
queryCount = state.queryCount + 1,
idleCount = newIdleCount,
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount)
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)

val fromTimestamp = newState.nextQueryFromTimestamp
val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
val toTimestamp = {
val behindCurrentTime =
if (newState.backtracking) settings.querySettings.backtrackingBehindCurrentTime
Expand Down Expand Up @@ -320,7 +384,11 @@ import org.slf4j.Logger
else s"Found [${state.itemCount}] items in previous query.")

newState ->
val newStateWithPrevious =
if (newState.backtracking) newState.copy(previousBacktracking = newState.latestBacktracking)
else newState.copy(previous = newState.latest)

newStateWithPrevious ->
.itemsBySlice(entityType, slice, fromTimestamp, toTimestamp, backtracking = newState.backtracking)
Expand All @@ -330,12 +398,30 @@ import org.slf4j.Logger

def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
// use wall clock to measure duration since start, up to idle backtracking limit
val timestamp =
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))

} else None

val nextHeartbeat: QueryState => Option[Envelope] =
if (settings.journalPublishEvents) heartbeat else _ => None

val currentTimestamp = // Can we use DDB as a timestamp source?
val currentWallClock = clock.instant()

ContinuousQuery[QueryState, Envelope](
initialState = QueryState.empty.copy(latest = initialOffset),
initialState = QueryState.empty
.copy(latest = initialOffset, startTimestamp = currentTimestamp, startWallClock = currentWallClock),
updateState = nextOffset,
delayNextQuery = delayNextQuery,
nextQuery = nextQuery,
beforeQuery = _ => None)
beforeQuery = _ => None,
heartbeat = nextHeartbeat)

private def deserializeAndAddOffset(timestampOffset: TimestampOffset): Flow[Item, Envelope, NotUsed] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ private[dynamodb] object ContinuousQuery {
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]] = (_: S) => None): Source[T, NotUsed] =
Source.fromGraph(new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery))
beforeQuery: S => Option[Future[S]] = (_: S) => None,
heartbeat: S => Option[T] = (_: S) => None): Source[T, NotUsed] =
new ContinuousQuery[S, T](initialState, updateState, delayNextQuery, nextQuery, beforeQuery, heartbeat))

private case object NextQuery

Expand Down Expand Up @@ -69,7 +71,8 @@ final private[dynamodb] class ContinuousQuery[S, T](
updateState: (S, T) => S,
delayNextQuery: S => Option[FiniteDuration],
nextQuery: S => (S, Option[Source[T, NotUsed]]),
beforeQuery: S => Option[Future[S]])
beforeQuery: S => Option[Future[S]],
heartbeat: S => Option[T])
extends GraphStage[SourceShape[T]] {
import ContinuousQuery._

Expand Down Expand Up @@ -151,8 +154,14 @@ final private[dynamodb] class ContinuousQuery[S, T](

val sourceWithHeartbeat = heartbeat(newState) match {
case None => source
case Some(h) => Source.single(h).concat(source)

val graph = Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import akka.persistence.query.typed.EventEnvelope
val SourceBacktracking = "BT"
val SourcePubSub = "PS"
val SourceSnapshot = "SN"
val SourceHeartbeat = "HB"

def fromQuery(env: EventEnvelope[_]): Boolean =
env.source == SourceQuery
Expand All @@ -32,6 +33,15 @@ import akka.persistence.query.typed.EventEnvelope
def fromSnapshot(env: EventEnvelope[_]): Boolean =
env.source == SourceSnapshot

def fromHeartbeat(env: EventEnvelope[_]): Boolean =
env.source == SourceHeartbeat

def isHeartbeatEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => fromHeartbeat(e)
case _ => false

def isFilteredEvent(env: Any): Boolean =
env match {
case e: EventEnvelope[_] => e.filtered
Expand Down

0 comments on commit 600a2a0

Please sign in to comment.