hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1096994 - in /hadoop/hdfs/trunk: CHANGES.txt src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Date Wed, 27 Apr 2011 05:31:15 GMT
Author: eli
Date: Wed Apr 27 05:31:15 2011
New Revision: 1096994

URL: http://svn.apache.org/viewvc?rev=1096994&view=rev
Log:
HDFS-1808. TestBalancer waits forever, errs without giving information. Contributed by Matt
Foley

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1096994&r1=1096993&r2=1096994&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Apr 27 05:31:15 2011
@@ -220,6 +220,9 @@ Trunk (unreleased changes)
     by throwing an error to indicate the editlog needs to be empty.
     (suresh)
 
+    HDFS-1808. TestBalancer waits forever, errs without giving information.
+    (Matt Foley via eli)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1096994&r1=1096993&r2=1096994&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
Wed Apr 27 05:31:15 2011
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.mortbay.log.Log;
 
 import junit.framework.TestCase;
 /**
@@ -53,6 +55,9 @@ public class TestBalancer extends TestCa
 
   ClientProtocol client;
 
+  static final long TIMEOUT = 20000L; //msec
+  static final double CAPACITY_ALLOWED_VARIANCE = 0.005;  // 0.5%
+  static final double BALANCE_ALLOWED_VARIANCE = 0.11;    // 10%+delta
   static final int DEFAULT_BLOCK_SIZE = 10;
   private Balancer balancer;
   private Random r = new Random();
@@ -186,28 +191,101 @@ public class TestBalancer extends TestCa
     cluster.shutdown();
   }
 
-  /* wait for one heartbeat */
-  private void waitForHeartBeat( long expectedUsedSpace, long expectedTotalSpace )
-  throws IOException {
-    long[] status = client.getStats();
-    while(status[0] != expectedTotalSpace || status[1] != expectedUsedSpace ) {
+  /**
+   * Wait until heartbeat gives expected results, within CAPACITY_ALLOWED_VARIANCE, 
+   * summed over all nodes.  Times out after TIMEOUT msec.
+   * @param expectedUsedSpace
+   * @param expectedTotalSpace
+   * @throws IOException - if getStats() fails
+   * @throws TimeoutException
+   */
+  private void waitForHeartBeat(long expectedUsedSpace, long expectedTotalSpace)
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+             : System.currentTimeMillis() + timeout;
+    
+    while (true) {
+      long[] status = client.getStats();
+      double totalSpaceVariance = Math.abs((double)status[0] - expectedTotalSpace) 
+          / expectedTotalSpace;
+      double usedSpaceVariance = Math.abs((double)status[1] - expectedUsedSpace) 
+          / expectedUsedSpace;
+      if (totalSpaceVariance < CAPACITY_ALLOWED_VARIANCE 
+          && usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
+        break; //done
+
+      if (System.currentTimeMillis() > failtime) {
+        throw new TimeoutException("Cluster failed to reached expected values of "
+            + "totalSpace (current: " + status[0] 
+            + ", expected: " + expectedTotalSpace 
+            + "), or usedSpace (current: " + status[1] 
+            + ", expected: " + expectedUsedSpace
+            + "), in more than " + timeout + " msec.");
+      }
       try {
         Thread.sleep(100L);
       } catch(InterruptedException ignored) {
       }
-      status = client.getStats();
     }
   }
+  
+  /**
+   * Wait until balanced: each datanode gives utilization within 
+   * BALANCE_ALLOWED_VARIANCE of average
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  private void waitForBalancer(long totalUsedSpace, long totalCapacity) 
+  throws IOException, TimeoutException {
+    long timeout = TIMEOUT;
+    long failtime = (timeout <= 0L) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + timeout;
+    final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
+    boolean balanced;
+    do {
+      DatanodeInfo[] datanodeReport = 
+          client.getDatanodeReport(DatanodeReportType.ALL);
+      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
+      balanced = true;
+      for (DatanodeInfo datanode : datanodeReport) {
+        double nodeUtilization = ((double)datanode.getDfsUsed())
+            / datanode.getCapacity();
+        if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
+          balanced = false;
+          if (System.currentTimeMillis() > failtime) {
+            throw new TimeoutException(
+                "Rebalancing expected avg utilization to become "
+                + avgUtilization + ", but on datanode " + datanode
+                + " it remains at " + nodeUtilization
+                + " after more than " + TIMEOUT + " msec.");
+          }
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException ignored) {
+          }
+          break;
+        }
+      }
+    } while (!balanced);
+  }
 
-  /* This test start a one-node cluster, fill the node to be 30% full;
-   * It then adds an empty node and start balancing.
-   * @param newCapacity new node's capacity
-   * @param new 
+  /** This test start a cluster with specified number of nodes, 
+   * and fills it to be 30% full (with a single file replicated identically
+   * to all datanodes);
+   * It then adds one new empty node and starts balancing.
+   * 
+   * @param conf - configuration
+   * @param capacities - array of capacities of original nodes in cluster
+   * @param racks - array of racks for original nodes in cluster
+   * @param newCapacity - new node's capacity
+   * @param newRack - new node's rack
+   * @throws Exception
    */
-  private void test(Configuration conf, long[] capacities, String[] racks, 
+  private void doTest(Configuration conf, long[] capacities, String[] racks, 
       long newCapacity, String newRack) throws Exception {
+    assertEquals(capacities.length, racks.length);
     int numOfDatanodes = capacities.length;
-    assertEquals(numOfDatanodes, racks.length);
     cluster = new MiniDFSCluster.Builder(conf)
                                 .numDataNodes(capacities.length)
                                 .racks(racks)
@@ -247,26 +325,8 @@ public class TestBalancer extends TestCa
     balancer.run(new String[0]);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
-    boolean balanced;
-    do {
-      DatanodeInfo[] datanodeReport = 
-        client.getDatanodeReport(DatanodeReportType.ALL);
-      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
-      balanced = true;
-      double avgUtilization = ((double)totalUsedSpace)/totalCapacity*100;
-      for(DatanodeInfo datanode:datanodeReport) {
-        if(Math.abs(avgUtilization-
-            ((double)datanode.getDfsUsed())/datanode.getCapacity()*100)>10) {
-          balanced = false;
-          try {
-            Thread.sleep(100);
-          } catch(InterruptedException ignored) {
-          }
-          break;
-        }
-      }
-    } while(!balanced);
-
+    Log.info("Rebalancing.");
+    waitForBalancer(totalUsedSpace, totalCapacity);
   }
 
   private void runBalancerDefaultConstructor(Configuration conf,
@@ -279,37 +339,19 @@ public class TestBalancer extends TestCa
     balancer.run(new String[0]);
 
     waitForHeartBeat(totalUsedSpace, totalCapacity);
-    boolean balanced;
-    do {
-      DatanodeInfo[] datanodeReport = client
-          .getDatanodeReport(DatanodeReportType.ALL);
-      assertEquals(datanodeReport.length, cluster.getDataNodes().size());
-      balanced = true;
-      double avgUtilization = ((double) totalUsedSpace) / totalCapacity * 100;
-      for (DatanodeInfo datanode : datanodeReport) {
-        if (Math.abs(avgUtilization - ((double) datanode.getDfsUsed())
-            / datanode.getCapacity() * 100) > 10) {
-          balanced = false;
-          try {
-            Thread.sleep(100);
-          } catch (InterruptedException ignored) {
-          }
-          break;
-        }
-      }
-    } while (!balanced);
-
+    Log.info("Rebalancing with default ctor.");
+    waitForBalancer(totalUsedSpace, totalCapacity);
   }
   
   /** one-node cluster test*/
   private void oneNodeTest(Configuration conf) throws Exception {
     // add an empty node with half of the CAPACITY & the same rack
-    test(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
+    doTest(conf, new long[]{CAPACITY}, new String[]{RACK0}, CAPACITY/2, RACK0);
   }
   
   /** two-node cluster test */
   private void twoNodeTest(Configuration conf) throws Exception {
-    test(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
+    doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
         CAPACITY, RACK2);
   }
   



Mime
View raw message