Skip to content

Commit

Permalink
feat: support time to live for projection offsets (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored Aug 16, 2024
1 parent beb29bc commit 8b890c1
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ final class EventSourcedEntityTimeToLiveSettings(config: Config) {
val snapshotTimeToLive: Option[FiniteDuration] = ConfigHelpers.optDuration(config, "snapshot-time-to-live")
}

private[dynamodb] object ConfigHelpers {
private[akka] object ConfigHelpers {
def optString(config: Config, path: String): Option[String] = {
if (config.hasPath(path)) {
val value = config.getString(path)
Expand All @@ -290,7 +290,7 @@ private[dynamodb] object ConfigHelpers {
}
}

private[dynamodb] object WildcardMap {
private[akka] object WildcardMap {
def apply[V](elements: Seq[(String, V)], default: V): WildcardMap[V] = {
val (wildcards, exact) = elements.partition { case (key, _) => hasWildcard(key) }
val prefixes = wildcards.map { case (key, value) => dropWildcard(key) -> value }
Expand All @@ -302,7 +302,9 @@ private[dynamodb] object WildcardMap {
private def dropWildcard(key: String): String = key.dropRight(1)
}

private[dynamodb] final class WildcardMap[V](exact: Map[String, V], prefixes: Map[String, V], default: V) {
private[akka] final class WildcardMap[V](exact: Map[String, V], prefixes: Map[String, V], default: V) {
import WildcardMap._

def isEmpty: Boolean = exact.isEmpty && prefixes.isEmpty

def get(key: String): V = {
Expand All @@ -313,4 +315,9 @@ private[dynamodb] final class WildcardMap[V](exact: Map[String, V], prefixes: Ma
.orElse(prefixes.collectFirst { case (k, v) if key.startsWith(k) => v })
.getOrElse(default)
}

def updated(key: String, value: V): WildcardMap[V] = {
if (hasWildcard(key)) new WildcardMap(exact, prefixes.updated(dropWildcard(key), value), default)
else new WildcardMap(exact.updated(key, value), prefixes, default)
}
}
22 changes: 22 additions & 0 deletions docs/src/main/paradox/projection.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,25 @@ Java
Scala
: @@snip [projection settings](/docs/src/test/scala/projection/docs/scaladsl/ProjectionDocExample.scala) { #projection-settings }

## Time to Live (TTL)

Offsets are never deleted by default. To have offsets deleted for inactive projections, an expiration timestamp can be
set. DynamoDB's [Time to Live (TTL)][ttl] feature can then be enabled, to automatically delete items after they have
expired. A new expiration timestamp will be set each time an offset for a particular projection slice or persistence id
is updated.

The TTL attribute to use for the timestamp offset table is named `expiry`.

Time-to-live settings are configured per projection. The projection name can also be matched by prefix by using a `*`
at the end of the key. For example, offsets can be configured to expire in 7 days for a particular projection, and in
14 days for all projections names that start with a particular prefix:

@@ snip [offset time-to-live](/docs/src/test/scala/projection/docs/config/ProjectionTimeToLiveSettingsDocExample.scala) { #time-to-live type=conf }

### Time to Live reference configuration

The following can be overridden in your `application.conf` for the time-to-live specific settings:

@@snip [reference.conf](/projection/src/main/resources/reference.conf) { #time-to-live-settings }

[ttl]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package projection.docs.config

import scala.concurrent.duration._

import akka.projection.dynamodb.DynamoDBProjectionSettings
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

object ProjectionTimeToLiveSettingsDocExample {

val ttlConfig: Config = ConfigFactory.load(ConfigFactory.parseString("""
//#time-to-live
akka.projection.dynamodb.time-to-live {
projections {
"some-projection" {
offset-time-to-live = 7 days
}
"projection-*" {
offset-time-to-live = 14 days
}
}
}
//#time-to-live
"""))
}

class ProjectionTimeToLiveSettingsDocExample extends AnyWordSpec with Matchers {
import ProjectionTimeToLiveSettingsDocExample._

def dynamoDBProjectionSettings(config: Config): DynamoDBProjectionSettings =
DynamoDBProjectionSettings(config.getConfig("akka.projection.dynamodb"))

"Projection Time to Live (TTL) docs" should {

"have example of setting offset-time-to-live" in {
val settings = dynamoDBProjectionSettings(ttlConfig)

val someTtlSettings = settings.timeToLiveSettings.projections.get("some-projection")
someTtlSettings.offsetTimeToLive shouldBe Some(7.days)

val ttlSettings1 = settings.timeToLiveSettings.projections.get("projection-1")
ttlSettings1.offsetTimeToLive shouldBe Some(14.days)

val ttlSettings2 = settings.timeToLiveSettings.projections.get("projection-2")
ttlSettings2.offsetTimeToLive shouldBe Some(14.days)

val defaultTtlSettings = settings.timeToLiveSettings.projections.get("other-projection")
defaultTtlSettings.offsetTimeToLive shouldBe None
}

}
}
26 changes: 26 additions & 0 deletions projection/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,29 @@ akka.projection.dynamodb {
warn-about-filtered-events-in-flow = true
}
//#projection-config

//#time-to-live-settings
akka.projection.dynamodb {
# Time to Live (TTL) settings
time-to-live {
projection-defaults {
# Set a time-to-live duration on all offsets when they are updated.
# Disabled when set to `off` or `none`.
offset-time-to-live = off
}

# Time-to-live settings per projection name.
# See `projection-defaults` for possible settings and default values.
# Prefix matching is supported by using * at the end of an entity type key.
projections {
# Example configuration:
# "some-projection" {
# offset-time-to-live = 7 days
# }
# "projection-*" {
# offset-time-to-live = 14 days
# }
}
}
}
//#time-to-live-settings
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ package akka.projection.dynamodb
import java.time.{ Duration => JDuration }

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

import akka.actor.typed.ActorSystem
import akka.persistence.dynamodb.ConfigHelpers
import akka.persistence.dynamodb.WildcardMap
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config
import com.typesafe.config.ConfigObject

object DynamoDBProjectionSettings {

Expand Down Expand Up @@ -39,7 +43,8 @@ object DynamoDBProjectionSettings {
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"))
offsetBatchSize = config.getInt("offset-store.offset-batch-size"),
timeToLiveSettings = TimeToLiveSettings(config.getConfig("time-to-live")))
}

/**
Expand All @@ -57,7 +62,8 @@ final class DynamoDBProjectionSettings private (
val keepNumberOfEntries: Int,
val evictInterval: JDuration,
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int) {
val offsetBatchSize: Int,
val timeToLiveSettings: TimeToLiveSettings) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)
Expand Down Expand Up @@ -86,23 +92,98 @@ final class DynamoDBProjectionSettings private (
def withOffsetBatchSize(offsetBatchSize: Int): DynamoDBProjectionSettings =
copy(offsetBatchSize = offsetBatchSize)

def withTimeToLiveSettings(timeToLiveSettings: TimeToLiveSettings): DynamoDBProjectionSettings =
copy(timeToLiveSettings = timeToLiveSettings)

private def copy(
timestampOffsetTable: String = timestampOffsetTable,
useClient: String = useClient,
timeWindow: JDuration = timeWindow,
keepNumberOfEntries: Int = keepNumberOfEntries,
evictInterval: JDuration = evictInterval,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize) =
offsetBatchSize: Int = offsetBatchSize,
timeToLiveSettings: TimeToLiveSettings = timeToLiveSettings) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
timeWindow,
keepNumberOfEntries,
evictInterval,
warnAboutFilteredEventsInFlow,
offsetBatchSize)
offsetBatchSize,
timeToLiveSettings)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $keepNumberOfEntries, $evictInterval, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
}

object TimeToLiveSettings {
val defaults: TimeToLiveSettings =
new TimeToLiveSettings(projections = WildcardMap(Seq.empty, ProjectionTimeToLiveSettings.defaults))

/**
* Scala API: Create from configuration corresponding to `akka.projection.dynamodb.time-to-live`.
*/
def apply(config: Config): TimeToLiveSettings = {
val projections: WildcardMap[ProjectionTimeToLiveSettings] = {
val defaults = config.getConfig("projection-defaults")
val defaultSettings = ProjectionTimeToLiveSettings(defaults)
val entries = config.getConfig("projections").root.entrySet.asScala
val perEntitySettings = entries.toSeq.map { entry =>
(entry.getKey, entry.getValue) match {
case (key: String, value: ConfigObject) =>
val settings = ProjectionTimeToLiveSettings(value.toConfig.withFallback(defaults))
key -> settings
}
}
WildcardMap(perEntitySettings, defaultSettings)
}
new TimeToLiveSettings(projections)
}

/**
* Java API: Create from configuration corresponding to `akka.projection.dynamodb.time-to-live`.
*/
def create(config: Config): TimeToLiveSettings = apply(config)
}

final class TimeToLiveSettings private (val projections: WildcardMap[ProjectionTimeToLiveSettings]) {

def withProjection(name: String, settings: ProjectionTimeToLiveSettings): TimeToLiveSettings =
copy(projections = projections.updated(name, settings))

private def copy(projections: WildcardMap[ProjectionTimeToLiveSettings] = projections): TimeToLiveSettings =
new TimeToLiveSettings(projections)
}

object ProjectionTimeToLiveSettings {
val defaults: ProjectionTimeToLiveSettings =
new ProjectionTimeToLiveSettings(offsetTimeToLive = None)

/**
* Scala API: Create from configuration corresponding to `akka.projection.dynamodb.time-to-live.projections`.
*/
def apply(config: Config): ProjectionTimeToLiveSettings =
new ProjectionTimeToLiveSettings(offsetTimeToLive = ConfigHelpers.optDuration(config, "offset-time-to-live"))

/**
* Java API: Create from configuration corresponding to `akka.projection.dynamodb.time-to-live.projections`.
*/
def create(config: Config): ProjectionTimeToLiveSettings = apply(config)
}

final class ProjectionTimeToLiveSettings private (val offsetTimeToLive: Option[FiniteDuration]) {

def withOffsetTimeToLive(offsetTimeToLive: FiniteDuration): ProjectionTimeToLiveSettings =
copy(offsetTimeToLive = Some(offsetTimeToLive))

def withOffsetTimeToLive(offsetTimeToLive: JDuration): ProjectionTimeToLiveSettings =
copy(offsetTimeToLive = Some(offsetTimeToLive.asScala))

def withNoOffsetTimeToLive(): ProjectionTimeToLiveSettings =
copy(offsetTimeToLive = None)

private def copy(offsetTimeToLive: Option[FiniteDuration] = offsetTimeToLive): ProjectionTimeToLiveSettings =
new ProjectionTimeToLiveSettings(offsetTimeToLive)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.projection.dynamodb.internal

import java.time.Instant
import java.util.Collections
import java.util.concurrent.CompletionException
import java.util.{ HashMap => JHashMap }
Expand Down Expand Up @@ -53,6 +54,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
val Timestamp = "ts"
val Seen = "seen"
val Paused = "paused"
val Expiry = "expiry"

val timestampBySlicePid = AttributeValue.fromS("_")
val managementStateBySlicePid = AttributeValue.fromS("_mgmt")
Expand All @@ -72,6 +74,8 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
import OffsetStoreDao.MaxTransactItems
import system.executionContext

private val timeToLiveSettings = settings.timeToLiveSettings.projections.get(projectionId.name)

private def nameSlice(slice: Int): String = s"${projectionId.name}-$slice"

def loadTimestampOffset(slice: Int): Future[Option[TimestampOffset]] = {
Expand Down Expand Up @@ -111,6 +115,10 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._

val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive =>
Instant.now().plusSeconds(timeToLive.toSeconds)
}

def writeBatch(offsetsBatch: IndexedSeq[(Int, TimestampOffset)]): Future[Done] = {
val writeItems =
offsetsBatch.map { case (slice, offset) =>
Expand All @@ -132,6 +140,10 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
}
attributes.put(Seen, AttributeValue.fromM(seen))

expiry.foreach { timestamp =>
attributes.put(Expiry, AttributeValue.fromN(timestamp.getEpochSecond.toString))
}

WriteRequest.builder
.putRequest(
PutRequest
Expand Down Expand Up @@ -178,6 +190,10 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
}

def storeSequenceNumbers(records: IndexedSeq[Record]): Future[Done] = {
val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive =>
Instant.now().plusSeconds(timeToLive.toSeconds)
}

def writeBatch(recordsBatch: IndexedSeq[Record]): Future[Done] = {
val writeItems =
recordsBatch
Expand All @@ -186,7 +202,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
.putRequest(
PutRequest
.builder()
.item(sequenceNumberAttributes(record))
.item(sequenceNumberAttributes(record, expiry))
.build())
.build()
}
Expand Down Expand Up @@ -262,13 +278,17 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
s"Too many transactional write items. Total limit is [${MaxTransactItems}], attempting to store " +
s"[${writeItems.size}] write items and [${records.size}] sequence numbers.")

val expiry = timeToLiveSettings.offsetTimeToLive.map { timeToLive =>
Instant.now().plusSeconds(timeToLive.toSeconds)
}

val writeSequenceNumbers = records.map { record =>
TransactWriteItem.builder
.put(
Put
.builder()
.tableName(settings.timestampOffsetTable)
.item(sequenceNumberAttributes(record))
.item(sequenceNumberAttributes(record, expiry))
.build())
.build()
}
Expand Down Expand Up @@ -301,7 +321,7 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest

}

private def sequenceNumberAttributes(record: Record): JHashMap[String, AttributeValue] = {
private def sequenceNumberAttributes(record: Record, expiry: Option[Instant]): JHashMap[String, AttributeValue] = {
import OffsetStoreDao.OffsetStoreAttributes._

val attributes = new JHashMap[String, AttributeValue]
Expand All @@ -311,6 +331,10 @@ import software.amazon.awssdk.services.dynamodb.model.WriteRequest
val timestampMicros = InstantFactory.toEpochMicros(record.timestamp)
attributes.put(Timestamp, AttributeValue.fromN(timestampMicros.toString))

expiry.foreach { expiryTimestamp =>
attributes.put(Expiry, AttributeValue.fromN(expiryTimestamp.getEpochSecond.toString))
}

attributes
}

Expand Down
Loading

0 comments on commit 8b890c1

Please sign in to comment.