Skip to content

Commit

Permalink
feat: Support for pluggable metric reporting (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
leviramsey authored Oct 2, 2024
1 parent 5f966b0 commit f62012d
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 3 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy val docs = project
.settings(dontPublish)
.settings(
name := "Akka Persistence plugin for Amazon DynamoDB",
libraryDependencies ++= Dependencies.docs,
libraryDependencies ++= (Dependencies.TestDeps.cloudwatchMetricPublisher +: Dependencies.docs),
makeSite := makeSite.dependsOn(LocalRootProject / ScalaUnidoc / doc).value,
previewPath := (Paradox / siteSubdirName).value,
Preprocess / siteSubdirName := s"api/akka-persistence-dynamodb/${projectInfoVersion.value}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.metrics.MetricPublisher

object ClientProvider extends ExtensionId[ClientProvider] {
def createExtension(system: ActorSystem[_]): ClientProvider = new ClientProvider(system)
Expand All @@ -35,6 +36,7 @@ object ClientProvider extends ExtensionId[ClientProvider] {
class ClientProvider(system: ActorSystem[_]) extends Extension {
private val clients = new ConcurrentHashMap[String, DynamoDbAsyncClient]
private val clientSettings = new ConcurrentHashMap[String, ClientSettings]
private val metricsProvider = AWSClientMetricsResolver.resolve(system)

CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseBeforeActorSystemTerminate, "close DynamoDB clients") { () =>
Expand All @@ -48,7 +50,7 @@ class ClientProvider(system: ActorSystem[_]) extends Extension {
configLocation,
configLocation => {
val settings = clientSettingsFor(configLocation)
createClient(settings)
createClient(settings, metricsProvider.map(_.metricPublisherFor(configLocation)))
})
}

Expand All @@ -63,7 +65,7 @@ class ClientProvider(system: ActorSystem[_]) extends Extension {
}
}

private def createClient(settings: ClientSettings): DynamoDbAsyncClient = {
private def createClient(settings: ClientSettings, metricsPublisher: Option[MetricPublisher]): DynamoDbAsyncClient = {
val httpClientBuilder = NettyNioAsyncHttpClient.builder
.maxConcurrency(settings.http.maxConcurrency)
.maxPendingConnectionAcquires(settings.http.maxPendingConnectionAcquires)
Expand Down Expand Up @@ -98,6 +100,8 @@ class ClientProvider(system: ActorSystem[_]) extends Extension {
overrideConfigurationBuilder = overrideConfigurationBuilder.apiCallAttemptTimeout(timeout.toJava)
}

metricsPublisher.foreach { mp => overrideConfigurationBuilder.addMetricPublisher(mp) }

overrideConfigurationBuilder.build()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.dynamodb.util

import akka.actor.ClassicActorSystemProvider
import akka.actor.ExtendedActorSystem
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi

import software.amazon.awssdk.metrics.MetricCollection
import software.amazon.awssdk.metrics.MetricPublisher

import scala.jdk.CollectionConverters.ListHasAsScala

import java.util.concurrent.ConcurrentHashMap

/**
* Service Provider Interface for injecting AWS SDK MetricPublisher into the underlying DynamoDB client (see
* https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/metrics-list.html).
*
* Implementations must include a single constructor with one argument: an `akka.actor.ClassicActorSystemProvider`. To
* setup your implementation, add a setting to your 'application.conf':
*
* {{{
* akka.persistence.dynamodb.client.metrics-providers += com.myexample.MyAWSMetricsProvider
* }}}
*/
@ApiMayChange
trait AWSClientMetricsProvider {

/**
* Given an overall config path for Akka Persistence DynamoDB (e.g. 'akka.persistence.dynamodb') returns an instance
* of an AWS SDK MetricPublisher which publishes SDK client metrics to the location of this implementation's choosing.
*/
def metricPublisherFor(configLocation: String): MetricPublisher
}

/** INTERNAL API */
@InternalApi
private[dynamodb] object AWSClientMetricsResolver {
def resolve(system: ClassicActorSystemProvider): Option[AWSClientMetricsProvider] = {
val providersPath = "akka.persistence.dynamodb.client.metrics-providers"
val config = system.classicSystem.settings.config
if (!config.hasPath(providersPath)) {
None
} else {
val fqcns = config.getStringList(providersPath)

fqcns.size match {
case 0 => None
case 1 => Some(createProvider(system, fqcns.get(0)))
case _ =>
val providers = fqcns.asScala.toSeq.map(fqcn => createProvider(system, fqcn))
Some(new EnsembleAWSClientMetricsProvider(providers))
}
}
}

def createProvider(system: ClassicActorSystemProvider, fqcn: String): AWSClientMetricsProvider = {
system.classicSystem
.asInstanceOf[ExtendedActorSystem]
.dynamicAccess
.createInstanceFor[AWSClientMetricsProvider](fqcn, List(classOf[ClassicActorSystemProvider] -> system))
.get
}

// This technically does not follow the construction convention that would allow it
// to be reflectively constructed, but we don't reflectively construct it
private class EnsembleAWSClientMetricsProvider(providers: Seq[AWSClientMetricsProvider])
extends AWSClientMetricsProvider {
private val instances = new ConcurrentHashMap[String, MetricPublisher]()

def metricPublisherFor(configLocation: String): MetricPublisher =
instances.computeIfAbsent(
configLocation,
path =>
new MetricPublisher {
private val publishers = providers.map(_.metricPublisherFor(configLocation))

def publish(metricCollection: MetricCollection): Unit = {
publishers.foreach(_.publish(metricCollection))
}

def close(): Unit = {
publishers.foreach(_.close())
}
})

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package akka.persistence.dynamodb.util

import scala.concurrent.duration._
import scala.jdk.CollectionConverters.ListHasAsScala
import scala.jdk.OptionConverters._

import akka.actor.ClassicActorSystemProvider
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.testkit.typed.scaladsl.ActorTestKitBase
import akka.util.JavaDurationConverters._
Expand All @@ -17,6 +19,8 @@ import org.scalatest.wordspec.AnyWordSpec
import software.amazon.awssdk.core.retry.RetryMode
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.metrics.MetricCollection
import software.amazon.awssdk.metrics.MetricPublisher

class ClientProviderSpec extends AnyWordSpec with Matchers with OptionValues {

Expand Down Expand Up @@ -62,6 +66,22 @@ class ClientProviderSpec extends AnyWordSpec with Matchers with OptionValues {
compressionConfiguration.minimumCompressionThresholdInBytes shouldBe 10240
}

"create client with a MetricPublisher" in withActorTestKit("""
akka.persistence.dynamodb.client {
region = "us-east-1"
metrics-providers += akka.persistence.dynamodb.util.TestNoopMetricsProvider
}
""") { testKit =>
val clientConfigLocation = "akka.persistence.dynamodb.client"
val client = ClientProvider(testKit.system).clientFor(clientConfigLocation)

val clientConfiguration = client.serviceClientConfiguration
val overrideConfiguration = clientConfiguration.overrideConfiguration
val metricPublishers = overrideConfiguration.metricPublishers.asScala.toSeq
metricPublishers.size shouldBe 1
metricPublishers should contain(TestNoopMetricsProvider.publisher)
}

"create client with configured settings" in withActorTestKit("""
akka.persistence.dynamodb.client {
call-timeout = 3 seconds
Expand Down Expand Up @@ -158,3 +178,15 @@ class ClientProviderSpec extends AnyWordSpec with Matchers with OptionValues {
}

}

class TestNoopMetricsProvider(system: ClassicActorSystemProvider) extends AWSClientMetricsProvider {
def metricPublisherFor(configLocation: String): MetricPublisher = TestNoopMetricsProvider.publisher
}

object TestNoopMetricsProvider {
val publisher =
new MetricPublisher {
def publish(collection: MetricCollection): Unit = ()
def close(): Unit = ()
}
}
1 change: 1 addition & 0 deletions docs/src/main/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This Akka Persistence plugin allows for using Amazon DynamoDB as a backend for A
* [Query Plugin](query.md)
* [Projection](projection.md)
* [Configuration](config.md)
* [Observability](observability.md)
* [Database cleanup](cleanup.md)
* [Contributing](contributing.md)

Expand Down
29 changes: 29 additions & 0 deletions docs/src/main/paradox/observability.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Observability

This plugin supports injecting an [AWS `MetricPublisher`](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/design/core/metrics/Design.md) into the underlying DynamoDB SDK client. This injection is accomplished by defining a class @scala[extending]@java[implementing] @apidoc[akka.persistence.dynamodb.util.AWSClientMetricsProvider].

Your implementation must expose a single constructor with one argument: an `akka.actor.ClassicActorSystemProvider`. Its `metricPublisherFor` method will take the config path to the `client` section of this instance of the plugin @ref:[configuration](config.md#multiple-plugins).

The AWS SDK provides an implementation of `MetricPublisher` which publishes to [Amazon CloudWatch](https://docs.aws.amazon.com/cloudwatch/). An `AWSClientMetricsProvider` providing [this `MetricPublisher`](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/metrics.html) with defaults would look like:

Scala
: @@snip [cloudwatch default](/docs/src/test/scala/docs/CloudWatchProvider.scala) { #cloudwatch-default }

Java
: @@snip [cloudwatch default](/docs/src/test/java/jdocs/CloudWatchWithDefaultConfigurationMetricsProvider.java) { #cloudwatch-default }

To register your provider implementation with the plugin, add its fully-qualified class name to the configuration path `akka.persistence.dynamodb.client.metrics-providers` (e.g. in `application.conf`):

```
akka.persistence.dynamodb.client.metrics-providers += domain.package.CloudWatchWithDefaultConfigurationMetricsProvider
```

In a test case, it may be useful to set the entire list of `metrics-providers` explicitly:

```
akka.persistence.dynamodb.client.metrics-providers = [ "domain.package.CloudWatchWithDefaultConfigurationMetricsProvider" ]
```

If multiple providers are specified, they will automatically be combined into a "meta-provider" which provides a publisher which will publish using _all_ of the specified providers' respective publishers.

If implementing your own `MetricPublisher`, [Amazon recommends that care be taken to not block the thread calling the methods of the `MetricPublisher`](https://github.com/aws/aws-sdk-java-v2/blob/master/docs/design/core/metrics/Design.md#performance): all I/O and "heavy" computation should be performed asynchronously and control immediately returned to the caller.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package jdocs;

// #cloudwatch-default
import akka.actor.ClassicActorSystemProvider;
import akka.persistence.dynamodb.util.AWSClientMetricsProvider;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher;

public class CloudWatchWithDefaultConfigurationMetricsProvider implements AWSClientMetricsProvider {
public CloudWatchWithDefaultConfigurationMetricsProvider(ClassicActorSystemProvider system) {
}

@Override
public MetricPublisher metricPublisherFor(String configLocation) {
// These are just the defaults... a more elaborate configuration using its builder is possible
return CloudWatchMetricPublisher.create();
}
}
// #cloudwatch-default
16 changes: 16 additions & 0 deletions docs/src/test/scala/docs/CloudWatchProvider.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package docs

// #cloudwatch-default
import akka.actor.ClassicActorSystemProvider
import akka.persistence.dynamodb.util.AWSClientMetricsProvider
import software.amazon.awssdk.metrics.MetricPublisher
import software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchMetricPublisher

class CloudWatchWithDefaultConfigurationMetricsProvider(system: ClassicActorSystemProvider)
extends AWSClientMetricsProvider {
def metricPublisherFor(configLocation: String): MetricPublisher = {
// These are just the defaults... a more elaborate configuration using its builder is possible
CloudWatchMetricPublisher.create()
}
}
// #cloudwatch-default
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ object Dependencies {
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.12" % Test // ApacheV2
val junit = "junit" % "junit" % "4.12" % Test // Eclipse Public License 1.0
val junitInterface = "com.novocode" % "junit-interface" % "0.11" % Test // "BSD 2-Clause"

val cloudwatchMetricPublisher = "software.amazon.awssdk" % "cloudwatch-metric-publisher" % AwsSdkVersion % Test
}

import Compile._
Expand Down

0 comments on commit f62012d

Please sign in to comment.