hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject hadoop git commit: HDFS-8549. Abort the balancer if an upgrade is in progress. Contributed by Andrew Wang. Backport HDFS-11808 by Akira Ajisaka.
Date Tue, 06 Jun 2017 00:46:49 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 f8969e220 -> db81fbbab


HDFS-8549. Abort the balancer if an upgrade is in progress. Contributed by Andrew Wang. Backport
HDFS-11808 by Akira Ajisaka.

(cherry picked from commit a7a7768341f1b7d3a8f2686e2f4d00c57f2e1d4f)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db81fbba
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db81fbba
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db81fbba

Branch: refs/heads/branch-2.7
Commit: db81fbbab1fb0b9c63cc6494380d9105da3782f9
Parents: f8969e2
Author: Andrew Wang <wang@apache.org>
Authored: Wed Jun 10 13:42:57 2015 -0700
Committer: Konstantin V Shvachko <shv@apache.org>
Committed: Mon Jun 5 17:27:52 2017 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hadoop/hdfs/server/balancer/Balancer.java   | 74 +++++++++++++-----
 .../hadoop/hdfs/server/balancer/ExitStatus.java |  3 +-
 .../hdfs/server/balancer/NameNodeConnector.java | 16 ++++
 .../hdfs/server/balancer/TestBalancer.java      | 82 ++++++++++++++++++--
 5 files changed, 150 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/db81fbba/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 2538699..2b58537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -102,6 +102,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-11648. Lazy construct the IIP pathname.
     (daryn via kihwal, backported by zhz)
 
+    HDFS-8549. Abort the balancer if an upgrade is in progress. (wang)
+    Backport HDFS-11808 by Akira Ajisaka.
+
   OPTIMIZATIONS
 
     HDFS-10896. Move lock logging logic from FSNamesystem into FSNamesystemLock.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db81fbba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 4fb6f0e..d5befee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -183,10 +183,17 @@ public class Balancer {
       + "\n\t[-include [-f <hosts-file> | <comma-separated list of hosts>]]"
       + "\tIncludes only the specified datanodes."
       + "\n\t[-idleiterations <idleiterations>]"
-      + "\tNumber of consecutive idle iterations (-1 for Infinite) before exit.";
-  
+      + "\tNumber of consecutive idle iterations (-1 for Infinite) before "
+      + "exit."
+      + "\n\t[-runDuringUpgrade]"
+      + "\tWhether to run the balancer during an ongoing HDFS upgrade."
+      + "This is usually not desired since it will not affect used space "
+      + "on over-utilized machines.";
+
   private final Dispatcher dispatcher;
+  private final NameNodeConnector nnc;
   private final BalancingPolicy policy;
+  private final boolean runDuringUpgrade;
   private final double threshold;
   private final long maxSizeToMove;
 
@@ -262,6 +269,7 @@ public class Balancer {
         DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
         DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
 
+    this.nnc = theblockpool;
     this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
         p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
         maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize,
@@ -272,6 +280,7 @@ public class Balancer {
     this.maxSizeToMove = getLong(conf,
         DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY,
         DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT);
+    this.runDuringUpgrade = p.runDuringUpgrade;
   }
   
   private static long getCapacity(DatanodeStorageReport report, StorageType t) {
@@ -333,7 +342,7 @@ public class Balancer {
           if (thresholdDiff <= 0) { // within threshold
             aboveAvgUtilized.add(s);
           } else {
-            overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             overUtilized.add(s);
           }
           g = s;
@@ -342,7 +351,7 @@ public class Balancer {
           if (thresholdDiff <= 0) { // within threshold
             belowAvgUtilized.add(g);
           } else {
-            underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
+            underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
             underUtilized.add(g);
           }
         }
@@ -364,17 +373,17 @@ public class Balancer {
   private static long computeMaxSize2Move(final long capacity, final long remaining,
       final double utilizationDiff, final double threshold, final long max) {
     final double diff = Math.min(threshold, Math.abs(utilizationDiff));
-    long maxSizeToMove = precentage2bytes(diff, capacity);
+    long maxSizeToMove = percentage2bytes(diff, capacity);
     if (utilizationDiff < 0) {
       maxSizeToMove = Math.min(remaining, maxSizeToMove);
     }
     return Math.min(max, maxSizeToMove);
   }
 
-  private static long precentage2bytes(double precentage, long capacity) {
-    Preconditions.checkArgument(precentage >= 0,
-        "precentage = " + precentage + " < 0");
-    return (long)(precentage * capacity / 100.0);
+  private static long percentage2bytes(double percentage, long capacity) {
+    Preconditions.checkArgument(percentage >= 0, "percentage = %s < 0",
+        percentage);
+    return (long)(percentage * capacity / 100.0);
   }
 
   /* log the over utilized & under utilized nodes */
@@ -565,7 +574,13 @@ public class Balancer {
         LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove)
             + " to make the cluster balanced." );
       }
-      
+
+      // Should not run the balancer during an unfinalized upgrade, since moved
+      // blocks are not deleted on the source datanode.
+      if (!runDuringUpgrade && nnc.isUpgrading()) {
+        return newResult(ExitStatus.UNFINALIZED_UPGRADE, bytesLeftToMove, -1);
+      }
+
       /* Decide all the nodes that will participate in the block move and
        * the number of bytes that need to be moved from one node to another
        * in this iteration. Maximum bytes to be moved per node is
@@ -700,7 +715,8 @@ public class Balancer {
     static final Parameters DEFAULT = new Parameters(
         BalancingPolicy.Node.INSTANCE, 10.0,
         NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS,
-        Collections.<String> emptySet(), Collections.<String> emptySet());
+        Collections.<String> emptySet(), Collections.<String> emptySet(),
+        false);
 
     final BalancingPolicy policy;
     final double threshold;
@@ -709,23 +725,34 @@ public class Balancer {
     Set<String> nodesToBeExcluded;
     //include only these nodes in balancing operations
     Set<String> nodesToBeIncluded;
+    /**
+     * Whether to run the balancer during upgrade.
+     */
+    final boolean runDuringUpgrade;
 
     Parameters(BalancingPolicy policy, double threshold, int maxIdleIteration,
-        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
+        Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded,
+        boolean runDuringUpgrade) {
       this.policy = policy;
       this.threshold = threshold;
       this.maxIdleIteration = maxIdleIteration;
       this.nodesToBeExcluded = nodesToBeExcluded;
       this.nodesToBeIncluded = nodesToBeIncluded;
+      this.runDuringUpgrade = runDuringUpgrade;
     }
 
     @Override
     public String toString() {
-      return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
-          + "[" + policy + ", threshold=" + threshold +
-          ", max idle iteration = " + maxIdleIteration +
-          ", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
-          ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
+      return String.format("%s.%s [%s,"
+              + " threshold = %s,"
+              + " max idle iteration = %s, "
+              + "number of nodes to be excluded = %s,"
+              + " number of nodes to be included = %s,"
+              + " run during upgrade = %s]",
+          Balancer.class.getSimpleName(), getClass().getSimpleName(),
+          policy, threshold, maxIdleIteration,
+          nodesToBeExcluded.size(), nodesToBeIncluded.size(),
+          runDuringUpgrade);
     }
   }
 
@@ -767,6 +794,7 @@ public class Balancer {
       int maxIdleIteration = Parameters.DEFAULT.maxIdleIteration;
       Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
       Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
+      boolean runDuringUpgrade = Parameters.DEFAULT.runDuringUpgrade;
 
       if (args != null) {
         try {
@@ -822,9 +850,16 @@ public class Balancer {
               }
             } else if ("-idleiterations".equalsIgnoreCase(args[i])) {
               checkArgument(++i < args.length,
-                  "idleiterations value is missing: args = " + Arrays.toString(args));
+                  "idleiterations value is missing: args = " + Arrays
+                      .toString(args));
               maxIdleIteration = Integer.parseInt(args[i]);
               LOG.info("Using a idleiterations of " + maxIdleIteration);
+            } else if ("-runDuringUpgrade".equalsIgnoreCase(args[i])) {
+              runDuringUpgrade = true;
+              LOG.info("Will run the balancer even during an ongoing HDFS "
+                  + "upgrade. Most users will not want to run the balancer "
+                  + "during an upgrade since it will not affect used space "
+                  + "on over-utilized machines.");
             } else {
               throw new IllegalArgumentException("args = "
                   + Arrays.toString(args));
@@ -838,7 +873,8 @@ public class Balancer {
         }
       }
       
-      return new Parameters(policy, threshold, maxIdleIteration, nodesTobeExcluded, nodesTobeIncluded);
+      return new Parameters(policy, threshold, maxIdleIteration,
+          nodesTobeExcluded, nodesTobeIncluded, runDuringUpgrade);
     }
 
     private static void printUsage(PrintStream out) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db81fbba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
index e36258f..6bf2986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/ExitStatus.java
@@ -29,7 +29,8 @@ public enum ExitStatus {
   NO_MOVE_PROGRESS(-3),
   IO_EXCEPTION(-4),
   ILLEGAL_ARGUMENTS(-5),
-  INTERRUPTED(-6);
+  INTERRUPTED(-6),
+  UNFINALIZED_UPGRADE(-7);
 
   private final int code;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db81fbba/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
index 30c6fab..0041841 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
@@ -44,7 +44,9 @@ import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@@ -164,6 +166,20 @@ public class NameNodeConnector implements Closeable {
     return namenode.getBlocks(datanode, size);
   }
 
+  /**
+   * @return true if an upgrade is in progress, false if not.
+   * @throws IOException
+   */
+  public boolean isUpgrading() throws IOException {
+    // fsimage upgrade
+    final boolean isUpgrade = !namenode.isUpgradeFinalized();
+    // rolling upgrade
+    RollingUpgradeInfo info = fs.rollingUpgrade(
+        HdfsConstants.RollingUpgradeAction.QUERY);
+    final boolean isRollingUpgrade = (info != null && !info.isFinalized());
+    return (isUpgrade || isRollingUpgrade);
+  }
+
   /** @return live datanode storage reports. */
   public DatanodeStorageReport[] getLiveDatanodeStorageReport()
       throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/db81fbba/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 398ad5a..a884ecb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -740,7 +740,8 @@ public class TestBalancer {
             Balancer.Parameters.DEFAULT.policy,
             Balancer.Parameters.DEFAULT.threshold,
             Balancer.Parameters.DEFAULT.maxIdleIteration,
-            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
+            nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded(),
+            false);
       }
 
       int expectedExcludedNodes = 0;
@@ -986,7 +987,8 @@ public class TestBalancer {
           Balancer.Parameters.DEFAULT.policy,
           Balancer.Parameters.DEFAULT.threshold,
           Balancer.Parameters.DEFAULT.maxIdleIteration,
-          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded);
+          datanodes, Balancer.Parameters.DEFAULT.nodesToBeIncluded,
+          false);
       final int r = Balancer.run(namenodes, p, conf);
       assertEquals(ExitStatus.SUCCESS.getExitCode(), r);
     } finally {
@@ -1417,12 +1419,7 @@ public class TestBalancer {
       Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
 
       // Run Balancer
-      Balancer.Parameters p = new Balancer.Parameters(
-        Parameters.DEFAULT.policy,
-        Parameters.DEFAULT.threshold,
-        Balancer.Parameters.DEFAULT.maxIdleIteration,
-        Parameters.DEFAULT.nodesToBeExcluded,
-        Parameters.DEFAULT.nodesToBeIncluded);
+      final Balancer.Parameters p = Parameters.DEFAULT;
       final int r = Balancer.run(namenodes, p, conf);
 
       // Validate no RAM_DISK block should be moved
@@ -1437,7 +1434,76 @@ public class TestBalancer {
   }
 
   /**
+   * Check that the balancer exits when there is an unfinalized upgrade.
+   */
+  @Test(timeout=300000)
+  public void testBalancerDuringUpgrade() throws Exception {
+    final int SEED = 0xFADED;
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
 
+    final int BLOCK_SIZE = 1024*1024;
+    cluster = new MiniDFSCluster
+        .Builder(conf)
+        .numDataNodes(1)
+        .storageCapacities(new long[] { BLOCK_SIZE * 10 })
+        .storageTypes(new StorageType[] { DEFAULT })
+        .storagesPerDatanode(1)
+        .build();
+
+    try {
+      cluster.waitActive();
+      // Create a file on the single DN
+      final String METHOD_NAME = GenericTestUtils.getMethodName();
+      final Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
+
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, path1, BLOCK_SIZE, BLOCK_SIZE * 2, BLOCK_SIZE,
+          (short) 1, SEED);
+
+      // Add another DN with the same capacity, cluster is now unbalanced
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.triggerHeartbeats();
+      Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+
+      // Run balancer
+      final Balancer.Parameters p = Parameters.DEFAULT;
+
+      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
+      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.PREPARE);
+      fs.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+
+      // Rolling upgrade should abort the balancer
+      assertEquals(ExitStatus.UNFINALIZED_UPGRADE.getExitCode(),
+          Balancer.run(namenodes, p, conf));
+
+      // Should work with the -runDuringUpgrade flag.
+      final Balancer.Parameters runDuringUpgrade =
+          new Balancer.Parameters(Parameters.DEFAULT.policy,
+              Parameters.DEFAULT.threshold,
+              Parameters.DEFAULT.maxIdleIteration,
+              Parameters.DEFAULT.nodesToBeExcluded,
+              Parameters.DEFAULT.nodesToBeIncluded,
+              true);
+      assertEquals(ExitStatus.SUCCESS.getExitCode(),
+          Balancer.run(namenodes, runDuringUpgrade, conf));
+
+      // Finalize the rolling upgrade
+      fs.rollingUpgrade(HdfsConstants.RollingUpgradeAction.FINALIZE);
+
+      // Should also work after finalization.
+      assertEquals(ExitStatus.SUCCESS.getExitCode(),
+          Balancer.run(namenodes, p, conf));
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  /**
    * Test special case. Two replicas belong to same block should not in same node.
    * We have 2 nodes.
    * We have a block in (DN0,SSD) and (DN1,DISK).


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message