Skip to content

Commit

Permalink
feat: Projection and offset store structure (#10)
Browse files Browse the repository at this point in the history
* starting with atLeastOnceAsync
* no real DynamoDB implementation in offset store yet
  • Loading branch information
patriknw authored Jun 4, 2024
1 parent b4c80ee commit 1ec74e7
Show file tree
Hide file tree
Showing 10 changed files with 1,447 additions and 3 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ lazy val root = (project in file("."))
publishTo := Some(Resolver.file("Unused transient repository", file("target/unusedrepo"))))
.enablePlugins(ScalaUnidocPlugin)
.disablePlugins(SitePlugin, MimaPlugin, CiReleasePlugin)
.aggregate(core, docs)
.aggregate(core, projection, docs)

def suffixFileFilter(suffix: String): FileFilter = new SimpleFileFilter(f => f.getAbsolutePath.endsWith(suffix))

Expand All @@ -96,6 +96,13 @@ lazy val core = (project in file("core"))
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(CiReleasePlugin)

lazy val projection = (project in file("projection"))
.dependsOn(core)
.settings(common)
.settings(name := "akka-projection-dynamodb", libraryDependencies ++= Dependencies.projection)
.enablePlugins(AutomateHeaderPlugin)
.disablePlugins(CiReleasePlugin)

lazy val docs = project
.in(file("docs"))
.enablePlugins(AkkaParadoxPlugin, ParadoxSitePlugin, PreprocessPlugin, PublishRsyncPlugin)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ akka.persistence.dynamodb {
behind-current-time = 10 seconds
}

# In-memory buffer holding events when reading from database.
# In-memory buffer holding events when reading from DynamoDB.
buffer-size = 100
}
}
Expand Down
16 changes: 15 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ object Dependencies {
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3
val AkkaVersion = System.getProperty("override.akka.version", "2.9.3")
val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }
val AkkaProjectionVersion = "1.5.4"
val AkkaProjectionVersionInDocs = "current"
val AwsSdkVersion = "2.25.59"

object Compile {
val akkaActorTyped = "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion
val akkaStream = "com.typesafe.akka" %% "akka-stream" % AkkaVersion
val akkaPersistence = "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion
val akkaPersistenceTyped = "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion
val akkaPersistenceQuery = "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion
val akkaProjectionEventsourced = "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion
val dynamodbSdk = "software.amazon.awssdk" % "dynamodb" % AwsSdkVersion

}

object TestDeps {
val akkaStreamTyped = "com.typesafe.akka" %% "akka-stream-typed" % AkkaVersion % Test
val akkaPersistenceTyped = "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion % Test
val akkaPersistenceTyped = Compile.akkaPersistenceTyped % Test
val akkaShardingTyped = "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion % Test
val akkaPersistenceTck = "com.typesafe.akka" %% "akka-persistence-tck" % AkkaVersion % Test
val akkaTestkit = "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test
Expand All @@ -52,6 +55,17 @@ object Dependencies {
TestDeps.logback,
TestDeps.scalaTest)

val projection = Seq(
dynamodbSdk.exclude("software.amazon.awssdk", "apache-client"),
Compile.akkaProjectionEventsourced,
Compile.akkaPersistenceTyped,
TestDeps.akkaStreamTestkit,
TestDeps.akkaTestkit,
TestDeps.akkaJackson,
TestDeps.akkaStreamTyped,
TestDeps.logback,
TestDeps.scalaTest)

val docs =
Seq(TestDeps.akkaPersistenceTyped)
}
38 changes: 38 additions & 0 deletions projection/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# This defines the default configuration for akka-projection-dynamodb.
# Make your edits/overrides in your application.conf.

//#projection-config
akka.projection.dynamodb {

offset-store {
# the DynamoDB table name for the offset store
timestamp-offset-table = "timestamp_offset"

# The offset store will keep track of persistence ids and sequence numbers
# within this time window from latest offset.
time-window = 5 minutes

# Keep this number of entries. Don't evict old entries until this threshold
# has been reached.
keep-number-of-entries = 10000

# Remove old entries outside the time-window from the offset store memory
# with this frequency.
evict-interval = 10 seconds

# Trying to batch insert offsets in batches of this size.
offset-batch-size = 20
}

# By default it shares DynamoDB client with akka-persistence-dynamodb (write side).
# To use a separate client for projections this can be
# set to another config path that defines the config based on
# akka.persistence.dynamodb.client config.
use-client = "akka.persistence.dynamodb.client"

# Filtered events are not actually filtered but passed through the handling flow
# for atLeastOnceFlow, in some applications this is fine, set to false to disable
# the info logged when seeing such filtered events
warn-about-filtered-events-in-flow = true
}
//#projection-config
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.dynamodb

import java.time.{ Duration => JDuration }

import scala.concurrent.duration._

import akka.actor.typed.ActorSystem
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config

object DynamoDBProjectionSettings {

val DefaultConfigPath = "akka.projection.dynamodb"

/**
* Scala API: Load configuration from `akka.projection.dynamodb`.
*/
def apply(system: ActorSystem[_]): DynamoDBProjectionSettings =
apply(system.settings.config.getConfig(DefaultConfigPath))

/**
* Java API: Load configuration from `akka.projection.dynamodb`.
*/
def create(system: ActorSystem[_]): DynamoDBProjectionSettings =
apply(system)

/**
* Scala API: From custom configuration corresponding to `akka.projection.dynamodb`.
*/
def apply(config: Config): DynamoDBProjectionSettings = {
new DynamoDBProjectionSettings(
timestampOffsetTable = config.getString("offset-store.timestamp-offset-table"),
useClient = config.getString("use-connection-factory"),
timeWindow = config.getDuration("offset-store.time-window"),
keepNumberOfEntries = config.getInt("offset-store.keep-number-of-entries"),
evictInterval = config.getDuration("offset-store.evict-interval"),
warnAboutFilteredEventsInFlow = config.getBoolean("warn-about-filtered-events-in-flow"),
offsetBatchSize = config.getInt("offset-store.offset-batch-size"))
}

/**
* Java API: From custom configuration corresponding to `akka.projection.dynamodb`.
*/
def create(config: Config): DynamoDBProjectionSettings =
apply(config)

}

final class DynamoDBProjectionSettings private (
val timestampOffsetTable: String,
val useClient: String,
val timeWindow: JDuration,
val keepNumberOfEntries: Int,
val evictInterval: JDuration,
val warnAboutFilteredEventsInFlow: Boolean,
val offsetBatchSize: Int) {

def withTimestampOffsetTable(timestampOffsetTable: String): DynamoDBProjectionSettings =
copy(timestampOffsetTable = timestampOffsetTable)

def withUseClient(clientConfigPath: String): DynamoDBProjectionSettings =
copy(useClient = clientConfigPath)

def withTimeWindow(timeWindow: FiniteDuration): DynamoDBProjectionSettings =
copy(timeWindow = timeWindow.asJava)

def withTimeWindow(timeWindow: JDuration): DynamoDBProjectionSettings =
copy(timeWindow = timeWindow)

def withKeepNumberOfEntries(keepNumberOfEntries: Int): DynamoDBProjectionSettings =
copy(keepNumberOfEntries = keepNumberOfEntries)

def withEvictInterval(evictInterval: FiniteDuration): DynamoDBProjectionSettings =
copy(evictInterval = evictInterval.asJava)

def withEvictInterval(evictInterval: JDuration): DynamoDBProjectionSettings =
copy(evictInterval = evictInterval)

def withWarnAboutFilteredEventsInFlow(warnAboutFilteredEventsInFlow: Boolean): DynamoDBProjectionSettings =
copy(warnAboutFilteredEventsInFlow = warnAboutFilteredEventsInFlow)

def withOffsetBatchSize(offsetBatchSize: Int): DynamoDBProjectionSettings =
copy(offsetBatchSize = offsetBatchSize)

private def copy(
timestampOffsetTable: String = timestampOffsetTable,
useClient: String = useClient,
timeWindow: JDuration = timeWindow,
keepNumberOfEntries: Int = keepNumberOfEntries,
evictInterval: JDuration = evictInterval,
warnAboutFilteredEventsInFlow: Boolean = warnAboutFilteredEventsInFlow,
offsetBatchSize: Int = offsetBatchSize) =
new DynamoDBProjectionSettings(
timestampOffsetTable,
useClient,
timeWindow,
keepNumberOfEntries,
evictInterval,
warnAboutFilteredEventsInFlow,
offsetBatchSize)

override def toString =
s"DynamoDBProjectionSettings($timestampOffsetTable, $useClient, $timeWindow, $keepNumberOfEntries, $evictInterval, $warnAboutFilteredEventsInFlow, $offsetBatchSize)"
}
Loading

0 comments on commit 1ec74e7

Please sign in to comment.