Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CD221849C for ; Mon, 22 Feb 2016 19:47:43 +0000 (UTC) Received: (qmail 73538 invoked by uid 500); 22 Feb 2016 19:47:22 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 73437 invoked by uid 500); 22 Feb 2016 19:47:22 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 71211 invoked by uid 99); 22 Feb 2016 19:47:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Feb 2016 19:47:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DA4CE0419; Mon, 22 Feb 2016 19:47:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Mon, 22 Feb 2016 19:48:00 -0000 Message-Id: <420e8820e16b423c8d53c601cc2c34fb@git.apache.org> In-Reply-To: <7839982659ae4ff6bd79ac8b2f5987b7@git.apache.org> References: <7839982659ae4ff6bd79ac8b2f5987b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [41/50] [abbrv] hadoop git commit: HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. Contributed by Jing Zhao. HDFS-9818. Correctly handle EC reconstruction work caused by not enough racks. Contributed by Jing Zhao. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e54cc293 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e54cc293 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e54cc293 Branch: refs/heads/HDFS-1312 Commit: e54cc2931262bf49682a8323da9811976218c03b Parents: 6eae433 Author: Jing Zhao Authored: Fri Feb 19 19:02:23 2016 -0800 Committer: Jing Zhao Committed: Fri Feb 19 19:02:23 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hdfs/DFSStripedInputStream.java | 7 +- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 40 ++-- .../BlockPlacementPolicyDefault.java | 5 +- .../BlockPlacementPolicyRackFaultTolerant.java | 10 +- .../BlockPlacementStatusDefault.java | 10 +- .../BlockReconstructionWork.java | 5 + .../blockmanagement/ErasureCodingWork.java | 106 ++++++++- .../server/blockmanagement/ReplicationWork.java | 5 + .../erasurecode/ErasureCodingWorker.java | 25 ++- .../TestBlocksWithNotEnoughRacks.java | 12 +- ...constructStripedBlocksWithRackAwareness.java | 213 +++++++++++++++++++ 12 files changed, 388 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d15e536..3483255 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -260,7 +260,12 @@ public class DFSStripedInputStream extends DFSInputStream { private void closeReader(BlockReaderInfo readerInfo) { if (readerInfo != null) { -// IOUtils.cleanup(null, readerInfo.reader); + if (readerInfo.reader != null) { + try { + readerInfo.reader.close(); + } catch (Throwable ignored) { + } + } readerInfo.skip(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a377243..1d0379c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -432,6 +432,9 @@ Trunk (Unreleased) HDFS-9794. Streamer threads may leak if failure happens when closing the striped outputstream. (jing9) + HDFS-9818. Correctly handle EC reconstruction work caused by not enough + racks. (jing9) + BREAKDOWN OF HDFS-7285 SUBTASKS AND RELATED JIRAS HDFS-7347. Configurable erasure coding policy for individual files and http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 67f5026..cc52b6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1628,7 +1628,7 @@ public class BlockManager implements BlockStatsMXBean { for (int i = 0 ; i < liveBlockIndices.size(); i++) { indices[i] = liveBlockIndices.get(i); } - return new ErasureCodingWork(block, bc, srcNodes, + return new ErasureCodingWork(getBlockPoolId(), block, bc, srcNodes, containingNodes, liveReplicaNodes, additionalReplRequired, priority, indices); } else { @@ -1638,6 +1638,16 @@ public class BlockManager implements BlockStatsMXBean { } } + private boolean isInNewRack(DatanodeDescriptor[] srcs, + DatanodeDescriptor target) { + for (DatanodeDescriptor src : srcs) { + if (src.getNetworkLocation().equals(target.getNetworkLocation())) { + return false; + } + } + return true; + } + private boolean validateReconstructionWork(BlockReconstructionWork rw) { BlockInfo block = rw.getBlock(); int priority = rw.getPriority(); @@ -1665,31 +1675,14 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo[] targets = rw.getTargets(); if ( (numReplicas.liveReplicas() >= requiredReplication) && (!isPlacementPolicySatisfied(block)) ) { - if (rw.getSrcNodes()[0].getNetworkLocation().equals( - targets[0].getDatanodeDescriptor().getNetworkLocation())) { - //No use continuing, unless a new rack in this case + if (!isInNewRack(rw.getSrcNodes(), targets[0].getDatanodeDescriptor())) { + // No use continuing, unless a new rack in this case return false; } } - // Add block to the to be reconstructed list - if (block.isStriped()) { - assert rw instanceof ErasureCodingWork; - assert rw.getTargets().length > 0; - assert pendingNum == 0 : "Should wait the previous reconstruction" - + " to finish"; - final ErasureCodingPolicy ecPolicy = - ((BlockInfoStriped) block).getErasureCodingPolicy(); - assert ecPolicy != null; - - rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded( - new ExtendedBlock(getBlockPoolId(), block), - rw.getSrcNodes(), rw.getTargets(), - ((ErasureCodingWork) rw).getLiveBlockIndicies(), ecPolicy); - } else { - rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets); - } - + // Add block to the datanode's task list + rw.addTaskToDatanode(); DatanodeStorageInfo.incrementBlocksScheduled(targets); // Move the block-replication into a "pending" state. @@ -3973,7 +3966,8 @@ public class BlockManager implements BlockStatsMXBean { .getPolicy(storedBlock.isStriped()); int numReplicas = storedBlock.isStriped() ? ((BlockInfoStriped) storedBlock) .getRealDataBlockNum() : storedBlock.getReplication(); - return placementPolicy.verifyBlockPlacement(locs, numReplicas).isPlacementPolicySatisfied(); + return placementPolicy.verifyBlockPlacement(locs, numReplicas) + .isPlacementPolicySatisfied(); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index ee891a5..e1a47ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -901,7 +901,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { locs = DatanodeDescriptor.EMPTY_ARRAY; if (!clusterMap.hasClusterEverBeenMultiRack()) { // only one rack - return new BlockPlacementStatusDefault(1, 1); + return new BlockPlacementStatusDefault(1, 1, 1); } int minRacks = 2; minRacks = Math.min(minRacks, numberOfReplicas); @@ -910,7 +910,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { Set racks = new TreeSet<>(); for (DatanodeInfo dn : locs) racks.add(dn.getNetworkLocation()); - return new BlockPlacementStatusDefault(racks.size(), minRacks); + return new BlockPlacementStatusDefault(racks.size(), minRacks, + clusterMap.getNumOfRacks()); } /** * Decide whether deleting the specified replica of the block still makes http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java index c803b97..c0d981c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyRackFaultTolerant.java @@ -160,14 +160,16 @@ public class BlockPlacementPolicyRackFaultTolerant extends BlockPlacementPolicyD locs = DatanodeDescriptor.EMPTY_ARRAY; if (!clusterMap.hasClusterEverBeenMultiRack()) { // only one rack - return new BlockPlacementStatusDefault(1, 1); + return new BlockPlacementStatusDefault(1, 1, 1); } // 1. Check that all locations are different. // 2. Count locations on different racks. - Set racks = new TreeSet(); - for (DatanodeInfo dn : locs) + Set racks = new TreeSet<>(); + for (DatanodeInfo dn : locs) { racks.add(dn.getNetworkLocation()); - return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas); + } + return new BlockPlacementStatusDefault(racks.size(), numberOfReplicas, + clusterMap.getNumOfRacks()); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java index 0b8b965..75bb65d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementStatusDefault.java @@ -21,15 +21,18 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus { private int requiredRacks = 0; private int currentRacks = 0; + private final int totalRacks; - public BlockPlacementStatusDefault(int currentRacks, int requiredRacks){ + public BlockPlacementStatusDefault(int currentRacks, int requiredRacks, + int totalRacks){ this.requiredRacks = requiredRacks; this.currentRacks = currentRacks; + this.totalRacks = totalRacks; } @Override public boolean isPlacementPolicySatisfied() { - return requiredRacks <= currentRacks; + return requiredRacks <= currentRacks || currentRacks >= totalRacks; } @Override @@ -38,7 +41,8 @@ public class BlockPlacementStatusDefault implements BlockPlacementStatus { return null; } return "Block should be additionally replicated on " + - (requiredRacks - currentRacks) + " more rack(s)."; + (requiredRacks - currentRacks) + + " more rack(s). Total number of racks in the cluster: " + totalRacks; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java index df9c164..c1998ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java @@ -108,4 +108,9 @@ abstract class BlockReconstructionWork { abstract void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes); + + /** + * add reconstruction task into a source datanode + */ + abstract void addTaskToDatanode(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 85a25d5..38ad324 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@ -17,15 +17,23 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.net.Node; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; class ErasureCodingWork extends BlockReconstructionWork { private final byte[] liveBlockIndicies; + private final String blockPoolId; - public ErasureCodingWork(BlockInfo block, + public ErasureCodingWork(String blockPoolId, BlockInfo block, BlockCollection bc, DatanodeDescriptor[] srcNodes, List containingNodes, @@ -34,6 +42,7 @@ class ErasureCodingWork extends BlockReconstructionWork { int priority, byte[] liveBlockIndicies) { super(block, bc, srcNodes, containingNodes, liveReplicaStorages, additionalReplRequired, priority); + this.blockPoolId = blockPoolId; this.liveBlockIndicies = liveBlockIndicies; BlockManager.LOG.debug("Creating an ErasureCodingWork to {} reconstruct ", block); @@ -47,15 +56,92 @@ class ErasureCodingWork extends BlockReconstructionWork { void chooseTargets(BlockPlacementPolicy blockplacement, BlockStoragePolicySuite storagePolicySuite, Set excludedNodes) { - try { - // TODO: new placement policy for EC considering multiple writers - DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget( - getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0], - getLiveReplicaStorages(), false, excludedNodes, - getBlock().getNumBytes(), - storagePolicySuite.getPolicy(getBc().getStoragePolicyID())); - setTargets(chosenTargets); - } finally { + // TODO: new placement policy for EC considering multiple writers + DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget( + getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0], + getLiveReplicaStorages(), false, excludedNodes, + getBlock().getNumBytes(), + storagePolicySuite.getPolicy(getBc().getStoragePolicyID())); + setTargets(chosenTargets); + } + + /** + * @return true if the current source nodes cover all the internal blocks. + * I.e., we only need to have more racks. + */ + private boolean hasAllInternalBlocks() { + final BlockInfoStriped block = (BlockInfoStriped) getBlock(); + if (getSrcNodes().length < block.getRealTotalBlockNum()) { + return false; + } + BitSet bitSet = new BitSet(block.getTotalBlockNum()); + for (byte index : liveBlockIndicies) { + bitSet.set(index); + } + for (int i = 0; i < block.getRealDataBlockNum(); i++) { + if (!bitSet.get(i)) { + return false; + } + } + for (int i = block.getDataBlockNum(); i < block.getTotalBlockNum(); i++) { + if (!bitSet.get(i)) { + return false; + } + } + return true; + } + + /** + * We have all the internal blocks but not enough racks. Thus we do not need + * to do decoding but only simply make an extra copy of an internal block. In + * this scenario, use this method to choose the source datanode for simple + * replication. + * @return The index of the source datanode. + */ + private int chooseSource4SimpleReplication() { + Map> map = new HashMap<>(); + for (int i = 0; i < getSrcNodes().length; i++) { + final String rack = getSrcNodes()[i].getNetworkLocation(); + List dnList = map.get(rack); + if (dnList == null) { + dnList = new ArrayList<>(); + map.put(rack, dnList); + } + dnList.add(i); + } + List max = null; + for (Map.Entry> entry : map.entrySet()) { + if (max == null || entry.getValue().size() > max.size()) { + max = entry.getValue(); + } + } + assert max != null; + return max.get(0); + } + + @Override + void addTaskToDatanode() { + assert getTargets().length > 0; + BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock(); + + // if we already have all the internal blocks, but not enough racks, + // we only need to replicate one internal block to a new rack + if (hasAllInternalBlocks()) { + int sourceIndex = chooseSource4SimpleReplication(); + final byte blockIndex = liveBlockIndicies[sourceIndex]; + final DatanodeDescriptor source = getSrcNodes()[sourceIndex]; + final long internBlkLen = StripedBlockUtil.getInternalBlockLength( + stripedBlk.getNumBytes(), stripedBlk.getCellSize(), + stripedBlk.getDataBlockNum(), blockIndex); + final Block targetBlk = new Block( + stripedBlk.getBlockId() + blockIndex, internBlkLen, + stripedBlk.getGenerationStamp()); + source.addBlockToBeReplicated(targetBlk, getTargets()); + } else { + getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded( + new ExtendedBlock(blockPoolId, stripedBlk), + getSrcNodes(), getTargets(), getLiveBlockIndicies(), + stripedBlk.getErasureCodingPolicy()); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java index b44b9b1..24601a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@ -53,4 +53,9 @@ class ReplicationWork extends BlockReconstructionWork { getSrcNodes()[0].decrementPendingReplicationWithoutTargets(); } } + + @Override + void addTaskToDatanode() { + getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java index b08aa2e..1017e1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/ErasureCodingWorker.java @@ -177,8 +177,14 @@ public final class ErasureCodingWorker { Collection ecTasks) { for (BlockECReconstructionInfo reconstructionInfo : ecTasks) { try { - EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL - .submit(new ReconstructAndTransferBlock(reconstructionInfo)); + ReconstructAndTransferBlock task = + new ReconstructAndTransferBlock(reconstructionInfo); + if (task.hasValidTargets()) { + EC_RECONSTRUCTION_STRIPED_BLK_THREAD_POOL.submit(task); + } else { + LOG.warn("No missing internal block. Skip reconstruction for task:{}", + reconstructionInfo); + } } catch (Throwable e) { LOG.warn("Failed to reconstruct striped block {}", reconstructionInfo.getExtendedBlock().getLocalBlock(), e); @@ -292,6 +298,7 @@ public final class ErasureCodingWorker { private final CompletionService readService = new ExecutorCompletionService<>( EC_RECONSTRUCTION_STRIPED_READ_THREAD_POOL); + private final boolean hasValidTargets; ReconstructAndTransferBlock(BlockECReconstructionInfo reconstructionInfo) { ErasureCodingPolicy ecPolicy = reconstructionInfo @@ -339,10 +346,14 @@ public final class ErasureCodingWorker { seqNo4Targets[i] = 0; } - getTargetIndices(); + hasValidTargets = getTargetIndices(); cachingStrategy = CachingStrategy.newDefaultStrategy(); } + boolean hasValidTargets() { + return hasValidTargets; + } + private ByteBuffer allocateBuffer(int length) { return ByteBuffer.allocate(length); } @@ -505,24 +516,30 @@ public final class ErasureCodingWorker { } } - private void getTargetIndices() { + /** + * @return true if there is valid target for reconstruction + */ + private boolean getTargetIndices() { BitSet bitset = new BitSet(dataBlkNum + parityBlkNum); for (int i = 0; i < sources.length; i++) { bitset.set(liveIndices[i]); } int m = 0; int k = 0; + boolean hasValidTarget = false; for (int i = 0; i < dataBlkNum + parityBlkNum; i++) { if (!bitset.get(i)) { if (getBlockLen(blockGroup, i) > 0) { if (m < targets.length) { targetIndices[m++] = (short)i; + hasValidTarget = true; } } else { zeroStripeIndices[k++] = (short)i; } } } + return hasValidTarget; } /** the reading length should not exceed the length for reconstruction. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java index ea994a2..d91155a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java @@ -285,7 +285,7 @@ public class TestBlocksWithNotEnoughRacks { final DatanodeManager dm = ns.getBlockManager().getDatanodeManager(); try { - // Create a file with one block with a replication factor of 2 + // Create a file with one block with a replication factor of 3 final FileSystem fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, filePath, 1L, REPLICATION_FACTOR, 1L); ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath); @@ -315,8 +315,9 @@ public class TestBlocksWithNotEnoughRacks { dm.removeDatanode(dnId); // Make sure we have enough live replicas even though we are - // short one rack and therefore need one replica - DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1); + // short one rack. The cluster now has only 1 rack thus we just make sure + // we still have 3 replicas. + DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0); } finally { cluster.shutdown(); } @@ -357,9 +358,8 @@ public class TestBlocksWithNotEnoughRacks { // The block gets re-replicated to another datanode so it has a // sufficient # replicas, but not across racks, so there should - // be 1 rack, and 1 needed replica (even though there are 2 hosts - // available and only 2 replicas required). - DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 1); + // be 1 rack. + DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0); // Start the "failed" datanode, which has a replica so the block is // now over-replicated and therefore a replica should be removed but http://git-wip-us.apache.org/repos/asf/hadoop/blob/e54cc293/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java new file mode 100644 index 0000000..2164957 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.INodeFile; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS; + +public class TestReconstructStripedBlocksWithRackAwareness { + public static final Logger LOG = LoggerFactory.getLogger( + TestReconstructStripedBlocksWithRackAwareness.class); + + static { + GenericTestUtils.setLogLevel(BlockPlacementPolicy.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL); + } + + private static final String[] hosts = new String[]{"host1", "host2", "host3", + "host4", "host5", "host6", "host7", "host8", "host9", "host10"}; + private static final String[] racks = new String[]{"/r1", "/r1", "/r2", "/r2", + "/r3", "/r3", "/r4", "/r4", "/r5", "/r6"}; + private static final List singleNodeRacks = Arrays.asList("host9", "host10"); + private static final short blockNum = (short) (NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS); + + private MiniDFSCluster cluster; + private DistributedFileSystem fs; + private FSNamesystem fsn; + private BlockManager bm; + + @Before + public void setup() throws Exception { + final HdfsConfiguration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, + false); + + cluster = new MiniDFSCluster.Builder(conf).racks(racks).hosts(hosts) + .numDataNodes(hosts.length).build(); + cluster.waitActive(); + + fsn = cluster.getNamesystem(); + bm = fsn.getBlockManager(); + + fs = cluster.getFileSystem(); + fs.setErasureCodingPolicy(new Path("/"), null); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * When there are all the internal blocks available but they are not placed on + * enough racks, NameNode should avoid normal decoding reconstruction but copy + * an internal block to a new rack. + * + * In this test, we first need to create a scenario that a striped block has + * all the internal blocks but distributed in <6 racks. Then we check if the + * replication monitor can correctly schedule the reconstruction work for it. + * + * For the 9 internal blocks + 5 racks setup, the test does the following: + * 1. create a 6 rack cluster with 10 datanodes, where there are 2 racks only + * containing 1 datanodes each + * 2. for a striped block with 9 internal blocks, there must be one internal + * block locating in a single-node rack. find this node and stop it + * 3. namenode will trigger reconstruction for the block and since the cluster + * has only 5 racks remaining, after the reconstruction we have 9 internal + * blocks distributed in 5 racks. + * 4. we bring the datanode back, now the cluster has 6 racks again + * 5. let the datanode call reportBadBlock, this will make the namenode to + * check if the striped block is placed in >= 6 racks, and the namenode will + * put the block into the under-replicated queue + * 6. now we can check if the replication monitor works as expected + */ + @Test + public void testReconstructForNotEnoughRacks() throws Exception { + final Path file = new Path("/foo"); + DFSTestUtil.createFile(fs, file, + BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2, (short) 1, 0L); + Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks()); + + final INodeFile fileNode = fsn.getFSDirectory() + .getINode4Write(file.toString()).asFile(); + BlockInfoStriped blockInfo = (BlockInfoStriped) fileNode.getLastBlock(); + + // find the internal block located in the single node rack + Block internalBlock = null; + String hostToStop = null; + for (DatanodeStorageInfo storage : blockInfo.storages) { + if (singleNodeRacks.contains(storage.getDatanodeDescriptor().getHostName())) { + hostToStop = storage.getDatanodeDescriptor().getHostName(); + internalBlock = blockInfo.getBlockOnStorage(storage); + } + } + Assert.assertNotNull(internalBlock); + Assert.assertNotNull(hostToStop); + + // delete the block on the chosen datanode + cluster.corruptBlockOnDataNodesByDeletingBlockFile( + new ExtendedBlock(bm.getBlockPoolId(), internalBlock)); + + // stop the chosen datanode + MiniDFSCluster.DataNodeProperties dnProp = null; + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + DataNode dn = cluster.getDataNodes().get(i); + if (dn.getDatanodeId().getHostName().equals(hostToStop)) { + dnProp = cluster.stopDataNode(i); + cluster.setDataNodeDead(dn.getDatanodeId()); + LOG.info("stop datanode " + dn.getDatanodeId().getHostName()); + } + } + NetworkTopology topology = bm.getDatanodeManager().getNetworkTopology(); + Assert.assertEquals(5, topology.getNumOfRacks()); + + // make sure the reconstruction work can finish + // now we have 9 internal blocks in 5 racks + DFSTestUtil.waitForReplication(fs, file, blockNum, 15 * 1000); + + // we now should have 9 internal blocks distributed in 5 racks + Set rackSet = new HashSet<>(); + for (DatanodeStorageInfo storage : blockInfo.storages) { + rackSet.add(storage.getDatanodeDescriptor().getNetworkLocation()); + } + Assert.assertEquals(5, rackSet.size()); + + // restart the stopped datanode + cluster.restartDataNode(dnProp); + cluster.waitActive(); + + // make sure we have 6 racks again + topology = bm.getDatanodeManager().getNetworkTopology(); + Assert.assertEquals(hosts.length, topology.getNumOfLeaves()); + Assert.assertEquals(6, topology.getNumOfRacks()); + + // pause all the heartbeats + DataNode badDn = null; + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + if (dn.getDatanodeId().getHostName().equals(hostToStop)) { + badDn = dn; + } + } + assert badDn != null; + // let the DN report the bad block, so that the namenode will put the block + // into under-replicated queue. note that the block still has 9 internal + // blocks but in 5 racks + badDn.reportBadBlocks(new ExtendedBlock(bm.getBlockPoolId(), internalBlock)); + + // check if replication monitor correctly schedule the replication work + boolean scheduled = false; + for (int i = 0; i < 5; i++) { // retry 5 times + for (DatanodeStorageInfo storage : blockInfo.storages) { + if (storage != null) { + DatanodeDescriptor dn = storage.getDatanodeDescriptor(); + Assert.assertEquals(0, dn.getNumberOfBlocksToBeErasureCoded()); + if (dn.getNumberOfBlocksToBeReplicated() == 1) { + scheduled = true; + } + } + } + if (scheduled) { + break; + } + Thread.sleep(1000); + } + Assert.assertTrue(scheduled); + } +}