Skip to content

Commit

Permalink
Allow getAll with partition keys for entities not having sort key (#254)
Browse files Browse the repository at this point in the history
* Allow getAll with partition keys for entities not having sort key

* fixed key in async dynamo db
  • Loading branch information
musketyr authored Jul 11, 2024
1 parent 4844b91 commit 5d5dae1
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/guide/src/docs/asciidoc/dynamodb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ The following table summarizes the supported method signatures:

`List<DynamoDBEntity> getAll(@HashKey String parentId, String... rangeKeys);`

`List<DynamoDBEntityNoRange> getAll(@HashKey List<String> parentIds);`

| Loads a single entity or a list of entities from the table. Range key is required for tables which defines the range key


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ default <R> Publisher<R> updateAll(Publisher<T> items, Function<UpdateBuilder<T,

Publisher<T> getAll(Object partitionKey, Publisher<?> sortKeys);

Publisher<T> getAll(Publisher<?> partitionKeys);

Publisher<T> get(Key key);

Publisher<Long> count(DetachedQuery<T> query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ private <T> Publisher<T> handleGet(AsyncDynamoDbService<T> service, MethodInvoca
Object partitionValue = partitionAndSort.getPartitionValue(params);

if (!partitionAndSort.hasSortKey()) {
if (partitionAndSort.isPartitionKeyPublisherOrIterable()) {
return service.getAll(partitionAndSort.getPartitionAttributeValues(conversionService, params));
}
return service.get(partitionValue, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ public Publisher<T> getAll(Object partitionKey, Publisher<?> sortKeys) {
return doWithKeys(partitionKey, sortKeys, this::getAll);
}

@Override
public Publisher<T> getAll(Publisher<?> partitionKeys) {
return doWithKeys(partitionKeys, this::getAllByAttributeValue);
}

@Override
public Publisher<T> get(Key key) {
return Mono.fromFuture(table.getItem(key)).map(this::postLoad);
Expand Down Expand Up @@ -289,6 +294,19 @@ private DetachedQuery<T> simplePartitionAndSort(Object partitionKey, Object sort
});
}

private Publisher<T> getAllByAttributeValue(Publisher<AttributeValue> partitionKeys) {
TableSchema<T> tableSchema = table.tableSchema();
Map<AttributeValue, Integer> order = new ConcurrentHashMap<>();
AtomicInteger counter = new AtomicInteger();
Comparator<T> comparator = Comparator.comparingInt(i -> order.getOrDefault(tableSchema.attributeValue(i, tableSchema.tableMetadata().primaryPartitionKey()), 0));

return Flux.from(partitionKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> {
order.put(k, counter.getAndIncrement());
return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(k).build()).build();
}).toList()))).flatMap(r -> Flux.from(r.resultsForTable(table)).map(this::postLoad)).sort(comparator);

}

private Publisher<T> getAll(AttributeValue hashKey, Publisher<AttributeValue> rangeKeys) {
TableSchema<T> tableSchema = table.tableSchema();
Map<AttributeValue, Integer> order = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -377,4 +395,9 @@ private <R> Publisher<R> doWithKeys(Object partitionKey, Publisher<?> sortKeys,
Optional<String> sortKeyName = table.tableSchema().tableMetadata().primarySortKey();
return function.apply(partitionKeyValue, Flux.from(sortKeys).map(key -> attributeConversionHelper.convert(table, sortKeyName.get(), key)));
}

private <R> Publisher<R> doWithKeys(Publisher<?> partitionKeys, Function<Publisher<AttributeValue>, Publisher<R>> function) {
String hashKeyName = table.tableSchema().tableMetadata().primaryPartitionKey();
return function.apply(Flux.from(partitionKeys).map(key -> attributeConversionHelper.convert(table, hashKeyName, key)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public Publisher<T> getAll(Object partitionKey, Publisher<?> sortKeys) {
return doWithKeys(partitionKey, sortKeys, this::getAll);
}

@Override
public Publisher<T> getAll(Publisher<?> partitionKeys) {
return doWithKeys(partitionKeys, this::getAllByAttributeValue);
}

@Override
public T get(Key key) {
T item = table.getItem(key);
Expand Down Expand Up @@ -286,6 +291,22 @@ private DetachedQuery<T> simplePartitionAndSort(Object partitionKey, Object sort
});
}

private Publisher<T> getAllByAttributeValue(Publisher<AttributeValue> partitionKeys) {
TableSchema<T> tableSchema = table.tableSchema();
Map<AttributeValue, Integer> order = new ConcurrentHashMap<>();
AtomicInteger counter = new AtomicInteger();

return Flux.from(partitionKeys).buffer(BATCH_SIZE).map(batchRangeKeys -> enhancedClient.batchGetItem(b -> b.readBatches(batchRangeKeys.stream().map(k -> {
order.put(k, counter.getAndIncrement());
return ReadBatch.builder(tableSchema.itemType().rawClass()).mappedTableResource(table).addGetItem(Key.builder().partitionValue(k).build()).build();
})
.collect(Collectors.toList())))).flatMap(r -> {
Comparator<T> comparator = Comparator.comparingInt(i -> order.getOrDefault(tableSchema.attributeValue(i, tableSchema.tableMetadata().primaryPartitionKey()), 0));
List<T> it = r.resultsForTable(table).stream().sorted(comparator).collect(Collectors.toList());
return Flux.fromIterable(it).map(this::postLoad);
});
}

private Publisher<T> getAll(AttributeValue hashKey, Publisher<AttributeValue> rangeKeys) {
TableSchema<T> tableSchema = table.tableSchema();
Map<AttributeValue, Integer> order = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -377,4 +398,9 @@ private <R> Publisher<R> doWithKeys(Object partitionKey, Publisher<?> sortKeys,
Optional<String> sortKeyName = table.tableSchema().tableMetadata().primarySortKey();
return function.apply(partitionKeyValue, Flux.from(sortKeys).map(key -> attributeConversionHelper.convert(table, sortKeyName.get(), key)));
}

private <R> Publisher<R> doWithKeys(Publisher<?> partitionKeys, Function<Publisher<AttributeValue>, Publisher<R>> function) {
String hashKeyName = table.tableSchema().tableMetadata().primaryPartitionKey();
return function.apply(Flux.from(partitionKeys).map(key -> attributeConversionHelper.convert(table, hashKeyName, key)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ default <R> int updateAll(Publisher<T> items, Function<UpdateBuilder<T, T>, Upda

Publisher<T> getAll(Object partitionKey, Publisher<?> sortKeys);

Publisher<T> getAll(Publisher<?> partitionKeys);

T get(Key key);

int count(DetachedQuery<T> query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ private <T> Object handleGet(DynamoDbService<T> service, MethodInvocationContext
Object partitionValue = partitionAndSort.getPartitionValue(params);

if (!partitionAndSort.hasSortKey()) {
if (partitionAndSort.isPartitionKeyPublisherOrIterable()) {
return publisherOrIterable(service.getAll(partitionAndSort.getPartitionAttributeValues(conversionService, params)), context.getReturnType().getType());
}
return service.get(partitionValue, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*
* @deprecated use @{@link PartitionKey} instead
*/
@Deprecated
@Inherited
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
*
* @deprecated use @{@link SortKey} instead
*/
@Deprecated
@Inherited
@Documented
@Retention(RetentionPolicy.RUNTIME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public Publisher<?> getSortAttributeValues(ConversionService conversionService,
return sortKey == null ? Flux.empty() : toPublisher(conversionService, Object.class, sortKey.getFirstArgument(), params);
}

public Publisher<?> getPartitionAttributeValues(ConversionService conversionService, Map<String, MutableArgumentValue<?>> params) {
return partitionKey == null ? Flux.empty() : toPublisher(conversionService, Object.class, partitionKey, params);
}


public <T> Consumer<QueryBuilder<T>> generateQuery(MethodInvocationContext<Object, Object> context, ConversionService conversionService) {
return q -> {
Expand Down Expand Up @@ -217,6 +221,10 @@ public boolean isSortKeyPublisherOrIterable() {
return sortKey.getFirstArgument().getType().isArray() || Iterable.class.isAssignableFrom(sortKey.getFirstArgument().getType()) || Publisher.class.isAssignableFrom(sortKey.getFirstArgument().getType());
}

public boolean isPartitionKeyPublisherOrIterable() {
return partitionKey.getType().isArray() || Iterable.class.isAssignableFrom(partitionKey.getType()) || Publisher.class.isAssignableFrom(partitionKey.getType());
}

public boolean isCustomized() {
return index != null || consistent || descending || !filters.isEmpty() || sortKey != null && sortKey.getOperator() != Filter.Operator.EQ || lastEvaluatedKey != null || limit != null || page != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class DefaultDynamoDBServiceNoRangeSpec extends Specification {
service.count('5') == 1
service.get('6')
service.count('6') == 1
service.getAll(['1', '2', '3', '4', '5', '6']).parentId == ['1', '2', '3', '4', '5', '6']

when:
service.increment('1')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public interface DynamoDBEntityNoRangeService {

DynamoDBEntityNoRange get(@PartitionKey String parentId);
List<DynamoDBEntityNoRange> getAll(@PartitionKey List<String> parentIds);
DynamoDBEntityNoRange save(DynamoDBEntityNoRange entity);
Flowable<DynamoDBEntityNoRange> saveAll(DynamoDBEntityNoRange... entities);

Expand Down

0 comments on commit 5d5dae1

Please sign in to comment.