hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: HDFS-8549. Abort the balancer if an upgrade is in progress.
Date Wed, 10 Jun 2015 20:43:06 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk c7729efee -> a7a776834


HDFS-8549. Abort the balancer if an upgrade is in progress.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7a77683
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7a77683
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7a77683

Branch: refs/heads/trunk
Commit: a7a7768341f1b7d3a8f2686e2f4d00c57f2e1d4f
Parents: c7729ef
Author: Andrew Wang <wang@apache.org>
Authored: Wed Jun 10 13:42:57 2015 -0700
Committer: Andrew Wang <wang@apache.org>
Committed: Wed Jun 10 13:42:57 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  2 +
 .../hadoop/hdfs/server/balancer/Balancer.java   | 78 +++++++++++++-----
 .../hadoop/hdfs/server/balancer/ExitStatus.java |  3 +-
 .../hdfs/server/balancer/NameNodeConnector.java | 16 ++++
 .../hdfs/server/balancer/TestBalancer.java      | 83 ++++++++++++++++++--
 5 files changed, 152 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7a77683/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index fad2707..3c85773 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -618,6 +618,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-8568. TestClusterId#testFormatWithEmptyClusterIdOption is failing.
     (Rakesh R. via xyao)
 
+    HDFS-8549. Abort the balancer if an upgrade is in progress. (wang)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7a77683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index bc7e448..8b7d802 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -180,12 +180,19 @@ public class Balancer {
       + "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
       + "\tIncludes only the specified datanodes."
       + "\n\t[-idleiterations <idleiterations>]"
-      + "\tNumber of consecutive idle iterations (-1 for Infinite) before exit.";
-  
+      + "\tNumber of consecutive idle iterations (-1 for Infinite) before "
+      + "exit."
+      + "\n\t[-runDuringUpgrade]"
+      + "\tWhether to run the balancer during an ongoing HDFS upgrade."
+      + "This is usually not desired since it will not affect used space "
+      + "on over-utilized machines.";
+
   private final Dispatcher dispatcher;
+  private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final boolean runDuringUpgrade;
   private final double threshold;
-  
+
   // all data node lists
   private final Collection<Source> overUtilized = new LinkedList<Source>();
   private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>();
@@ -227,11 +234,13 @@ public class Balancer {
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
 
+    this.nnc = theblockpool;
     this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
         p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
         maxConcurrentMovesPerNode, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
+    this.runDuringUpgrade = p.runDuringUpgrade;
   }
   
   private static long getCapacity(DatanodeStorageReport report, StorageType t) {
@@ -293,7 +302,7 @@ public class Balancer {
           if (thresholdDiff <= 0) { // within threshold
             aboveAvgUtilized.add(s);
           } else {
-            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             overUtilized.add(s);
           }
           g = s;
@@ -302,7 +311,7 @@ public class Balancer {
           if (thresholdDiff <= 0) { // within threshold
             belowAvgUtilized.add(g);
           } else {
-            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             underUtilized.add(g);
           }
         }
@@ -324,17 +333,17 @@ public class Balancer {
   private static long computeMaxSize2Move(final long capacity, final long remaining,
       final double utilizationDiff, final double threshold) {
     final double diff = Math.min(threshold, Math.abs(utilizationDiff));
-    long maxSizeToMove = precentage2bytes(diff, capacity);
+    long maxSizeToMove = percentage2bytes(diff, capacity);
     if (utilizationDiff < 0) {
       maxSizeToMove = Math.min(remaining, maxSizeToMove);
     }
     return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove);
   }
 
-  private static long precentage2bytes(double precentage, long capacity) {
-    Preconditions.checkArgument(precentage >= 0,
-        "precentage = " + precentage + " < 0");
-    return (long)(precentage * capacity / 100.0);
+  private static long percentage2bytes(double percentage, long capacity) {
+    Preconditions.checkArgument(percentage >= 0, "percentage = %s < 0",
+        percentage);
+    return (long)(percentage * capacity / 100.0);
   }
 
   /* log the over utilized & under utilized nodes */
@@ -516,7 +525,13 @@ public class Balancer {
         LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
             + " to make the cluster balanced." );
       }
-      
+
+      // Should not run the balancer during an unfinalized upgrade, since moved
+      // blocks are not deleted on the source datanode.
+      if (!runDuringUpgrade && nnc.isUpgrading()) {
+        return newResult(ExitStatus.UNFINALIZED_UPGRADE, bytesLeftToMove, -1);
+      }
+
       /* Decide all the nodes that will participate in the block move and
        * the number of bytes that need to be moved from one node to another
        * in this iteration. Maximum bytes to be moved per node is
@@ -530,7 +545,7 @@ public class Balancer {
         LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
             " in this iteration");
       }
-      
+
       /* For each pair of <source, target>, start a thread that repeatedly 
        * decide a block to be moved and its proxy source, 
        * then initiates the move until all bytes are moved or no more block
@@ -634,7 +649,8 @@ public class Balancer {
     static final Parameters DEFAULT = new Parameters(
         BalancingPolicy.Node.INSTANCE, 10.0,
         NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
-        Collections.<String> emptySet(), Collections.<String> emptySet());
+        Collections.<String> emptySet(), Collections.<String> emptySet(),
+        false);
 
     final BalancingPolicy policy;
     final double threshold;
@@ -643,23 +659,34 @@ public class Balancer {
     Set<String> nodesToBeExcluded;
     //include only these nodes in balancing operations
     Set<String> nodesToBeIncluded;
+    /**
+     * Whether to run the balancer during upgrade.
+     */
+    final boolean runDuringUpgrade;
 
     Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
-        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded,
+        boolean runDuringUpgrade) {
       this.policy = policy;
       this.threshold = threshold;
       this.maxIdleIteration = maxIdleIteration;
       this.nodesToBeExcluded = nodesToBeExcluded;
       this.nodesToBeIncluded = nodesToBeIncluded;
+      this.runDuringUpgrade = runDuringUpgrade;
     }
 
     @Override
     public String toString() {
-      return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold +
-          ", max idle iteration = " + maxIdleIteration +
-          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
-          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+      return String.format("%s.%s [%s,"
+              + " threshold = %s,"
+              + " max idle iteration = %s, "
+              + "number of nodes to be excluded = %s,"
+              + " number of nodes to be included = %s,"
+              + " run during upgrade = %s]",
+          Balancer.class.getSimpleName(), getClass().getSimpleName(),
+          policy, threshold, maxIdleIteration,
+          nodesToBeExcluded.size(), nodesToBeIncluded.size(),
+          runDuringUpgrade);
     }
   }
 
@@ -701,6 +728,7 @@ public class Balancer {
       int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
       Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
       Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
+      boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
 
       if (args != null) {
         try {
@@ -756,9 +784,16 @@ public class Balancer {
               }
             } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
               checkArgument(++i < args.length,
-                  "idleiterations value is missing: args = " + Arrays.toString(args));
+                  "idleiterations value is missing: args = " + Arrays
+                      .toString(args));
               maxIdleIteration = Integer.parseInt(args[i]);
               LOG.info("Using a idleiterations of " + maxIdleIteration);
+            } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) {
+              runDuringUpgrade = true;
+              LOG.info("Will run the balancer even during an ongoing HDFS "
+                  + "upgrade. Most users will not want to run the balancer "
+                  + "during an upgrade since it will not affect used space "
+                  + "on over-utilized machines.");
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
@@ -772,7 +807,8 @@ public class Balancer {
         }
       }
       
-      return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded);
+      return new Parameters(policy, threshold, maxIdleIteration,
+          nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade);
     }
 
     private static void printUsage(PrintStream out) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7a77683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
index e36258f..6bf2986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
@@ -29,7 +29,8 @@ public enum ExitStatus {
   NO_MOVE_PROGRESS(-3),
   IO_EXCEPTION(-4),
   ILLEGAL_ARGUMENTS(-5),
-  INTERRUPTED(-6);
+  INTERRUPTED(-6),
+  UNFINALIZED_UPGRADE(-7);
 
   private final int code;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7a77683/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 2e4f214..e62dd08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -43,7 +43,9 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -163,6 +165,20 @@ public class NameNodeConnector implements Closeable {
     return namenode.getBlocks(datanode, size);
   }
 
+  /**
+   * @return true if an upgrade is in progress, false if not.
+   * @throws IOException
+   */
+  public boolean isUpgrading() throws IOException {
+    // fsimage upgrade
+    final boolean isUpgrade = !namenode.isUpgradeFinalized();
+    // rolling upgrade
+    RollingUpgradeInfo info = fs.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.QUERY);
+    final boolean isRollingUpgrade = (info != null && !info.isFinalized());
+    return (isUpgrade || isRollingUpgrade);
+  }
+
   /** @return live datanode storage reports. */
   public DatanodeStorageReport[] getLiveDatanodeStorageReport()
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7a77683/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 92d31d0..1f7bade 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.LazyPersistTestCase;
@@ -627,7 +628,8 @@ public class TestBalancer {
             Balancer.Parameters.DEFAULT.policy,
             Balancer.Parameters.DEFAULT.threshold,
             Balancer.Parameters.DEFAULT.maxIdleIteration,
-            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
+            false);
       }
 
       int expectedExcludedNodes = 0;
@@ -866,7 +868,8 @@ public class TestBalancer {
           Balancer.Parameters.DEFAULT.policy,
           Balancer.Parameters.DEFAULT.threshold,
           Balancer.Parameters.DEFAULT.maxIdleIteration,
-          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
+          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded,
+          false);
       final int r = Balancer.run(namenodes, p, conf);
       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     } finally {
@@ -1296,12 +1299,7 @@ public class TestBalancer {
       Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
 
       // Run Balancer
-      Balancer.Parameters p = new Balancer.Parameters(
-        Parameters.DEFAULT.policy,
-        Parameters.DEFAULT.threshold,
-        Balancer.Parameters.DEFAULT.maxIdleIteration,
-        Parameters.DEFAULT.nodesToBeExcluded,
-        Parameters.DEFAULT.nodesToBeIncluded);
+      final Balancer.Parameters p = Parameters.DEFAULT;
       final int r = Balancer.run(namenodes, p, conf);
 
       // Validate no RAM_DISK block should be moved
@@ -1316,6 +1314,75 @@ public class TestBalancer {
   }
 
   /**
+   * Check that the balancer exits when there is an unfinalized upgrade.
+   */
+  @Test(timeout=300000)
+  public void testBalancerDuringUpgrade() throws Exception {
+    final int SEED = 0xFADED;
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+
+    final int BLOCK_SIZE = 1024*1024;
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(1)
+        .storageCapacities(new long[] { BLOCK_SIZE * 10 })
+        .storageTypes(new StorageType[] { DEFAULT })
+        .storagesPerDatanode(1)
+        .build();
+
+    try {
+      cluster.waitActive();
+      // Create a file on the single DN
+      final String METHOD_NAME = GenericTestUtils.getMethodName();
+      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
+          (short) 1, SEED);
+
+      // Add another DN with the same capacity, cluster is now unbalanced
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.triggerHeartbeats();
+      Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
+
+      // Run balancer
+      final Balancer.Parameters p = Parameters.DEFAULT;
+
+      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
+      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+      // Rolling upgrade should abort the balancer
+      assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
+          Balancer.run(namenodes, p, conf));
+
+      // Should work with the -runDuringUpgrade flag.
+      final Balancer.Parameters runDuringUpgrade =
+          new Balancer.Parameters(Parameters.DEFAULT.policy,
+              Parameters.DEFAULT.threshold,
+              Parameters.DEFAULT.maxIdleIteration,
+              Parameters.DEFAULT.nodesToBeExcluded,
+              Parameters.DEFAULT.nodesToBeIncluded,
+              true);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(),
+          Balancer.run(namenodes, runDuringUpgrade, conf));
+
+      // Finalize the rolling upgrade
+      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
+
+      // Should also work after finalization.
+      assertEquals(ExitStatus.SUCCESS.getExitCode(),
+          Balancer.run(namenodes, p, conf));
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
    * Test special case. Two replicas belong to same block should not in same node.
    * We have 2 nodes.
    * We have a block in (DN0,SSD) and (DN1,DISK).


Mime
View raw message