Skip to content

Commit

Permalink
Server returns Blob_Deleted for deleted blob out of retention (linked…
Browse files Browse the repository at this point in the history
…in#2821)

* Server returns Blob_Deleted for deleted blob out of retention

* Changing tests

* Fix hard delete error
  • Loading branch information
justinlin-linkedin authored Jul 16, 2024
1 parent 32ac01c commit 1d989b0
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ public enum StoreGetOptions {
Store_Include_Expired,
/**
* This option indicates that the store needs to return the message even if it has been
* marked for deletion as long as the message has not been physically deleted from the
* store.
* marked for deletion as long as the message is still in delete retention window
*/
Store_Include_Deleted
Store_Include_Deleted,
/**
* This option indicates that the store needs to return the message as long as the message
* has not been physically deleted from the store.
*/
Store_Include_Compaction_Ready
}
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,7 @@ protected EnumSet<StoreGetOptions> getStoreGetOptions(GetRequest getRequest) {
storeGetOptions = EnumSet.of(StoreGetOptions.Store_Include_Deleted);
}
if (getRequest.getGetOption() == GetOption.Include_All) {
storeGetOptions = EnumSet.of(StoreGetOptions.Store_Include_Deleted, StoreGetOptions.Store_Include_Expired);
storeGetOptions = EnumSet.allOf(StoreGetOptions.class);
}
return storeGetOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Long getBlobContentCRC(MessageInfo msg) throws StoreException, IOExceptio
StoreGetOptions.Store_Include_Expired);
MessageReadSet rdset = null;
try {
StoreInfo stinfo = this.get(Collections.singletonList(msg.getStoreKey()), storeGetOptions);
StoreInfo stinfo = get(Collections.singletonList(msg.getStoreKey()), storeGetOptions);
rdset = stinfo.getMessageReadSet();
MessageInfo minfo = stinfo.getMessageReadSetInfo().get(0);
rdset.doPrefetch(0, minfo.getSize() - MessageFormatRecord.Crc_Size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,8 @@ private void persistCleanupToken() throws IOException, StoreException {
*/
private void performHardDeletes(List<MessageInfo> messageInfoList) throws StoreException {
try {
EnumSet<StoreGetOptions> getOptions = EnumSet.of(StoreGetOptions.Store_Include_Deleted);
EnumSet<StoreGetOptions> getOptions =
EnumSet.of(StoreGetOptions.Store_Include_Deleted, StoreGetOptions.Store_Include_Compaction_Ready);
List<BlobReadOptions> readOptionsList = new ArrayList<BlobReadOptions>(messageInfoList.size());

/* First create the readOptionsList */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1370,10 +1370,18 @@ BlobReadOptions getBlobReadInfo(StoreKey id, EnumSet<StoreGetOptions> getOptions
if (value == null) {
throw new StoreException("Id " + id + " not present in index " + dataDir, StoreErrorCodes.ID_Not_Found);
} else if (value.isDelete()) {
if (!getOptions.contains(StoreGetOptions.Store_Include_Deleted)) {
throw new StoreException("Id " + id + " has been deleted in index " + dataDir, StoreErrorCodes.ID_Deleted);
} else {
// Blob is deleted, as long as
// 1. get option includes Compaction_Ready blob
// 2. or get option includes Deleted blob and the blob is deleted no more than retention window time.
// we return the blob back to frontend
long deleteOperationTime = value.getOperationTimeInMs();
if (getOptions.contains(StoreGetOptions.Store_Include_Compaction_Ready) || (
getOptions.contains(StoreGetOptions.Store_Include_Deleted)
&& deleteOperationTime + TimeUnit.MINUTES.toMillis(config.storeDeletedMessageRetentionMinutes)
>= time.milliseconds())) {
readOptions = getDeletedBlobReadOptions(value, id, indexSegments);
} else {
throw new StoreException("Id " + id + " has been deleted in index " + dataDir, StoreErrorCodes.ID_Deleted);
}
} else if (isExpired(value) && !getOptions.contains(StoreGetOptions.Store_Include_Expired)) {
throw new StoreException("Id " + id + " has expired ttl in index " + dataDir, StoreErrorCodes.TTL_Expired);
Expand Down
43 changes: 26 additions & 17 deletions ambry-store/src/test/java/com/github/ambry/store/BlobStoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public CallableResult call() throws Exception {
// A list of keys grouped by the log segment that they belong to
private final List<Set<MockId>> idsByLogSegment = new ArrayList<>();
// Set of all deleted keys
private final Set<MockId> deletedKeys = Collections.newSetFromMap(new ConcurrentHashMap<MockId, Boolean>());
private final Map<MockId, Long> deletedKeys = new ConcurrentHashMap<MockId, Long>();
// Set of all expired keys
private final Set<MockId> expiredKeys = Collections.newSetFromMap(new ConcurrentHashMap<MockId, Boolean>());
// Set of all keys that are not deleted/expired
Expand Down Expand Up @@ -657,14 +657,28 @@ public void basicTest() throws InterruptedException, IOException, StoreException
checkStoreInfo(storeInfo, liveKeys);

MockMessageStoreHardDelete hd = (MockMessageStoreHardDelete) hardDelete;
for (MockId id : deletedKeys) {
for (MockId id : deletedKeys.keySet()) {
// cannot get without StoreGetOptions
verifyGetFailure(id, StoreErrorCodes.ID_Deleted);

// with StoreGetOptions.Store_Include_Deleted
storeInfo = store.get(Collections.singletonList(id), EnumSet.of(StoreGetOptions.Store_Include_Deleted));
storeInfo = store.get(Collections.singletonList(id), EnumSet.of(StoreGetOptions.Store_Include_Compaction_Ready));
checkStoreInfo(storeInfo, Collections.singleton(id));

long operationTimeMs = deletedKeys.get(id);
if (operationTimeMs + TimeUnit.MINUTES.toMillis(CuratedLogIndexState.deleteRetentionHour * 60)
>= time.milliseconds()) {
storeInfo = store.get(Collections.singletonList(id), EnumSet.of(StoreGetOptions.Store_Include_Deleted));
checkStoreInfo(storeInfo, Collections.singleton(id));
} else {
try {
store.get(Collections.singletonList(id), EnumSet.of(StoreGetOptions.Store_Include_Deleted));
fail("Should not be able to GET " + id);
} catch (StoreException e) {
assertEquals("Unexpected StoreErrorCode", StoreErrorCodes.ID_Deleted, e.getErrorCode());
}
}

// with all StoreGetOptions
storeInfo = store.get(Collections.singletonList(id), EnumSet.allOf(StoreGetOptions.class));
checkStoreInfo(storeInfo, Collections.singleton(id));
Expand Down Expand Up @@ -1228,7 +1242,7 @@ public void deleteLifeVersionTest() throws StoreException {
info = new MessageInfo(addedId, DELETE_RECORD_SIZE, true, false, false, Utils.Infinite_Time, null,
addedId.getAccountId(), addedId.getContainerId(), time.milliseconds(), lifeVersion);
store.delete(Collections.singletonList(info));
deletedKeys.add(addedId);
deletedKeys.put(addedId, time.milliseconds());
undeletedKeys.remove(addedId);
liveKeys.remove(addedId);
StoreInfo storeInfo = store.get(Arrays.asList(addedId), EnumSet.of(StoreGetOptions.Store_Include_Deleted));
Expand Down Expand Up @@ -1273,7 +1287,7 @@ public void deleteLifeVersionTest() throws StoreException {
info = new MessageInfo(addedId, DELETE_RECORD_SIZE, true, false, false, Utils.Infinite_Time, null,
addedId.getAccountId(), addedId.getContainerId(), time.milliseconds(), MessageInfo.LIFE_VERSION_FROM_FRONTEND);
store.delete(Collections.singletonList(info));
deletedKeys.add(addedId);
deletedKeys.put(addedId, time.milliseconds());
undeletedKeys.remove(addedId);
liveKeys.remove(addedId);
storeInfo = store.get(Arrays.asList(addedId), EnumSet.of(StoreGetOptions.Store_Include_Deleted));
Expand Down Expand Up @@ -1423,7 +1437,7 @@ public void putErrorCasesTest() throws StoreException {
@Test
public void deleteErrorCasesTest() throws StoreException {
// ID that is already deleted
verifyDeleteFailure(deletedKeys.iterator().next(), StoreErrorCodes.ID_Deleted);
verifyDeleteFailure(deletedKeys.keySet().iterator().next(), StoreErrorCodes.ID_Deleted);
// ID that does not exist
verifyDeleteFailure(getUniqueId(), StoreErrorCodes.ID_Not_Found);
MockId id = getUniqueId();
Expand Down Expand Up @@ -1555,12 +1569,12 @@ public void ttlUpdateErrorCasesTest() throws Exception {
inNoTtlUpdatePeriodTest();
// ID that is already updated
for (MockId ttlUpdated : ttlUpdatedKeys) {
if (!deletedKeys.contains(ttlUpdated)) {
if (!deletedKeys.containsKey(ttlUpdated)) {
verifyTtlUpdateFailure(ttlUpdated, Utils.Infinite_Time, StoreErrorCodes.Already_Updated);
}
}
// ID that is already deleted
for (MockId deleted : deletedKeys) {
for (MockId deleted : deletedKeys.keySet()) {
verifyTtlUpdateFailure(deleted, Utils.Infinite_Time, StoreErrorCodes.ID_Deleted);
}
// Attempt to set expiry time to anything other than infinity
Expand Down Expand Up @@ -1788,7 +1802,7 @@ public void findMissingKeysTest() throws StoreException {
@Test
public void isKeyDeletedTest() throws StoreException {
for (MockId id : allKeys.keySet()) {
assertEquals("Returned state is not as expected", deletedKeys.contains(id), store.isKeyDeleted(id));
assertEquals("Returned state is not as expected", deletedKeys.containsKey(id), store.isKeyDeleted(id));
}
for (MockId id : deletedAndShouldBeCompactedKeys) {
assertTrue("Returned state is not as expected", store.isKeyDeleted(id));
Expand Down Expand Up @@ -3007,7 +3021,7 @@ private MessageInfo delete(MockId idToDelete, long operationTimeMs, boolean forc
store.forceDelete(Collections.singletonList(info));
}

deletedKeys.add(idToDelete);
deletedKeys.put(idToDelete, operationTimeMs);
undeletedKeys.remove(idToDelete);
liveKeys.remove(idToDelete);
return info;
Expand Down Expand Up @@ -3081,7 +3095,7 @@ private void checkStoreInfo(StoreInfo storeInfo, Set<MockId> expectedKeys, short
assertEquals("ContainerId mismatch", expectedInfo.getContainerId(), messageInfo.getContainerId());
assertEquals("OperationTime mismatch", expectedInfo.getOperationTimeMs(), messageInfo.getOperationTimeMs());
assertEquals("isTTLUpdated not as expected", ttlUpdatedKeys.contains(id), messageInfo.isTtlUpdated());
assertEquals("isDeleted not as expected", deletedKeys.contains(id), messageInfo.isDeleted());
assertEquals("isDeleted not as expected", deletedKeys.containsKey(id), messageInfo.isDeleted());
assertEquals("isUndeleted not as expected", undeletedKeys.contains(id), messageInfo.isUndeleted());
if (IndexValue.hasLifeVersion(lifeVersion)) {
assertEquals("lifeVersion not as expected", lifeVersion, messageInfo.getLifeVersion());
Expand Down Expand Up @@ -3181,8 +3195,6 @@ private void setupTestState(boolean addTtlUpdates, boolean addUndelete) throws I
deletes++;
idsByLogSegment.get(2).add(idToDelete);
// 1 DELETE for the PUT in the same segment
deletedKeys.add(addedId);
liveKeys.remove(addedId);
delete(addedId);
deletes++;

Expand Down Expand Up @@ -3365,7 +3377,6 @@ private void addCuratedData(long sizeToWrite, boolean addTtlUpdates) throws Stor
// 1 DELETE for the expired PUT
delete(id);
deletedKeyCount++;
deletedKeys.add(id);
expiredKeys.remove(id);
idsGroupedByIndexSegment.add(idsInIndexSegment);
idsInLogSegment.addAll(idsInIndexSegment);
Expand Down Expand Up @@ -3501,8 +3512,6 @@ private MockId getIdToDelete(Set<MockId> ids, boolean ttlUpdated) {
if (deleteCandidate == null) {
throw new IllegalStateException("Could not find a key to delete in set: " + ids);
}
deletedKeys.add(deleteCandidate);
liveKeys.remove(deleteCandidate);
return deleteCandidate;
}

Expand Down Expand Up @@ -3626,7 +3635,7 @@ private void verifyGetFutures(List<Getter> getters, List<Future<CallableResult>>
} catch (ExecutionException e) {
StoreException storeException = (StoreException) e.getCause();
StoreErrorCodes expectedCode = StoreErrorCodes.ID_Not_Found;
if (deletedKeys.contains(id)) {
if (deletedKeys.containsKey(id)) {
expectedCode = StoreErrorCodes.ID_Deleted;
} else if (expiredKeys.contains(id)) {
expectedCode = StoreErrorCodes.TTL_Expired;
Expand Down

0 comments on commit 1d989b0

Please sign in to comment.