Skip to content

Commit

Permalink
fix: don't set idle when backtracking, check for forward behind lates…
Browse files Browse the repository at this point in the history
…t backtracking (#104)

* fix: don't set idle when backtracking, check for forward behind latest backtracking

* use separate idle counter

* debug log

* 2 idleCountBeforeHeartbeat

---------

Co-authored-by: Patrik Nordwall <[email protected]>
  • Loading branch information
leviramsey and patriknw authored Nov 28, 2024
1 parent 2fe1f50 commit 6f61cbe
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ import org.slf4j.Logger
startTimestamp = Instant.EPOCH,
startWallClock = Instant.EPOCH,
currentQueryWallClock = Instant.EPOCH,
previousQueryWallClock = Instant.EPOCH)
previousQueryWallClock = Instant.EPOCH,
idleCountBeforeHeartbeat = 0)
}

final case class QueryState(
Expand All @@ -60,7 +61,8 @@ import org.slf4j.Logger
startTimestamp: Instant,
startWallClock: Instant,
currentQueryWallClock: Instant,
previousQueryWallClock: Instant) {
previousQueryWallClock: Instant,
idleCountBeforeHeartbeat: Long) {

def backtracking: Boolean = backtrackingCount > 0

Expand Down Expand Up @@ -301,6 +303,10 @@ import org.slf4j.Logger

def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, NotUsed]]) = {
val newIdleCount = if (state.itemCount == 0) state.idleCount + 1 else 0
val newIdleCountBeforeHeartbeat =
if (state.backtracking) state.idleCountBeforeHeartbeat
else if (state.itemCount == 0) state.idleCountBeforeHeartbeat + 1
else 0
// start tracking query wall clock for heartbeats after initial backtracking query
val newQueryWallClock =
if (state.latestBacktracking != TimestampOffset.Zero) clock.instant()
Expand All @@ -324,7 +330,8 @@ import org.slf4j.Logger
latestBacktracking = fromOffset,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else if (switchFromBacktracking(state)) {
// switching from backtracking
state.copy(
Expand All @@ -334,7 +341,8 @@ import org.slf4j.Logger
idleCount = newIdleCount,
backtrackingCount = 0,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
} else {
// continuing
val newBacktrackingCount = if (state.backtracking) state.backtrackingCount + 1 else 0
Expand All @@ -346,7 +354,8 @@ import org.slf4j.Logger
backtrackingCount = newBacktrackingCount,
backtrackingExpectFiltered = state.latestBacktrackingSeenCount,
currentQueryWallClock = newQueryWallClock,
previousQueryWallClock = state.currentQueryWallClock)
previousQueryWallClock = state.currentQueryWallClock,
idleCountBeforeHeartbeat = newIdleCountBeforeHeartbeat)
}

val fromTimestamp = newState.nextQueryFromTimestamp(backtrackingWindow)
Expand Down Expand Up @@ -399,7 +408,7 @@ import org.slf4j.Logger
}

def heartbeat(state: QueryState): Option[Envelope] = {
if (state.idleCount >= 1 && state.previousQueryWallClock != Instant.EPOCH) {
if (state.idleCountBeforeHeartbeat >= 2 && state.previousQueryWallClock != Instant.EPOCH) {
// use wall clock to measure duration since start, up to idle backtracking limit
val timestamp = state.startTimestamp.plus(
JDuration.between(state.startWallClock, state.previousQueryWallClock.minus(backtrackingBehindCurrentTime)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,12 +572,24 @@ final class DynamoDBReadJournal(system: ExtendedActorSystem, config: Config, cfg
case None => Instant.EPOCH
}
env => {
val slice = persistenceExt.sliceForPersistenceId(env.persistenceId)
env.offset match {
case t: TimestampOffset =>
if (EnvelopeOrigin.fromQuery(env)) {
if (log.isDebugEnabled()) {
val l = latestBacktracking(slice)
if (l.isAfter(t.timestamp))
log.debug(
"event from query for persistenceId [{}] seqNr [{}] " +
s"timestamp [{}] was before last event from backtracking or heartbeat [{}].",
env.persistenceId,
env.sequenceNr,
t.timestamp,
l)
}

env :: Nil
} else {
val slice = persistenceExt.sliceForPersistenceId(env.persistenceId)
if (EnvelopeOrigin.fromBacktracking(env)) {
latestBacktrackingPerSlice = latestBacktrackingPerSlice.updated(slice, t.timestamp)
env :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object EventsBySlicePubSubBacktrackingSpec {
akka.persistence.dynamodb {
journal.publish-events = on
query {
refresh-interval = 1 s
refresh-interval = 300 ms
# Ensure pubsub arrives first
behind-current-time = 2s
Expand Down

0 comments on commit 6f61cbe

Please sign in to comment.