Skip to content

Commit

Permalink
feat: Store timestamp and prepare GSI (#7)
Browse files Browse the repository at this point in the history
* always increasing timestamp by at least 1 micros
  • Loading branch information
patriknw authored May 29, 2024
1 parent 7226af6 commit 6fe9e4f
Show file tree
Hide file tree
Showing 10 changed files with 362 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,36 @@ package akka.persistence.dynamodb.internal

import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.atomic.AtomicReference

import akka.annotation.InternalApi

/**
* INTERNAL API
*/
@InternalApi private[akka] object InstantFactory {
private val previousNow = new AtomicReference(Instant.EPOCH)

/**
* Current time truncated to microseconds.
* Current time truncated to microseconds. Within this JVM it's guaranteed to be equal to or greater than previous
* invocation of `now`.
*/
def now(): Instant =
Instant.now().truncatedTo(ChronoUnit.MICROS)
def now(): Instant = {
val n = Instant.now().truncatedTo(ChronoUnit.MICROS)
previousNow.updateAndGet { prev =>
// monotonically increasing, at least 1 microsecond more than previous timestamp
if (!n.isAfter(prev)) prev.plus(1, ChronoUnit.MICROS)
else n
}
}

def toEpochMicros(instant: Instant): Long =
instant.getEpochSecond * 1_000_000 + (instant.getNano / 1000)

def fromEpochMicros(micros: Long): Instant = {
val epochSeconds = micros / 1_000_000
val nanos = (micros % 1_000_000) * 1000
Instant.ofEpochSecond(epochSeconds, nanos)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.Persistence
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
Expand Down Expand Up @@ -56,15 +57,17 @@ import software.amazon.awssdk.services.dynamodb.model.Update

// it's always the same persistenceId for all events
val persistenceId = events.head.persistenceId
val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)

def putItemAttributes(item: SerializedJournalItem) = {
import JournalAttributes._
val attributes = new JHashMap[String, AttributeValue]
attributes.put(Pid, AttributeValue.fromS(persistenceId))
attributes.put(SeqNr, AttributeValue.fromN(item.seqNr.toString))
attributes.put(Slice, AttributeValue.fromN(slice.toString))
attributes.put(EntityType, AttributeValue.fromS(item.entityType))
attributes.put(EntityTypeSlice, AttributeValue.fromS(s"$entityType-$slice"))
val timestampMicros = InstantFactory.toEpochMicros(item.writeTimestamp)
attributes.put(Timestamp, AttributeValue.fromN(timestampMicros.toString))
attributes.put(EventSerId, AttributeValue.fromN(item.serId.toString))
attributes.put(EventSerManifest, AttributeValue.fromS(item.serManifest))
attributes.put(EventPayload, AttributeValue.fromB(SdkBytes.fromByteArray(item.payload.get)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@

package akka.persistence.dynamodb.internal

import java.time.Instant

import scala.jdk.CollectionConverters._

import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.persistence.dynamodb.DynamoDBSettings
import akka.persistence.typed.PersistenceId
import akka.stream.scaladsl.Source
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
Expand Down Expand Up @@ -60,11 +57,9 @@ import software.amazon.awssdk.services.dynamodb.model.QueryRequest
}

SerializedJournalItem(
slice = item.get(Slice).n().toInt,
entityType = item.get(EntityType).s(),
persistenceId = item.get(Pid).s(),
seqNr = item.get(SeqNr).n().toLong,
writeTimestamp = Instant.EPOCH,
writeTimestamp = InstantFactory.fromEpochMicros(item.get(Timestamp).n().toLong),
payload = Some(item.get(EventPayload).b().asByteArray()),
serId = item.get(EventSerId).n().toInt,
serManifest = item.get(EventSerManifest).s(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import java.time.Instant
import akka.annotation.InternalApi

final case class SerializedJournalItem(
slice: Int,
entityType: String,
persistenceId: String,
seqNr: Long,
writeTimestamp: Instant,
Expand All @@ -30,9 +28,9 @@ final case class SerializedEventMetadata(serId: Int, serManifest: String, payloa
// FIXME should attribute names be shorter?
val Pid = "pid"
val SeqNr = "seq_nr"
val Slice = "slice"
// redundant to store entity type, but needed for the bySlices GSI
val EntityType = "entity_type"
// needed for the bySlices GSI
val EntityTypeSlice = "entity_type_slice"
val Timestamp = "timestamp"
val EventSerId = "event_ser_id"
val EventSerManifest = "event_ser_manifest"
val EventPayload = "event_payload"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package akka.persistence.dynamodb.journal

import java.time.Instant

import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down Expand Up @@ -34,7 +32,6 @@ import akka.persistence.dynamodb.query.scaladsl.DynamoDBReadJournal
import akka.persistence.journal.AsyncWriteJournal
import akka.persistence.journal.Tagged
import akka.persistence.query.PersistenceQuery
import akka.persistence.typed.PersistenceId
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
Expand Down Expand Up @@ -104,8 +101,7 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
}

override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
def atomicWrite(atomicWrite: AtomicWrite): Future[Instant] = {
val timestamp = InstantFactory.now()
def atomicWrite(atomicWrite: AtomicWrite): Future[Done] = {
val serialized: Try[Seq[SerializedJournalItem]] = Try {
atomicWrite.payload.map { pr =>
val (event, tags) = pr.payload match {
Expand All @@ -115,9 +111,6 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
(other.asInstanceOf[AnyRef], Set.empty[String])
}

val entityType = PersistenceId.extractEntityType(pr.persistenceId)
val slice = persistenceExt.sliceForPersistenceId(pr.persistenceId)

val serializedEvent = event match {
case s: SerializedEvent => s // already serialized
case _ =>
Expand All @@ -136,9 +129,10 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
SerializedEventMetadata(id, metaManifest, serializedMeta)
}

// monotonically increasing, at least 1 microsecond more than previous timestamp
val timestamp = InstantFactory.now()

SerializedJournalItem(
slice,
entityType,
pr.persistenceId,
pr.sequenceNr,
timestamp,
Expand All @@ -153,14 +147,14 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e

serialized match {
case Success(writes) =>
journalDao.writeEvents(writes).map(_ => timestamp)(ExecutionContexts.parasitic)
journalDao.writeEvents(writes)
case Failure(exc) =>
Future.failed(exc)
}
}

val persistenceId = messages.head.persistenceId
val writeResult: Future[Instant] =
val writeResult: Future[Done] =
if (messages.size == 1)
atomicWrite(messages.head)
else {
Expand All @@ -170,8 +164,9 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
atomicWrite(batch)
}

// FiXME pubSub not added yet
val writeAndPublishResult: Future[Done] =
publish(messages, writeResult)
writeResult

writesInProgress.put(persistenceId, writeAndPublishResult)
writeAndPublishResult.onComplete { _ =>
Expand All @@ -180,11 +175,6 @@ private[dynamodb] final class DynamoDBJournal(config: Config, cfgPath: String) e
writeAndPublishResult.map(_ => Nil)(ExecutionContexts.parasitic)
}

private def publish(messages: immutable.Seq[AtomicWrite], timestamp: Future[Instant]): Future[Done] = {
// FiXME pubSub not added yet
Future.successful(Done)
}

override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
log.debug("asyncDeleteMessagesTo persistenceId [{}], toSequenceNr [{}]", persistenceId, toSequenceNr)
journalDao.deleteEventsTo(persistenceId, toSequenceNr, resetSequenceNumber = false)
Expand Down
104 changes: 104 additions & 0 deletions core/src/test/scala/akka/persistence/dynamodb/TestActors.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb

import akka.Done
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior

object TestActors {
object Persister {

import akka.persistence.typed.scaladsl.Effect

sealed trait Command
final case class Persist(payload: Any) extends Command
final case class PersistWithAck(payload: Any, replyTo: ActorRef[Done]) extends Command
final case class PersistAll(payloads: List[Any]) extends Command
final case class Ping(replyTo: ActorRef[Done]) extends Command
final case class GetState(replyTo: ActorRef[String]) extends Command
final case class GetSeqNr(replyTo: ActorRef[Long]) extends Command
final case class Stop(replyTo: ActorRef[Done]) extends Command

def apply(pid: String): Behavior[Command] =
apply(PersistenceId.ofUniqueId(pid))

def apply(pid: PersistenceId): Behavior[Command] = {
apply(pid, "", "", Set.empty)
}

def apply(pid: PersistenceId, tags: Set[String]): Behavior[Command] = {
apply(pid, "", "", tags)
}

def apply(
pid: PersistenceId,
journalPluginId: String,
snapshotPluginId: String,
tags: Set[String]): Behavior[Command] = {
Behaviors.setup { context =>
eventSourcedBehavior(pid, context)
.withJournalPluginId(journalPluginId)
.withSnapshotPluginId(snapshotPluginId)
.snapshotWhen { case (_, event, _) =>
event.toString.contains("snap")
}
.withTagger(_ => tags)
}
}

def eventSourcedBehavior(
pid: PersistenceId,
context: ActorContext[Command]): EventSourcedBehavior[Command, Any, String] = {
EventSourcedBehavior[Command, Any, String](
persistenceId = pid,
"",
{ (state, command) =>
command match {
case command: Persist =>
context.log.debugN(
"Persist [{}], pid [{}], seqNr [{}]",
command.payload,
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payload)
case command: PersistWithAck =>
context.log.debugN(
"Persist [{}], pid [{}], seqNr [{}]",
command.payload,
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payload).thenRun(_ => command.replyTo ! Done)
case command: PersistAll =>
if (context.log.isDebugEnabled)
context.log.debugN(
"PersistAll [{}], pid [{}], seqNr [{}]",
command.payloads.mkString(","),
pid.id,
EventSourcedBehavior.lastSequenceNumber(context) + 1)
Effect.persist(command.payloads)
case Ping(replyTo) =>
replyTo ! Done
Effect.none
case GetState(replyTo) =>
replyTo ! state
Effect.none
case GetSeqNr(replyTo) =>
replyTo ! EventSourcedBehavior.lastSequenceNumber(context)
Effect.none
case Stop(replyTo) =>
replyTo ! Done
Effect.stop()
}
},
(state, event) => if (state == "") event.toString else s"$state|$event")
}
}
}
28 changes: 28 additions & 0 deletions core/src/test/scala/akka/persistence/dynamodb/TestData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb

import java.util.concurrent.atomic.AtomicLong

import akka.persistence.typed.PersistenceId

object TestData {
private val start = 0L // could be something more unique, like currentTimeMillis
private val pidCounter = new AtomicLong(start)
private val entityTypeCounter = new AtomicLong(start)
}

trait TestData {
import TestData.pidCounter
import TestData.entityTypeCounter

def nextPid(): String = s"p-${pidCounter.incrementAndGet()}"

def nextEntityType(): String = s"TestEntity-${entityTypeCounter.incrementAndGet()}"

def nextPersistenceId(entityType: String): PersistenceId =
PersistenceId.of(entityType, s"${pidCounter.incrementAndGet()}")

}
Loading

0 comments on commit 6fe9e4f

Please sign in to comment.