Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 330E1E993 for ; Thu, 31 Jan 2013 23:05:39 +0000 (UTC) Received: (qmail 267 invoked by uid 500); 31 Jan 2013 23:05:39 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 214 invoked by uid 500); 31 Jan 2013 23:05:39 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 205 invoked by uid 99); 31 Jan 2013 23:05:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Jan 2013 23:05:38 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 31 Jan 2013 23:05:34 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 0E3EC2388A40; Thu, 31 Jan 2013 23:05:14 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1441249 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/ Date: Thu, 31 Jan 2013 23:05:13 -0000 To: hdfs-commits@hadoop.apache.org From: kihwal@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130131230514.0E3EC2388A40@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: kihwal Date: Thu Jan 31 23:05:13 2013 New Revision: 1441249 URL: http://svn.apache.org/viewvc?rev=1441249&view=rev Log: merge -r 1199023:1199024 Merging from trunk to branch-0.23 to fix HDFS-2495 Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1441249&r1=1441248&r2=1441249&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jan 31 23:05:13 2013 @@ -14,6 +14,9 @@ Release 0.23.7 - UNRELEASED HDFS-2477. Optimize computing the diff between a block report and the namenode state. (Tomasz Nykiel via hairong) + HDFS-2495. Increase granularity of write operations in ReplicationMonitor + thus reducing contention for write lock. (Tomasz Nykiel via hairong) + BUG FIXES HDFS-4288. NN accepts incremental BR as IBR in safemode (daryn via kihwal) Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1441249&r1=1441248&r2=1441249&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Thu Jan 31 23:05:13 2013 @@ -958,15 +958,7 @@ public class BlockManager { chooseUnderReplicatedBlocks(blocksToProcess); // replicate blocks - int scheduledReplicationCount = 0; - for (int i=0; i> blocksToReplicate) { int requiredReplication, numEffectiveReplicas; List containingNodes, liveReplicaNodes; DatanodeDescriptor srcNode; INodeFile fileINode = null; int additionalReplRequired; + int scheduledWork = 0; + List work = new LinkedList(); + namesystem.writeLock(); try { synchronized (neededReplications) { - // block should belong to a file - fileINode = blocksMap.getINode(block); - // abandoned block or block reopened for append - if(fileINode == null || fileINode.isUnderConstruction()) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - return false; - } + for (int priority = 0; priority < blocksToReplicate.size(); priority++) { + for (Block block : blocksToReplicate.get(priority)) { + // block should belong to a file + fileINode = blocksMap.getINode(block); + // abandoned block or block reopened for append + if(fileINode == null || fileINode.isUnderConstruction()) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + continue; + } - requiredReplication = fileINode.getReplication(); + requiredReplication = fileINode.getReplication(); - // get a source data-node - containingNodes = new ArrayList(); - liveReplicaNodes = new ArrayList(); - NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas, priority); - if(srcNode == null) // block can not be replicated from any node - { - return false; - } - - assert liveReplicaNodes.size() == numReplicas.liveReplicas(); - // do not schedule more if enough replicas is already pending - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); + // get a source data-node + containingNodes = new ArrayList(); + liveReplicaNodes = new ArrayList(); + NumberReplicas numReplicas = new NumberReplicas(); + srcNode = chooseSourceDatanode( + block, containingNodes, liveReplicaNodes, numReplicas, priority); + if(srcNode == null) // block can not be replicated from any node + continue; + + assert liveReplicaNodes.size() == numReplicas.liveReplicas(); + // do not schedule more if enough replicas is already pending + numEffectiveReplicas = numReplicas.liveReplicas() + + pendingReplications.getNumReplicas(block); - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - blockLog.info("BLOCK* " - + "Removing block " + block - + " from neededReplications as it has enough replicas."); - return false; - } - } + if (numEffectiveReplicas >= requiredReplication) { + if ( (pendingReplications.getNumReplicas(block) > 0) || + (blockHasEnoughRacks(block)) ) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + blockLog.info("BLOCK* " + + "Removing block " + block + + " from neededReplications as it has enough replicas."); + continue; + } + } - if (numReplicas.liveReplicas() < requiredReplication) { - additionalReplRequired = requiredReplication - numEffectiveReplicas; - } else { - additionalReplRequired = 1; //Needed on a new rack + if (numReplicas.liveReplicas() < requiredReplication) { + additionalReplRequired = requiredReplication + - numEffectiveReplicas; + } else { + additionalReplRequired = 1; // Needed on a new rack + } + work.add(new ReplicationWork(block, fileINode, srcNode, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority)); + } } - } } finally { namesystem.writeUnlock(); } - - // Exclude all of the containing nodes from being targets. - // This list includes decommissioning or corrupt nodes. - HashMap excludedNodes = new HashMap(); - for (DatanodeDescriptor dn : containingNodes) { - excludedNodes.put(dn, dn); - } - // choose replication targets: NOT HOLDING THE GLOBAL LOCK - // It is costly to extract the filename for which chooseTargets is called, - // so for now we pass in the Inode itself. - DatanodeDescriptor targets[] = - blockplacement.chooseTarget(fileINode, additionalReplRequired, - srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes()); - if(targets.length == 0) - return false; + HashMap excludedNodes + = new HashMap(); + for(ReplicationWork rw : work){ + // Exclude all of the containing nodes from being targets. + // This list includes decommissioning or corrupt nodes. + excludedNodes.clear(); + for (DatanodeDescriptor dn : rw.containingNodes) { + excludedNodes.put(dn, dn); + } + + // choose replication targets: NOT HOLDING THE GLOBAL LOCK + // It is costly to extract the filename for which chooseTargets is called, + // so for now we pass in the Inode itself. + rw.targets = blockplacement.chooseTarget(rw.fileINode, + rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes, + excludedNodes, rw.block.getNumBytes()); + } namesystem.writeLock(); try { - synchronized (neededReplications) { - // Recheck since global lock was released - // block should belong to a file - fileINode = blocksMap.getINode(block); - // abandoned block or block reopened for append - if(fileINode == null || fileINode.isUnderConstruction()) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; - return false; + for(ReplicationWork rw : work){ + DatanodeDescriptor[] targets = rw.targets; + if(targets == null || targets.length == 0){ + rw.targets = null; + continue; } - requiredReplication = fileINode.getReplication(); - // do not schedule more if enough replicas is already pending - NumberReplicas numReplicas = countNodes(block); - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); - - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + synchronized (neededReplications) { + Block block = rw.block; + int priority = rw.priority; + // Recheck since global lock was released + // block should belong to a file + fileINode = blocksMap.getINode(block); + // abandoned block or block reopened for append + if(fileINode == null || fileINode.isUnderConstruction()) { neededReplications.remove(block, priority); // remove from neededReplications + rw.targets = null; replIndex--; - blockLog.info("BLOCK* " - + "Removing block " + block - + " from neededReplications as it has enough replicas."); - return false; + continue; } - } + requiredReplication = fileINode.getReplication(); - if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { - if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) { - //No use continuing, unless a new rack in this case - return false; + // do not schedule more if enough replicas is already pending + NumberReplicas numReplicas = countNodes(block); + numEffectiveReplicas = numReplicas.liveReplicas() + + pendingReplications.getNumReplicas(block); + + if (numEffectiveReplicas >= requiredReplication) { + if ( (pendingReplications.getNumReplicas(block) > 0) || + (blockHasEnoughRacks(block)) ) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + rw.targets = null; + NameNode.stateChangeLog.info("BLOCK* " + + "Removing block " + block + + " from neededReplications as it has enough replicas."); + continue; + } } - } - // Add block to the to be replicated list - srcNode.addBlockToBeReplicated(block, targets); + if ( (numReplicas.liveReplicas() >= requiredReplication) && + (!blockHasEnoughRacks(block)) ) { + if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) { + //No use continuing, unless a new rack in this case + continue; + } + } - for (DatanodeDescriptor dn : targets) { - dn.incBlocksScheduled(); - } + // Add block to the to be replicated list + rw.srcNode.addBlockToBeReplicated(block, targets); + scheduledWork++; - // Move the block-replication into a "pending" state. - // The reason we use 'pending' is so we can retry - // replications that fail after an appropriate amount of time. - pendingReplications.add(block, targets.length); - if(blockLog.isDebugEnabled()) { - blockLog.debug( - "BLOCK* block " + block - + " is moved from neededReplications to pendingReplications"); - } + for (DatanodeDescriptor dn : targets) { + dn.incBlocksScheduled(); + } - // remove from neededReplications - if(numEffectiveReplicas + targets.length >= requiredReplication) { - neededReplications.remove(block, priority); // remove from neededReplications - replIndex--; + // Move the block-replication into a "pending" state. + // The reason we use 'pending' is so we can retry + // replications that fail after an appropriate amount of time. + pendingReplications.add(block, targets.length); + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* block " + block + + " is moved from neededReplications to pendingReplications"); + } + + // remove from neededReplications + if(numEffectiveReplicas + targets.length >= requiredReplication) { + neededReplications.remove(block, priority); // remove from neededReplications + replIndex--; + } } - if (blockLog.isInfoEnabled()) { + } + } finally { + namesystem.writeUnlock(); + } + + if (blockLog.isInfoEnabled()) { + // log which blocks have been scheduled for replication + for(ReplicationWork rw : work){ + DatanodeDescriptor[] targets = rw.targets; + if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); for (int k = 0; k < targets.length; k++) { targetList.append(' '); targetList.append(targets[k].getName()); } blockLog.info( - "BLOCK* ask " - + srcNode.getName() + " to replicate " - + block + " to " + targetList); - if(blockLog.isDebugEnabled()) { - blockLog.debug( - "BLOCK* neededReplications = " + neededReplications.size() - + " pendingReplications = " + pendingReplications.size()); - } + "BLOCK* ask " + + rw.srcNode.getName() + " to replicate " + + rw.block + " to " + targetList); } } - } finally { - namesystem.writeUnlock(); + } + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "BLOCK* neededReplications = " + neededReplications.size() + + " pendingReplications = " + pendingReplications.size()); } - return true; + return scheduledWork; } /** @@ -2723,4 +2744,34 @@ public class BlockManager { return workFound; } + private static class ReplicationWork { + + private Block block; + private INodeFile fileINode; + + private DatanodeDescriptor srcNode; + private List containingNodes; + private List liveReplicaNodes; + private int additionalReplRequired; + + private DatanodeDescriptor targets[]; + private int priority; + + public ReplicationWork(Block block, + INodeFile fileINode, + DatanodeDescriptor srcNode, + List containingNodes, + List liveReplicaNodes, + int additionalReplRequired, + int priority) { + this.block = block; + this.fileINode = fileINode; + this.srcNode = srcNode; + this.containingNodes = containingNodes; + this.liveReplicaNodes = liveReplicaNodes; + this.additionalReplRequired = additionalReplRequired; + this.priority = priority; + this.targets = null; + } + } } Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1441249&r1=1441248&r2=1441249&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Thu Jan 31 23:05:13 2013 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.bl import static org.junit.Assert.*; import java.io.IOException; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -359,25 +360,35 @@ public class TestBlockManager { bm.blocksMap.addINode(blockInfo, iNode); return blockInfo; } - + private DatanodeDescriptor[] scheduleSingleReplication(Block block) { - assertEquals("Block not initially pending replication", - 0, bm.pendingReplications.getNumReplicas(block)); - assertTrue("computeReplicationWork should indicate replication is needed", - bm.computeReplicationWorkForBlock(block, 1)); + // list for priority 1 + List list_p1 = new ArrayList(); + list_p1.add(block); + + // list of lists for each priority + List> list_all = new ArrayList>(); + list_all.add(new ArrayList()); // for priority 0 + list_all.add(list_p1); // for priority 1 + + assertEquals("Block not initially pending replication", 0, + bm.pendingReplications.getNumReplicas(block)); + assertEquals( + "computeReplicationWork should indicate replication is needed", 1, + bm.computeReplicationWorkForBlocks(list_all)); assertTrue("replication is pending after work is computed", bm.pendingReplications.getNumReplicas(block) > 0); - - LinkedListMultimap repls = - getAllPendingReplications(); + + LinkedListMultimap repls = getAllPendingReplications(); assertEquals(1, repls.size()); - Entry repl = repls.entries().iterator().next(); + Entry repl = repls.entries() + .iterator().next(); DatanodeDescriptor[] targets = repl.getValue().targets; - + DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length]; pipeline[0] = repl.getKey(); System.arraycopy(targets, 0, pipeline, 1, targets.length); - + return pipeline; }