Skip to content

Commit

Permalink
feat: support time to live when writing events or snapshots (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter authored Jul 30, 2024
1 parent ac6a3b1 commit ec26096
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 5 deletions.
8 changes: 8 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ akka.persistence.dynamodb {
# (such as when events are deleted on snapshot). Set to a duration to expire items after this time
# following the triggered deletion. Disabled when set to `off` or `none`.
use-time-to-live-for-deletes = off

# Set a time-to-live duration on all events when they are originally created and stored.
# Disabled when set to `off` or `none`.
event-time-to-live = off

# Set a time-to-live duration on all snapshots when they are originally created and stored.
# Disabled when set to `off` or `none`.
snapshot-time-to-live = off
}
}
// #time-to-live-settings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ final class TimeToLiveSettings(config: Config) {

val useTimeToLiveForDeletes: Option[FiniteDuration] =
ConfigHelpers.optDuration(config, "use-time-to-live-for-deletes")

val eventTimeToLive: Option[FiniteDuration] = ConfigHelpers.optDuration(config, "event-time-to-live")

val snapshotTimeToLive: Option[FiniteDuration] = ConfigHelpers.optDuration(config, "snapshot-time-to-live")
}

private[dynamodb] object ConfigHelpers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ import software.amazon.awssdk.services.dynamodb.model.Update
attributes.put(MetaPayload, AttributeValue.fromB(SdkBytes.fromByteArray(meta.payload)))
}

settings.timeToLiveSettings.eventTimeToLive.foreach { timeToLive =>
val expiryTimestamp = Instant.now().plusSeconds(timeToLive.toSeconds)
attributes.put(Expiry, AttributeValue.fromN(expiryTimestamp.getEpochSecond.toString))
}

attributes
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
attributes.put(MetaPayload, AttributeValue.fromB(SdkBytes.fromByteArray(meta.payload)))
}

settings.timeToLiveSettings.snapshotTimeToLive.foreach { timeToLive =>
val expiryTimestamp = Instant.now().plusSeconds(timeToLive.toSeconds)
attributes.put(Expiry, AttributeValue.fromN(expiryTimestamp.getEpochSecond.toString))
}

val request = PutItemRequest
.builder()
.tableName(settings.snapshotTable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object DynamoDBJournalSpec {
.parseString("""akka.persistence.dynamodb.with-meta = true""")
.withFallback(DynamoDBJournalSpec.testConfig())

def configWithTTL: Config =
def configWithTTLForDeletes: Config =
ConfigFactory
.parseString("""
akka.persistence.dynamodb.time-to-live {
Expand All @@ -43,6 +43,16 @@ object DynamoDBJournalSpec {
""")
.withFallback(DynamoDBJournalSpec.testConfig())

def configWithEventTTL: Config =
ConfigFactory
.parseString("""
akka.persistence.dynamodb.time-to-live {
check-expiry = on
event-time-to-live = 1 hour
}
""")
.withFallback(DynamoDBJournalSpec.testConfig())

def testConfig(): Config = {
ConfigFactory
.parseString(s"""
Expand Down Expand Up @@ -80,8 +90,8 @@ class DynamoDBJournalWithMetaSpec extends DynamoDBJournalBaseSpec(DynamoDBJourna
protected override def supportsMetadata: CapabilityFlag = CapabilityFlag.on()
}

class DynamoDBJournalWithTTLSpec
extends DynamoDBJournalBaseSpec(DynamoDBJournalSpec.configWithTTL)
class DynamoDBJournalWithTTLForDeletesSpec
extends DynamoDBJournalBaseSpec(DynamoDBJournalSpec.configWithTTLForDeletes)
with Inspectors
with OptionValues {

Expand Down Expand Up @@ -199,3 +209,23 @@ class DynamoDBJournalWithTTLSpec

}
}

class DynamoDBJournalWithEventTTLSpec
extends DynamoDBJournalBaseSpec(DynamoDBJournalSpec.configWithEventTTL)
with Inspectors
with OptionValues {

// check persisted events all have expiry set

override protected def beforeEach(): Unit = {
import akka.persistence.dynamodb.internal.JournalAttributes._
super.beforeEach() // test events written
val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds
val eventItems = getEventItemsFor(pid)
eventItems.size shouldBe 5
forAll(eventItems) { eventItem =>
eventItem.get(Expiry).value.n.toLong should (be <= expected and be > expected - 10) // within 10s
eventItem.get(ExpiryMarker) shouldBe None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package akka.persistence.dynamodb.snapshot

import scala.concurrent.duration._

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.CapabilityFlag
Expand All @@ -27,7 +29,7 @@ import org.scalatest.Pending
object DynamoDBSnapshotStoreSpec {
val config: Config = TestConfig.config

def configWithTTL: Config =
def configWithTTLForDeletes: Config =
ConfigFactory
.parseString("""
akka.persistence.dynamodb.time-to-live {
Expand All @@ -37,6 +39,16 @@ object DynamoDBSnapshotStoreSpec {
}
""")
.withFallback(config)

def configWithSnapshotTTL: Config =
ConfigFactory
.parseString("""
akka.persistence.dynamodb.time-to-live {
check-expiry = on
snapshot-time-to-live = 1 hour
}
""")
.withFallback(config)
}

abstract class DynamoDBSnapshotStoreBaseSpec(config: Config)
Expand Down Expand Up @@ -67,6 +79,8 @@ abstract class DynamoDBSnapshotStoreBaseSpec(config: Config)

protected def usingTTLForDeletes: Boolean = false

protected def usingSnapshotTTL: Boolean = false

// Note: these depend on populating the database with snapshots in SnapshotStoreSpec.beforeEach
// mostly covers the important bits of the skipped tests but for an update-in-place snapshot store

Expand Down Expand Up @@ -99,6 +113,12 @@ abstract class DynamoDBSnapshotStoreBaseSpec(config: Config)
val sequenceNr = result.snapshot.get.metadata.sequenceNr
sequenceNr shouldBe 15

if (usingSnapshotTTL) {
val expected = System.currentTimeMillis / 1000 + 1.hour.toSeconds
val snapshotItem = getSnapshotItemFor(pid).value
snapshotItem.get(SnapshotAttributes.Expiry).value.n.toLong should (be <= expected and be > expected - 10)
}

val md = SnapshotMetadata(pid, sequenceNr, timestamp = 0)
val cmd = DeleteSnapshot(md)
val sub = TestProbe()
Expand All @@ -122,6 +142,12 @@ abstract class DynamoDBSnapshotStoreBaseSpec(config: Config)

class DynamoDBSnapshotStoreSpec extends DynamoDBSnapshotStoreBaseSpec(DynamoDBSnapshotStoreSpec.config)

class DynamoDBSnapshotStoreWithTTLSpec extends DynamoDBSnapshotStoreBaseSpec(DynamoDBSnapshotStoreSpec.configWithTTL) {
class DynamoDBSnapshotStoreWithTTLForDeletesSpec
extends DynamoDBSnapshotStoreBaseSpec(DynamoDBSnapshotStoreSpec.configWithTTLForDeletes) {
override protected def usingTTLForDeletes: Boolean = true
}

class DynamoDBSnapshotStoreWithSnapshotTTLSpec
extends DynamoDBSnapshotStoreBaseSpec(DynamoDBSnapshotStoreSpec.configWithSnapshotTTL) {
override protected def usingSnapshotTTL: Boolean = true
}
6 changes: 6 additions & 0 deletions docs/src/main/paradox/journal.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ deleted events can be configured to expire in 7 days, rather than being deleted

@@ snip [use time-to-live for deletes](/docs/src/test/scala/docs/config/TimeToLiveSettingsDocExample.scala) { #use-time-to-live-for-deletes type=conf }

While it is recommended to keep all events in an event sourced system, so that new @ref:[projections](projection.md)
can be re-built, setting a time to live expiry on events or snapshots when they are created and stored is supported.
For example, events can be configured to expire in 3 days and snapshots in 5 days, using configuration:

@@ snip [event and snapshot time-to-live](/docs/src/test/scala/docs/config/TimeToLiveSettingsDocExample.scala) { #time-to-live type=conf }

The @ref[EventSourcedCleanup tool](cleanup.md#event-sourced-cleanup-tool) can also be used to set an expiration
timestamp on events or snapshots.

Expand Down
17 changes: 17 additions & 0 deletions docs/src/test/scala/docs/config/TimeToLiveSettingsDocExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ object TimeToLiveSettingsDocExample {
}
//#use-time-to-live-for-deletes
"""))

val ttlConfig: Config = ConfigFactory.load(ConfigFactory.parseString("""
//#time-to-live
akka.persistence.dynamodb.time-to-live {
event-time-to-live = 3 days
snapshot-time-to-live = 5 days
}
//#time-to-live
"""))
}

class TimeToLiveSettingsDocExample extends AnyWordSpec with Matchers {
Expand All @@ -51,5 +60,13 @@ class TimeToLiveSettingsDocExample extends AnyWordSpec with Matchers {
settings.timeToLiveSettings.useTimeToLiveForDeletes shouldBe Some(7.days)
}

"have example of setting event-time-to-live and snapshot-time-to-live" in {
val settings = dynamoDBSettings(ttlConfig)
settings.timeToLiveSettings.checkExpiry shouldBe false
settings.timeToLiveSettings.useTimeToLiveForDeletes shouldBe None
settings.timeToLiveSettings.eventTimeToLive shouldBe Some(3.days)
settings.timeToLiveSettings.snapshotTimeToLive shouldBe Some(5.days)
}

}
}

0 comments on commit ec26096

Please sign in to comment.