Skip to content

Commit

Permalink
HDFS-17358. EC: infinite lease recovery caused by the length of RWR e…
Browse files Browse the repository at this point in the history
…quals to zero or datanode does not have the replica. (apache#6509). Contributed by farmmamba.

Reviewed-by: Tao Li <[email protected]>
Reviewed-by: Haiyang Hu <[email protected]>
Signed-off-by:  Shuyan Zhang <[email protected]>
  • Loading branch information
hfutatzhanghb authored Feb 27, 2024
1 parent a897e74 commit 15af529
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,8 @@ protected void recover() throws IOException {
Map<Long, BlockRecord> syncBlocks = new HashMap<>(locs.length);
final int dataBlkNum = ecPolicy.getNumDataUnits();
final int totalBlkNum = dataBlkNum + ecPolicy.getNumParityUnits();
int zeroLenReplicaCnt = 0;
int dnNotHaveReplicaCnt = 0;
//check generation stamps
for (int i = 0; i < locs.length; i++) {
DatanodeID id = locs[i];
Expand Down Expand Up @@ -419,10 +421,14 @@ protected void recover() throws IOException {
if (info == null) {
LOG.debug("Block recovery: DataNode: {} does not have " +
"replica for block: (block={}, internalBlk={})", id, block, internalBlk);
dnNotHaveReplicaCnt++;
} else {
LOG.debug("Block recovery: Ignored replica with invalid "
+ "generation stamp or length: {} from DataNode: {} by block: {}",
info, id, block);
if (info.getNumBytes() == 0) {
zeroLenReplicaCnt++;
}
}
}
} catch (RecoveryInProgressException ripE) {
Expand All @@ -436,9 +442,18 @@ protected void recover() throws IOException {
"datanode={})", block, internalBlk, id, e);
}
}
checkLocations(syncBlocks.size());

final long safeLength = getSafeLength(syncBlocks);
final long safeLength;
if (dnNotHaveReplicaCnt + zeroLenReplicaCnt <= locs.length - ecPolicy.getNumDataUnits()) {
checkLocations(syncBlocks.size());
safeLength = getSafeLength(syncBlocks);
} else {
safeLength = 0;
LOG.warn("Block recovery: {} datanodes do not have the replica of block {}." +
" {} datanodes have zero-length replica. Will remove this block.",
dnNotHaveReplicaCnt, block, zeroLenReplicaCnt);
}

LOG.debug("Recovering block {}, length={}, safeLength={}, syncList={}", block,
block.getNumBytes(), safeLength, syncBlocks);

Expand All @@ -452,11 +467,13 @@ protected void recover() throws IOException {
rurList.add(r);
}
}
assert rurList.size() >= dataBlkNum : "incorrect safe length";

// Recovery the striped block by truncating internal blocks to the safe
// length. Abort if there is any failure in this step.
truncatePartialBlock(rurList, safeLength);
if (safeLength > 0) {
Preconditions.checkArgument(rurList.size() >= dataBlkNum, "incorrect safe length");
// Recovery the striped block by truncating internal blocks to the safe
// length. Abort if there is any failure in this step.
truncatePartialBlock(rurList, safeLength);
}

// notify Namenode the new size and locations
final DatanodeID[] newLocs = new DatanodeID[totalBlkNum];
Expand All @@ -469,11 +486,20 @@ protected void recover() throws IOException {
int index = (int) (r.rInfo.getBlockId() &
HdfsServerConstants.BLOCK_GROUP_INDEX_MASK);
newLocs[index] = r.id;
newStorages[index] = r.storageID;
if (r.storageID != null) {
newStorages[index] = r.storageID;
}
}
ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
safeLength, recoveryId);
DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(bpid);
if (safeLength == 0) {
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
newBlock.getNumBytes(), true, true, newLocs, newStorages);
LOG.info("After block recovery, the length of new block is 0. " +
"Will remove this block: {} from file.", newBlock);
return;
}
nn.commitBlockSynchronization(block, newBlock.getGenerationStamp(),
newBlock.getNumBytes(), true, false, newLocs, newStorages);
}
Expand Down Expand Up @@ -527,8 +553,8 @@ long getSafeLength(Map<Long, BlockRecord> syncBlocks) {
private void checkLocations(int locationCount)
throws IOException {
if (locationCount < ecPolicy.getNumDataUnits()) {
throw new IOException(block + " has no enough internal blocks" +
", unable to start recovery. Locations=" + Arrays.asList(locs));
throw new IOException(block + " has no enough internal blocks(current: " + locationCount +
"), unable to start recovery. Locations=" + Arrays.asList(locs));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ private BlockReader createBlockReader(long offsetInBlock) {
block.getNumBytes() - offsetInBlock, true, "", peer, source,
null, stripedReader.getCachingStrategy(), -1, conf);
} catch (IOException e) {
LOG.info("Exception while creating remote block reader, datanode {}",
source, e);
LOG.info("Exception while creating remote block reader for {}, datanode {}",
block, source, e);
IOUtils.closeStream(peer);
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,35 @@ public void testSafeLength() {
checkSafeLength(1024 * 1024 * 1024, 6442450944L); // Length of: 1 GiB
}

/**
* 1. Write 1MB data, then flush it.
* 2. Mock client quiet exceptionally.
* 3. Trigger lease recovery.
* 4. Lease recovery successfully.
*/
@Test
public void testLeaseRecoveryWithManyZeroLengthReplica() {
int curCellSize = (int)1024 * 1024;
try {
final FSDataOutputStream out = dfs.create(p);
final DFSStripedOutputStream stripedOut = (DFSStripedOutputStream) out
.getWrappedStream();
for (int pos = 0; pos < curCellSize; pos++) {
out.write(StripedFileTestUtil.getByte(pos));
}
for (int i = 0; i < dataBlocks + parityBlocks; i++) {
StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
waitStreamerAllAcked(s);
stopBlockStream(s);
}
recoverLease();
LOG.info("Trigger recover lease manually successfully.");
} catch (Throwable e) {
String msg = "failed testCase" + StringUtils.stringifyException(e);
Assert.fail(msg);
}
}

private void checkSafeLength(int blockLength, long expectedSafeLength) {
int[] blockLengths = new int[]{blockLength, blockLength, blockLength, blockLength,
blockLength, blockLength};
Expand Down

0 comments on commit 15af529

Please sign in to comment.