Skip to content

Commit

Permalink
chore: Check for clock skew with response header (#113)
Browse files Browse the repository at this point in the history
* tolerance config
  • Loading branch information
patriknw authored Dec 4, 2024
1 parent 8c8f997 commit 1ee98ab
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,12 @@ akka.persistence.dynamodb {
}
}
// #client-settings

akka.persistence.dynamodb {
clock-skew-detection {
# When the local clock and time in AWS response diverge by more than this duration
# a warning is logged. Can be disabled by setting this to "off".
# This check only has precision of seconds.
warning-tolerance = 2 seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.dynamodb

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
Expand Down Expand Up @@ -55,6 +56,8 @@ object DynamoDBSettings {
if (indexName.nonEmpty) indexName else snapshotTable + "_slice_idx"
}

val clockSkewSettings = new ClockSkewSettings(config)

new DynamoDBSettings(
journalTable,
journalPublishEvents,
Expand All @@ -63,7 +66,8 @@ object DynamoDBSettings {
cleanupSettings,
timeToLiveSettings,
journalBySliceGsi,
snapshotBySliceGsi)
snapshotBySliceGsi,
clockSkewSettings)
}

/**
Expand All @@ -74,6 +78,9 @@ object DynamoDBSettings {

}

/**
* Use `DynamoDBSettings.apply` or `DynamoDBSettings.create` for construction.
*/
final class DynamoDBSettings private (
val journalTable: String,
val journalPublishEvents: Boolean,
Expand All @@ -82,7 +89,8 @@ final class DynamoDBSettings private (
val cleanupSettings: CleanupSettings,
val timeToLiveSettings: TimeToLiveSettings,
val journalBySliceGsi: String,
val snapshotBySliceGsi: String)
val snapshotBySliceGsi: String,
val clockSkewSettings: ClockSkewSettings)

final class QuerySettings(config: Config) {
val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala
Expand Down Expand Up @@ -278,6 +286,22 @@ final class EventSourcedEntityTimeToLiveSettings(config: Config) {
val snapshotTimeToLive: Option[FiniteDuration] = ConfigHelpers.optDuration(config, "snapshot-time-to-live")
}

/**
* INTERNAL API
*/
@InternalStableApi
final class ClockSkewSettings(config: Config) {
val warningTolerance: FiniteDuration = {
val path = "clock-skew-detection.warning-tolerance"
Helpers.toRootLowerCase(config.getString(path)) match {
case "off" | "none" => Duration.Zero
case _ => config.getDuration(path).toScala
}
}

override def toString: String = s"ClockSkewSettings($warningTolerance)"
}

private[akka] object ConfigHelpers {
def optString(config: Config, path: String): Option[String] = {
if (config.hasPath(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ package akka.persistence.dynamodb.internal

import java.nio.ByteBuffer
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.concurrent.CompletionException
import java.util.Base64
import java.util.Locale
import java.util.{ HashMap => JHashMap }
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
Expand All @@ -25,6 +29,7 @@ import akka.persistence.typed.PersistenceId
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.core.SdkResponse
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.Delete
Expand Down Expand Up @@ -58,6 +63,36 @@ import software.amazon.awssdk.services.dynamodb.model.Update

private implicit val ec: ExecutionContext = system.executionContext

private val dateHeaderLogCounter = new AtomicLong
private val dateHeaderParser = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.US)
private val clockSkewToleranceMillis = settings.clockSkewSettings.warningTolerance.toMillis

private def checkClockSkew(response: SdkResponse): Unit = {
try {
if (clockSkewToleranceMillis > 0 &&
dateHeaderLogCounter.getAndIncrement() % 1000 == 0) {
val dateHeaderOpt = response.sdkHttpResponse().firstMatchingHeader("Date")
if (dateHeaderOpt.isPresent) {
val dateHeader = dateHeaderOpt.get
val awsInstant = Instant.from(dateHeaderParser.parse(dateHeader))
val now = Instant.now()
// The Date header only has precision of seconds so this is just a rough check
if (math.abs(java.time.Duration.between(awsInstant, now).toMillis) >= clockSkewToleranceMillis) {
log.warn(
"Possible clock skew, make sure clock synchronization is installed. " +
"Local time [{}] vs DynamoDB response time [{}]",
now,
awsInstant)
}
}
}
} catch {
case NonFatal(exc) =>
log.warn("check clock skew failed", exc)
}

}

def writeEvents(events: Seq[SerializedJournalItem]): Future[Done] = {
require(events.nonEmpty)

Expand Down Expand Up @@ -118,7 +153,14 @@ import software.amazon.awssdk.services.dynamodb.model.Update
response.consumedCapacity.capacityUnits)
}
}
result.map(_ => Done)(ExecutionContext.parasitic)
result
.map { response =>
checkClockSkew(response)
Done
}(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
} else {
val writeItems =
events.map { item =>
Expand Down Expand Up @@ -160,7 +202,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.map { response =>
checkClockSkew(response)
Done
}(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
Expand Down

0 comments on commit 1ee98ab

Please sign in to comment.