Skip to content

Commit

Permalink
Dedicated error for unprocessed DynamoDB batches
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Jun 27, 2024
1 parent 7457d8a commit a538d93
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedUpdate;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.UpdateBuilder;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.events.DynamoDbEvent;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.exception.FailedBatchRequestException;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Nullable;
Expand Down Expand Up @@ -168,7 +169,7 @@ public Publisher<T> saveAll(Publisher<T> itemsToSave) {
if (unprocesseded.isEmpty()) {
return Flux.fromIterable(r.getT2()).doOnNext(i -> publisher.publishEvent(DynamoDbEvent.postPersist(i)));
}
return Flux.error(new IllegalArgumentException("Following items couldn't be saved:" + unprocesseded.stream().map(Object::toString).collect(Collectors.joining(", "))));
return Flux.error(new FailedBatchRequestException("Failed to save items", unprocesseded));
});
}

Expand Down Expand Up @@ -216,9 +217,7 @@ public Publisher<T> deleteAll(Publisher<T> items) {
r.getT2().forEach(i -> publisher.publishEvent(DynamoDbEvent.postRemove(i)));
return Flux.fromIterable(r.getT2());
}
return Flux.error(new IllegalArgumentException("Following items couldn't be deleted:" + unprocesseded.stream()
.map(k -> tableSchema.mapToItem(k.keyMap(tableSchema, TableMetadata.primaryIndexName()))).map(Object::toString)
.collect(Collectors.joining(", "))));
return Flux.error(new FailedBatchRequestException("Failed to delete items", unprocesseded));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.DetachedUpdate;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.builder.UpdateBuilder;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.events.DynamoDbEvent;
import com.agorapulse.micronaut.amazon.awssdk.dynamodb.exception.FailedBatchRequestException;
import io.micronaut.context.event.ApplicationEventPublisher;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.beans.BeanIntrospection;
Expand Down Expand Up @@ -167,7 +168,7 @@ public Publisher<T> saveAll(Publisher<T> itemsToSave) {
return Flux.fromIterable(saved);
}

throw new IllegalArgumentException("Following items couldn't be saved:" + unprocessed.stream().map(Object::toString).collect(Collectors.joining(", ")));
throw new FailedBatchRequestException("Failed to save items", unprocessed);
}

@Override
Expand Down Expand Up @@ -210,9 +211,7 @@ public int deleteAll(Publisher<T> items) {
return deleted.size();
}

throw new IllegalArgumentException("Following items couldn't be deleted:" + unprocessed.stream()
.map(k -> tableSchema.mapToItem(k.keyMap(tableSchema, TableMetadata.primaryIndexName()))).map(Object::toString)
.collect(Collectors.joining(", ")));
throw new FailedBatchRequestException("Failed to delete items", unprocessed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* Copyright 2018-2024 Agorapulse.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.agorapulse.micronaut.amazon.awssdk.dynamodb.exception;


import java.util.List;

/**
* Exception thrown when a batch request fails.
* <p>
* The exception contains a list of unprocessed items - for save operation it is a list of items that were not saved and for delete operation it is a list of keys that were not deleted.
*/
public class FailedBatchRequestException extends IllegalArgumentException {

private final transient List<?> unprocessedItems;

public FailedBatchRequestException(String s, List<?> unprocessedItems) {
super(s);
this.unprocessedItems = unprocessedItems;
}

public List<?> getUnprocessedItems() {
return unprocessedItems;
}

}

0 comments on commit a538d93

Please sign in to comment.