Skip to content

Commit

Permalink
Add sanity check for blob id in blob store (linkedin#2930)
Browse files Browse the repository at this point in the history
Add sanity check in blobstore put method to make sure the blob id belongs to the partition of this blob store.
  • Loading branch information
justinlin-linkedin authored Oct 31, 2024
1 parent 132e023 commit 9d28fce
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ void processReplicaMetadataResponse(Set<MessageInfo> missingRemoteStoreMessages,
BlobId blobId = (BlobId) messageInfo.getStoreKey();
if (remoteReplicaInfo.getLocalReplicaId().getPartitionId().compareTo(blobId.getPartition()) != 0) {
throw new IllegalStateException(
"Blob id is not in the expected partition Actual partition " + blobId.getPartition()
"Blob id " + blobId.getID() + " is not in the expected partition Actual partition " + blobId.getPartition()
+ " Expected partition " + remoteReplicaInfo.getLocalReplicaId().getPartitionId());
}
BlobId localKey = (BlobId) remoteKeyToLocalKeyMap.get(messageInfo.getStoreKey());
Expand Down
95 changes: 58 additions & 37 deletions ambry-store/src/main/java/com/github/ambry/store/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.ambry.clustermap.ReplicaSealStatus;
import com.github.ambry.clustermap.ReplicaState;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.messageformat.DeleteMessageFormatInputStream;
import com.github.ambry.messageformat.MessageFormatException;
Expand Down Expand Up @@ -340,15 +341,14 @@ public void start() throws StoreException {
*/
@Override
public Long getBlobContentCRC(MessageInfo msg) throws StoreException, IOException {
EnumSet<StoreGetOptions> storeGetOptions = EnumSet.of(StoreGetOptions.Store_Include_Deleted,
StoreGetOptions.Store_Include_Expired);
EnumSet<StoreGetOptions> storeGetOptions =
EnumSet.of(StoreGetOptions.Store_Include_Deleted, StoreGetOptions.Store_Include_Expired);
MessageReadSet rdset = null;
try {
StoreInfo stinfo = this.get(Collections.singletonList(msg.getStoreKey()), storeGetOptions);
rdset = stinfo.getMessageReadSet();
MessageInfo minfo = stinfo.getMessageReadSetInfo().get(0);
rdset.doPrefetch(0, minfo.getSize() - MessageFormatRecord.Crc_Size,
MessageFormatRecord.Crc_Size);
rdset.doPrefetch(0, minfo.getSize() - MessageFormatRecord.Crc_Size, MessageFormatRecord.Crc_Size);
return rdset.getPrefetchedData(0).getLong(0);
} finally {
if (rdset != null && rdset.count() > 0 && rdset.getPrefetchedData(0) != null) {
Expand Down Expand Up @@ -552,6 +552,7 @@ public void put(MessageWriteSet messageSetToWrite) throws StoreException {
throw new IllegalArgumentException("Message write set cannot be empty");
}
checkDuplicates(messageSetToWrite.getMessageSetInfo());
checkPartition(messageSetToWrite.getMessageSetInfo());
final Timer.Context context = metrics.putResponse.time();
try {
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
Expand Down Expand Up @@ -644,7 +645,8 @@ public void delete(List<MessageInfo> infosToDelete) throws StoreException {
List<Short> lifeVersions = new ArrayList<>();
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
for (MessageInfo info : infosToDelete) {
validateMessageInfoForDelete(info, indexEndOffsetBeforeCheck, indexValuesPriorToDelete, lifeVersions, originalPuts);
validateMessageInfoForDelete(info, indexEndOffsetBeforeCheck, indexValuesPriorToDelete, lifeVersions,
originalPuts);
}
synchronized (storeWriteLock) {
Offset currentIndexEndOffset = index.getCurrentEndOffset();
Expand All @@ -663,7 +665,8 @@ public void delete(List<MessageInfo> infosToDelete) throws StoreException {
}
List<InputStream> inputStreams = new ArrayList<>(infosToDelete.size());
List<MessageInfo> updatedInfos = new ArrayList<>(infosToDelete.size());
Offset endOffsetOfLastMessage = writeDeleteMessagesToLogSegment(infosToDelete, inputStreams, updatedInfos, lifeVersions);
Offset endOffsetOfLastMessage =
writeDeleteMessagesToLogSegment(infosToDelete, inputStreams, updatedInfos, lifeVersions);
logger.trace("Store : {} delete mark written to log", dataDir);
updateIndexAndStats(updatedInfos, endOffsetOfLastMessage, originalPuts, indexValuesPriorToDelete);
}
Expand All @@ -686,25 +689,25 @@ public StoreBatchDeleteInfo batchDelete(List<MessageInfo> infosToDelete) throws
checkStarted();
List<MessageInfo> infosToDeleteCopy = new ArrayList<>(infosToDelete);
checkDuplicates(infosToDelete);
StoreBatchDeleteInfo storeBatchDeleteInfo = new StoreBatchDeleteInfo(
replicaId.getPartitionId(), new ArrayList<>());
StoreBatchDeleteInfo storeBatchDeleteInfo = new StoreBatchDeleteInfo(replicaId.getPartitionId(), new ArrayList<>());
final Timer.Context context = metrics.batchDeleteResponse.time();
try {
List<IndexValue> indexValuesPriorToDelete = new ArrayList<>();
List<IndexValue> originalPuts = new ArrayList<>();
List<Short> lifeVersions = new ArrayList<>();
Offset indexEndOffsetBeforeCheck = index.getCurrentEndOffset();
// Update infosToDelete to filteredInfosToDelete returned from this function.
infosToDelete = validateAndFilterMessageInfosForBatchDelete(infosToDelete, indexEndOffsetBeforeCheck, indexValuesPriorToDelete, lifeVersions,
originalPuts, storeBatchDeleteInfo);
if (infosToDelete.isEmpty()){
logger.trace("Batch Delete completely failed since all blobs failed. Received InfosToDelete: {}", infosToDeleteCopy);
infosToDelete = validateAndFilterMessageInfosForBatchDelete(infosToDelete, indexEndOffsetBeforeCheck,
indexValuesPriorToDelete, lifeVersions, originalPuts, storeBatchDeleteInfo);
if (infosToDelete.isEmpty()) {
logger.trace("Batch Delete completely failed since all blobs failed. Received InfosToDelete: {}",
infosToDeleteCopy);
return storeBatchDeleteInfo;
}
synchronized (storeWriteLock) {
// Update storeBatchDeleteInfo for the remaining infosToDelete via store lock operations.
validateAndBatchDelete(infosToDelete, indexEndOffsetBeforeCheck, indexValuesPriorToDelete,
lifeVersions, originalPuts, storeBatchDeleteInfo, infosToDeleteCopy);
validateAndBatchDelete(infosToDelete, indexEndOffsetBeforeCheck, indexValuesPriorToDelete, lifeVersions,
originalPuts, storeBatchDeleteInfo, infosToDeleteCopy);
}
onSuccess("BATCH_DELETE");
return storeBatchDeleteInfo;
Expand Down Expand Up @@ -1630,6 +1633,24 @@ private void checkDuplicates(List<MessageInfo> infos) {
}
}

/**
* Sanity check, make sure all blobs belong to this partition.
* @param infos The list of {@link MessageInfo} to check partition
*/
private void checkPartition(List<MessageInfo> infos) {
for (MessageInfo info : infos) {
StoreKey storeKey = info.getStoreKey();
if (storeKey instanceof BlobId && replicaId != null) {
BlobId blobId = (BlobId) storeKey;
if (blobId.getPartition().getId() != replicaId.getPartitionId().getId()) {
throw new IllegalArgumentException(
"WriteSet contains unexpected blob id : " + info.getStoreKey() + ", it belongs to partition "
+ blobId.getPartition() + " but this blobstore's partition is " + replicaId.getPartitionId());
}
}
}
}

private MessageInfo fromIndexValue(StoreKey key, IndexValue value) {
return new MessageInfo(key, value.getSize(), value.isDelete(), value.isTtlUpdate(), value.isUndelete(),
value.getExpiresAtMs(), null, value.getAccountId(), value.getContainerId(), value.getOperationTimeInMs(),
Expand Down Expand Up @@ -1705,8 +1726,8 @@ public String toString() {
* Runs 2 validations for blobs to delete for both delete and batchDelete. Throws StoreException for validation failure.
* This message is separate from common validation function due to store locking required for these validations.
*/
private void validateDeleteForDuplicateAndLifeVersioConflicts(MessageInfo info, IndexValue value, short expectedLifeVersion)
throws StoreException {
private void validateDeleteForDuplicateAndLifeVersioConflicts(MessageInfo info, IndexValue value,
short expectedLifeVersion) throws StoreException {
// There are several possible cases that can exist here. Delete has to follow either PUT, TTL_UPDATE or UNDELETE.
// let EOBC be end offset before check, and [RECORD] means RECORD is optional
// 1. PUT [TTL_UPDATE DELETE UNDELETE] EOBC DELETE
Expand All @@ -1722,8 +1743,8 @@ private void validateDeleteForDuplicateAndLifeVersioConflicts(MessageInfo info,
// value being ttl update is fine, we can just append DELETE to it.
} else {
// For the extreme case, we log it out and throw an exception.
logger.warn("Concurrent operation for id " + info.getStoreKey() + " in store " + dataDir
+ ". Newly added value " + value);
logger.warn("Concurrent operation for id " + info.getStoreKey() + " in store " + dataDir + ". Newly added value "
+ value);
throw new StoreException(
"Cannot delete id " + info.getStoreKey() + " since there are concurrent operation while delete",
StoreErrorCodes.Life_Version_Conflict);
Expand Down Expand Up @@ -1788,15 +1809,16 @@ private void validateMessageInfoForDelete(MessageInfo info, Offset indexEndOffse
/*
* Runs validations and filtration for blobs to delete for batchDelete.
*/
private List<MessageInfo> validateAndFilterMessageInfosForBatchDelete(List<MessageInfo> infosToDelete, Offset indexEndOffsetBeforeCheck,
List<IndexValue> indexValuesPriorToDelete, List<Short> lifeVersions, List<IndexValue> originalPuts,
StoreBatchDeleteInfo storeBatchDeleteInfo) throws StoreException {
private List<MessageInfo> validateAndFilterMessageInfosForBatchDelete(List<MessageInfo> infosToDelete,
Offset indexEndOffsetBeforeCheck, List<IndexValue> indexValuesPriorToDelete, List<Short> lifeVersions,
List<IndexValue> originalPuts, StoreBatchDeleteInfo storeBatchDeleteInfo) throws StoreException {
List<MessageInfo> filteredInfos = new ArrayList<>();
for (MessageInfo info : infosToDelete) {
try {
validateMessageInfoForDelete(info, indexEndOffsetBeforeCheck, indexValuesPriorToDelete, lifeVersions, originalPuts);
validateMessageInfoForDelete(info, indexEndOffsetBeforeCheck, indexValuesPriorToDelete, lifeVersions,
originalPuts);
filteredInfos.add(info);
} catch (StoreException e){
} catch (StoreException e) {
storeBatchDeleteInfo.addMessageErrorInfo(new MessageErrorInfo(info, e.getErrorCode()));
if (e.getErrorCode() == StoreErrorCodes.IOError) {
onError();
Expand All @@ -1818,16 +1840,14 @@ private Offset writeDeleteMessagesToLogSegment(List<MessageInfo> infosToDelete,
new DeleteMessageFormatInputStream(info.getStoreKey(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), lifeVersions.get(i));
// Don't change the lifeVersion here, there are other logic in markAsDeleted that relies on this lifeVersion.
updatedInfos.add(
new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), info.getLifeVersion()));
updatedInfos.add(new MessageInfo(info.getStoreKey(), stream.getSize(), info.getAccountId(), info.getContainerId(),
info.getOperationTimeMs(), info.getLifeVersion()));
inputStreams.add(stream);
i++;
}
Offset endOffsetOfLastMessage = log.getEndOffset();
MessageFormatWriteSet writeSet =
new MessageFormatWriteSet(new SequenceInputStream(Collections.enumeration(inputStreams)), updatedInfos,
false);
new MessageFormatWriteSet(new SequenceInputStream(Collections.enumeration(inputStreams)), updatedInfos, false);
writeSet.writeTo(log);
return endOffsetOfLastMessage;
}
Expand All @@ -1837,23 +1857,22 @@ private Offset writeDeleteMessagesToLogSegment(List<MessageInfo> infosToDelete,
* batchDelete.
*/
private void updateIndexAndStats(List<MessageInfo> updatedInfos, Offset endOffsetOfLastMessage,
List<IndexValue> originalPuts, List<IndexValue> indexValuesPriorToDelete)
throws StoreException {
List<IndexValue> originalPuts, List<IndexValue> indexValuesPriorToDelete) throws StoreException {
int correspondingPutIndex = 0;
for (MessageInfo info : updatedInfos) {
FileSpan fileSpan = log.getFileSpanForMessage(endOffsetOfLastMessage, info.getSize());
IndexValue deleteIndexValue =
index.markAsDeleted(info.getStoreKey(), fileSpan, null, info.getOperationTimeMs(), info.getLifeVersion());
endOffsetOfLastMessage = fileSpan.getEndOffset();
blobStoreStats.handleNewDeleteEntry(info.getStoreKey(), deleteIndexValue,
originalPuts.get(correspondingPutIndex), indexValuesPriorToDelete.get(correspondingPutIndex));
blobStoreStats.handleNewDeleteEntry(info.getStoreKey(), deleteIndexValue, originalPuts.get(correspondingPutIndex),
indexValuesPriorToDelete.get(correspondingPutIndex));
correspondingPutIndex++;
}
logger.trace("Store : {} delete has been marked in the index ", dataDir);
}

/*
* This method is used to validate and batch delete messages in the store. It is used for batch_delete.
* This method is used to validate and batch delete messages in the store. It is used for batch_delete.
*/
private void validateAndBatchDelete(List<MessageInfo> infosToDelete, Offset indexEndOffsetBeforeCheck,
List<IndexValue> indexValuesPriorToDelete, List<Short> lifeVersions, List<IndexValue> originalPuts,
Expand All @@ -1874,22 +1893,24 @@ private void validateAndBatchDelete(List<MessageInfo> infosToDelete, Offset inde
}
i++;
filteredInfos.add(info);
} catch (StoreException e){
} catch (StoreException e) {
storeBatchDeleteInfo.addMessageErrorInfo(new MessageErrorInfo(info, e.getErrorCode()));
if (e.getErrorCode() == StoreErrorCodes.IOError) {
onError();
}
}
}
infosToDelete = filteredInfos;
if (infosToDelete.isEmpty()){
logger.trace("BATCH_DELETE: Operation completely failed since all blobs failed. Received InfosToDelete: {}", infosToDeleteCopy);
if (infosToDelete.isEmpty()) {
logger.trace("BATCH_DELETE: Operation completely failed since all blobs failed. Received InfosToDelete: {}",
infosToDeleteCopy);
return;
}
}
List<InputStream> inputStreams = new ArrayList<>(infosToDelete.size());
List<MessageInfo> updatedInfos = new ArrayList<>(infosToDelete.size());
Offset endOffsetOfLastMessage = writeDeleteMessagesToLogSegment(infosToDelete, inputStreams, updatedInfos, lifeVersions);
Offset endOffsetOfLastMessage =
writeDeleteMessagesToLogSegment(infosToDelete, inputStreams, updatedInfos, lifeVersions);
logger.trace("BATCH_DELETE: Store : {} delete mark written to log", dataDir);
updateIndexAndStats(updatedInfos, endOffsetOfLastMessage, originalPuts, indexValuesPriorToDelete);
for (MessageInfo info : updatedInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import com.github.ambry.clustermap.HelixFactory;
import com.github.ambry.clustermap.HelixParticipant;
import com.github.ambry.clustermap.MockHelixParticipant;
import com.github.ambry.clustermap.PartitionId;
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.clustermap.ReplicaSealStatus;
import com.github.ambry.clustermap.ReplicaStatusDelegate;
import com.github.ambry.commons.ErrorMapping;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.ClusterMapConfig;
import com.github.ambry.config.StoreConfig;
import com.github.ambry.config.VerifiableProperties;
Expand Down Expand Up @@ -60,6 +61,7 @@
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand All @@ -83,7 +85,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mock;
import org.mockito.Mockito;
import sun.nio.ch.FileChannelImpl;

Expand Down Expand Up @@ -697,7 +698,6 @@ public void basicTest() throws InterruptedException, IOException, StoreException
// cannot get without StoreGetOptions
verifyGetFailure(id, StoreErrorCodes.TTL_Expired);

// with StoreGetOptions.Store_Include_Expired
storeInfo = store.get(Collections.singletonList(id), EnumSet.of(StoreGetOptions.Store_Include_Expired));
checkStoreInfo(storeInfo, Collections.singleton(id));

Expand All @@ -711,6 +711,31 @@ public void basicTest() throws InterruptedException, IOException, StoreException

// non existent ID has to fail
verifyGetFailure(getUniqueId(), StoreErrorCodes.ID_Not_Found);

// Verify the partition failure
final String newPartitionString = "newPartitionId";
final long newPartitionLongId = 2L;
PartitionId newPartitionId = mock(PartitionId.class);
when(newPartitionId.toString()).thenReturn(newPartitionString);
when(newPartitionId.getId()).thenReturn(newPartitionLongId);
when(newPartitionId.getBytes()).thenReturn(new byte[]{0, 2, 0, 0, 0, 0, 0, 0, 0, (byte) newPartitionLongId});
BlobId blobId =
new BlobId(BlobId.BLOB_ID_V6, BlobId.BlobIdType.NATIVE, (byte) 1, (short) 1022, (short) 8, newPartitionId,
false, BlobId.BlobDataType.SIMPLE, UUID.randomUUID().toString());
long crc = random.nextLong();
MessageInfo info = new MessageInfo(blobId, 100, false, false, false, expiresAtMs, crc, blobId.getAccountId(),
blobId.getContainerId(), Utils.Infinite_Time, (short) -1);
ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes(100));
List<MessageInfo> infos = new ArrayList<>();
List<ByteBuffer> buffers = new ArrayList<>();
infos.add(info);
buffers.add(buffer);
try {
store.put(new MockMessageWriteSet(infos, buffers));
fail("should fail due to invalid partition");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("belongs to partition " + newPartitionString));
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public static class MockReplicaId implements ReplicaId {
partitionId = mock(PartitionId.class);
when(partitionId.toString()).thenReturn(storeId);
when(partitionId.toPathString()).thenReturn(storeId);
when(partitionId.getId()).thenReturn(1L);
}

@Override
Expand Down

0 comments on commit 9d28fce

Please sign in to comment.