Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Check for clock skew with response header #113

Merged
merged 3 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,12 @@ akka.persistence.dynamodb {
}
}
// #client-settings

akka.persistence.dynamodb {
clock-skew-detection {
# When the local clock and time in AWS response diverge by more than this duration
# a warning is logged. Can be disabled by setting this to "off".
# This check only has precision of seconds.
warning-tolerance = 2 seconds
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package akka.persistence.dynamodb

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
Expand Down Expand Up @@ -55,6 +56,8 @@ object DynamoDBSettings {
if (indexName.nonEmpty) indexName else snapshotTable + "_slice_idx"
}

val clockSkewSettings = new ClockSkewSettings(config)

new DynamoDBSettings(
journalTable,
journalPublishEvents,
Expand All @@ -63,7 +66,8 @@ object DynamoDBSettings {
cleanupSettings,
timeToLiveSettings,
journalBySliceGsi,
snapshotBySliceGsi)
snapshotBySliceGsi,
clockSkewSettings)
}

/**
Expand All @@ -74,6 +78,9 @@ object DynamoDBSettings {

}

/**
* Use `DynamoDBSettings.apply` or `DynamoDBSettings.create` for construction.
*/
final class DynamoDBSettings private (
val journalTable: String,
val journalPublishEvents: Boolean,
Expand All @@ -82,7 +89,8 @@ final class DynamoDBSettings private (
val cleanupSettings: CleanupSettings,
val timeToLiveSettings: TimeToLiveSettings,
val journalBySliceGsi: String,
val snapshotBySliceGsi: String)
val snapshotBySliceGsi: String,
val clockSkewSettings: ClockSkewSettings)

final class QuerySettings(config: Config) {
val refreshInterval: FiniteDuration = config.getDuration("refresh-interval").toScala
Expand Down Expand Up @@ -278,6 +286,22 @@ final class EventSourcedEntityTimeToLiveSettings(config: Config) {
val snapshotTimeToLive: Option[FiniteDuration] = ConfigHelpers.optDuration(config, "snapshot-time-to-live")
}

/**
* INTERNAL API
*/
@InternalStableApi
final class ClockSkewSettings(config: Config) {
val warningTolerance: FiniteDuration = {
val path = "clock-skew-detection.warning-tolerance"
Helpers.toRootLowerCase(config.getString(path)) match {
case "off" | "none" => Duration.Zero
case _ => config.getDuration(path).toScala
}
}

override def toString: String = s"ClockSkewSettings($warningTolerance)"
}

private[akka] object ConfigHelpers {
def optString(config: Config, path: String): Option[String] = {
if (config.hasPath(path)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ package akka.persistence.dynamodb.internal

import java.nio.ByteBuffer
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.concurrent.CompletionException
import java.util.Base64
import java.util.Locale
import java.util.{ HashMap => JHashMap }
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.util.control.NonFatal

import akka.Done
import akka.actor.typed.ActorSystem
Expand All @@ -25,6 +29,7 @@ import akka.persistence.typed.PersistenceId
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.core.SdkResponse
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.Delete
Expand Down Expand Up @@ -58,6 +63,36 @@ import software.amazon.awssdk.services.dynamodb.model.Update

private implicit val ec: ExecutionContext = system.executionContext

private val dateHeaderLogCounter = new AtomicLong
private val dateHeaderParser = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss z", Locale.US)
private val clockSkewToleranceMillis = settings.clockSkewSettings.warningTolerance.toMillis

private def checkClockSkew(response: SdkResponse): Unit = {
try {
if (clockSkewToleranceMillis > 0 &&
dateHeaderLogCounter.getAndIncrement() % 1000 == 0) {
val dateHeaderOpt = response.sdkHttpResponse().firstMatchingHeader("Date")
if (dateHeaderOpt.isPresent) {
val dateHeader = dateHeaderOpt.get
val awsInstant = Instant.from(dateHeaderParser.parse(dateHeader))
val now = Instant.now()
// The Date header only has precision of seconds so this is just a rough check
if (math.abs(java.time.Duration.between(awsInstant, now).toMillis) >= clockSkewToleranceMillis) {
log.warn(
"Possible clock skew, make sure clock synchronization is installed. " +
"Local time [{}] vs DynamoDB response time [{}]",
now,
awsInstant)
}
}
}
} catch {
case NonFatal(exc) =>
log.warn("check clock skew failed", exc)
}

}

def writeEvents(events: Seq[SerializedJournalItem]): Future[Done] = {
require(events.nonEmpty)

Expand Down Expand Up @@ -118,7 +153,14 @@ import software.amazon.awssdk.services.dynamodb.model.Update
response.consumedCapacity.capacityUnits)
}
}
result.map(_ => Done)(ExecutionContext.parasitic)
result
.map { response =>
checkClockSkew(response)
Done
}(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
} else {
val writeItems =
events.map { item =>
Expand Down Expand Up @@ -160,7 +202,10 @@ import software.amazon.awssdk.services.dynamodb.model.Update
}
}
result
.map(_ => Done)(ExecutionContext.parasitic)
.map { response =>
checkClockSkew(response)
Done
}(ExecutionContext.parasitic)
.recoverWith { case c: CompletionException =>
Future.failed(c.getCause)
}(ExecutionContext.parasitic)
Expand Down