diff --git a/docs/modules/ROOT/pages/tmail-backend/configure/index.adoc b/docs/modules/ROOT/pages/tmail-backend/configure/index.adoc index a61e0216cd..e4373f2551 100644 --- a/docs/modules/ROOT/pages/tmail-backend/configure/index.adoc +++ b/docs/modules/ROOT/pages/tmail-backend/configure/index.adoc @@ -209,6 +209,8 @@ Specified to TMail backend, we can configure the following configurations in the | S3 service endpoint | objectstorage.s3.secondary.region | S3 region +| objectstorage.s3.secondary.bucket.suffix +| Optional string. Defaults to empty. The suffix of bucket names for the secondary blob store. e.g. "-copy". | objectstorage.s3.secondary.accessKeyId | https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys[S3 access key id] | objectstorage.s3.secondary.secretKey diff --git a/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/FailedBlobOperationListener.java b/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/FailedBlobOperationListener.java index fc515e41bb..c438db0911 100644 --- a/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/FailedBlobOperationListener.java +++ b/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/FailedBlobOperationListener.java @@ -1,8 +1,5 @@ package com.linagora.tmail.blob.secondaryblobstore; -import static com.linagora.tmail.blob.secondaryblobstore.ObjectStorageIdentity.PRIMARY; -import static com.linagora.tmail.blob.secondaryblobstore.ObjectStorageIdentity.SECONDARY; - import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; @@ -21,32 +18,24 @@ public static class FailedBlobOperationListenerGroup extends Group { private final BlobStoreDAO primaryBlobStoreDAO; private final BlobStoreDAO secondaryBlobStoreDAO; + private final String secondaryBucketSuffix; public FailedBlobOperationListener(BlobStoreDAO primaryBlobStoreDAO, - BlobStoreDAO secondaryBlobStoreDAO) { + BlobStoreDAO secondaryBlobStoreDAO, + String secondaryBucketSuffix) { this.primaryBlobStoreDAO = primaryBlobStoreDAO; this.secondaryBlobStoreDAO = secondaryBlobStoreDAO; + this.secondaryBucketSuffix = secondaryBucketSuffix; } @Override public Publisher reactiveEvent(Event event) { - FailedBlobEvents.BlobEvent blobEvent = (FailedBlobEvents.BlobEvent) event; - ObjectStorageIdentity failedObjectStorage = blobEvent.getFailedObjectStorage(); - BlobStoreDAO successfulBlobStoreDAO = successfulBlobStoreDAO(failedObjectStorage); - BlobStoreDAO failedBlobStoreDAO = failedBlobStoreDAO(failedObjectStorage); - - if (blobEvent instanceof FailedBlobEvents.BlobAddition blobAdditionEvent) { - return Mono.from(copyBlob(blobAdditionEvent.bucketName(), blobAdditionEvent.blobId(), successfulBlobStoreDAO, failedBlobStoreDAO)); - } - - if (blobEvent instanceof FailedBlobEvents.BlobsDeletion blobsDeletionEvent) { - return Mono.from(failedBlobStoreDAO.delete(blobsDeletionEvent.bucketName(), blobsDeletionEvent.blobIds())); - } - - if (blobEvent instanceof FailedBlobEvents.BucketDeletion bucketDeletionEvent) { - return Mono.from(failedBlobStoreDAO.deleteBucket(bucketDeletionEvent.bucketName())); - } - return Mono.empty(); + return switch (event) { + case FailedBlobEvents.BlobAddition blobAdditionEvent -> handleFailedBlobsAdditionEvent(blobAdditionEvent); + case FailedBlobEvents.BlobsDeletion blobsDeletionEvent -> handleFailedBlobsDeletionEvent(blobsDeletionEvent); + case FailedBlobEvents.BucketDeletion bucketDeletionEvent -> handleFailedBucketDeletionEvent(bucketDeletionEvent); + default -> Mono.empty(); + }; } @Override @@ -61,25 +50,48 @@ public Group getDefaultGroup() { return new FailedBlobOperationListenerGroup(); } - private Mono copyBlob(BucketName bucketName, BlobId blobId, BlobStoreDAO fromBlobStore, BlobStoreDAO toBlobStore) { - return Mono.from(fromBlobStore.readReactive(bucketName, blobId)) - .flatMap(inputStream -> Mono.from(toBlobStore.save(bucketName, blobId, inputStream))) + private Mono handleFailedBlobsAdditionEvent(FailedBlobEvents.BlobAddition blobAdditionEvent) { + return switch (blobAdditionEvent.failedObjectStorage()) { + case PRIMARY -> readFromSecondaryAndSaveToPrimary(blobAdditionEvent.bucketName(), blobAdditionEvent.blobId()); + case SECONDARY -> readFromPrimaryAndSaveToSecondary(blobAdditionEvent.bucketName(), blobAdditionEvent.blobId()); + }; + } + + private Mono readFromSecondaryAndSaveToPrimary(BucketName bucketName, BlobId blobId) { + return Mono.from(secondaryBlobStoreDAO.readReactive(withSuffix(bucketName), blobId)) + .flatMap(inputStream -> Mono.from(primaryBlobStoreDAO.save(bucketName, blobId, inputStream))) + .subscribeOn(Schedulers.boundedElastic()); + } + + private Mono readFromPrimaryAndSaveToSecondary(BucketName bucketName, BlobId blobId) { + return Mono.from(primaryBlobStoreDAO.readReactive(bucketName, blobId)) + .flatMap(inputStream -> Mono.from(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, inputStream))) .subscribeOn(Schedulers.boundedElastic()); } - private BlobStoreDAO successfulBlobStoreDAO(ObjectStorageIdentity failedObjectStorage) { - if (SECONDARY.equals(failedObjectStorage)) { - return primaryBlobStoreDAO; - } else { - return secondaryBlobStoreDAO; - } + private Mono handleFailedBlobsDeletionEvent(FailedBlobEvents.BlobsDeletion blobsDeletionEvent) { + return switch (blobsDeletionEvent.failedObjectStorage()) { + case PRIMARY -> deleteBlobsFromPrimaryBucket(blobsDeletionEvent); + case SECONDARY -> deleteBlobsFromSecondaryBucket(blobsDeletionEvent); + }; + } + + private Mono deleteBlobsFromPrimaryBucket(FailedBlobEvents.BlobsDeletion blobsDeletionEvent) { + return Mono.from(primaryBlobStoreDAO.delete(blobsDeletionEvent.bucketName(), blobsDeletionEvent.blobIds())); + } + + private Mono deleteBlobsFromSecondaryBucket(FailedBlobEvents.BlobsDeletion blobsDeletionEvent) { + return Mono.from(secondaryBlobStoreDAO.delete(withSuffix(blobsDeletionEvent.bucketName()), blobsDeletionEvent.blobIds())); + } + + private Publisher handleFailedBucketDeletionEvent(FailedBlobEvents.BucketDeletion bucketDeletionEvent) { + return switch (bucketDeletionEvent.failedObjectStorage()) { + case PRIMARY -> primaryBlobStoreDAO.deleteBucket(bucketDeletionEvent.bucketName()); + case SECONDARY -> secondaryBlobStoreDAO.deleteBucket(withSuffix(bucketDeletionEvent.bucketName())); + }; } - private BlobStoreDAO failedBlobStoreDAO(ObjectStorageIdentity failedObjectStorage) { - if (PRIMARY.equals(failedObjectStorage)) { - return primaryBlobStoreDAO; - } else { - return secondaryBlobStoreDAO; - } + private BucketName withSuffix(BucketName bucketName) { + return BucketName.of(bucketName.asString() + secondaryBucketSuffix); } } diff --git a/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/SecondaryBlobStoreDAO.java b/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/SecondaryBlobStoreDAO.java index 5af5c373e7..ec0b875f87 100644 --- a/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/SecondaryBlobStoreDAO.java +++ b/tmail-backend/blob/secondary-blob-store/src/main/java/com/linagora/tmail/blob/secondaryblobstore/SecondaryBlobStoreDAO.java @@ -82,13 +82,16 @@ public boolean isSuccess() { private final BlobStoreDAO primaryBlobStoreDAO; private final BlobStoreDAO secondaryBlobStoreDAO; + private final String secondaryBucketSuffix; private final EventBus eventBus; public SecondaryBlobStoreDAO(BlobStoreDAO primaryBlobStoreDAO, BlobStoreDAO secondaryBlobStoreDAO, + String secondaryBucketSuffix, EventBus eventBus) { this.primaryBlobStoreDAO = primaryBlobStoreDAO; this.secondaryBlobStoreDAO = secondaryBlobStoreDAO; + this.secondaryBucketSuffix = secondaryBucketSuffix; this.eventBus = eventBus; } @@ -98,7 +101,7 @@ public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStore return primaryBlobStoreDAO.read(bucketName, blobId); } catch (Exception ex) { LOGGER.warn("Fail to read from the first blob store with bucket name {} and blobId {}. Use second blob store", bucketName.asString(), blobId.asString(), ex); - return secondaryBlobStoreDAO.read(bucketName, blobId); + return secondaryBlobStoreDAO.read(withSuffix(bucketName), blobId); } } @@ -107,7 +110,7 @@ public Mono readReactive(BucketName bucketName, BlobId blobId) { return Mono.from(primaryBlobStoreDAO.readReactive(bucketName, blobId)) .onErrorResume(ex -> { LOGGER.warn("Fail to read from the first blob store with bucket name {} and blobId {}. Use second blob store", bucketName.asString(), blobId.asString(), ex); - return Mono.from(secondaryBlobStoreDAO.readReactive(bucketName, blobId)); + return Mono.from(secondaryBlobStoreDAO.readReactive(withSuffix(bucketName), blobId)); }); } @@ -116,14 +119,14 @@ public Mono readBytes(BucketName bucketName, BlobId blobId) { return Mono.from(primaryBlobStoreDAO.readBytes(bucketName, blobId)) .onErrorResume(ex -> { LOGGER.warn("Fail to read from the first blob store with bucket name {} and blobId {}. Use second blob store", bucketName.asString(), blobId.asString(), ex); - return Mono.from(secondaryBlobStoreDAO.readBytes(bucketName, blobId)); + return Mono.from(secondaryBlobStoreDAO.readBytes(withSuffix(bucketName), blobId)); }); } @Override public Mono save(BucketName bucketName, BlobId blobId, byte[] data) { return Flux.merge(asSavingStatus(primaryBlobStoreDAO.save(bucketName, blobId, data), ObjectStorageIdentity.PRIMARY), - asSavingStatus(secondaryBlobStoreDAO.save(bucketName, blobId, data), ObjectStorageIdentity.SECONDARY)) + asSavingStatus(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, data), ObjectStorageIdentity.SECONDARY)) .collectList() .flatMap(savingStatuses -> merge(savingStatuses, failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobAddition(Event.EventId.random(), bucketName, blobId, failedObjectStorage), NO_REGISTRATION_KEYS))); @@ -136,7 +139,7 @@ public Mono save(BucketName bucketName, BlobId blobId, InputStream inputSt fileBackedOutputStream -> Mono.fromCallable(() -> IOUtils.copy(inputStream, fileBackedOutputStream)) .flatMap(size -> Flux.merge( asSavingStatus(primaryBlobStoreDAO.save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size)), ObjectStorageIdentity.PRIMARY), - asSavingStatus(secondaryBlobStoreDAO.save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size)), ObjectStorageIdentity.SECONDARY)) + asSavingStatus(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size)), ObjectStorageIdentity.SECONDARY)) .collectList() .flatMap(savingStatuses -> merge(savingStatuses, failedObjectStorage -> eventBus.dispatch( @@ -149,7 +152,7 @@ public Mono save(BucketName bucketName, BlobId blobId, InputStream inputSt @Override public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) { return Flux.merge(asSavingStatus(primaryBlobStoreDAO.save(bucketName, blobId, content), ObjectStorageIdentity.PRIMARY), - asSavingStatus(secondaryBlobStoreDAO.save(bucketName, blobId, content), ObjectStorageIdentity.SECONDARY)) + asSavingStatus(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, content), ObjectStorageIdentity.SECONDARY)) .collectList() .flatMap(savingStatuses -> merge(savingStatuses, failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobAddition(Event.EventId.random(), bucketName, blobId, failedObjectStorage), NO_REGISTRATION_KEYS))); @@ -158,7 +161,7 @@ public Mono save(BucketName bucketName, BlobId blobId, ByteSource content) @Override public Mono delete(BucketName bucketName, BlobId blobId) { return Flux.merge(asSavingStatus(primaryBlobStoreDAO.delete(bucketName, blobId), ObjectStorageIdentity.PRIMARY), - asSavingStatus(secondaryBlobStoreDAO.delete(bucketName, blobId), ObjectStorageIdentity.SECONDARY)) + asSavingStatus(secondaryBlobStoreDAO.delete(withSuffix(bucketName), blobId), ObjectStorageIdentity.SECONDARY)) .collectList() .flatMap(savingStatuses -> merge(savingStatuses, failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobsDeletion(Event.EventId.random(), bucketName, ImmutableList.of(blobId), failedObjectStorage), NO_REGISTRATION_KEYS))); @@ -167,7 +170,7 @@ public Mono delete(BucketName bucketName, BlobId blobId) { @Override public Mono delete(BucketName bucketName, Collection blobIds) { return Flux.merge(asSavingStatus(primaryBlobStoreDAO.delete(bucketName, blobIds), ObjectStorageIdentity.PRIMARY), - asSavingStatus(secondaryBlobStoreDAO.delete(bucketName, blobIds), ObjectStorageIdentity.SECONDARY)) + asSavingStatus(secondaryBlobStoreDAO.delete(withSuffix(bucketName), blobIds), ObjectStorageIdentity.SECONDARY)) .collectList() .flatMap(savingStatuses -> merge(savingStatuses, failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobsDeletion(Event.EventId.random(), bucketName, blobIds, failedObjectStorage), NO_REGISTRATION_KEYS))); @@ -176,7 +179,7 @@ public Mono delete(BucketName bucketName, Collection blobIds) { @Override public Mono deleteBucket(BucketName bucketName) { return Flux.merge(asSavingStatus(primaryBlobStoreDAO.deleteBucket(bucketName), ObjectStorageIdentity.PRIMARY), - asSavingStatus(secondaryBlobStoreDAO.deleteBucket(bucketName), ObjectStorageIdentity.SECONDARY)) + asSavingStatus(secondaryBlobStoreDAO.deleteBucket(withSuffix(bucketName)), ObjectStorageIdentity.SECONDARY)) .collectList() .flatMap(savingStatuses -> merge(savingStatuses, failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BucketDeletion(Event.EventId.random(), bucketName, failedObjectStorage), NO_REGISTRATION_KEYS))); @@ -213,6 +216,10 @@ private Mono merge(List savingStatuses, Function Mono.from(testee.readReactive(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isInstanceOf(SdkClientException.class) + .hasMessage("Unable to execute HTTP request: Read timed out"); + } + + @Test + public void readBytesShouldReturnDataWhenBlobDoesNotExistInThePrimaryBlobStore() { + Mono.from(secondaryBlobStoreDAO.save(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + + assertThat(Mono.from(testee.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveBytesShouldSaveDataToBothBlobStores() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveBytesShouldEventuallySaveDataToBothBlobStoresWhenPrimaryStorageIsDown() { + primaryS3.pause(); + unpauseS3AfterAwhile(primaryS3); + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveBytesShouldEventuallySaveDataToBothBlobStoresWhenSecondStorageIsDown() { + secondaryS3.pause(); + unpauseS3AfterAwhile(secondaryS3); + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveInputStreamShouldSaveDataToBothBlobStores() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY))).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveInputStreamShouldEventuallySaveDataToBothBlobStoresWhenPrimaryStorageIsDown() { + primaryS3.pause(); + unpauseS3AfterAwhile(primaryS3); + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY))).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveInputStreamShouldEventuallySaveDataToBothBlobStoresWhenSecondStorageIsDown() { + secondaryS3.pause(); + unpauseS3AfterAwhile(secondaryS3); + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteArrayInputStream(SHORT_BYTEARRAY))).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void saveByteSourceShouldSaveDataToBothBlobStores() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + assertThat(Mono.from(primaryBlobStoreDAO.readBytes(TEST_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + assertThat(Mono.from(secondaryBlobStoreDAO.readBytes(TEST_SECONDARY_BUCKET_NAME, TEST_BLOB_ID)).block()) + .isEqualTo(SHORT_BYTEARRAY); + } + + @Test + public void deleteBlobShouldDeleteInBothBlobStores() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + Mono.from(testee.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBlobShouldEventuallyDeleteInBothBlobStoresWhenPrimaryStorageIsDown() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + primaryS3.pause(); + unpauseS3AfterAwhile(primaryS3); + Mono.from(testee.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBlobShouldEventuallyDeleteInBothBlobStoresWhenSecondStorageIsDown() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + secondaryS3.pause(); + unpauseS3AfterAwhile(secondaryS3); + Mono.from(testee.delete(TEST_BUCKET_NAME, TEST_BLOB_ID)).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBlobsShouldDeleteInBothBlobStores() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + Mono.from(testee.delete(TEST_BUCKET_NAME, ImmutableList.of(TEST_BLOB_ID))).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBlobsShouldEventuallyDeleteInBothBlobStoresWhenPrimaryStorageIsDown() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + primaryS3.pause(); + unpauseS3AfterAwhile(primaryS3); + Mono.from(testee.delete(TEST_BUCKET_NAME, ImmutableList.of(TEST_BLOB_ID))).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBlobsShouldEventuallyDeleteInBothBlobStoresWhenSecondStorageIsDown() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + secondaryS3.pause(); + unpauseS3AfterAwhile(secondaryS3); + Mono.from(testee.delete(TEST_BUCKET_NAME, ImmutableList.of(TEST_BLOB_ID))).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBlobs(TEST_BUCKET_NAME)).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBucketShouldDeleteInBothBlobStores() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + Mono.from(testee.deleteBucket(TEST_BUCKET_NAME)).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBuckets()).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBuckets()).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBucketShouldEventuallyDeleteInBothBlobStoresWhenPrimaryStorageIsDown() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + primaryS3.pause(); + unpauseS3AfterAwhile(primaryS3); + Mono.from(testee.deleteBucket(TEST_BUCKET_NAME)).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBuckets()).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBuckets()).collectList().block()) + .isEmpty(); + } + + @Test + public void deleteBucketShouldEventuallyDeleteInBothBlobStoresWhenSecondStorageIsDown() { + Mono.from(testee.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ByteSource.wrap(SHORT_BYTEARRAY))).block(); + + secondaryS3.pause(); + unpauseS3AfterAwhile(secondaryS3); + Mono.from(testee.deleteBucket(TEST_BUCKET_NAME)).block(); + + assertThat(Flux.from(primaryBlobStoreDAO.listBuckets()).collectList().block()) + .isEmpty(); + assertThat(Flux.from(secondaryBlobStoreDAO.listBuckets()).collectList().block()) + .isEmpty(); + } + + @Test + @Override + public void saveInputStreamShouldThrowOnIOException() { + BlobStoreDAO store = testee(); + + assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, getThrowingInputStream())).block()) + .getCause() + .isInstanceOf(IOException.class); + } + + @Test + @Override + public void saveShouldThrowWhenNullData() { + BlobStoreDAO store = testee(); + + assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, (byte[]) null)).block()) + .isInstanceOf(ObjectStoreException.class); + } + + @Test + @Override + public void saveByteSourceShouldThrowOnIOException() { + BlobStoreDAO store = testee(); + + assertThatThrownBy(() -> Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, new ByteSource() { + @Override + public InputStream openStream() throws IOException { + return getThrowingInputStream(); + } + })).block()) + .isInstanceOf(ObjectStoreException.class); + } + + private void unpauseS3AfterAwhile(DockerAwsS3Container s3) { + new Thread(() -> { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + s3.unpause(); + }).start(); + } +} diff --git a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreConfiguration.java b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreConfiguration.java index d122d928ee..92491b9f76 100644 --- a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreConfiguration.java +++ b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreConfiguration.java @@ -8,7 +8,6 @@ import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.james.blob.aes.CryptoConfig; -import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration; import org.apache.james.modules.mailbox.ConfigurationComponent; import org.apache.james.server.blob.deduplication.StorageStrategy; import org.apache.james.server.core.filesystem.FileSystemImpl; @@ -19,7 +18,7 @@ import io.vavr.control.Try; public record BlobStoreConfiguration(BlobStoreImplName implementation, - Optional maybeSecondaryS3BlobStoreConfiguration, + Optional maybeSecondaryS3BlobStoreConfiguration, boolean cacheEnabled, StorageStrategy storageStrategy, Optional cryptoConfig, @@ -42,14 +41,14 @@ default RequireSecondaryS3BlobStoreConfig s3() { @FunctionalInterface public interface RequireSecondaryS3BlobStoreConfig { - RequireCache secondaryS3BlobStore(Optional maybeS3BlobStoreConfiguration); + RequireCache secondaryS3BlobStore(Optional maybeSecondaryS3BlobStoreConfiguration); default RequireCache noSecondaryS3BlobStore() { return secondaryS3BlobStore(Optional.empty()); } - default RequireCache secondaryS3BlobStore(S3BlobStoreConfiguration s3BlobStoreConfiguration) { - return secondaryS3BlobStore(Optional.of(s3BlobStoreConfiguration)); + default RequireCache secondaryS3BlobStore(SecondaryS3BlobStoreConfiguration secondaryS3BlobStoreConfiguration) { + return secondaryS3BlobStore(Optional.of(secondaryS3BlobStoreConfiguration)); } } @@ -215,7 +214,7 @@ private static Optional parseCryptoConfig(Configuration configurat return Optional.empty(); } - private static Optional parseS3BlobStoreConfiguration(Configuration configuration) throws ConfigurationException { + private static Optional parseS3BlobStoreConfiguration(Configuration configuration) throws ConfigurationException { if (configuration.getBoolean(OBJECT_STORAGE_S3_SECONDARY_ENABLED, false)) { return Optional.of(SecondaryS3BlobStoreConfigurationReader.from(configuration)); } else { diff --git a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreModulesChooser.java b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreModulesChooser.java index d1e0230d4f..cf4818fd5e 100644 --- a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreModulesChooser.java +++ b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/BlobStoreModulesChooser.java @@ -11,7 +11,6 @@ import org.apache.james.blob.api.BucketName; import org.apache.james.blob.cassandra.cache.CachedBlobStore; import org.apache.james.blob.objectstorage.aws.JamesS3MetricPublisher; -import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration; import org.apache.james.blob.objectstorage.aws.S3BlobStoreDAO; import org.apache.james.blob.objectstorage.aws.S3ClientFactory; import org.apache.james.events.EventBus; @@ -84,9 +83,9 @@ BlobStoreDAO provideNoSecondaryBlobStoreDAO(@Named(INITIAL_BLOBSTORE_DAO) BlobSt } static class SecondaryObjectStorageModule extends AbstractModule { - private final S3BlobStoreConfiguration secondaryS3BlobStoreConfiguration; + private final SecondaryS3BlobStoreConfiguration secondaryS3BlobStoreConfiguration; - public SecondaryObjectStorageModule(S3BlobStoreConfiguration secondaryS3BlobStoreConfiguration) { + public SecondaryObjectStorageModule(SecondaryS3BlobStoreConfiguration secondaryS3BlobStoreConfiguration) { this.secondaryS3BlobStoreConfiguration = secondaryS3BlobStoreConfiguration; } @@ -96,15 +95,15 @@ public SecondaryObjectStorageModule(S3BlobStoreConfiguration secondaryS3BlobStor BlobStoreDAO getSecondaryS3BlobStoreDAO(BlobId.Factory blobIdFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) { - S3ClientFactory s3SecondaryClientFactory = new S3ClientFactory(secondaryS3BlobStoreConfiguration, + S3ClientFactory s3SecondaryClientFactory = new S3ClientFactory(secondaryS3BlobStoreConfiguration.s3BlobStoreConfiguration(), () -> new JamesS3MetricPublisher(metricFactory, gaugeRegistry, "secondary_s3")); - return new S3BlobStoreDAO(s3SecondaryClientFactory, secondaryS3BlobStoreConfiguration, blobIdFactory); + return new S3BlobStoreDAO(s3SecondaryClientFactory, secondaryS3BlobStoreConfiguration.s3BlobStoreConfiguration(), blobIdFactory); } @ProvidesIntoSet TmailReactiveGroupEventListener provideFailedBlobOperationListener(@Named(INITIAL_BLOBSTORE_DAO) BlobStoreDAO firstBlobStoreDAO, @Named(SECOND_BLOB_STORE_DAO) BlobStoreDAO secondBlobStoreDAO) { - return new FailedBlobOperationListener(firstBlobStoreDAO, secondBlobStoreDAO); + return new FailedBlobOperationListener(firstBlobStoreDAO, secondBlobStoreDAO, secondaryS3BlobStoreConfiguration.secondaryBucketSuffix()); } @Provides @@ -113,7 +112,7 @@ TmailReactiveGroupEventListener provideFailedBlobOperationListener(@Named(INITIA BlobStoreDAO provideSecondaryBlobStoreDAO(@Named(INITIAL_BLOBSTORE_DAO) BlobStoreDAO firstBlobStoreDAO, @Named(SECOND_BLOB_STORE_DAO) BlobStoreDAO secondBlobStoreDAO, @Named(TmailInjectNameConstants.TMAIL_EVENT_BUS_INJECT_NAME) EventBus eventBus) { - return new SecondaryBlobStoreDAO(firstBlobStoreDAO, secondBlobStoreDAO, eventBus); + return new SecondaryBlobStoreDAO(firstBlobStoreDAO, secondBlobStoreDAO, secondaryS3BlobStoreConfiguration.secondaryBucketSuffix(), eventBus); } } @@ -172,8 +171,8 @@ public BlobStoreDAO provideMultiSaveblobStoreDAO(@Named(MAYBE_ENCRYPTION_BLOBSTO } } - public static Module chooseSecondaryObjectStorageModule(Optional maybeS3BlobStoreConfiguration) { - return maybeS3BlobStoreConfiguration + public static Module chooseSecondaryObjectStorageModule(Optional maybeSecondaryS3BlobStoreConfiguration) { + return maybeSecondaryS3BlobStoreConfiguration .map(configuration -> (Module) new SecondaryObjectStorageModule(configuration)) .orElse(new NoSecondaryObjectStorageModule()); } diff --git a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfiguration.java b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfiguration.java new file mode 100644 index 0000000000..3768a53c9c --- /dev/null +++ b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfiguration.java @@ -0,0 +1,6 @@ +package com.linagora.tmail.blob.guice; + +import org.apache.james.blob.objectstorage.aws.S3BlobStoreConfiguration; + +public record SecondaryS3BlobStoreConfiguration(S3BlobStoreConfiguration s3BlobStoreConfiguration, String secondaryBucketSuffix) { +} diff --git a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfigurationReader.java b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfigurationReader.java index 79b77991fb..cbb255a533 100644 --- a/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfigurationReader.java +++ b/tmail-backend/guice/blob-guice/src/main/java/com/linagora/tmail/blob/guice/SecondaryS3BlobStoreConfigurationReader.java @@ -22,6 +22,7 @@ public class SecondaryS3BlobStoreConfigurationReader { private static final String OBJECTSTORAGE_NAMESPACE = "objectstorage.namespace"; private static final String OBJECTSTORAGE_BUCKET_PREFIX = "objectstorage.bucketPrefix"; private static final String OBJECTSTORAGE_S3_REGION = "objectstorage.s3.secondary.region"; + private static final String OBJECTSTORAGE_S3_BUCKET_SUFFIX = "objectstorage.s3.secondary.bucket.suffix"; private static final String OBJECTSTORAGE_S3_HTTP_CONCURRENCY = "objectstorage.s3.http.concurrency"; private static final String OBJECTSTORAGE_S3_READ_TIMEOUT = "objectstorage.s3.read.timeout"; private static final String OBJECTSTORAGE_S3_WRITE_TIMEOUT = "objectstorage.s3.write.timeout"; @@ -30,7 +31,7 @@ public class SecondaryS3BlobStoreConfigurationReader { private static final String OBJECTSTORAGE_S3_UPLOAD_RETRY_MAX_ATTEMPTS = "objectstorage.s3.upload.retry.maxAttempts"; private static final String OBJECTSTORAGE_S3_UPLOAD_RETRY_BACKOFF_DURATION_MILLIS = "objectstorage.s3.upload.retry.backoffDurationMillis"; - public static S3BlobStoreConfiguration from(Configuration configuration) throws ConfigurationException { + public static SecondaryS3BlobStoreConfiguration from(Configuration configuration) throws ConfigurationException { Optional httpConcurrency = Optional.ofNullable(configuration.getInteger(OBJECTSTORAGE_S3_HTTP_CONCURRENCY, null)); Optional namespace = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_NAMESPACE, null)); Optional bucketPrefix = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_BUCKET_PREFIX, null)); @@ -54,7 +55,10 @@ public static S3BlobStoreConfiguration from(Configuration configuration) throws .jitter(UPLOAD_RETRY_BACKOFF_JETTY_DEFAULT) .filter(SdkException.class::isInstance)); - return S3BlobStoreConfiguration.builder() + String secondaryBucketSuffix = Optional.ofNullable(configuration.getString(OBJECTSTORAGE_S3_BUCKET_SUFFIX, null)) + .orElse(""); + + return new SecondaryS3BlobStoreConfiguration(S3BlobStoreConfiguration.builder() .authConfiguration(SecondaryAwsS3ConfigurationReader.from(configuration)) .region(region) .defaultBucketName(namespace.map(BucketName::of)) @@ -65,7 +69,8 @@ public static S3BlobStoreConfiguration from(Configuration configuration) throws .writeTimeout(writeTimeout) .connectionTimeout(connectionTimeout) .uploadRetrySpec(uploadRetrySpec) - .build(); + .build(), + secondaryBucketSuffix); } } diff --git a/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java b/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java index 1c497fe7b6..ae70cd3572 100644 --- a/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java +++ b/tmail-backend/integration-tests/jmap/distributed-jmap-integration-tests/src/test/java/com/linagora/tmail/james/DistributedLinagoraSecondaryBlobStoreTest.java @@ -60,6 +60,7 @@ import com.google.inject.multibindings.Multibinder; import com.google.inject.name.Named; import com.linagora.tmail.blob.guice.BlobStoreConfiguration; +import com.linagora.tmail.blob.guice.SecondaryS3BlobStoreConfiguration; import com.linagora.tmail.blob.secondaryblobstore.FailedBlobEvents; import com.linagora.tmail.blob.secondaryblobstore.FailedBlobOperationListener; import com.linagora.tmail.blob.secondaryblobstore.SecondaryBlobStoreDAO; @@ -126,9 +127,10 @@ public EventDeadLetters getEventDeadLetters() { static final String ANDRE_PASSWORD = "andrepassword"; static final String ACCEPT_RFC8621_VERSION_HEADER = "application/json; jmapVersion=rfc-8621"; static final String ACCOUNT_ID = "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6"; + static final String SECONDARY_BUCKET_SUFFIX = "-secondary-bucket-suffix"; static DockerAwsS3Container secondaryS3 = new DockerAwsS3Container(); - static S3BlobStoreConfiguration secondaryS3Configuration; + static SecondaryS3BlobStoreConfiguration secondaryS3Configuration; static { secondaryS3.start(); @@ -138,13 +140,14 @@ public EventDeadLetters getEventDeadLetters() { .secretKey(DockerAwsS3Container.SECRET_ACCESS_KEY) .build(); - secondaryS3Configuration = S3BlobStoreConfiguration.builder() + secondaryS3Configuration = new SecondaryS3BlobStoreConfiguration(S3BlobStoreConfiguration.builder() .authConfiguration(authConfiguration) .region(secondaryS3.dockerAwsS3().region()) .uploadRetrySpec(Optional.of(Retry.backoff(3, java.time.Duration.ofSeconds(1)) .filter(UPLOAD_RETRY_EXCEPTION_PREDICATE))) .readTimeout(Optional.of(Duration.ofMillis(500))) - .build(); + .build(), + SECONDARY_BUCKET_SUFFIX); } @RegisterExtension @@ -238,8 +241,9 @@ void sendEmailShouldResultingInSavingDataToBothObjectStorages(GuiceJamesServer s calmlyAwait.atMost(TEN_SECONDS) .untilAsserted(() -> { BucketName bucketName = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBuckets()).collectList().block().getFirst(); + BucketName secondaryBucketName = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBuckets()).collectList().block().getFirst(); List blobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); - List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); + List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(secondaryBucketName)).collectList().block(); assertThat(blobIds).hasSameSizeAs(blobIds2); assertThat(blobIds).hasSameElementsAs(blobIds2); }); @@ -268,14 +272,14 @@ void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecon List blobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); calmlyAwait.atMost(ONE_MINUTE) .untilAsserted(() -> { - List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); + List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(BucketName.of(bucketName.asString() + SECONDARY_BUCKET_SUFFIX))).collectList().block(); assertThat(blobIds2).hasSameSizeAs(blobIds); assertThat(blobIds2).hasSameElementsAs(blobIds); }); } @Test - void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDownForLongTime(GuiceJamesServer server) throws Exception { + void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecondStorageIsDownForLongTime(GuiceJamesServer server) { secondaryS3.pause(); given() @@ -315,7 +319,7 @@ void sendEmailShouldResultingInEventuallySavingDataToBothObjectStoragesWhenSecon List expectedBlobIds = Flux.from(blobStoreProbe.getPrimaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); calmlyAwait.atMost(TEN_SECONDS) .untilAsserted(() -> { - List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(bucketName)).collectList().block(); + List blobIds2 = Flux.from(blobStoreProbe.getSecondaryBlobStoreDAO().listBlobs(BucketName.of(bucketName.asString() + SECONDARY_BUCKET_SUFFIX))).collectList().block(); assertThat(blobIds2).hasSameSizeAs(expectedBlobIds); assertThat(blobIds2).hasSameElementsAs(expectedBlobIds); });