Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D1AB6DE64 for ; Sun, 16 Dec 2012 08:28:51 +0000 (UTC) Received: (qmail 96088 invoked by uid 500); 16 Dec 2012 08:28:50 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95655 invoked by uid 500); 16 Dec 2012 08:28:47 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95609 invoked by uid 99); 16 Dec 2012 08:28:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Dec 2012 08:28:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 16 Dec 2012 08:28:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D661823888E4 for ; Sun, 16 Dec 2012 08:28:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1422482 - in /hadoop/common/branches/branch-1: ./ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/server/balancer/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop... Date: Sun, 16 Dec 2012 08:28:21 -0000 To: common-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121216082822.D661823888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Sun Dec 16 08:28:17 2012 New Revision: 1422482 URL: http://svn.apache.org/viewvc?rev=1422482&view=rev Log: HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new NetworkTopology with NodeGroup and use generic code for choosing datanode in Balancer. Contributed by Junping Du Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1422482&r1=1422481&r2=1422482&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Sun Dec 16 08:28:17 2012 @@ -51,6 +51,10 @@ Release 1.2.0 - unreleased HADOOP-7096. Allow setting of end-of-record delimiter for TextInputFormat (Ahmed Radwan, backported by suresh) + HDFS-3942. Backport HDFS-3495 and HDFS-4234: Update Balancer to support new + NetworkTopology with NodeGroup and use generic code for choosing datanode + in Balancer. (Junping Du via szetszwo) + IMPROVEMENTS HDFS-3515. Port HDFS-1457 to branch-1. (eli) Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1422482&r1=1422481&r2=1422482&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/net/NetworkTopology.java Sun Dec 16 08:28:17 2012 @@ -26,6 +26,8 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; /** The class represents a cluster of computer with a tree hierarchical * network topology. @@ -341,6 +343,19 @@ public class NetworkTopology { public NetworkTopology() { clusterMap = new InnerNode(InnerNode.ROOT); } + + /** + * Get an instance of NetworkTopology based on the value of the configuration + * parameter net.topology.impl. + * + * @param conf the configuration to be used + * @return an instance of NetworkTopology + */ + public static NetworkTopology getInstance(Configuration conf){ + return (NetworkTopology) ReflectionUtils.newInstance( + conf.getClass("net.topology.impl", NetworkTopology.class, + NetworkTopology.class), conf); + } /** Add a leaf node * Update node counter & rack counter if neccessary Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1422482&r1=1422481&r2=1422482&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sun Dec 16 08:28:17 2012 @@ -60,7 +60,6 @@ import org.apache.hadoop.hdfs.protocol.F import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyDefault; @@ -79,6 +78,7 @@ import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; @@ -173,7 +173,7 @@ import org.apache.hadoop.util.ToolRunner *
    *
  1. The cluster is balanced. Exiting *
  2. No block can be moved. Exiting... - *
  3. No block has been moved for 3 iterations. Exiting... + *
  4. No block has been moved for 5 iterations. Exiting... *
  5. Received an IO exception: failure reason. Exiting... *
  6. Another balancer is running. Exiting... *
@@ -227,7 +227,7 @@ public class Balancer implements Tool { private Map datanodes = new HashMap(); - private NetworkTopology cluster = new NetworkTopology(); + private NetworkTopology cluster; private double avgUtilization = 0.0D; @@ -542,7 +542,7 @@ public class Balancer implements Tool { } /** Decide if still need to move more bytes */ - protected boolean isMoveQuotaFull() { + protected boolean hasSpaceForScheduling() { return scheduledSize0 && - (!srcBlockList.isEmpty() || blocksToReceive>0)) { + while(!isTimeUp && scheduledSize > 0 && + (!srcBlockList.isEmpty() || blocksToReceive > 0)) { PendingBlockMove pendingBlock = chooseNextBlockToMove(); if (pendingBlock != null) { // move the block @@ -796,7 +796,8 @@ public class Balancer implements Tool { */ private void checkReplicationPolicyCompatibility(Configuration conf) throws UnsupportedActionException { - if (BlockPlacementPolicy.getInstance(conf, null, null).getClass() != BlockPlacementPolicyDefault.class) { + if (!(BlockPlacementPolicy.getInstance(conf, null, null) + instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); } @@ -1065,6 +1066,36 @@ public class Balancer implements Tool { LOG.info(msg); } + /** A matcher interface for matching nodes. */ + private interface Matcher { + /** Given the cluster topology, does the left node match the right node? */ + boolean match(NetworkTopology cluster, Node left, Node right); + } + + /** Match datanodes in the same node group. */ + static final Matcher SAME_NODE_GROUP = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return cluster.isOnSameNodeGroup(left, right); + } + }; + + /** Match datanodes in the same rack. */ + static final Matcher SAME_RACK = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return cluster.isOnSameRack(left, right); + } + }; + + /** Match any datanode with any other datanode. */ + static final Matcher ANY_OTHER = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return left != right; + } + }; + /* Decide all pairs and * the number of bytes to move from a source to a target * Maximum bytes to be moved per node is @@ -1072,10 +1103,15 @@ public class Balancer implements Tool { * Return total number of bytes to move in this iteration */ private long chooseNodes() { - // Match nodes on the same rack first - chooseNodes(true); - // Then match nodes on different racks - chooseNodes(false); + // First, match nodes on the same node group if cluster is node group aware + if (cluster.isNodeGroupAware()) { + chooseNodes(SAME_NODE_GROUP); + } + + // Then, match nodes on the same rack + chooseNodes(SAME_RACK); + // At last, match all remaining nodes + chooseNodes(ANY_OTHER); assert (datanodes.size() == overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ @@ -1089,167 +1125,99 @@ public class Balancer implements Tool { } return bytesToMove; } - - /* if onRack is true, decide all pairs - * where source and target are on the same rack; Otherwise - * decide all pairs where source and target are - * on different racks - */ - private void chooseNodes(boolean onRack) { + + /** Decide all pairs according to the matcher. */ + private void chooseNodes(final Matcher matcher) { /* first step: match each overUtilized datanode (source) to * one or more underUtilized datanodes (targets). */ - chooseTargets(underUtilizedDatanodes.iterator(), onRack); + chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher); /* match each remaining overutilized datanode (source) to * below average utilized datanodes (targets). * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ - chooseTargets(belowAvgUtilizedDatanodes.iterator(), onRack); + chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher); - /* match each remaining underutilized datanode to - * above average utilized datanodes. + /* match each remaining underutilized datanode (target) to + * above average utilized datanodes (source). * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ - chooseSources(aboveAvgUtilizedDatanodes.iterator(), onRack); + chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher); } - - /* choose targets from the target candidate list for each over utilized - * source datanode. OnRackTarget determines if the chosen target - * should be on the same rack as the source + + /** + * For each datanode, choose matching nodes from the candidates. Either the + * datanodes or the candidates are source nodes with (utilization > Avg), and + * the others are target nodes with (utilization < Avg). */ - private void chooseTargets( - Iterator targetCandidates, boolean onRackTarget ) { - for (Iterator srcIterator = overUtilizedDatanodes.iterator(); - srcIterator.hasNext();) { - Source source = srcIterator.next(); - while (chooseTarget(source, targetCandidates, onRackTarget)) { - } - if (!source.isMoveQuotaFull()) { - srcIterator.remove(); + private void + chooseDatanodes(Collection datanodes, Collection candidates, + Matcher matcher) { + for (Iterator i = datanodes.iterator(); i.hasNext();) { + final D datanode = i.next(); + for(; chooseForOneDatanode(datanode, candidates, matcher); ); + if (!datanode.hasSpaceForScheduling()) { + i.remove(); } } - return; } - /* choose sources from the source candidate list for each under utilized - * target datanode. onRackSource determines if the chosen source - * should be on the same rack as the target + /** + * For the given datanode, choose a candidate and then schedule it. + * @return true if a candidate is chosen; false if no candidates is chosen. */ - private void chooseSources( - Iterator sourceCandidates, boolean onRackSource) { - for (Iterator targetIterator = - underUtilizedDatanodes.iterator(); targetIterator.hasNext();) { - BalancerDatanode target = targetIterator.next(); - while (chooseSource(target, sourceCandidates, onRackSource)) { - } - if (!target.isMoveQuotaFull()) { - targetIterator.remove(); - } - } - return; - } + private boolean chooseForOneDatanode( + BalancerDatanode dn, Collection candidates, Matcher matcher) { + final Iterator i = candidates.iterator(); + final C chosen = chooseCandidate(dn, i, matcher); - /* For the given source, choose targets from the target candidate list. - * OnRackTarget determines if the chosen target - * should be on the same rack as the source - */ - private boolean chooseTarget(Source source, - Iterator targetCandidates, boolean onRackTarget) { - if (!source.isMoveQuotaFull()) { + if (chosen == null) { return false; } - boolean foundTarget = false; - BalancerDatanode target = null; - while (!foundTarget && targetCandidates.hasNext()) { - target = targetCandidates.next(); - if (!target.isMoveQuotaFull()) { - targetCandidates.remove(); - continue; - } - if (onRackTarget) { - // choose from on-rack nodes - if (cluster.isOnSameRack(source.datanode, target.datanode)) { - foundTarget = true; - } - } else { - // choose from off-rack nodes - if (!cluster.isOnSameRack(source.datanode, target.datanode)) { - foundTarget = true; - } - } + if (dn instanceof Source) { + matchSourceWithTargetToMove((Source)dn, chosen); + } else { + matchSourceWithTargetToMove((Source)chosen, dn); } - if (foundTarget) { - assert(target != null):"Choose a null target"; - long size = Math.min(source.availableSizeToMove(), - target.availableSizeToMove()); - NodeTask nodeTask = new NodeTask(target, size); - source.addNodeTask(nodeTask); - target.incScheduledSize(nodeTask.getSize()); - sources.add(source); - targets.add(target); - if (!target.isMoveQuotaFull()) { - targetCandidates.remove(); - } - LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " - +source.datanode.getName() + " to " + target.datanode.getName()); - return true; + if (!chosen.hasSpaceForScheduling()) { + i.remove(); } - return false; + return true; } - /* For the given target, choose sources from the source candidate list. - * OnRackSource determines if the chosen source - * should be on the same rack as the target - */ - private boolean chooseSource(BalancerDatanode target, - Iterator sourceCandidates, boolean onRackSource) { - if (!target.isMoveQuotaFull()) { - return false; - } - boolean foundSource = false; - Source source = null; - while (!foundSource && sourceCandidates.hasNext()) { - source = sourceCandidates.next(); - if (!source.isMoveQuotaFull()) { - sourceCandidates.remove(); - continue; - } - if (onRackSource) { - // choose from on-rack nodes - if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { - foundSource = true; - } - } else { - // choose from off-rack nodes - if (!cluster.isOnSameRack(source.datanode, target.datanode)) { - foundSource = true; + private void matchSourceWithTargetToMove( + Source source, BalancerDatanode target) { + long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); + NodeTask nodeTask = new NodeTask(target, size); + source.addNodeTask(nodeTask); + target.incScheduledSize(nodeTask.getSize()); + sources.add(source); + targets.add(target); + LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " + +source.datanode.getName() + " to " + target.datanode.getName()); + } + + /** Choose a candidate for the given datanode. */ + private + C chooseCandidate(D dn, Iterator candidates, Matcher matcher) { + if (dn.hasSpaceForScheduling()) { + for(; candidates.hasNext(); ) { + final C c = candidates.next(); + if (!c.hasSpaceForScheduling()) { + candidates.remove(); + } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) { + return c; } } } - if (foundSource) { - assert(source != null):"Choose a null source"; - long size = Math.min(source.availableSizeToMove(), - target.availableSizeToMove()); - NodeTask nodeTask = new NodeTask(target, size); - source.addNodeTask(nodeTask); - target.incScheduledSize(nodeTask.getSize()); - sources.add(source); - targets.add(target); - if ( !source.isMoveQuotaFull()) { - sourceCandidates.remove(); - } - LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " - +source.datanode.getName() + " to " + target.datanode.getName()); - return true; - } - return false; + return null; } private static class BytesMoved { - private long bytesMoved = 0L;; + private long bytesMoved = 0L; private synchronized void inc( long bytes ) { bytesMoved += bytes; } @@ -1391,6 +1359,11 @@ public class Balancer implements Tool { if (block.isLocatedOnDatanode(target)) { return false; } + + if (cluster.isNodeGroupAware() && + isOnSameNodeGroupWithReplicas(target, block, source)) { + return false; + } boolean goodBlock = false; if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { @@ -1423,16 +1396,38 @@ public class Balancer implements Tool { return goodBlock; } + /** + * Check if there are any replica (other than source) on the same node group + * with target. If true, then target is not a good candidate for placing + * specific block replica as we don't want 2 replicas under the same nodegroup + * after balance. + * @param target targetDataNode + * @param block dataBlock + * @param source sourceDataNode + * @return true if there are any replica (other than source) on the same node + * group with target + */ + private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target, + BalancerBlock block, Source source) { + for (BalancerDatanode loc : block.locations) { + if (loc != source && + cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) { + return true; + } + } + return false; + } + /* reset all fields in a balancer preparing for the next iteration */ private void resetData() { - this.cluster = new NetworkTopology(); + this.cluster = NetworkTopology.getInstance(conf); this.overUtilizedDatanodes.clear(); this.aboveAvgUtilizedDatanodes.clear(); this.belowAvgUtilizedDatanodes.clear(); this.underUtilizedDatanodes.clear(); this.datanodes.clear(); this.sources.clear(); - this.targets.clear(); + this.targets.clear(); this.avgUtilization = 0.0D; cleanGlobalBlockList(); this.movedBlocks.cleanup(); @@ -1505,7 +1500,7 @@ public class Balancer implements Tool { Formatter formatter = new Formatter(System.out); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); int iterations = 0; - while (true ) { + while (true) { /* get all live datanodes of a cluster and their disk usage * decide the number of bytes need to be moved */ @@ -1655,6 +1650,7 @@ public class Balancer implements Tool { /** set this balancer's configuration */ public void setConf(Configuration conf) { this.conf = conf; + this.cluster = NetworkTopology.getInstance(conf); movedBlocks.setWinWidth(conf); } Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1422482&r1=1422481&r2=1422482&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sun Dec 16 08:28:17 2012 @@ -524,9 +524,7 @@ public class FSNamesystem implements FSC DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); - this.clusterMap = (NetworkTopology) ReflectionUtils.newInstance( - conf.getClass("net.topology.impl", NetworkTopology.class, - NetworkTopology.class), conf); + this.clusterMap = NetworkTopology.getInstance(conf); this.maxCorruptFilesReturned = conf.getInt( DFSConfigKeys.DFS_MAX_CORRUPT_FILES_RETURNED_KEY, @@ -3863,7 +3861,6 @@ public class FSNamesystem implements FSC // Is the block being reported the last block of an underconstruction file? boolean blockUnderConstruction = false; if (fileINode.isUnderConstruction()) { - INodeFileUnderConstruction cons = (INodeFileUnderConstruction) fileINode; Block last = fileINode.getLastBlock(); if (last == null) { // This should never happen, but better to handle it properly than to throw Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1422482&r1=1422481&r2=1422482&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSCluster.java Sun Dec 16 08:28:17 2012 @@ -27,6 +27,7 @@ import java.nio.channels.FileChannel; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -69,13 +70,12 @@ public class MiniDFSCluster { } private Configuration conf; - private NameNode nameNode; - private int numDataNodes; - private ArrayList dataNodes = + protected NameNode nameNode; + protected int numDataNodes; + protected List dataNodes = new ArrayList(); private File base_dir; - private File data_dir; - + protected File data_dir; /** * This null constructor is used only when wishing to start a data node cluster @@ -439,8 +439,6 @@ public class MiniDFSCluster { waitActive(); } - - /** * Modify the config and start up the DataNodes. The info port for * DataNodes is guaranteed to use a free port. Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java?rev=1422482&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/MiniDFSClusterWithNodeGroup.java Sun Dec 16 08:28:17 2012 @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.StaticMapping; + +public class MiniDFSClusterWithNodeGroup extends MiniDFSCluster { + + private static String[] NODE_GROUPS = null; + private static final Log LOG = LogFactory.getLog(MiniDFSClusterWithNodeGroup.class); + + public MiniDFSClusterWithNodeGroup(int nameNodePort, + Configuration conf, + int numDataNodes, + boolean format, + boolean manageDfsDirs, + StartupOption operation, + String[] racks, + long[] simulatedCapacities) throws IOException { + super(nameNodePort, conf, numDataNodes, format, manageDfsDirs, + manageDfsDirs, operation, racks, null, simulatedCapacities); + } + + public MiniDFSClusterWithNodeGroup(int nameNodePort, + Configuration conf, + int numDataNodes, + boolean format, + boolean manageDfsDirs, + StartupOption operation, + String[] racks, + String[] hosts, + long[] simulatedCapacities) throws IOException { + super(nameNodePort, conf, numDataNodes, format, manageDfsDirs, + manageDfsDirs, operation, racks, hosts, simulatedCapacities); + } + + // NODE_GROUPS should be set before constructor being executed. + public static void setNodeGroups(String[] nodeGroups) { + NODE_GROUPS = nodeGroups; + } + + public synchronized void startDataNodes(Configuration conf, int numDataNodes, + boolean manageDfsDirs, StartupOption operation, + String[] racks, String[] nodeGroups, String[] hosts, + long[] simulatedCapacities) throws IOException { + conf.set("slave.host.name", "127.0.0.1"); + + int curDatanodesNum = dataNodes.size(); + // for mincluster's the default initialDelay for BRs is 0 + if (conf.get(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) { + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0); + } + // If minicluster's name node is null assume that the conf has been + // set with the right address:port of the name node. + // + if (nameNode != null) { // set conf from the name node + InetSocketAddress nnAddr = nameNode.getNameNodeAddress(); + int nameNodePort = nnAddr.getPort(); + FileSystem.setDefaultUri(conf, + "hdfs://"+ nnAddr.getHostName() + + ":" + Integer.toString(nameNodePort)); + } + if (racks != null && numDataNodes > racks.length ) { + throw new IllegalArgumentException( "The length of racks [" + racks.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } + + if (nodeGroups != null && numDataNodes > nodeGroups.length ) { + throw new IllegalArgumentException( "The length of nodeGroups [" + nodeGroups.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } + + if (hosts != null && numDataNodes > hosts.length ) { + throw new IllegalArgumentException( "The length of hosts [" + hosts.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } + //Generate some hostnames if required + if (racks != null && hosts == null) { + hosts = new String[numDataNodes]; + for (int i = curDatanodesNum; i < curDatanodesNum + numDataNodes; i++) { + hosts[i - curDatanodesNum] = "host" + i + ".foo.com"; + } + } + + if (simulatedCapacities != null + && numDataNodes > simulatedCapacities.length) { + throw new IllegalArgumentException( "The length of simulatedCapacities [" + + simulatedCapacities.length + + "] is less than the number of datanodes [" + numDataNodes + "]."); + } + + + // Set up the right ports for the datanodes + conf.set("dfs.datanode.address", "127.0.0.1:0"); + conf.set("dfs.datanode.http.address", "127.0.0.1:0"); + conf.set("dfs.datanode.ipc.address", "127.0.0.1:0"); + + String [] dnArgs = (operation == null || + operation != StartupOption.ROLLBACK) ? + null : new String[] {operation.getName()}; + + for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; i++) { + Configuration dnConf = new Configuration(conf); + + if (manageDfsDirs) { + File dir1 = new File(data_dir, "data"+(2*i+1)); + File dir2 = new File(data_dir, "data"+(2*i+2)); + dir1.mkdirs(); + dir2.mkdirs(); + if (!dir1.isDirectory() || !dir2.isDirectory()) { + throw new IOException("Mkdirs failed to create directory for DataNode " + + i + ": " + dir1 + " or " + dir2); + } + dnConf.set(DataNode.DATA_DIR_KEY, dir1.getPath() + "," + dir2.getPath()); + } + if (simulatedCapacities != null) { + dnConf.setBoolean("dfs.datanode.simulateddatastorage", true); + dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY, + simulatedCapacities[i-curDatanodesNum]); + } + LOG.info("Starting DataNode " + i + " with " + + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": " + + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + if (hosts != null) { + dnConf.set("slave.host.name", hosts[i - curDatanodesNum]); + LOG.info("Starting DataNode " + i + " with hostname set to: " + + dnConf.get("slave.host.name")); + } + if (racks != null) { + String name = hosts[i - curDatanodesNum]; + if (nodeGroups == null) { + LOG.info("Adding node with hostname : " + name + " to rack " + + racks[i-curDatanodesNum]); + StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum]); + } else { + LOG.info("Adding node with hostname : " + name + " to serverGroup " + + nodeGroups[i-curDatanodesNum] + " and rack " + + racks[i-curDatanodesNum]); + StaticMapping.addNodeToRack(name,racks[i-curDatanodesNum] + + nodeGroups[i-curDatanodesNum]); + } + } + Configuration newconf = new Configuration(dnConf); // save config + if (hosts != null) { + NetUtils.addStaticResolution(hosts[i - curDatanodesNum], "localhost"); + } + DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf); + if(dn == null) + throw new IOException("Cannot start DataNode in " + + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY)); + //since the HDFS does things based on IP:port, we need to add the mapping + //for IP:port to rackId + String ipAddr = dn.getSelfAddr().getAddress().getHostAddress(); + if (racks != null) { + int port = dn.getSelfAddr().getPort(); + if (nodeGroups == null) { + LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + + " to rack " + racks[i-curDatanodesNum]); + StaticMapping.addNodeToRack(ipAddr + ":" + port, + racks[i-curDatanodesNum]); + } else { + LOG.info("Adding node with IP:port : " + ipAddr + ":" + port + " to nodeGroup " + + nodeGroups[i-curDatanodesNum] + " and rack " + racks[i-curDatanodesNum]); + StaticMapping.addNodeToRack(ipAddr + ":" + port, racks[i-curDatanodesNum] + + nodeGroups[i-curDatanodesNum]); + } + } + DataNode.runDatanodeDaemon(dn); + dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs)); + } + curDatanodesNum += numDataNodes; + this.numDataNodes += numDataNodes; + waitActive(); + } + + public synchronized void startDataNodes(Configuration conf, int numDataNodes, + boolean manageDfsDirs, StartupOption operation, + String[] racks, String[] nodeGroups, String[] hosts, + long[] simulatedCapacities, + boolean setupHostsFile) throws IOException { + startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, nodeGroups, + hosts, simulatedCapacities); + } + + // This is for initialize from parent class. + @Override + public synchronized void startDataNodes(Configuration conf, int numDataNodes, + boolean manageDfsDirs, StartupOption operation, + String[] racks, String[] hosts, + long[] simulatedCapacities) throws IOException { + startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, NODE_GROUPS, hosts, + simulatedCapacities); + } + +} + Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java?rev=1422482&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java Sun Dec 16 08:28:17 2012 @@ -0,0 +1,367 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSClusterWithNodeGroup; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.net.NetworkTopology; +import org.junit.Assert; +import org.junit.Test; + +/** + * This class tests if a balancer schedules tasks correctly. + */ +public class TestBalancerWithNodeGroup { + + final private static long CAPACITY = 500L; + final private static String RACK0 = "/rack0"; + final private static String RACK1 = "/rack1"; + final private static String NODEGROUP0 = "/nodegroup0"; + final private static String NODEGROUP1 = "/nodegroup1"; + final private static String NODEGROUP2 = "/nodegroup2"; + final static private String fileName = "/tmp.txt"; + final static private Path filePath = new Path(fileName); + private static final Log LOG = LogFactory.getLog( + "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup"); + MiniDFSClusterWithNodeGroup cluster; + + ClientProtocol client; + private Balancer balancer; + + static final long TIMEOUT = 20000L; //msec + static final double CAPACITY_ALLOWED_VARIANCE = 0.005; // 0.5% + static final double BALANCE_ALLOWED_VARIANCE = 0.11; // 10%+delta + static final int DEFAULT_BLOCK_SIZE = 5; + private static final Random r = new Random(); + + static { + Balancer.setBlockMoveWaitTime(1000L) ; + } + + private void initConf(Configuration conf) { + conf.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + conf.setInt("io.bytes.per.checksum", DEFAULT_BLOCK_SIZE); + conf.setLong("dfs.heartbeat.interval", 1L); + conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true); + conf.setLong("dfs.balancer.movedWinWidth", 2000L); + conf.set("net.topology.impl", "org.apache.hadoop.net.NetworkTopologyWithNodeGroup"); + conf.set("dfs.block.replicator.classname", + "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyWithNodeGroup"); + } + + /* create a file with a length of fileLen */ + private void createFile(long fileLen, short replicationFactor) + throws IOException { + FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, filePath, fileLen, + replicationFactor, r.nextLong()); + DFSTestUtil.waitReplication(fs, filePath, replicationFactor); + } + + // create and initiate conf for balancer + private Configuration createConf() { + Configuration conf = new Configuration(); + initConf(conf); + return conf; + } + + /** + * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, + * summed over all nodes. Times out after TIMEOUT msec. + * @param expectedUsedSpace + * @param expectedTotalSpace + * @throws IOException - if getStats() fails + * @throws TimeoutException + */ + private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace) + throws IOException, TimeoutException { + long timeout = TIMEOUT; + long failtime = (timeout <= 0L) ? Long.MAX_VALUE + : System.currentTimeMillis() + timeout; + + while (true) { + long[] status = client.getStats(); + double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) + / expectedTotalSpace; + double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) + / expectedUsedSpace; + if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE + && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE) + break; //done + + if (System.currentTimeMillis() > failtime) { + throw new TimeoutException("Cluster failed to reached expected values of " + + "totalSpace (current: " + status[0] + + ", expected: " + expectedTotalSpace + + "), or usedSpace (current: " + status[1] + + ", expected: " + expectedUsedSpace + + "), in more than " + timeout + " msec."); + } + try { + Thread.sleep(100L); + } catch(InterruptedException ignored) { + } + } + } + + /** + * Wait until balanced: each datanode gives utilization within + * BALANCE_ALLOWED_VARIANCE of average + * @throws IOException + * @throws TimeoutException + */ + private void waitForBalancer(long totalUsedSpace, long totalCapacity) + throws IOException, TimeoutException { + long timeout = TIMEOUT; + long failtime = (timeout <= 0L) ? Long.MAX_VALUE + : System.currentTimeMillis() + timeout; + final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; + boolean balanced; + do { + DatanodeInfo[] datanodeReport = + client.getDatanodeReport(DatanodeReportType.ALL); + assertEquals(datanodeReport.length, cluster.getDataNodes().size()); + balanced = true; + for (DatanodeInfo datanode : datanodeReport) { + double nodeUtilization = ((double)datanode.getDfsUsed()) + / datanode.getCapacity(); + if (Math.abs(avgUtilization - nodeUtilization) > + BALANCE_ALLOWED_VARIANCE) { + balanced = false; + if (System.currentTimeMillis() > failtime) { + throw new TimeoutException( + "Rebalancing expected avg utilization to become " + + avgUtilization + ", but on datanode " + datanode + + " it remains at " + nodeUtilization + + " after more than " + TIMEOUT + " msec."); + } + try { + Thread.sleep(100); + } catch (InterruptedException ignored) { + } + break; + } + } + } while (!balanced); + } + + /** Start balancer and check if the cluster is balanced after the run */ + private void runBalancer(Configuration conf, + long totalUsedSpace, long totalCapacity) throws Exception { + waitForHeartBeat(totalUsedSpace, totalCapacity); + + // start rebalancing + balancer = new Balancer(conf); + + final int r = balancer.run(new String[0]); + + assertEquals(Balancer.SUCCESS, r); + + waitForHeartBeat(totalUsedSpace, totalCapacity); + LOG.info("Rebalancing with default factor."); + waitForBalancer(totalUsedSpace, totalCapacity); + } + + private void runBalancerCanFinish(Configuration conf, + long totalUsedSpace, long totalCapacity) throws Exception { + waitForHeartBeat(totalUsedSpace, totalCapacity); + + balancer = new Balancer(conf); + final int r = balancer.run(new String[0]); + Assert.assertTrue(r == Balancer.SUCCESS || + (r == Balancer.NO_MOVE_PROGRESS)); + waitForHeartBeat(totalUsedSpace, totalCapacity); + LOG.info("Rebalancing ends successful."); + } + + /** + * Create a 4 nodes cluster: 2 nodes (n0, n1) in RACK0/NODEGROUP0, 1 node (n2) + * in RACK1/NODEGROUP1 and 1 node (n3) in RACK1/NODEGROUP2. Fill the cluster + * to 60% and 3 replicas, so n2 and n3 will have replica for all blocks according + * to replica placement policy with NodeGroup. As a result, n2 and n3 will be + * filled with 80% (60% x 4 / 3), and no blocks can be migrated from n2 and n3 + * to n0 or n1 as balancer policy with node group. Thus, we expect the balancer + * to end in 5 iterations without move block process. + */ + @Test + public void testBalancerEndInNoMoveProgress() throws Exception { + Configuration conf = createConf(); + long[] capacities = new long[]{CAPACITY, CAPACITY, CAPACITY, CAPACITY}; + String[] racks = new String[]{RACK0, RACK0, RACK1, RACK1}; + String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP0, NODEGROUP1, NODEGROUP2}; + + int numOfDatanodes = capacities.length; + assertEquals(numOfDatanodes, racks.length); + assertEquals(numOfDatanodes, nodeGroups.length); + MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); + cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length, + true, true, null, racks, capacities); + try { + cluster.waitActive(); + client = DFSClient.createNamenode(conf); + + long totalCapacity = 0L; + for(long capacity : capacities) { + totalCapacity += capacity; + } + + // fill up the cluster to be 60% full + long totalUsedSpace = totalCapacity * 6 / 10; + + createFile(totalUsedSpace / 3, (short) 3); + + // run balancer which can finish in 5 iterations with no block movement. + runBalancerCanFinish(conf, totalUsedSpace, totalCapacity); + + } finally { + cluster.shutdown(); + } + } + + /** + * Create a cluster with even distribution, and a new empty node is added to + * the cluster, then test rack locality for balancer policy. + */ + @Test + public void testBalancerWithRackLocality() throws Exception { + Configuration conf = createConf(); + long[] capacities = new long[]{CAPACITY, CAPACITY}; + String[] racks = new String[]{RACK0, RACK1}; + String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1}; + + int numOfDatanodes = capacities.length; + assertEquals(numOfDatanodes, racks.length); + MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); + + MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); + cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length, + true, true, null, racks, capacities); + try { + cluster.waitActive(); + + client = DFSClient.createNamenode(conf); + + long totalCapacity = 0L; + for(long capacity : capacities) { + totalCapacity += capacity; + } + + // fill up the cluster to be 30% full + long totalUsedSpace = totalCapacity * 3 / 10; + createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes); + + long newCapacity = CAPACITY; + String newRack = RACK1; + String newNodeGroup = NODEGROUP2; + + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, + new String[]{newNodeGroup}, new long[] {newCapacity}); + + totalCapacity += newCapacity; + + // run balancer and validate results + runBalancer(conf, totalUsedSpace, totalCapacity); + + DatanodeInfo[] datanodeReport = + client.getDatanodeReport(DatanodeReportType.ALL); + + Map rackToUsedCapacity = new HashMap(); + for (DatanodeInfo datanode: datanodeReport) { + String rack = NetworkTopology.getFirstHalf(datanode.getNetworkLocation()); + int usedCapacity = (int) datanode.getDfsUsed(); + + if (rackToUsedCapacity.get(rack) != null) { + rackToUsedCapacity.put(rack, usedCapacity + rackToUsedCapacity.get(rack)); + } else { + rackToUsedCapacity.put(rack, usedCapacity); + } + } + assertEquals(rackToUsedCapacity.size(), 2); + assertEquals(rackToUsedCapacity.get(RACK0), rackToUsedCapacity.get(RACK1)); + + } finally { + cluster.shutdown(); + } + } + + /** Create a cluster with even distribution, and a new empty node is added to + * the cluster, then test rack locality for balancer policy. + **/ + @Test + public void testBalancerWithNodeGroup() throws Exception { + Configuration conf = createConf(); + long[] capacities = new long[]{CAPACITY, CAPACITY}; + String[] racks = new String[]{RACK0, RACK1}; + String[] nodeGroups = new String[]{NODEGROUP0, NODEGROUP1}; + + int numOfDatanodes = capacities.length; + assertEquals(numOfDatanodes, racks.length); + MiniDFSClusterWithNodeGroup.setNodeGroups(nodeGroups); + cluster = new MiniDFSClusterWithNodeGroup(0, conf, capacities.length, + true, true, null, racks, capacities); + try { + cluster.waitActive(); + client = DFSClient.createNamenode(conf); + + long totalCapacity = 0L; + for(long capacity : capacities) { + totalCapacity += capacity; + } + + // fill up the cluster to be 30% full + long totalUsedSpace = totalCapacity*3/10; + + createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes); + + long newCapacity = CAPACITY; + String newRack = RACK1; + String newNodeGroup = NODEGROUP2; + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, new String[]{newRack}, + new String[]{newNodeGroup}, new long[] {newCapacity}); + + totalCapacity += newCapacity; + + // run balancer and validate results + runBalancer(conf, totalUsedSpace, totalCapacity); + + } finally { + cluster.shutdown(); + } + } + +} \ No newline at end of file