Skip to content

Commit

Permalink
ISSUE-1307 Allow to specify secondary bucket suffix (#1309)
Browse files Browse the repository at this point in the history
  • Loading branch information
quantranhong1999 authored Nov 18, 2024
1 parent 6f18936 commit 0d3754e
Show file tree
Hide file tree
Showing 10 changed files with 535 additions and 83 deletions.
2 changes: 2 additions & 0 deletions docs/modules/ROOT/pages/tmail-backend/configure/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ Specified to TMail backend, we can configure the following configurations in the
| S3 service endpoint
| objectstorage.s3.secondary.region
| S3 region
| objectstorage.s3.secondary.bucket.suffix
| Optional string. Defaults to empty. The suffix of bucket names for the secondary blob store. e.g. "-copy".
| objectstorage.s3.secondary.accessKeyId
| https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys[S3 access key id]
| objectstorage.s3.secondary.secretKey
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.linagora.tmail.blob.secondaryblobstore;

import static com.linagora.tmail.blob.secondaryblobstore.ObjectStorageIdentity.PRIMARY;
import static com.linagora.tmail.blob.secondaryblobstore.ObjectStorageIdentity.SECONDARY;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobStoreDAO;
import org.apache.james.blob.api.BucketName;
Expand All @@ -21,32 +18,24 @@ public static class FailedBlobOperationListenerGroup extends Group {

private final BlobStoreDAO primaryBlobStoreDAO;
private final BlobStoreDAO secondaryBlobStoreDAO;
private final String secondaryBucketSuffix;

public FailedBlobOperationListener(BlobStoreDAO primaryBlobStoreDAO,
BlobStoreDAO secondaryBlobStoreDAO) {
BlobStoreDAO secondaryBlobStoreDAO,
String secondaryBucketSuffix) {
this.primaryBlobStoreDAO = primaryBlobStoreDAO;
this.secondaryBlobStoreDAO = secondaryBlobStoreDAO;
this.secondaryBucketSuffix = secondaryBucketSuffix;
}

@Override
public Publisher<Void> reactiveEvent(Event event) {
FailedBlobEvents.BlobEvent blobEvent = (FailedBlobEvents.BlobEvent) event;
ObjectStorageIdentity failedObjectStorage = blobEvent.getFailedObjectStorage();
BlobStoreDAO successfulBlobStoreDAO = successfulBlobStoreDAO(failedObjectStorage);
BlobStoreDAO failedBlobStoreDAO = failedBlobStoreDAO(failedObjectStorage);

if (blobEvent instanceof FailedBlobEvents.BlobAddition blobAdditionEvent) {
return Mono.from(copyBlob(blobAdditionEvent.bucketName(), blobAdditionEvent.blobId(), successfulBlobStoreDAO, failedBlobStoreDAO));
}

if (blobEvent instanceof FailedBlobEvents.BlobsDeletion blobsDeletionEvent) {
return Mono.from(failedBlobStoreDAO.delete(blobsDeletionEvent.bucketName(), blobsDeletionEvent.blobIds()));
}

if (blobEvent instanceof FailedBlobEvents.BucketDeletion bucketDeletionEvent) {
return Mono.from(failedBlobStoreDAO.deleteBucket(bucketDeletionEvent.bucketName()));
}
return Mono.empty();
return switch (event) {
case FailedBlobEvents.BlobAddition blobAdditionEvent -> handleFailedBlobsAdditionEvent(blobAdditionEvent);
case FailedBlobEvents.BlobsDeletion blobsDeletionEvent -> handleFailedBlobsDeletionEvent(blobsDeletionEvent);
case FailedBlobEvents.BucketDeletion bucketDeletionEvent -> handleFailedBucketDeletionEvent(bucketDeletionEvent);
default -> Mono.empty();
};
}

@Override
Expand All @@ -61,25 +50,48 @@ public Group getDefaultGroup() {
return new FailedBlobOperationListenerGroup();
}

private Mono<Void> copyBlob(BucketName bucketName, BlobId blobId, BlobStoreDAO fromBlobStore, BlobStoreDAO toBlobStore) {
return Mono.from(fromBlobStore.readReactive(bucketName, blobId))
.flatMap(inputStream -> Mono.from(toBlobStore.save(bucketName, blobId, inputStream)))
private Mono<Void> handleFailedBlobsAdditionEvent(FailedBlobEvents.BlobAddition blobAdditionEvent) {
return switch (blobAdditionEvent.failedObjectStorage()) {
case PRIMARY -> readFromSecondaryAndSaveToPrimary(blobAdditionEvent.bucketName(), blobAdditionEvent.blobId());
case SECONDARY -> readFromPrimaryAndSaveToSecondary(blobAdditionEvent.bucketName(), blobAdditionEvent.blobId());
};
}

private Mono<Void> readFromSecondaryAndSaveToPrimary(BucketName bucketName, BlobId blobId) {
return Mono.from(secondaryBlobStoreDAO.readReactive(withSuffix(bucketName), blobId))
.flatMap(inputStream -> Mono.from(primaryBlobStoreDAO.save(bucketName, blobId, inputStream)))
.subscribeOn(Schedulers.boundedElastic());
}

private Mono<Void> readFromPrimaryAndSaveToSecondary(BucketName bucketName, BlobId blobId) {
return Mono.from(primaryBlobStoreDAO.readReactive(bucketName, blobId))
.flatMap(inputStream -> Mono.from(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, inputStream)))
.subscribeOn(Schedulers.boundedElastic());
}

private BlobStoreDAO successfulBlobStoreDAO(ObjectStorageIdentity failedObjectStorage) {
if (SECONDARY.equals(failedObjectStorage)) {
return primaryBlobStoreDAO;
} else {
return secondaryBlobStoreDAO;
}
private Mono<Void> handleFailedBlobsDeletionEvent(FailedBlobEvents.BlobsDeletion blobsDeletionEvent) {
return switch (blobsDeletionEvent.failedObjectStorage()) {
case PRIMARY -> deleteBlobsFromPrimaryBucket(blobsDeletionEvent);
case SECONDARY -> deleteBlobsFromSecondaryBucket(blobsDeletionEvent);
};
}

private Mono<Void> deleteBlobsFromPrimaryBucket(FailedBlobEvents.BlobsDeletion blobsDeletionEvent) {
return Mono.from(primaryBlobStoreDAO.delete(blobsDeletionEvent.bucketName(), blobsDeletionEvent.blobIds()));
}

private Mono<Void> deleteBlobsFromSecondaryBucket(FailedBlobEvents.BlobsDeletion blobsDeletionEvent) {
return Mono.from(secondaryBlobStoreDAO.delete(withSuffix(blobsDeletionEvent.bucketName()), blobsDeletionEvent.blobIds()));
}

private Publisher<Void> handleFailedBucketDeletionEvent(FailedBlobEvents.BucketDeletion bucketDeletionEvent) {
return switch (bucketDeletionEvent.failedObjectStorage()) {
case PRIMARY -> primaryBlobStoreDAO.deleteBucket(bucketDeletionEvent.bucketName());
case SECONDARY -> secondaryBlobStoreDAO.deleteBucket(withSuffix(bucketDeletionEvent.bucketName()));
};
}

private BlobStoreDAO failedBlobStoreDAO(ObjectStorageIdentity failedObjectStorage) {
if (PRIMARY.equals(failedObjectStorage)) {
return primaryBlobStoreDAO;
} else {
return secondaryBlobStoreDAO;
}
private BucketName withSuffix(BucketName bucketName) {
return BucketName.of(bucketName.asString() + secondaryBucketSuffix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,16 @@ public boolean isSuccess() {

private final BlobStoreDAO primaryBlobStoreDAO;
private final BlobStoreDAO secondaryBlobStoreDAO;
private final String secondaryBucketSuffix;
private final EventBus eventBus;

public SecondaryBlobStoreDAO(BlobStoreDAO primaryBlobStoreDAO,
BlobStoreDAO secondaryBlobStoreDAO,
String secondaryBucketSuffix,
EventBus eventBus) {
this.primaryBlobStoreDAO = primaryBlobStoreDAO;
this.secondaryBlobStoreDAO = secondaryBlobStoreDAO;
this.secondaryBucketSuffix = secondaryBucketSuffix;
this.eventBus = eventBus;
}

Expand All @@ -98,7 +101,7 @@ public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStore
return primaryBlobStoreDAO.read(bucketName, blobId);
} catch (Exception ex) {
LOGGER.warn("Fail to read from the first blob store with bucket name {} and blobId {}. Use second blob store", bucketName.asString(), blobId.asString(), ex);
return secondaryBlobStoreDAO.read(bucketName, blobId);
return secondaryBlobStoreDAO.read(withSuffix(bucketName), blobId);
}
}

Expand All @@ -107,7 +110,7 @@ public Mono<InputStream> readReactive(BucketName bucketName, BlobId blobId) {
return Mono.from(primaryBlobStoreDAO.readReactive(bucketName, blobId))
.onErrorResume(ex -> {
LOGGER.warn("Fail to read from the first blob store with bucket name {} and blobId {}. Use second blob store", bucketName.asString(), blobId.asString(), ex);
return Mono.from(secondaryBlobStoreDAO.readReactive(bucketName, blobId));
return Mono.from(secondaryBlobStoreDAO.readReactive(withSuffix(bucketName), blobId));
});
}

Expand All @@ -116,14 +119,14 @@ public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
return Mono.from(primaryBlobStoreDAO.readBytes(bucketName, blobId))
.onErrorResume(ex -> {
LOGGER.warn("Fail to read from the first blob store with bucket name {} and blobId {}. Use second blob store", bucketName.asString(), blobId.asString(), ex);
return Mono.from(secondaryBlobStoreDAO.readBytes(bucketName, blobId));
return Mono.from(secondaryBlobStoreDAO.readBytes(withSuffix(bucketName), blobId));
});
}

@Override
public Mono<Void> save(BucketName bucketName, BlobId blobId, byte[] data) {
return Flux.merge(asSavingStatus(primaryBlobStoreDAO.save(bucketName, blobId, data), ObjectStorageIdentity.PRIMARY),
asSavingStatus(secondaryBlobStoreDAO.save(bucketName, blobId, data), ObjectStorageIdentity.SECONDARY))
asSavingStatus(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, data), ObjectStorageIdentity.SECONDARY))
.collectList()
.flatMap(savingStatuses -> merge(savingStatuses,
failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobAddition(Event.EventId.random(), bucketName, blobId, failedObjectStorage), NO_REGISTRATION_KEYS)));
Expand All @@ -136,7 +139,7 @@ public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputSt
fileBackedOutputStream -> Mono.fromCallable(() -> IOUtils.copy(inputStream, fileBackedOutputStream))
.flatMap(size -> Flux.merge(
asSavingStatus(primaryBlobStoreDAO.save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size)), ObjectStorageIdentity.PRIMARY),
asSavingStatus(secondaryBlobStoreDAO.save(bucketName, blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size)), ObjectStorageIdentity.SECONDARY))
asSavingStatus(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, new FileBackedOutputStreamByteSource(fileBackedOutputStream, size)), ObjectStorageIdentity.SECONDARY))
.collectList()
.flatMap(savingStatuses -> merge(savingStatuses,
failedObjectStorage -> eventBus.dispatch(
Expand All @@ -149,7 +152,7 @@ public Mono<Void> save(BucketName bucketName, BlobId blobId, InputStream inputSt
@Override
public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) {
return Flux.merge(asSavingStatus(primaryBlobStoreDAO.save(bucketName, blobId, content), ObjectStorageIdentity.PRIMARY),
asSavingStatus(secondaryBlobStoreDAO.save(bucketName, blobId, content), ObjectStorageIdentity.SECONDARY))
asSavingStatus(secondaryBlobStoreDAO.save(withSuffix(bucketName), blobId, content), ObjectStorageIdentity.SECONDARY))
.collectList()
.flatMap(savingStatuses -> merge(savingStatuses,
failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobAddition(Event.EventId.random(), bucketName, blobId, failedObjectStorage), NO_REGISTRATION_KEYS)));
Expand All @@ -158,7 +161,7 @@ public Mono<Void> save(BucketName bucketName, BlobId blobId, ByteSource content)
@Override
public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
return Flux.merge(asSavingStatus(primaryBlobStoreDAO.delete(bucketName, blobId), ObjectStorageIdentity.PRIMARY),
asSavingStatus(secondaryBlobStoreDAO.delete(bucketName, blobId), ObjectStorageIdentity.SECONDARY))
asSavingStatus(secondaryBlobStoreDAO.delete(withSuffix(bucketName), blobId), ObjectStorageIdentity.SECONDARY))
.collectList()
.flatMap(savingStatuses -> merge(savingStatuses,
failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobsDeletion(Event.EventId.random(), bucketName, ImmutableList.of(blobId), failedObjectStorage), NO_REGISTRATION_KEYS)));
Expand All @@ -167,7 +170,7 @@ public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
@Override
public Mono<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) {
return Flux.merge(asSavingStatus(primaryBlobStoreDAO.delete(bucketName, blobIds), ObjectStorageIdentity.PRIMARY),
asSavingStatus(secondaryBlobStoreDAO.delete(bucketName, blobIds), ObjectStorageIdentity.SECONDARY))
asSavingStatus(secondaryBlobStoreDAO.delete(withSuffix(bucketName), blobIds), ObjectStorageIdentity.SECONDARY))
.collectList()
.flatMap(savingStatuses -> merge(savingStatuses,
failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BlobsDeletion(Event.EventId.random(), bucketName, blobIds, failedObjectStorage), NO_REGISTRATION_KEYS)));
Expand All @@ -176,7 +179,7 @@ public Mono<Void> delete(BucketName bucketName, Collection<BlobId> blobIds) {
@Override
public Mono<Void> deleteBucket(BucketName bucketName) {
return Flux.merge(asSavingStatus(primaryBlobStoreDAO.deleteBucket(bucketName), ObjectStorageIdentity.PRIMARY),
asSavingStatus(secondaryBlobStoreDAO.deleteBucket(bucketName), ObjectStorageIdentity.SECONDARY))
asSavingStatus(secondaryBlobStoreDAO.deleteBucket(withSuffix(bucketName)), ObjectStorageIdentity.SECONDARY))
.collectList()
.flatMap(savingStatuses -> merge(savingStatuses,
failedObjectStorage -> eventBus.dispatch(new FailedBlobEvents.BucketDeletion(Event.EventId.random(), bucketName, failedObjectStorage), NO_REGISTRATION_KEYS)));
Expand Down Expand Up @@ -213,6 +216,10 @@ private Mono<Void> merge(List<SavingStatus> savingStatuses, Function<ObjectStora
return partialFailureHandler.apply(failedSavingStatus.objectStorageIdentity());
}

private BucketName withSuffix(BucketName bucketName) {
return BucketName.of(bucketName.asString() + secondaryBucketSuffix);
}

@VisibleForTesting
public BlobStoreDAO getFirstBlobStoreDAO() {
return primaryBlobStoreDAO;
Expand Down
Loading

0 comments on commit 0d3754e

Please sign in to comment.