diff --git a/core/src/main/scala/akka/persistence/dynamodb/util/TableSettings.scala b/core/src/main/scala/akka/persistence/dynamodb/util/TableSettings.scala new file mode 100644 index 0000000..ec3b6c6 --- /dev/null +++ b/core/src/main/scala/akka/persistence/dynamodb/util/TableSettings.scala @@ -0,0 +1,146 @@ +/* + * Copyright (C) 2024 Lightbend Inc. + */ + +package akka.persistence.dynamodb.util + +object TableSettings { + + /** + * Default table settings for running with DynamoDB local. + */ + val Local = new TableSettings(throughput = ThroughputSettings.Local) + + /** + * Scala API: Create table settings for CreateTables utilities. + * + * @param throughput + * throughput settings for the table + * @return + * table settings + */ + def apply(throughput: ThroughputSettings): TableSettings = + new TableSettings(throughput) + + /** + * Java API: Create table settings for CreateTables utilities. + * + * @param throughput + * throughput settings for the table + * @return + * table settings + */ + def create(throughput: ThroughputSettings): TableSettings = + new TableSettings(throughput) +} + +/** + * Table settings for CreateTables utilities. + * + * @param throughput + * throughput settings for the table + */ +final class TableSettings(val throughput: ThroughputSettings) + +object IndexSettings { + + /** + * Default index settings for running with DynamoDB local. + */ + val Local = new IndexSettings(enabled = true, throughput = ThroughputSettings.Local) + + /** + * Disabled index settings. + */ + val Disabled = new IndexSettings(enabled = false, throughput = ThroughputSettings.Local) + + /** + * Scala API: Create index settings for CreateTables utilities. + * + * @param throughput + * throughput settings for the index + * @return + * index settings for an enabled index + */ + def apply(throughput: ThroughputSettings): IndexSettings = + new IndexSettings(enabled = true, throughput) + + /** + * Java API: Create index settings for CreateTables utilities. + * + * @param throughput + * throughput settings for the index + * @return + * index settings for an enabled index + */ + def create(throughput: ThroughputSettings): IndexSettings = + new IndexSettings(enabled = true, throughput) +} + +/** + * Index settings for CreateTables utilities. + * + * @param enabled + * whether the index is enabled + * @param throughput + * throughput settings for the index + */ +final class IndexSettings(val enabled: Boolean, val throughput: ThroughputSettings) + +object ThroughputSettings { + + /** + * Default throughput settings for running with DynamoDB local. + */ + val Local: ThroughputSettings = provisioned(readCapacityUnits = 5L, writeCapacityUnits = 5L) + + /** + * Create provisioned throughput settings. + * + * @param readCapacityUnits + * the maximum number of strongly consistent reads consumed per second + * @param writeCapacityUnits + * the maximum number of writes consumed per second + * @return + * provisioned throughput settings + */ + def provisioned(readCapacityUnits: Long, writeCapacityUnits: Long): ThroughputSettings = + new ProvisionedThroughputSettings(readCapacityUnits, writeCapacityUnits) + + /** + * Create on-demand throughput settings. + * + * @param maxReadRequestUnits + * the maximum number of read request units (for no maximum, set to -1) + * @param maxWriteRequestUnits + * the maximum number of write request units (for no maximum, set to -1) + * @return + * on-demand throughput settings + */ + def onDemand(maxReadRequestUnits: Long, maxWriteRequestUnits: Long): ThroughputSettings = + new OnDemandThroughputSettings(maxReadRequestUnits, maxWriteRequestUnits) +} + +sealed trait ThroughputSettings + +/** + * Provisioned throughput settings. + * + * @param readCapacityUnits + * the maximum number of strongly consistent reads consumed per second + * @param writeCapacityUnits + * the maximum number of writes consumed per second + */ +final class ProvisionedThroughputSettings(val readCapacityUnits: Long, val writeCapacityUnits: Long) + extends ThroughputSettings + +/** + * On-demand throughput settings. + * + * @param maxReadRequestUnits + * the maximum number of read request units (for no maximum, set to -1) + * @param maxWriteRequestUnits + * the maximum number of write request units (for no maximum, set to -1) + */ +final class OnDemandThroughputSettings(val maxReadRequestUnits: Long, val maxWriteRequestUnits: Long) + extends ThroughputSettings diff --git a/core/src/main/scala/akka/persistence/dynamodb/util/javadsl/CreateTables.scala b/core/src/main/scala/akka/persistence/dynamodb/util/javadsl/CreateTables.scala index b4c9e4b..75b8a2f 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/util/javadsl/CreateTables.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/util/javadsl/CreateTables.scala @@ -11,6 +11,8 @@ import scala.jdk.FutureConverters._ import akka.Done import akka.actor.typed.ActorSystem import akka.persistence.dynamodb.DynamoDBSettings +import akka.persistence.dynamodb.util.IndexSettings +import akka.persistence.dynamodb.util.TableSettings import akka.persistence.dynamodb.util.scaladsl import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient @@ -20,12 +22,34 @@ object CreateTables { settings: DynamoDBSettings, client: DynamoDbAsyncClient, deleteIfExists: Boolean): CompletionStage[Done] = - scaladsl.CreateTables.createJournalTable(system, settings, client, deleteIfExists).asJava + createJournalTable(system, settings, client, deleteIfExists, TableSettings.Local, IndexSettings.Local) + + def createJournalTable( + system: ActorSystem[_], + settings: DynamoDBSettings, + client: DynamoDbAsyncClient, + deleteIfExists: Boolean, + tableSettings: TableSettings, + sliceIndexSettings: IndexSettings): CompletionStage[Done] = + scaladsl.CreateTables + .createJournalTable(system, settings, client, deleteIfExists, tableSettings, sliceIndexSettings) + .asJava def createSnapshotsTable( system: ActorSystem[_], settings: DynamoDBSettings, client: DynamoDbAsyncClient, deleteIfExists: Boolean): CompletionStage[Done] = - scaladsl.CreateTables.createSnapshotsTable(system, settings, client, deleteIfExists).asJava + createSnapshotsTable(system, settings, client, deleteIfExists, TableSettings.Local, IndexSettings.Local) + + def createSnapshotsTable( + system: ActorSystem[_], + settings: DynamoDBSettings, + client: DynamoDbAsyncClient, + deleteIfExists: Boolean, + tableSettings: TableSettings, + sliceIndexSettings: IndexSettings): CompletionStage[Done] = + scaladsl.CreateTables + .createSnapshotsTable(system, settings, client, deleteIfExists, tableSettings, sliceIndexSettings) + .asJava } diff --git a/core/src/main/scala/akka/persistence/dynamodb/util/scaladsl/CreateTables.scala b/core/src/main/scala/akka/persistence/dynamodb/util/scaladsl/CreateTables.scala index 5d69cc4..949433d 100644 --- a/core/src/main/scala/akka/persistence/dynamodb/util/scaladsl/CreateTables.scala +++ b/core/src/main/scala/akka/persistence/dynamodb/util/scaladsl/CreateTables.scala @@ -16,6 +16,10 @@ import akka.Done import akka.actor.typed.ActorSystem import akka.dispatch.ExecutionContexts import akka.persistence.dynamodb.DynamoDBSettings +import akka.persistence.dynamodb.util.IndexSettings +import akka.persistence.dynamodb.util.OnDemandThroughputSettings +import akka.persistence.dynamodb.util.ProvisionedThroughputSettings +import akka.persistence.dynamodb.util.TableSettings import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest @@ -24,6 +28,7 @@ import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement import software.amazon.awssdk.services.dynamodb.model.KeyType +import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput import software.amazon.awssdk.services.dynamodb.model.Projection import software.amazon.awssdk.services.dynamodb.model.ProjectionType import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput @@ -36,7 +41,9 @@ object CreateTables { system: ActorSystem[_], settings: DynamoDBSettings, client: DynamoDbAsyncClient, - deleteIfExists: Boolean): Future[Done] = { + deleteIfExists: Boolean, + tableSettings: TableSettings = TableSettings.Local, + sliceIndexSettings: IndexSettings = IndexSettings.Local): Future[Done] = { import akka.persistence.dynamodb.internal.JournalAttributes._ implicit val ec: ExecutionContext = system.executionContext @@ -44,41 +51,89 @@ object CreateTables { client.describeTable(DescribeTableRequest.builder().tableName(settings.journalTable).build()).asScala def create(): Future[Done] = { - val sliceIndex = GlobalSecondaryIndex - .builder() - .indexName(settings.journalBySliceGsi) - .keySchema( - KeySchemaElement.builder().attributeName(EntityTypeSlice).keyType(KeyType.HASH).build(), - KeySchemaElement.builder().attributeName(Timestamp).keyType(KeyType.RANGE).build()) - .projection( - // FIXME we could skip a few attributes - Projection.builder().projectionType(ProjectionType.ALL).build()) - // FIXME config - .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(10L).writeCapacityUnits(10L).build()) - .build() - - val req = CreateTableRequest - .builder() + val sliceIndex = if (sliceIndexSettings.enabled) { + var indexBuilder = + GlobalSecondaryIndex.builder + .indexName(settings.journalBySliceGsi) + .keySchema( + KeySchemaElement.builder().attributeName(EntityTypeSlice).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(Timestamp).keyType(KeyType.RANGE).build()) + .projection(Projection.builder().projectionType(ProjectionType.ALL).build()) + + indexBuilder = sliceIndexSettings.throughput match { + case provisioned: ProvisionedThroughputSettings => + indexBuilder.provisionedThroughput( + ProvisionedThroughput.builder + .readCapacityUnits(provisioned.readCapacityUnits) + .writeCapacityUnits(provisioned.writeCapacityUnits) + .build()) + case onDemand: OnDemandThroughputSettings => + indexBuilder.onDemandThroughput( + OnDemandThroughput.builder + .maxReadRequestUnits(onDemand.maxReadRequestUnits) + .maxWriteRequestUnits(onDemand.maxWriteRequestUnits) + .build()) + } + + Some(indexBuilder.build()) + } else None + + var requestBuilder = CreateTableRequest.builder .tableName(settings.journalTable) .keySchema( KeySchemaElement.builder().attributeName(Pid).keyType(KeyType.HASH).build(), KeySchemaElement.builder().attributeName(SeqNr).keyType(KeyType.RANGE).build()) - .attributeDefinitions( - AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build(), - AttributeDefinition.builder().attributeName(SeqNr).attributeType(ScalarAttributeType.N).build(), - AttributeDefinition.builder().attributeName(EntityTypeSlice).attributeType(ScalarAttributeType.S).build(), - AttributeDefinition.builder().attributeName(Timestamp).attributeType(ScalarAttributeType.N).build()) - // FIXME config - .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()) - .globalSecondaryIndexes(sliceIndex) - .build() - - client.createTable(req).asScala.map(_ => Done) + + val tableAttributes = Seq( + AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build(), + AttributeDefinition.builder().attributeName(SeqNr).attributeType(ScalarAttributeType.N).build()) + + val tableWithIndexAttributes = tableAttributes ++ Seq( + AttributeDefinition.builder().attributeName(EntityTypeSlice).attributeType(ScalarAttributeType.S).build(), + AttributeDefinition.builder().attributeName(Timestamp).attributeType(ScalarAttributeType.N).build()) + + requestBuilder = sliceIndex match { + case Some(index) => + requestBuilder + .attributeDefinitions(tableWithIndexAttributes: _*) + .globalSecondaryIndexes(index) + case None => + requestBuilder.attributeDefinitions(tableAttributes: _*) + } + + requestBuilder = tableSettings.throughput match { + case provisioned: ProvisionedThroughputSettings => + requestBuilder.provisionedThroughput( + ProvisionedThroughput.builder + .readCapacityUnits(provisioned.readCapacityUnits) + .writeCapacityUnits(provisioned.writeCapacityUnits) + .build()) + case onDemand: OnDemandThroughputSettings => + requestBuilder.onDemandThroughput( + OnDemandThroughput.builder + .maxReadRequestUnits(onDemand.maxReadRequestUnits) + .maxWriteRequestUnits(onDemand.maxWriteRequestUnits) + .build()) + } + + client + .createTable(requestBuilder.build()) + .asScala + .map(_ => Done) + .recoverWith { case c: CompletionException => + Future.failed(c.getCause) + }(ExecutionContexts.parasitic) } def delete(): Future[Done] = { val req = DeleteTableRequest.builder().tableName(settings.journalTable).build() - client.deleteTable(req).asScala.map(_ => Done) + client + .deleteTable(req) + .asScala + .map(_ => Done) + .recoverWith { case c: CompletionException => + Future.failed(c.getCause) + }(ExecutionContexts.parasitic) } existingTable.transformWith { @@ -100,7 +155,9 @@ object CreateTables { system: ActorSystem[_], settings: DynamoDBSettings, client: DynamoDbAsyncClient, - deleteIfExists: Boolean): Future[Done] = { + deleteIfExists: Boolean, + tableSettings: TableSettings = TableSettings.Local, + sliceIndexSettings: IndexSettings = IndexSettings.Local): Future[Done] = { import akka.persistence.dynamodb.internal.SnapshotAttributes._ implicit val ec: ExecutionContext = system.executionContext @@ -109,32 +166,70 @@ object CreateTables { client.describeTable(DescribeTableRequest.builder().tableName(settings.snapshotTable).build()).asScala def create(): Future[Done] = { - val sliceIndex = GlobalSecondaryIndex - .builder() - .indexName(settings.snapshotBySliceGsi) - .keySchema( - KeySchemaElement.builder().attributeName(EntityTypeSlice).keyType(KeyType.HASH).build(), - KeySchemaElement.builder().attributeName(EventTimestamp).keyType(KeyType.RANGE).build()) - .projection(Projection.builder().projectionType(ProjectionType.ALL).build()) - // FIXME config - .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()) - .build() - - val request = CreateTableRequest - .builder() + val sliceIndex = if (sliceIndexSettings.enabled) { + var indexBuilder = + GlobalSecondaryIndex.builder + .indexName(settings.snapshotBySliceGsi) + .keySchema( + KeySchemaElement.builder().attributeName(EntityTypeSlice).keyType(KeyType.HASH).build(), + KeySchemaElement.builder().attributeName(EventTimestamp).keyType(KeyType.RANGE).build()) + .projection(Projection.builder().projectionType(ProjectionType.ALL).build()) + + indexBuilder = sliceIndexSettings.throughput match { + case provisioned: ProvisionedThroughputSettings => + indexBuilder.provisionedThroughput( + ProvisionedThroughput.builder + .readCapacityUnits(provisioned.readCapacityUnits) + .writeCapacityUnits(provisioned.writeCapacityUnits) + .build()) + case onDemand: OnDemandThroughputSettings => + indexBuilder.onDemandThroughput( + OnDemandThroughput.builder + .maxReadRequestUnits(onDemand.maxReadRequestUnits) + .maxWriteRequestUnits(onDemand.maxWriteRequestUnits) + .build()) + } + + Some(indexBuilder.build()) + } else None + + var requestBuilder = CreateTableRequest.builder .tableName(settings.snapshotTable) .keySchema(KeySchemaElement.builder().attributeName(Pid).keyType(KeyType.HASH).build()) - .attributeDefinitions( - AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build(), - AttributeDefinition.builder().attributeName(EntityTypeSlice).attributeType(ScalarAttributeType.S).build(), - AttributeDefinition.builder().attributeName(EventTimestamp).attributeType(ScalarAttributeType.N).build()) - // FIXME config - .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()) - .globalSecondaryIndexes(sliceIndex) - .build() + + val tableAttributes = + Seq(AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build()) + + val tableWithIndexAttributes = tableAttributes ++ Seq( + AttributeDefinition.builder().attributeName(EntityTypeSlice).attributeType(ScalarAttributeType.S).build(), + AttributeDefinition.builder().attributeName(EventTimestamp).attributeType(ScalarAttributeType.N).build()) + + requestBuilder = sliceIndex match { + case Some(index) => + requestBuilder + .attributeDefinitions(tableWithIndexAttributes: _*) + .globalSecondaryIndexes(index) + case None => + requestBuilder.attributeDefinitions(tableAttributes: _*) + } + + requestBuilder = tableSettings.throughput match { + case provisioned: ProvisionedThroughputSettings => + requestBuilder.provisionedThroughput( + ProvisionedThroughput.builder + .readCapacityUnits(provisioned.readCapacityUnits) + .writeCapacityUnits(provisioned.writeCapacityUnits) + .build()) + case onDemand: OnDemandThroughputSettings => + requestBuilder.onDemandThroughput( + OnDemandThroughput.builder + .maxReadRequestUnits(onDemand.maxReadRequestUnits) + .maxWriteRequestUnits(onDemand.maxWriteRequestUnits) + .build()) + } client - .createTable(request) + .createTable(requestBuilder.build()) .asScala .map(_ => Done) .recoverWith { case c: CompletionException => diff --git a/projection/src/main/scala/akka/projection/dynamodb/javadsl/CreateTables.scala b/projection/src/main/scala/akka/projection/dynamodb/javadsl/CreateTables.scala index d957337..3a0343b 100644 --- a/projection/src/main/scala/akka/projection/dynamodb/javadsl/CreateTables.scala +++ b/projection/src/main/scala/akka/projection/dynamodb/javadsl/CreateTables.scala @@ -10,6 +10,7 @@ import scala.jdk.FutureConverters._ import akka.Done import akka.actor.typed.ActorSystem +import akka.persistence.dynamodb.util.TableSettings import akka.projection.dynamodb.DynamoDBProjectionSettings import akka.projection.dynamodb.scaladsl import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient @@ -20,6 +21,15 @@ object CreateTables { settings: DynamoDBProjectionSettings, client: DynamoDbAsyncClient, deleteIfExists: Boolean): CompletionStage[Done] = - scaladsl.CreateTables.createTimestampOffsetStoreTable(system, settings, client, deleteIfExists).asJava + createTimestampOffsetStoreTable(system, settings, client, deleteIfExists, TableSettings.Local) + def createTimestampOffsetStoreTable( + system: ActorSystem[_], + settings: DynamoDBProjectionSettings, + client: DynamoDbAsyncClient, + deleteIfExists: Boolean, + tableSettings: TableSettings): CompletionStage[Done] = + scaladsl.CreateTables + .createTimestampOffsetStoreTable(system, settings, client, deleteIfExists, tableSettings) + .asJava } diff --git a/projection/src/main/scala/akka/projection/dynamodb/scaladsl/CreateTables.scala b/projection/src/main/scala/akka/projection/dynamodb/scaladsl/CreateTables.scala index b3c282d..5e25304 100644 --- a/projection/src/main/scala/akka/projection/dynamodb/scaladsl/CreateTables.scala +++ b/projection/src/main/scala/akka/projection/dynamodb/scaladsl/CreateTables.scala @@ -15,6 +15,9 @@ import scala.util.Success import akka.Done import akka.actor.typed.ActorSystem import akka.dispatch.ExecutionContexts +import akka.persistence.dynamodb.util.OnDemandThroughputSettings +import akka.persistence.dynamodb.util.ProvisionedThroughputSettings +import akka.persistence.dynamodb.util.TableSettings import akka.projection.dynamodb.DynamoDBProjectionSettings import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition @@ -23,6 +26,7 @@ import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement import software.amazon.awssdk.services.dynamodb.model.KeyType +import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType @@ -32,7 +36,8 @@ object CreateTables { system: ActorSystem[_], settings: DynamoDBProjectionSettings, client: DynamoDbAsyncClient, - deleteIfExists: Boolean): Future[Done] = { + deleteIfExists: Boolean, + tableSettings: TableSettings = TableSettings.Local): Future[Done] = { import akka.projection.dynamodb.internal.OffsetStoreDao.OffsetStoreAttributes._ implicit val ec: ExecutionContext = system.executionContext @@ -40,8 +45,7 @@ object CreateTables { client.describeTable(DescribeTableRequest.builder().tableName(settings.timestampOffsetTable).build()).asScala def create(): Future[Done] = { - val req = CreateTableRequest - .builder() + var requestBuilder = CreateTableRequest.builder .tableName(settings.timestampOffsetTable) .keySchema( KeySchemaElement.builder().attributeName(NameSlice).keyType(KeyType.HASH).build(), @@ -49,11 +53,24 @@ object CreateTables { .attributeDefinitions( AttributeDefinition.builder().attributeName(NameSlice).attributeType(ScalarAttributeType.S).build(), AttributeDefinition.builder().attributeName(Pid).attributeType(ScalarAttributeType.S).build()) - .provisionedThroughput(ProvisionedThroughput.builder().readCapacityUnits(5L).writeCapacityUnits(5L).build()) - .build() + + requestBuilder = tableSettings.throughput match { + case provisioned: ProvisionedThroughputSettings => + requestBuilder.provisionedThroughput( + ProvisionedThroughput.builder + .readCapacityUnits(provisioned.readCapacityUnits) + .writeCapacityUnits(provisioned.writeCapacityUnits) + .build()) + case onDemand: OnDemandThroughputSettings => + requestBuilder.onDemandThroughput( + OnDemandThroughput.builder + .maxReadRequestUnits(onDemand.maxReadRequestUnits) + .maxWriteRequestUnits(onDemand.maxWriteRequestUnits) + .build()) + } client - .createTable(req) + .createTable(requestBuilder.build()) .asScala .map(_ => Done) .recoverWith { case c: CompletionException =>