Skip to content

Commit

Permalink
Add crc checks in put operation (linkedin#2915)
Browse files Browse the repository at this point in the history
This change makes sure that content of a chunk in a PUT operation is not modified in frontend (due to illegal reference counts of byte buf, etc) while we are writing it to server
  • Loading branch information
Arun-LinkedIn authored Oct 21, 2024
1 parent 8ff03be commit 639fa06
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class RouterConfig {
public static final String ROUTER_PUT_SUCCESS_TARGET = "router.put.success.target";
public static final String ROUTER_PUT_REMOTE_SUCCESS_TARGET = "router.put.remote.success.target";
public static final String ROUTER_PUT_REMOTE_ATTEMPT_LIMIT = "router.put.remote.attempt.limit";
public static final String ROUTER_VERIFY_CRC_FOR_PUT_REQUESTS = "router.verify.crc.for.put.requests";
public static final String ROUTER_REPLICATE_BLOB_REQUEST_PARALLELISM = "router.replicate.blob.request.parallelism";
public static final String ROUTER_REPLICATE_BLOB_SUCCESS_TARGET = "router.replicate.blob.success.target";
public static final String ROUTER_MAX_SLIPPED_PUT_ATTEMPTS = "router.max.slipped.put.attempts";
Expand Down Expand Up @@ -289,6 +290,10 @@ public class RouterConfig {
@Default("2")
public final int routerPutRemoteAttemptLimit;

@Config(ROUTER_VERIFY_CRC_FOR_PUT_REQUESTS)
@Default("false")
public final boolean routerVerifyCrcForPutRequests;

/**
* The maximum number of parallel requests issued at a time by the ReplicateBlob manager.
*/
Expand Down Expand Up @@ -946,6 +951,7 @@ public RouterConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getInt(ROUTER_GET_OPERATION_MIN_LOCAL_REPLICA_COUNT_TO_PRIORITIZE_LOCAL,
DEFAULT_ROUTER_GET_OPERATION_MIN_LOCAL_REPLICA_COUNT_TO_PRIORITIZE_LOCAL);
routerParanoidDurabilityEnabled = verifiableProperties.getBoolean(ROUTER_PARANOID_DURABILITY_ENABLED, false);
routerVerifyCrcForPutRequests = verifiableProperties.getBoolean(ROUTER_VERIFY_CRC_FOR_PUT_REQUESTS, false);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public static RestServiceErrorCode getRestServiceErrorCode(RouterErrorCode route
case UnexpectedInternalError:
case ChannelClosed:
case LifeVersionConflict:
case BlobCorrupted:
default:
return InternalServerError;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,10 @@ public enum RouterErrorCode {
/**
* Request is out of quota and should be throttled.
*/
TooManyRequests
TooManyRequests,

/**
* Blob content no longer satisfies CRC. This applies to uploads only
*/
BlobCorrupted
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public class NonBlockingRouterMetrics {
public final Counter chunkFillerUnexpectedErrorCount;
public final Counter operationFailureWithUnsetExceptionCount;
public final Counter missingDataChunkErrorCount;
public final Counter putBlobCRCMismatchCount;

// Performance metrics for operation managers.
public final Histogram putManagerPollTimeMs;
Expand Down Expand Up @@ -512,6 +513,8 @@ public NonBlockingRouterMetrics(ClusterMap clusterMap, RouterConfig routerConfig
metricRegistry.counter(MetricRegistry.name(NonBlockingRouter.class, "BackgroundDeleterNotFoundCount"));
backgroundDeleterExceptionCount =
metricRegistry.counter(MetricRegistry.name(NonBlockingRouter.class, "BackgroundDeleterExceptionCount"));
putBlobCRCMismatchCount =
metricRegistry.counter(MetricRegistry.name(NonBlockingRouter.class, "PutBlobCRCMismatchCount"));

// Performance metrics for operation managers.
putManagerPollTimeMs = metricRegistry.histogram(MetricRegistry.name(PutManager.class, "PutManagerPollTimeMs"));
Expand Down Expand Up @@ -986,6 +989,9 @@ private void onError(Exception exception) {
case ChannelClosed:
channelClosedErrorCount.inc();
break;
case BlobCorrupted:
putBlobCRCMismatchCount.inc();
break;
default:
unknownErrorCountForOperation.inc();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.github.ambry.rest.RestUtils;
import com.github.ambry.server.ServerErrorCode;
import com.github.ambry.store.StoreKey;
import com.github.ambry.utils.Crc32;
import com.github.ambry.utils.Pair;
import com.github.ambry.utils.Time;
import com.github.ambry.utils.Utils;
Expand All @@ -72,6 +73,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1192,6 +1195,8 @@ class PutChunk {
// Whether this chunk is compressed. Default is false for not compressed.
// This value is set after compression has completed. It is used to create PutRequest.
private boolean isChunkCompressed;
private final Crc32 chunkCrc32 = new Crc32();
private boolean isCrcVerified = false;

/**
* Construct a PutChunk
Expand Down Expand Up @@ -1232,6 +1237,8 @@ synchronized void releaseBlobContent() {
logger.trace("{}: releasing the chunk data for chunk {}", loggingContext, chunkIndex);
ReferenceCountUtil.safeRelease(buf);
buf = null;
chunkCrc32.reset();
isCrcVerified = false;
}
}

Expand Down Expand Up @@ -1444,6 +1451,22 @@ protected OperationTracker getOperationTracker() {
*/
private void encryptionCallback(EncryptJob.EncryptJobResult result, Exception exception) {
logger.trace("{}: Processing encrypt job callback for chunk at index {}", loggingContext, chunkIndex);

if (exception == null && !verifyCRC()) {
// Original content is corruption. Complete the operation
logger.error("CRC of the chunk {} is different before and after encryption", chunkBlobId);
releaseBlobContent();
if (result != null) {
result.release();
}
setOperationExceptionAndComplete(
new RouterException("CRC of chunk {} is different before and after encryption" + chunkBlobId, exception,
RouterErrorCode.BlobCorrupted));
routerMetrics.encryptTimeMs.update(time.milliseconds() - chunkEncryptReadyAtMs);
routerCallback.onPollReady();
return;
}

if (!isMetadataChunk()) {
// If this is a data blob, then release the content with or without exception.
// When there is no exception, then the encrypted data will be used.
Expand All @@ -1454,6 +1477,11 @@ private void encryptionCallback(EncryptJob.EncryptJobResult result, Exception ex
if (exception == null && !isOperationComplete()) {
if (!isMetadataChunk()) {
buf = result.getEncryptedBlobContent();
if (routerConfig.routerVerifyCrcForPutRequests) {
for (ByteBuffer byteBuffer : buf.nioBuffers()) {
chunkCrc32.update(byteBuffer);
}
}
}
encryptedPerBlobKey = result.getEncryptedKey();
chunkUserMetadata = result.getEncryptedUserMetadata().array();
Expand Down Expand Up @@ -1571,29 +1599,37 @@ void onFillComplete(boolean updateMetric) {
*/
int fillFrom(ByteBuf channelReadBuf) {
int toWrite;
ByteBuf slice;
if (buf == null) {
// If current buf is null, then only read the up to routerMaxPutChunkSizeBytes.
toWrite = Math.min(channelReadBuf.readableBytes(), routerConfig.routerMaxPutChunkSizeBytes);
buf = channelReadBuf.readRetainedSlice(toWrite);
slice = channelReadBuf.readRetainedSlice(toWrite);
buf = slice;
buf.touch(loggingContext);
} else {
int remainingSize = routerConfig.routerMaxPutChunkSizeBytes - buf.readableBytes();
toWrite = Math.min(channelReadBuf.readableBytes(), remainingSize);
ByteBuf remainingSlice = channelReadBuf.readRetainedSlice(toWrite);
remainingSlice.touch(loggingContext);
slice = channelReadBuf.readRetainedSlice(toWrite);
slice.touch(loggingContext);
// buf already has some bytes
if (buf instanceof CompositeByteBuf) {
// Buf is already a CompositeByteBuf, then just add the slice from
((CompositeByteBuf) buf).addComponent(true, remainingSlice);
((CompositeByteBuf) buf).addComponent(true, slice);
} else {
int maxComponents = routerConfig.routerMaxPutChunkSizeBytes;
CompositeByteBuf composite = buf.isDirect() ? buf.alloc().compositeDirectBuffer(maxComponents)
: buf.alloc().compositeHeapBuffer(maxComponents);
composite.addComponents(true, buf, remainingSlice);
composite.addComponents(true, buf, slice);
buf = composite;
buf.touch(loggingContext);
}
}

// Update crc for the chunk data
if (routerConfig.routerVerifyCrcForPutRequests) {
chunkCrc32.update(slice.nioBuffer());
}

if (buf.readableBytes() == routerConfig.routerMaxPutChunkSizeBytes) {
if (chunkIndex == 0) {
// If first put chunk is full, but not yet prepared then mark it awaiting resolution instead of completing it.
Expand All @@ -1613,6 +1649,16 @@ int fillFrom(ByteBuf channelReadBuf) {
void checkAndMaybeComplete() {
boolean done = false;
// Now, check if this chunk is done.
if (chunkException != null && chunkException.getErrorCode() == RouterErrorCode.BlobCorrupted) {
logger.error("{} : Put chunk {} failed due to corruption. Failing entire operation", loggingContext,
getChunkBlobId());
// Append this blob to slipped put so that it is cleaned up later. If this is a composite blob, the rest of the
// chunks will be cleaned up automatically as part of PUT operation clean up in PutManager#onComplete()
appendSlippedPutBlobId(chunkBlobId);
setOperationExceptionAndComplete(chunkException);
return;
}

if (operationTracker.isDone() || (chunkException != null
&& chunkException.getErrorCode() == RouterErrorCode.TooManyRequests)) {
if (!operationTracker.hasSucceeded()) {
Expand Down Expand Up @@ -1863,8 +1909,18 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
} else {
ServerErrorCode putError = putResponse.getError();
if (putError == ServerErrorCode.No_Error) {
logger.trace("{}: The putRequest was successful for chunk {}", loggingContext, chunkIndex);
isSuccessful = true;
if (!verifyCRC()) {
logger.error("{}: PutRequest with response correlationId {}, blob id {} has a mismatch in crc {} ",
loggingContext, correlationId, chunkBlobId, requestInfo.getReplicaId().getDataNodeId());
setChunkException(
new RouterException("CRC mismatch of chunk content before and after writing to server",
RouterErrorCode.BlobCorrupted));
isSuccessful = false;
putRequestFinalState = TrackedRequestFinalState.FAILURE;
} else {
logger.trace("{}: The putRequest was successful for chunk {}", loggingContext, chunkIndex);
isSuccessful = true;
}
} else {
// chunkException will be set within processServerError.
logger.trace(
Expand All @@ -1890,8 +1946,7 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
// We use a separate metric to the latency of remote vs local writes (such requests are routed through
// ParanoidDurabilityOperationTracker).
routerMetrics.routerPutRequestRemoteLatencyMs.update(requestLatencyMs);
}
else {
} else {
routerMetrics.routerPutRequestLocalLatencyMs.update(requestLatencyMs);
}
} else {
Expand All @@ -1900,6 +1955,23 @@ void handleResponse(ResponseInfo responseInfo, PutResponse putResponse) {
checkAndMaybeComplete();
}

/**
* @return {@code true} if CRC of the chunk buffer is same as one calculated in chunk filler thread
*/
boolean verifyCRC() {
if (!routerConfig.routerVerifyCrcForPutRequests || isMetadataChunk() || isCrcVerified) {
return true;
}
Crc32 crc32 = new Crc32();
for (ByteBuffer byteBuffer : buf.nioBuffers()) {
crc32.update(byteBuffer);
}
isCrcVerified = true;
logger.trace("Chunk Id {}, state {}, Original CRC {}, current CRC {}", chunkBlobId, state.name(),
this.chunkCrc32.getValue(), crc32.getValue());
return this.chunkCrc32.getValue() == crc32.getValue();
}

/**
* Process response if it was rejected due to quota compliance.
* @param correlationId correlation id of the request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ static boolean isSystemHealthError(Exception exception) {
case BlobUpdateNotAllowed:
isSystemHealthError = false;
break;
case BlobCorrupted:
case UnexpectedInternalError:
isInternalError = true;
break;
Expand Down
Loading

0 comments on commit 639fa06

Please sign in to comment.