Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 414ABEDF9 for ; Fri, 4 Jan 2013 08:10:09 +0000 (UTC) Received: (qmail 972 invoked by uid 500); 4 Jan 2013 08:10:09 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 760 invoked by uid 500); 4 Jan 2013 08:10:08 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 705 invoked by uid 99); 4 Jan 2013 08:10:06 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jan 2013 08:10:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Jan 2013 08:10:04 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6DA232388847; Fri, 4 Jan 2013 08:09:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1428739 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/ Date: Fri, 04 Jan 2013 08:09:44 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130104080944.6DA232388847@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Fri Jan 4 08:09:43 2013 New Revision: 1428739 URL: http://svn.apache.org/viewvc?rev=1428739&view=rev Log: HDFS-4270. Introduce soft and hard limits for max replication so that replications of the highest priority are allowed to choose a source datanode that has reached its soft limit but not the hard limit. Contributed by Derek Dagit Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1428739&r1=1428738&r2=1428739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jan 4 08:09:43 2013 @@ -443,6 +443,11 @@ Release 2.0.3-alpha - Unreleased HDFS-4326. bump up Tomcat version for HttpFS to 6.0.36. (tucu via acmurthy) + HDFS-4270. Introduce soft and hard limits for max replication so that + replications of the highest priority are allowed to choose a source datanode + that has reached its soft limit but not the hard limit. (Derek Dagit via + szetszwo) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1428739&r1=1428738&r2=1428739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan 4 08:09:43 2013 @@ -143,6 +143,8 @@ public class DFSConfigKeys extends Commo public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1; public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams"; public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2; + public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit"; + public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4; public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled"; public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false; public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled"; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1428739&r1=1428738&r2=1428739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jan 4 08:09:43 2013 @@ -189,10 +189,16 @@ public class BlockManager { /** The maximum number of replicas allowed for a block */ public final short maxReplication; - /** The maximum number of outgoing replication streams - * a given node should have at one time - */ + /** + * The maximum number of outgoing replication streams a given node should have + * at one time considering all but the highest priority replications needed. + */ int maxReplicationStreams; + /** + * The maximum number of outgoing replication streams a given node should have + * at one time. + */ + int replicationStreamsHardLimit; /** Minimum copies needed or else write is disallowed */ public final short minReplication; /** Default number of replicas */ @@ -263,9 +269,16 @@ public class BlockManager { this.minReplication = (short)minR; this.maxReplication = (short)maxR; - this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); - this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null; + this.maxReplicationStreams = + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); + this.replicationStreamsHardLimit = + conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); + this.shouldCheckForEnoughRacks = + conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null + ? false : true; this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); @@ -435,7 +448,8 @@ public class BlockManager { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, - containingLiveReplicasNodes, numReplicas); + containingLiveReplicasNodes, numReplicas, + UnderReplicatedBlocks.LEVEL); assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -1145,11 +1159,12 @@ public class BlockManager { liveReplicaNodes = new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas); + block, containingNodes, liveReplicaNodes, numReplicas, + priority); if(srcNode == null) { // block can not be replicated from any node LOG.debug("Block " + block + " cannot be repl from any node"); continue; - } + } assert liveReplicaNodes.size() == numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending @@ -1339,16 +1354,34 @@ public class BlockManager { * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. * Otherwise we choose a random node among those that did not reach their - * replication limit. + * replication limits. However, if the replication is of the highest priority + * and all nodes have reached their replication limits, we will choose a + * random node despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. + * + * @param block Block for which a replication source is needed + * @param containingNodes List to be populated with nodes found to contain the + * given block + * @param nodesContainingLiveReplicas List to be populated with nodes found to + * contain live replicas of the given block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and + * decommissioned replicas of the given + * block. + * @param priority integer representing replication priority of the given + * block + * @return the DatanodeDescriptor of the chosen node from which to replicate + * the given block */ - private DatanodeDescriptor chooseSourceDatanode( + @VisibleForTesting + DatanodeDescriptor chooseSourceDatanode( Block block, List containingNodes, List nodesContainingLiveReplicas, - NumberReplicas numReplicas) { + NumberReplicas numReplicas, + int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; @@ -1377,8 +1410,15 @@ public class BlockManager { // If so, do not select the node as src node if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) continue; - if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + { continue; // already reached replication limit + } + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) + { + continue; + } // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1428739&r1=1428738&r2=1428739&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Fri Jan 4 08:09:43 2013 @@ -19,10 +19,13 @@ package org.apache.hadoop.hdfs.server.bl import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -429,4 +432,57 @@ public class TestBlockManager { } return repls; } + + /** + * Test that a source node for a highest-priority replication is chosen even if all available + * source nodes have reached their replication limits. + */ + @Test + public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception { + bm.maxReplicationStreams = 0; + bm.replicationStreamsHardLimit = 1; + + long blockId = 42; // arbitrary + Block aBlock = new Block(blockId, 0, 0); + + List origNodes = getNodes(0, 1); + // Add the block to the first node. + addBlockOnNodes(blockId,origNodes.subList(0,1)); + + List cntNodes = new LinkedList(); + List liveNodes = new LinkedList(); + + assertNotNull("Chooses source node for a highest-priority replication" + + " even if all available source nodes have reached their replication" + + " limits below the hard limit.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + + assertNull("Does not choose a source node for a less-than-highest-priority" + + " replication since all available source nodes have reached" + + " their replication limits.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED)); + + // Increase the replication count to test replication count > hard limit + DatanodeDescriptor targets[] = { origNodes.get(1) }; + origNodes.get(0).addBlockToBeReplicated(aBlock, targets); + + assertNull("Does not choose a source node for a highest-priority" + + " replication when all available nodes exceed the hard limit.", + bm.chooseSourceDatanode( + aBlock, + cntNodes, + liveNodes, + new NumberReplicas(), + UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)); + } }