hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From min...@apache.org
Subject hadoop git commit: HDFS-8890. Allow admin to specify which blockpools the balancer should run on. (Chris Trezzo via mingma)
Date Wed, 02 Sep 2015 22:58:29 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 1d56325a8 -> f81d12668


HDFS-8890. Allow admin to specify which blockpools the balancer should run on. (Chris Trezzo
via mingma)

(cherry picked from commit d31a41c35927f02f2fb40d19380b5df4bb2b6d57)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f81d1266
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f81d1266
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f81d1266

Branch: refs/heads/branch-2
Commit: f81d12668f6c5d8b2d1689ccac2c6ecf91a4eee6
Parents: 1d56325
Author: Ming Ma <mingma@apache.org>
Authored: Wed Sep 2 15:55:42 2015 -0700
Committer: Ming Ma <mingma@apache.org>
Committed: Wed Sep 2 15:57:55 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/server/balancer/Balancer.java   |  82 ++++++---
 .../src/site/markdown/HDFSCommands.md           |   2 +
 .../hdfs/server/balancer/TestBalancer.java      |  43 ++++-
 .../TestBalancerWithMultipleNameNodes.java      | 179 ++++++++++++++++---
 5 files changed, 253 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f81d1266/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index def33d3..9553ec3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -535,6 +535,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-328. Improve fs -setrep error message for invalid replication factors.
     (Daniel Templeton via wang)
 
+    HDFS-8890. Allow admin to specify which blockpools the balancer should run
+    on. (Chris Trezzo via mingma)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f81d1266/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 9d3ddd4..c4a4edc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -180,6 +180,8 @@ public class Balancer {
       + "\tExcludes the specified datanodes."
       + "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
       + "\tIncludes only the specified datanodes."
+      + "\n\t[-blockpools <comma-separated list of blockpool ids>]"
+      + "\tThe balancer will only run on blockpools included in this list."
       + "\n\t[-idleiterations <idleiterations>]"
       + "\tNumber of consecutive idle iterations (-1 for Infinite) before "
       + "exit."
@@ -653,22 +655,27 @@ public class Balancer {
         done = true;
         Collections.shuffle(connectors);
         for(NameNodeConnector nnc : connectors) {
-          final Balancer b = new Balancer(nnc, p, conf);
-          final Result r = b.runOneIteration();
-          r.print(iteration, System.out);
-
-          // clean all lists
-          b.resetData(conf);
-          if (r.exitStatus == ExitStatus.IN_PROGRESS) {
-            done = false;
-          } else if (r.exitStatus != ExitStatus.SUCCESS) {
-            //must be an error statue, return.
-            return r.exitStatus.getExitCode();
-          }
-        }
+          if (p.blockpools.size() == 0
+              || p.blockpools.contains(nnc.getBlockpoolID())) {
+            final Balancer b = new Balancer(nnc, p, conf);
+            final Result r = b.runOneIteration();
+            r.print(iteration, System.out);
+
+            // clean all lists
+            b.resetData(conf);
+            if (r.exitStatus == ExitStatus.IN_PROGRESS) {
+              done = false;
+            } else if (r.exitStatus != ExitStatus.SUCCESS) {
+              // must be an error statue, return.
+              return r.exitStatus.getExitCode();
+            }
 
-        if (!done) {
-          Thread.sleep(sleeptime);
+            if (!done) {
+              Thread.sleep(sleeptime);
+            }
+          } else {
+            LOG.info("Skipping blockpool " + nnc.getBlockpoolID());
+          }
         }
       }
     } finally {
@@ -700,12 +707,12 @@ public class Balancer {
   }
 
   static class Parameters {
-    static final Parameters DEFAULT = new Parameters(
-        BalancingPolicy.Node.INSTANCE, 10.0,
-        NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
-        Collections.<String>emptySet(), Collections.<String>emptySet(),
-        Collections.<String>emptySet(),
-        false);
+    static final Parameters DEFAULT =
+        new Parameters(BalancingPolicy.Node.INSTANCE, 10.0,
+            NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
+            Collections.<String> emptySet(), Collections.<String> emptySet(),
+            Collections.<String> emptySet(), Collections.<String> emptySet(),
+            false);
 
     final BalancingPolicy policy;
     final double threshold;
@@ -719,19 +726,25 @@ public class Balancer {
      */
     final Set<String> sourceNodes;
     /**
+     * A set of block pools to run the balancer on.
+     */
+    final Set<String> blockpools;
+    /**
      * Whether to run the balancer during upgrade.
      */
     final boolean runDuringUpgrade;
 
     Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
         Set<String> excludedNodes, Set<String> includedNodes,
-        Set<String> sourceNodes, boolean runDuringUpgrade) {
+        Set<String> sourceNodes, Set<String> blockpools,
+        boolean runDuringUpgrade) {
       this.policy = policy;
       this.threshold = threshold;
       this.maxIdleIteration = maxIdleIteration;
       this.excludedNodes = excludedNodes;
       this.includedNodes = includedNodes;
       this.sourceNodes = sourceNodes;
+      this.blockpools = blockpools;
       this.runDuringUpgrade = runDuringUpgrade;
     }
 
@@ -743,10 +756,11 @@ public class Balancer {
               + " #excluded nodes = %s,"
               + " #included nodes = %s,"
               + " #source nodes = %s,"
+              + " #blockpools = %s,"
               + " run during upgrade = %s]",
-          Balancer.class.getSimpleName(), getClass().getSimpleName(),
-          policy, threshold, maxIdleIteration,
-          excludedNodes.size(), includedNodes.size(), sourceNodes.size(),
+          Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
+          threshold, maxIdleIteration, excludedNodes.size(),
+          includedNodes.size(), sourceNodes.size(), blockpools.size(),
           runDuringUpgrade);
     }
   }
@@ -790,6 +804,7 @@ public class Balancer {
       Set<String> excludedNodes = Parameters.DEFAULT.excludedNodes;
       Set<String> includedNodes = Parameters.DEFAULT.includedNodes;
       Set<String> sourceNodes = Parameters.DEFAULT.sourceNodes;
+      Set<String> blockpools = Parameters.DEFAULT.blockpools;
       boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
 
       if (args != null) {
@@ -829,6 +844,14 @@ public class Balancer {
             } else if ("-source".equalsIgnoreCase(args[i])) {
               sourceNodes = new HashSet<>();
               i = processHostList(args, i, "source", sourceNodes);
+            } else if ("-blockpools".equalsIgnoreCase(args[i])) {
+              checkArgument(
+                  ++i < args.length,
+                  "blockpools value is missing: args = "
+                      + Arrays.toString(args));
+              blockpools = parseBlockPoolList(args[i]);
+              LOG.info("Balancer will run on the following blockpools: "
+                  + blockpools.toString());
             } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
               checkArgument(++i < args.length,
                   "idleiterations value is missing: args = " + Arrays
@@ -854,8 +877,8 @@ public class Balancer {
         }
       }
       
-      return new Parameters(policy, threshold, maxIdleIteration,
-          excludedNodes, includedNodes, sourceNodes, runDuringUpgrade);
+      return new Parameters(policy, threshold, maxIdleIteration, excludedNodes,
+          includedNodes, sourceNodes, blockpools, runDuringUpgrade);
     }
 
     private static int processHostList(String[] args, int i, String type,
@@ -882,6 +905,11 @@ public class Balancer {
       return i;
     }
 
+    private static Set<String> parseBlockPoolList(String string) {
+      String[] addrs = StringUtils.getTrimmedStrings(string);
+      return new HashSet<String>(Arrays.asList(addrs));
+    }
+
     private static void printUsage(PrintStream out) {
       out.println(USAGE + "\n");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f81d1266/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 5fa7220..b4b870b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -265,6 +265,7 @@ Usage:
               [-policy <policy>]
               [-exclude [-f <hosts-file> | <comma-separated list of hosts>]]
               [-include [-f <hosts-file> | <comma-separated list of hosts>]]
+              [-blockpools <comma-separated list of blockpool ids>]
               [-idleiterations <idleiterations>]
 
 | COMMAND\_OPTION | Description |
@@ -273,6 +274,7 @@ Usage:
 | `-threshold` \<threshold\> | Percentage of disk capacity. This overwrites the default
threshold. |
 | `-exclude -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Excludes
the specified datanodes from being balanced by the balancer. |
 | `-include -f` \<hosts-file\> \| \<comma-separated list of hosts\> | Includes
only the specified datanodes to be balanced by the balancer. |
+| `-blockpools` \<comma-separated list of blockpool ids\> | The balancer will only
run on blockpools included in this list. |
 | `-idleiterations` \<iterations\> | Maximum number of idle iterations before exit.
This overwrites the default idleiterations(5). |
 
 Runs a cluster balancing utility. An administrator can simply press Ctrl-C to stop the rebalancing
process. See [Balancer](./HdfsUserGuide.html#Balancer) for more details.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f81d1266/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 5a8c9f8..125d056 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -644,7 +644,7 @@ public class TestBalancer {
             Balancer.Parameters.DEFAULT.maxIdleIteration,
             nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
             Balancer.Parameters.DEFAULT.sourceNodes,
-            false);
+            Balancer.Parameters.DEFAULT.blockpools, false);
       }
 
       int expectedExcludedNodes = 0;
@@ -885,7 +885,7 @@ public class TestBalancer {
           Balancer.Parameters.DEFAULT.maxIdleIteration,
           datanodes, Balancer.Parameters.DEFAULT.includedNodes,
           Balancer.Parameters.DEFAULT.sourceNodes,
-          false);
+          Balancer.Parameters.DEFAULT.blockpools, false);
       final int r = Balancer.run(namenodes, p, conf);
       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     } finally {
@@ -1080,6 +1080,34 @@ public class TestBalancer {
     } catch (IllegalArgumentException e) {
 
     }
+
+    parameters = new String[] { "-blockpools" };
+    try {
+      Balancer.Cli.parse(parameters);
+      fail("IllegalArgumentException is expected when a value "
+          + "is not specified for the blockpool flag");
+    } catch (IllegalArgumentException e) {
+
+    }
+  }
+
+  @Test
+  public void testBalancerCliParseBlockpools() {
+    String[] parameters = new String[] { "-blockpools", "bp-1,bp-2,bp-3" };
+    Balancer.Parameters p = Balancer.Cli.parse(parameters);
+    assertEquals(3, p.blockpools.size());
+
+    parameters = new String[] { "-blockpools", "bp-1" };
+    p = Balancer.Cli.parse(parameters);
+    assertEquals(1, p.blockpools.size());
+
+    parameters = new String[] { "-blockpools", "bp-1,,bp-2" };
+    p = Balancer.Cli.parse(parameters);
+    assertEquals(3, p.blockpools.size());
+
+    parameters = new String[] { "-blockpools", "bp-1," };
+    p = Balancer.Cli.parse(parameters);
+    assertEquals(1, p.blockpools.size());
   }
 
 
@@ -1387,7 +1415,7 @@ public class TestBalancer {
               Parameters.DEFAULT.excludedNodes,
               Parameters.DEFAULT.includedNodes,
               Parameters.DEFAULT.sourceNodes,
-              true);
+              Balancer.Parameters.DEFAULT.blockpools, true);
       assertEquals(ExitStatus.SUCCESS.getExitCode(),
           Balancer.run(namenodes, runDuringUpgrade, conf));
 
@@ -1590,7 +1618,8 @@ public class TestBalancer {
             BalancingPolicy.Node.INSTANCE, 1,
             NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
             Collections.<String> emptySet(), Collections.<String> emptySet(),
-            Collections.<String> emptySet(), false);
+            Collections.<String> emptySet(),
+            Balancer.Parameters.DEFAULT.blockpools, false);
 
         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
         final int r = Balancer.run(namenodes, p, conf);
@@ -1609,7 +1638,7 @@ public class TestBalancer {
           BalancingPolicy.Node.INSTANCE, 1,
           NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
           Collections.<String> emptySet(), Collections.<String> emptySet(),
-          sourceNodes, false);
+          sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
 
         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 50);
         final int r = Balancer.run(namenodes, p, conf);
@@ -1624,7 +1653,7 @@ public class TestBalancer {
           BalancingPolicy.Node.INSTANCE, 1,
           NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
           Collections.<String> emptySet(), Collections.<String> emptySet(),
-          sourceNodes, false);
+          sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
 
         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
         final int r = Balancer.run(namenodes, p, conf);
@@ -1641,7 +1670,7 @@ public class TestBalancer {
           BalancingPolicy.Node.INSTANCE, 1,
           NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
           Collections.<String> emptySet(), Collections.<String> emptySet(),
-          sourceNodes, false);
+          sourceNodes, Balancer.Parameters.DEFAULT.blockpools, false);
 
         conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1);
         final int r = Balancer.run(namenodes, p, conf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f81d1266/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
index f51757c..b07ad89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java
@@ -21,8 +21,13 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +47,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
@@ -60,6 +67,7 @@ public class TestBalancerWithMultipleNameNodes {
   private static final long CAPACITY = 500L;
   private static final String RACK0 = "/rack0";
   private static final String RACK1 = "/rack1";
+  private static final String RACK2 = "/rack2";
 
   private static final String FILE_NAME = "/tmp.txt";
   private static final Path FILE_PATH = new Path(FILE_NAME);
@@ -76,16 +84,20 @@ public class TestBalancerWithMultipleNameNodes {
     final MiniDFSCluster cluster;
     final ClientProtocol[] clients;
     final short replication;
-    
+    final Balancer.Parameters parameters;
+
     Suite(MiniDFSCluster cluster, final int nNameNodes, final int nDataNodes,
-        Configuration conf) throws IOException {
+        Balancer.Parameters parameters, Configuration conf) throws IOException {
       this.conf = conf;
       this.cluster = cluster;
       clients = new ClientProtocol[nNameNodes];
       for(int i = 0; i < nNameNodes; i++) {
         clients[i] = cluster.getNameNode(i).getRpcServer();
       }
-      replication = (short)Math.max(1, nDataNodes - 1);
+      // hard coding replication factor to 1 so logical and raw HDFS size are
+      // equal
+      replication = 1;
+      this.parameters = parameters;
     }
   }
 
@@ -104,11 +116,9 @@ public class TestBalancerWithMultipleNameNodes {
       ) throws IOException, InterruptedException, TimeoutException {
     final ExtendedBlock[][] blocks = new ExtendedBlock[s.clients.length][];
     for(int n = 0; n < s.clients.length; n++) {
-      final long fileLen = size/s.replication;
-      createFile(s, n, fileLen);
-
-      final List<LocatedBlock> locatedBlocks = s.clients[n].getBlockLocations(
-          FILE_NAME, 0, fileLen).getLocatedBlocks();
+      createFile(s, n, size);
+      final List<LocatedBlock> locatedBlocks =
+          s.clients[n].getBlockLocations(FILE_NAME, 0, size).getLocatedBlocks();
 
       final int numOfBlocks = locatedBlocks.size();
       blocks[n] = new ExtendedBlock[numOfBlocks];
@@ -151,9 +161,14 @@ public class TestBalancerWithMultipleNameNodes {
     wait(s.clients, totalUsed, totalCapacity);
     LOG.info("BALANCER 1");
 
+    // get storage reports for relevant blockpools so that we can compare
+    // blockpool usages after balancer has run
+    Map<Integer, DatanodeStorageReport[]> preBalancerPoolUsages =
+        getStorageReports(s);
+
     // start rebalancing
     final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
-    final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
+    final int r = Balancer.run(namenodes, s.parameters, s.conf);
     Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
 
     LOG.info("BALANCER 2");
@@ -189,7 +204,7 @@ public class TestBalancerWithMultipleNameNodes {
       balanced = true;
       for(int d = 0; d < used.length; d++) {
         final double p = used[d]*100.0/cap[d];
-        balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
+        balanced = p <= avg + s.parameters.threshold;
         if (!balanced) {
           if (i % 100 == 0) {
             LOG.warn("datanodes " + d + " is not yet balanced: "
@@ -203,6 +218,89 @@ public class TestBalancerWithMultipleNameNodes {
       }
     }
     LOG.info("BALANCER 6");
+    // cluster is balanced, verify that only selected blockpools were touched
+    Map<Integer, DatanodeStorageReport[]> postBalancerPoolUsages =
+        getStorageReports(s);
+    Assert.assertEquals(preBalancerPoolUsages.size(),
+        postBalancerPoolUsages.size());
+    for (Map.Entry<Integer, DatanodeStorageReport[]> entry
+        : preBalancerPoolUsages.entrySet()) {
+      compareTotalPoolUsage(entry.getValue(),
+          postBalancerPoolUsages.get(entry.getKey()));
+    }
+  }
+
+  /**
+   * Compare the total blockpool usage on each datanode to ensure that nothing
+   * was balanced.
+   *
+   * @param preReports storage reports from pre balancer run
+   * @param postReports storage reports from post balancer run
+   */
+  private static void compareTotalPoolUsage(DatanodeStorageReport[] preReports,
+      DatanodeStorageReport[] postReports) {
+    Assert.assertNotNull(preReports);
+    Assert.assertNotNull(postReports);
+    Assert.assertEquals(preReports.length, postReports.length);
+    for (DatanodeStorageReport preReport : preReports) {
+      String dnUuid = preReport.getDatanodeInfo().getDatanodeUuid();
+      for(DatanodeStorageReport postReport : postReports) {
+        if(postReport.getDatanodeInfo().getDatanodeUuid().equals(dnUuid)) {
+          Assert.assertEquals(getTotalPoolUsage(preReport),
+              getTotalPoolUsage(postReport));
+          LOG.info("Comparision of datanode pool usage pre/post balancer run. "
+              + "PrePoolUsage: " + getTotalPoolUsage(preReport)
+              + ", PostPoolUsage: " + getTotalPoolUsage(postReport));
+          break;
+        }
+      }
+    }
+  }
+
+  private static long getTotalPoolUsage(DatanodeStorageReport report) {
+    long usage = 0L;
+    for (StorageReport sr : report.getStorageReports()) {
+      usage += sr.getBlockPoolUsed();
+    }
+    return usage;
+  }
+
+  /**
+   * Get the storage reports for all blockpools that were not specified by the
+   * balancer blockpool parameters. If none were specified then the parameter
+   * was not set and do not return any reports.
+   *
+   * @param s suite for the test
+   * @return a map of storage reports where the key is the blockpool index
+   * @throws IOException
+   */
+  private static Map<Integer,
+    DatanodeStorageReport[]> getStorageReports(Suite s) throws IOException {
+    Map<Integer, DatanodeStorageReport[]> reports =
+        new HashMap<Integer, DatanodeStorageReport[]>();
+    if (s.parameters.blockpools.size() == 0) {
+      // the blockpools parameter was not set, so we don't need to track any
+      // blockpools.
+      return Collections.emptyMap();
+    }
+    for (int i = 0; i < s.clients.length; i++) {
+      if (s.parameters.blockpools.contains(s.cluster.getNamesystem(i)
+          .getBlockPoolId())) {
+        // we want to ensure that blockpools not specified by the balancer
+        // parameters were left alone. Therefore, if the pool was specified,
+        // skip it. Note: this code assumes the clients in the suite are ordered
+        // the same way that they are indexed via cluster#getNamesystem(index).
+        continue;
+      } else {
+        LOG.info("Tracking usage of blockpool id: "
+            + s.cluster.getNamesystem(i).getBlockPoolId());
+        reports.put(i,
+            s.clients[i].getDatanodeStorageReport(DatanodeReportType.LIVE));
+      }
+    }
+    LOG.info("Tracking " + reports.size()
+        + " blockpool(s) for pre/post balancer usage.");
+    return reports;
   }
 
   private static void sleep(long ms) {
@@ -220,25 +318,31 @@ public class TestBalancerWithMultipleNameNodes {
   }
 
   /**
-   * First start a cluster and fill the cluster up to a certain size.
-   * Then redistribute blocks according the required distribution.
-   * Finally, balance the cluster.
-   * 
+   * First start a cluster and fill the cluster up to a certain size. Then
+   * redistribute blocks according the required distribution. Finally, balance
+   * the cluster.
+   *
    * @param nNameNodes Number of NameNodes
-   * @param distributionPerNN The distribution for each NameNode. 
+   * @param nNameNodesToBalance Number of NameNodes to run the balancer on
+   * @param distributionPerNN The distribution for each NameNode.
    * @param capacities Capacities of the datanodes
    * @param racks Rack names
    * @param conf Configuration
    */
   private void unevenDistribution(final int nNameNodes,
-      long distributionPerNN[], long capacities[], String[] racks,
-      Configuration conf) throws Exception {
+      final int nNameNodesToBalance, long distributionPerNN[],
+      long capacities[], String[] racks, Configuration conf) throws Exception {
     LOG.info("UNEVEN 0");
     final int nDataNodes = distributionPerNN.length;
     if (capacities.length != nDataNodes || racks.length != nDataNodes) {
       throw new IllegalArgumentException("Array length is not the same");
     }
 
+    if (nNameNodesToBalance > nNameNodes) {
+      throw new IllegalArgumentException("Number of namenodes to balance is "
+          + "greater than the number of namenodes.");
+    }
+
     // calculate total space that need to be filled
     final long usedSpacePerNN = TestBalancer.sum(distributionPerNN);
 
@@ -248,7 +352,7 @@ public class TestBalancerWithMultipleNameNodes {
       LOG.info("UNEVEN 1");
       final MiniDFSCluster cluster = new MiniDFSCluster
           .Builder(new Configuration(conf))
-          .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
+              .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(nNameNodes))
           .numDataNodes(nDataNodes)
           .racks(racks)
           .simulatedCapacities(capacities)
@@ -258,7 +362,7 @@ public class TestBalancerWithMultipleNameNodes {
         cluster.waitActive();
         DFSTestUtil.setFederatedConfiguration(cluster, conf);
         LOG.info("UNEVEN 3");
-        final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+        final Suite s = new Suite(cluster, nNameNodes, nDataNodes, null, conf);
         blocks = generateBlocks(s, usedSpacePerNN);
         LOG.info("UNEVEN 4");
       } finally {
@@ -280,7 +384,20 @@ public class TestBalancerWithMultipleNameNodes {
       try {
         cluster.waitActive();
         LOG.info("UNEVEN 12");
-        final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+        Set<String> blockpools = new HashSet<String>();
+        for (int i = 0; i < nNameNodesToBalance; i++) {
+          blockpools.add(cluster.getNamesystem(i).getBlockPoolId());
+        }
+        Balancer.Parameters params =
+            new Balancer.Parameters(Balancer.Parameters.DEFAULT.policy,
+                Balancer.Parameters.DEFAULT.threshold,
+                Balancer.Parameters.DEFAULT.maxIdleIteration,
+                Balancer.Parameters.DEFAULT.excludedNodes,
+                Balancer.Parameters.DEFAULT.includedNodes,
+                Balancer.Parameters.DEFAULT.sourceNodes, blockpools,
+                Balancer.Parameters.DEFAULT.runDuringUpgrade);
+        final Suite s =
+            new Suite(cluster, nNameNodes, nDataNodes, params, conf);
         for(int n = 0; n < nNameNodes; n++) {
           // redistribute blocks
           final Block[][] blocksDN = TestBalancer.distributeBlocks(
@@ -336,7 +453,9 @@ public class TestBalancerWithMultipleNameNodes {
     try {
       cluster.waitActive();
       LOG.info("RUN_TEST 1");
-      final Suite s = new Suite(cluster, nNameNodes, nDataNodes, conf);
+      final Suite s =
+          new Suite(cluster, nNameNodes, nDataNodes,
+              Balancer.Parameters.DEFAULT, conf);
       long totalCapacity = TestBalancer.sum(capacities);
 
       LOG.info("RUN_TEST 2");
@@ -378,10 +497,26 @@ public class TestBalancerWithMultipleNameNodes {
   @Test
   public void testUnevenDistribution() throws Exception {
     final Configuration conf = createConf();
-    unevenDistribution(2,
+    unevenDistribution(2, 2,
         new long[] {30*CAPACITY/100, 5*CAPACITY/100},
         new long[]{CAPACITY, CAPACITY},
         new String[] {RACK0, RACK1},
         conf);
   }
+
+  @Test
+  public void testBalancing1OutOf2Blockpools() throws Exception {
+    final Configuration conf = createConf();
+    unevenDistribution(2, 1, new long[] { 30 * CAPACITY / 100,
+        5 * CAPACITY / 100 }, new long[] { CAPACITY, CAPACITY }, new String[] {
+        RACK0, RACK1 }, conf);
+  }
+
+  @Test
+  public void testBalancing2OutOf3Blockpools() throws Exception {
+    final Configuration conf = createConf();
+    unevenDistribution(3, 2, new long[] { 30 * CAPACITY / 100,
+        5 * CAPACITY / 100, 10 * CAPACITY / 100 }, new long[] { CAPACITY,
+        CAPACITY, CAPACITY }, new String[] { RACK0, RACK1, RACK2 }, conf);
+  }
 }


Mime
View raw message