hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1076502 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/
Date Thu, 03 Mar 2011 02:31:48 GMT
Author: szetszwo
Date: Thu Mar  3 02:31:48 2011
New Revision: 1076502

URL: http://svn.apache.org/viewvc?rev=1076502&view=rev
Log:
HDFS-1683. Test Balancer with multiple NameNodes.

Added:
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1076502&r1=1076501&r2=1076502&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Thu Mar  3 02:31:48 2011
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
 
     HDFS-1707. Federation: Failure in browsing data on new namenodes. (jitendra)
 
+    HDFS-1683. Test Balancer with multiple NameNodes.  (szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1076502&r1=1076501&r2=1076502&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
Thu Mar  3 02:31:48 2011
@@ -471,6 +471,12 @@ public class Balancer {
     private List<PendingBlockMove> pendingBlocks = 
       new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); 
     
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "[" + getName()
+          + ", utilization=" + utilization + "]";
+    }
+
     /* Constructor 
      * Depending on avgutil & threshold, calculate maximum bytes to move 
      */
@@ -844,7 +850,7 @@ public class Balancer {
     }
 
     //logging
-    logImbalancedNodes();
+    logNodes();
     
     assert (this.datanodes.size() == 
       overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
@@ -856,25 +862,20 @@ public class Balancer {
   }
 
   /* log the over utilized & under utilized nodes */
-  private void logImbalancedNodes() {
-    StringBuilder msg = new StringBuilder();
-    msg.append(overUtilizedDatanodes.size());
-    msg.append(" over utilized nodes:");
-    for (Source node : overUtilizedDatanodes) {
-      msg.append( " " );
-      msg.append( node.getName() );
-    }
-    LOG.info(msg);
-    msg = new StringBuilder();
-    msg.append(underUtilizedDatanodes.size());
-    msg.append(" under utilized nodes: ");
-    for (BalancerDatanode node : underUtilizedDatanodes) {
-      msg.append( " " );
-      msg.append( node.getName() );
-    }
-    LOG.info(msg);
+  private void logNodes() {
+    logNodes("over-utilized", overUtilizedDatanodes);
+    if (LOG.isTraceEnabled()) {
+      logNodes("above-average", aboveAvgUtilizedDatanodes);
+      logNodes("below-average", belowAvgUtilizedDatanodes);
+    }
+    logNodes("underutilized", underUtilizedDatanodes);
   }
-  
+
+  private static <T extends BalancerDatanode> void logNodes(
+      String name, Collection<T> nodes) {
+    LOG.info(nodes.size() + " " + name + ": " + nodes);
+  }
+
   /* Decide all <source, target> pairs and
    * the number of bytes to move from a source to a target
    * Maximum bytes to be moved per node is
@@ -1310,7 +1311,7 @@ public class Balancer {
         return ReturnStatus.SUCCESS;
       } else {
         LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
-            +" bytes to make the cluster balanced." );
+            + " to make the cluster balanced." );
       }
       
       /* Decide all the nodes that will participate in the block move and
@@ -1324,7 +1325,7 @@ public class Balancer {
         return ReturnStatus.NO_MOVE_BLOCK;
       } else {
         LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) +
-            "bytes in this iteration");
+            " in this iteration");
       }
 
       formatter.format("%-24s %10d  %19s  %18s  %17s\n", 
@@ -1395,12 +1396,12 @@ public class Balancer {
       }
     
       boolean done = false;
-      for(int iterations = 0; !done; iterations++) {
+      for(int iteration = 0; !done; iteration++) {
         done = true;
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
           final Balancer b = new Balancer(nnc, p, conf);
-          final ReturnStatus r = b.run(iterations, formatter);
+          final ReturnStatus r = b.run(iteration, formatter);
           if (r == ReturnStatus.IN_PROGRESS) {
             done = false;
           } else if (r != ReturnStatus.SUCCESS) {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1076502&r1=1076501&r2=1076502&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Thu Mar  3 02:31:48 2011
@@ -745,6 +745,8 @@ public class DataNode extends Configured
         conf.getBoolean("dfs.datanode.simulateddatastorage", false);
       
       if (simulatedFSDataset) {
+        initFsDataSet(conf, dataDirs);
+
         bpRegistration.setStorageID(datanodeId.getStorageID()); //same as DN
         bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
         bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
@@ -758,8 +760,8 @@ public class DataNode extends Configured
 
         bpRegistration.setStorageID(storage.getStorageID());
         bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
+        initFsDataSet(conf, dataDirs);
       }
-      initFsDataSet(conf, dataDirs);
       data.addBlockPool(blockPoolId, conf);
     }
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1076502&r1=1076501&r2=1076502&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
Thu Mar  3 02:31:48 2011
@@ -719,7 +719,7 @@ public class MiniDFSCluster {
         conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
       }
       if (simulatedCapacities != null) {
-        dnConf.setBoolean("dfs.datanode.simulateddatastorage", true);
+        dnConf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
         dnConf.setLong(SimulatedFSDataset.CONFIG_PROPERTY_CAPACITY,
             simulatedCapacities[i-curDatanodesNum]);
       }
@@ -1404,7 +1404,25 @@ public class MiniDFSCluster {
     sdataset.injectBlocks(bpid, blocksToInject);
     dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
   }
-  
+
+  /**
+   * Multiple-NameNode version of {@link #injectBlocks(Iterable[])}.
+   */
+  public void injectBlocks(int nameNodeIndex, int dataNodeIndex,
+      Iterable<Block> blocksToInject) throws IOException {
+    if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
+      throw new IndexOutOfBoundsException();
+    }
+    FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset();
+    if (!(dataSet instanceof SimulatedFSDataset)) {
+      throw new IOException("injectBlocks is valid only for SimilatedFSDataset");
+    }
+    String bpid = getNamesystem(nameNodeIndex).getBlockPoolId();
+    SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
+    sdataset.injectBlocks(bpid, blocksToInject);
+    dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
+  }
+
   /**
    * This method is valid only if the data nodes have simulated data
    * @param blocksToInject - blocksToInject[] is indexed in the same order as the list 

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1076502&r1=1076501&r2=1076502&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
(original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Thu Mar  3 02:31:48 2011
@@ -57,18 +57,18 @@ public class TestBalancer extends TestCa
   ClientProtocol client;
 
   static final int DEFAULT_BLOCK_SIZE = 10;
-  private Random r = new Random();
+  private static final Random r = new Random();
 
   static {
     Balancer.setBlockMoveWaitTime(1000L) ;
   }
 
-  private void initConf(Configuration conf) {
+  static void initConf(Configuration conf) {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
-    conf.setLong("dfs.heartbeat.interval", 1L);
+    conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
     conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
-    conf.setLong("dfs.balancer.movedWinWidth", 2000L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
   }
 
   /* create a file with a length of <code>fileLen</code> */
@@ -113,8 +113,8 @@ public class TestBalancer extends TestCa
   }
 
   /* Distribute all blocks according to the given distribution */
-  Block[][] distributeBlocks(ExtendedBlock[] blocks, short replicationFactor, 
-      final long[] distribution ) {
+  static Block[][] distributeBlocks(ExtendedBlock[] blocks,
+      short replicationFactor, final long[] distribution) {
     // make a copy
     long[] usedSpace = new long[distribution.length];
     System.arraycopy(distribution, 0, usedSpace, 0, distribution.length);
@@ -145,6 +145,14 @@ public class TestBalancer extends TestCa
     return results;
   }
 
+  static long sum(long[] x) {
+    long s = 0L;
+    for(long a : x) {
+      s += a;
+    }
+    return s;
+  }
+
   /* we first start a cluster and fill the cluster up to a certain size.
    * then redistribute blocks according the required distribution.
    * Afterwards a balancer is running to balance the cluster.
@@ -157,10 +165,7 @@ public class TestBalancer extends TestCa
     }
 
     // calculate total space that need to be filled
-    long totalUsedSpace=0L;
-    for(int i=0; i<distribution.length; i++) {
-      totalUsedSpace += distribution[i];
-    }
+    final long totalUsedSpace = sum(distribution);
 
     // fill the cluster
     ExtendedBlock[] blocks = generateBlocks(conf, totalUsedSpace,
@@ -183,10 +188,7 @@ public class TestBalancer extends TestCa
     for(int i = 0; i < blocksDN.length; i++)
       cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
 
-    long totalCapacity = 0L;
-    for(long capacity:capacities) {
-      totalCapacity += capacity;
-    }
+    final long totalCapacity = sum(capacities);
     runBalancer(conf, totalUsedSpace, totalCapacity);
     cluster.shutdown();
   }
@@ -222,10 +224,8 @@ public class TestBalancer extends TestCa
       cluster.waitActive();
       client = DFSClient.createNamenode(conf);
 
-      long totalCapacity=0L;
-      for(long capacity:capacities) {
-        totalCapacity += capacity;
-      }
+      long totalCapacity = sum(capacities);
+      
       // fill up the cluster to be 30% full
       long totalUsedSpace = totalCapacity*3/10;
       createFile(totalUsedSpace/numOfDatanodes, (short)numOfDatanodes);
@@ -333,10 +333,8 @@ public class TestBalancer extends TestCa
       cluster.waitActive();
       client = DFSClient.createNamenode(conf);
 
-      long totalCapacity = 0L;
-      for (long capacity : capacities) {
-        totalCapacity += capacity;
-      }
+      long totalCapacity = sum(capacities);
+
       // fill up the cluster to be 30% full
       long totalUsedSpace = totalCapacity * 3 / 10;
       createFile(totalUsedSpace / numOfDatanodes, (short) numOfDatanodes);

Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java?rev=1076502&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
(added)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
Thu Mar  3 02:31:48 2011
@@ -0,0 +1,388 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test balancer with multiple NameNodes
+ */
+public class TestBalancerWithMultipleNameNodes {
+  static final Log LOG = Balancer.LOG;
+  {
+    ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.OFF);
+//    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.OFF);
+  }
+
+  
+  private static final long CAPACITY = 500L;
+  private static final String RACK0 = "/rack0";
+  private static final String RACK1 = "/rack1";
+  private static final String RACK2 = "/rack2";
+
+  private static final String FILE_NAME = "/tmp.txt";
+  private static final Path FILE_PATH = new Path(FILE_NAME);
+  
+  private static final Random RANDOM = new Random();
+
+  static {
+    Balancer.setBlockMoveWaitTime(1000L) ;
+  }
+
+  /** Common objects used in various methods. */
+  private static class Suite {
+    final Configuration conf;
+    final MiniDFSCluster cluster;
+    final ClientProtocol[] clients;
+    final short replication;
+    
+    Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
+        Configuration conf) throws IOException {
+      this.conf = conf;
+      this.cluster = cluster;
+      clients = new ClientProtocol[nNameNodes];
+      for(int i = 0; i < nNameNodes; i++) {
+        clients[i] = cluster.getNameNode(i);
+      }
+      replication = (short)Math.max(1, nDataNodes - 1);
+    }
+  }
+
+  /* create a file with a length of <code>fileLen</code> */
+  private static void createFile(Suite s, int index, long len
+      ) throws IOException {
+    final FileSystem fs = s.cluster.getFileSystem(index);
+    DFSTestUtil.createFile(fs, FILE_PATH, len, s.replication, RANDOM.nextLong());
+    DFSTestUtil.waitReplication(fs, FILE_PATH, s.replication);
+  }
+
+  /* fill up a cluster with <code>numNodes</code> datanodes 
+   * whose used space to be <code>size</code>
+   */
+  private static ExtendedBlock[][] generateBlocks(Suite s, long size
+      ) throws IOException {
+    final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
+    for(int n = 0; n < s.clients.length; n++) {
+      final long fileLen = size/s.replication;
+      createFile(s, n, fileLen);
+
+      final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
+          FILE_NAME, 0, fileLen).getLocatedBlocks();
+
+      final int numOfBlocks = locatedBlocks.size();
+      blocks[n] = new ExtendedBlock[numOfBlocks];
+      for(int i = 0; i < numOfBlocks; i++) {
+        final ExtendedBlock b = locatedBlocks.get(i).getBlock();
+        blocks[n][i] = new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
+            b.getNumBytes(), b.getGenerationStamp());
+      }
+    }
+    return blocks;
+  }
+
+  /* wait for one heartbeat */
+  static void wait(final ClientProtocol[] clients,
+      long expectedUsedSpace, long expectedTotalSpace) throws IOException {
+    LOG.info("WAIT expectedUsedSpace=" + expectedUsedSpace
+        + ", expectedTotalSpace=" + expectedTotalSpace);
+    for(int n = 0; n < clients.length; n++) {
+      int i = 0;
+      for(boolean done = false; !done; ) {
+        final long[] s = clients[n].getStats();
+        done = s[0] == expectedTotalSpace && s[1] == expectedUsedSpace;
+        if (!done) {
+          sleep(100L);
+          if (++i % 100 == 0) {
+            LOG.warn("WAIT i=" + i + ", s=[" + s[0] + ", " + s[1] + "]");
+          }
+        }
+      }
+    }
+  }
+
+  static void runBalancer(Suite s,
+      final long totalUsed, final long totalCapacity) throws Exception {
+    final double avg = totalUsed*100.0/totalCapacity;
+
+    LOG.info("BALANCER 0: totalUsed=" + totalUsed
+        + ", totalCapacity=" + totalCapacity
+        + ", avg=" + avg);
+    wait(s.clients, totalUsed, totalCapacity);
+    LOG.info("BALANCER 1");
+
+    // start rebalancing
+    final List<InetSocketAddress> namenodes =new ArrayList<InetSocketAddress>();
+    namenodes.add(NameNode.getServiceAddress(s.conf, true));
+    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
+    Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
+
+    LOG.info("BALANCER 2");
+    wait(s.clients, totalUsed, totalCapacity);
+    LOG.info("BALANCER 3");
+
+    int i = 0;
+    for(boolean balanced = false; !balanced; i++) {
+      final long[] used = new long[s.cluster.getDataNodes().size()];
+      final long[] cap = new long[used.length];
+
+      for(int n = 0; n < s.clients.length; n++) {
+        final DatanodeInfo[] datanodes = s.clients[n].getDatanodeReport(
+            DatanodeReportType.ALL);
+        Assert.assertEquals(datanodes.length, used.length);
+
+        for(int d = 0; d < datanodes.length; d++) {
+          if (n == 0) {
+            used[d] = datanodes[d].getDfsUsed();
+            cap[d] = datanodes[d].getCapacity();
+            if (i % 100 == 0) {
+              LOG.warn("datanodes[" + d
+                  + "]: getDfsUsed()=" + datanodes[d].getDfsUsed()
+                  + ", getCapacity()=" + datanodes[d].getCapacity());
+            }
+          } else {
+            Assert.assertEquals(used[d], datanodes[d].getDfsUsed());
+            Assert.assertEquals(cap[d], datanodes[d].getCapacity());
+          }
+        }
+      }
+
+      balanced = true;
+      for(int d = 0; d < used.length; d++) {
+        final double p = used[d]*100.0/cap[d];
+        balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold;
+        if (!balanced) {
+          if (i % 100 == 0) {
+            LOG.warn("datanodes " + d + " is not yet balanced: "
+                + "used=" + used[d] + ", cap=" + cap[d] + ", avg=" + avg);
+            LOG.warn("TestBalancer.sum(used)=" + TestBalancer.sum(used)
+                + ", TestBalancer.sum(cap)=" + TestBalancer.sum(cap));
+          }
+          sleep(100);
+          break;
+        }
+      }
+    }
+    LOG.info("BALANCER 6");
+  }
+
+  private static void sleep(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch(InterruptedException e) {
+      LOG.error(e);
+    }
+  }
+  
+  private static Configuration createConf() {
+    final Configuration conf = new HdfsConfiguration();
+    TestBalancer.initConf(conf);
+    return conf;
+  }
+
+  /**
+   * First start a cluster and fill the cluster up to a certain size.
+   * Then redistribute blocks according the required distribution.
+   * Finally, balance the cluster.
+   * 
+   * @param nNameNodes Number of NameNodes
+   * @param distributionPerNN The distribution for each NameNode. 
+   * @param capacities Capacities of the datanodes
+   * @param racks Rack names
+   * @param conf Configuration
+   */
+  private void unevenDistribution(final int nNameNodes,
+      long distributionPerNN[], long capacities[], String[] racks,
+      Configuration conf) throws Exception {
+    LOG.info("UNEVEN 0");
+    final int nDataNodes = distributionPerNN.length;
+    if (capacities.length != nDataNodes || racks.length != nDataNodes) {
+      throw new IllegalArgumentException("Array length is not the same");
+    }
+
+    // calculate total space that need to be filled
+    final long usedSpacePerNN = TestBalancer.sum(distributionPerNN);
+
+    // fill the cluster
+    final ExtendedBlock[][] blocks;
+    {
+      LOG.info("UNEVEN 1");
+      final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+          .numNameNodes(nNameNodes)
+          .numDataNodes(nDataNodes)
+          .racks(racks)
+          .simulatedCapacities(capacities)
+          .build();
+      LOG.info("UNEVEN 2");
+      try {
+        cluster.waitActive();
+        LOG.info("UNEVEN 3");
+        final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+        blocks = generateBlocks(s, usedSpacePerNN);
+        LOG.info("UNEVEN 4");
+      } finally {
+        cluster.shutdown();
+      }
+    }
+
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY, "0.0f");
+    {
+      LOG.info("UNEVEN 10");
+      final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+          .numNameNodes(nNameNodes)
+          .numDataNodes(nDataNodes)
+          .racks(racks)
+          .simulatedCapacities(capacities)
+          .format(false)
+          .build();
+      LOG.info("UNEVEN 11");
+      try {
+        cluster.waitActive();
+        LOG.info("UNEVEN 12");
+        final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+        for(int n = 0; n < nNameNodes; n++) {
+          // redistribute blocks
+          final Block[][] blocksDN = TestBalancer.distributeBlocks(
+              blocks[n], s.replication, distributionPerNN);
+    
+          for(int d = 0; d < blocksDN.length; d++)
+            cluster.injectBlocks(n, d, Arrays.asList(blocksDN[d]));
+
+          LOG.info("UNEVEN 13: n=" + n);
+        }
+    
+        final long totalCapacity = TestBalancer.sum(capacities);
+        final long totalUsed = nNameNodes*usedSpacePerNN;
+        LOG.info("UNEVEN 14");
+        runBalancer(s, totalUsed, totalCapacity);
+        LOG.info("UNEVEN 15");
+      } finally {
+        cluster.shutdown();
+      }
+      LOG.info("UNEVEN 16");
+    }
+  }
+
+
+  /**
+   * This test start a cluster, fill the DataNodes to be 30% full;
+   * It then adds an empty node and start balancing.
+   *
+   * @param nNameNodes Number of NameNodes
+   * @param capacities Capacities of the datanodes
+   * @param racks Rack names
+   * @param newCapacity the capacity of the new DataNode
+   * @param newRack the rack for the new DataNode
+   * @param conf Configuration
+   */ 
+  private void runTest(final int nNameNodes, long[] capacities, String[] racks,
+      long newCapacity, String newRack, Configuration conf) throws Exception {
+    final int nDataNodes = capacities.length;
+    LOG.info("nNameNodes=" + nNameNodes + ", nDataNodes=" + nDataNodes);
+    Assert.assertEquals(nDataNodes, racks.length);
+
+    LOG.info("RUN_TEST -1");
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numNameNodes(nNameNodes)
+        .numDataNodes(nDataNodes)
+        .racks(racks)
+        .simulatedCapacities(capacities)
+        .build();
+    LOG.info("RUN_TEST 0");
+
+    try {
+      cluster.waitActive();
+      LOG.info("RUN_TEST 1");
+      final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+      long totalCapacity = TestBalancer.sum(capacities);
+
+      LOG.info("RUN_TEST 2");
+      // fill up the cluster to be 30% full
+      final long totalUsed = totalCapacity*3/10;
+      final long size = (totalUsed/nNameNodes)/s.replication;
+      for(int n = 0; n < nNameNodes; n++) {
+        createFile(s, n, size);
+      }
+
+      LOG.info("RUN_TEST 3");
+      // start up an empty node with the same capacity and on the same rack
+      cluster.startDataNodes(conf, 1, true, null,
+          new String[]{newRack}, new long[]{newCapacity});
+
+      totalCapacity += newCapacity;
+
+      LOG.info("RUN_TEST 4");
+      // run RUN_TEST and validate results
+      runBalancer(s, totalUsed, totalCapacity);
+      LOG.info("RUN_TEST 5");
+    } finally {
+      cluster.shutdown();
+    }
+    LOG.info("RUN_TEST 6");
+  }
+  
+  /** Test a cluster with even distribution, 
+   * then a new empty node is added to the cluster
+   */
+  @Test
+  public void testBalancer() throws Exception {
+    final Configuration conf = createConf();
+    runTest(2, new long[]{CAPACITY}, new String[]{RACK0},
+        CAPACITY/2, RACK0, conf);
+  }
+
+  /** Test unevenly distributed cluster */
+  @Test
+  public void testUnevenDistribution() throws Exception {
+    final Configuration conf = createConf();
+    unevenDistribution(2,
+        new long[] {30*CAPACITY/100, 5*CAPACITY/100},
+        new long[]{CAPACITY, CAPACITY},
+        new String[] {RACK0, RACK1},
+        conf);
+  }
+}



Mime
View raw message