hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject hadoop git commit: HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source DataNode. Contributed by Mark Wagner and Zhe Zhang.
Date Mon, 21 Nov 2016 16:39:13 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 38024eb8f -> 774b0cd84


HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source DataNode. Contributed
by Mark Wagner and Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/774b0cd8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/774b0cd8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/774b0cd8

Branch: refs/heads/branch-2
Commit: 774b0cd845a67bc0e2ef99b415ce1e5432769551
Parents: 38024eb
Author: Kihwal Lee <kihwal@apache.org>
Authored: Mon Nov 21 10:38:03 2016 -0600
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Mon Nov 21 10:39:04 2016 -0600

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 +++
 .../hadoop/hdfs/server/balancer/Balancer.java   |  5 ++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 34 ++++++++++++--------
 .../apache/hadoop/hdfs/server/mover/Mover.java  |  5 ++-
 .../src/main/resources/hdfs-default.xml         | 20 ++++++++++++
 .../hdfs/server/balancer/TestBalancer.java      |  2 ++
 6 files changed, 55 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/774b0cd8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c1e6de6..a3b9712 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -478,6 +478,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String  DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
   public static final String  DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
   public static final int     DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
+  public static final String  DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
+  public static final int    DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One
minute
 
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
@@ -486,6 +488,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
   public static final String  DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
   public static final int     DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
+  public static final String  DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
+  public static final int    DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
 
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
   public static final int     DFS_DATANODE_DEFAULT_PORT = 50010;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/774b0cd8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index 68c27b6..94f84a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -283,13 +283,16 @@ public class Balancer {
     final int blockMoveTimeout = conf.getInt(
         DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
         DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
+    final int maxNoMoveInterval = conf.getInt(
+        DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
 
     this.nnc = theblockpool;
     this.dispatcher =
         new Dispatcher(theblockpool, p.getIncludedNodes(),
             p.getExcludedNodes(), movedWinWidth, moverThreads,
             dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
-            getBlocksMinBlockSize, blockMoveTimeout, conf);
+            getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
     this.threshold = p.getThreshold();
     this.policy = p.getBalancingPolicy();
     this.sourceNodes = p.getSourceNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/774b0cd8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 9b4a5e3..5ab6778 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -122,6 +122,11 @@ public class Dispatcher {
   private final long getBlocksSize;
   private final long getBlocksMinBlockSize;
   private final long blockMoveTimeout;
+  /**
+   * If no block can be moved out of a {@link Source} after this configured
+   * amount of time, the Source should give up choosing the next possible move.
+   */
+  private final int maxNoMoveInterval;
 
   private final int ioFileBufferSize;
 
@@ -805,7 +810,7 @@ public class Dispatcher {
      */
     private void dispatchBlocks() {
       this.blocksToReceive = 2 * getScheduledSize();
-      int noPendingMoveIteration = 0;
+      long previousMoveTimestamp = Time.monotonicNow();
       while (getScheduledSize() > 0 && !isIterationOver()
           && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
         if (LOG.isTraceEnabled()) {
@@ -815,8 +820,8 @@ public class Dispatcher {
         }
         final PendingMove p = chooseNextMove();
         if (p != null) {
-          // Reset no pending move counter
-          noPendingMoveIteration=0;
+          // Reset previous move timestamp
+          previousMoveTimestamp = Time.monotonicNow();
           executePendingMove(p);
           continue;
         }
@@ -839,13 +844,11 @@ public class Dispatcher {
             return;
           }
         } else {
-          // source node cannot find a pending block to move, iteration +1
-          noPendingMoveIteration++;
-          // in case no blocks can be moved for source node's task,
-          // jump out of while-loop after 5 iterations.
-          if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
-            LOG.info("Failed to find a pending move "  + noPendingMoveIteration
-                + " times.  Skipping " + this);
+          // jump out of while-loop after the configured timeout.
+          long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp;
+          if (noMoveInterval > maxNoMoveInterval) {
+            LOG.info("Failed to find a pending move for "  + noMoveInterval
+                + " ms.  Skipping " + this);
             resetScheduledSize();
           }
         }
@@ -856,6 +859,9 @@ public class Dispatcher {
           synchronized (Dispatcher.this) {
             Dispatcher.this.wait(1000); // wait for targets/sources to be idle
           }
+          // Didn't find a possible move in this iteration of the while loop,
+          // adding a small delay before choosing next move again.
+          Thread.sleep(100);
         } catch (InterruptedException ignored) {
         }
       }
@@ -880,17 +886,18 @@ public class Dispatcher {
   /** Constructor called by Mover. */
   public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
-      int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
+      int dispatcherThreads, int maxConcurrentMovesPerNode,
+      int maxNoMoveInterval, Configuration conf) {
     this(nnc, includedNodes, excludedNodes, movedWinWidth,
         moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
-        0L, 0L, 0,  conf);
+        0L, 0L, 0, maxNoMoveInterval, conf);
   }
 
   Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode,
       long getBlocksSize, long getBlocksMinBlockSize,
-      int blockMoveTimeout, Configuration conf) {
+      int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -906,6 +913,7 @@ public class Dispatcher {
     this.getBlocksSize = getBlocksSize;
     this.getBlocksMinBlockSize = getBlocksMinBlockSize;
     this.blockMoveTimeout = blockMoveTimeout;
+    this.maxNoMoveInterval = maxNoMoveInterval;
 
     this.saslClient = new SaslDataTransferClient(conf,
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/774b0cd8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index 6c05f2f..2e1b8e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -123,13 +123,16 @@ public class Mover {
     final int maxConcurrentMovesPerNode = conf.getInt(
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    final int maxNoMoveInterval = conf.getInt(
+        DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY,
+        DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT);
     this.retryMaxAttempts = conf.getInt(
         DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
         DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
     this.retryCount = retryCount;
     this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
         Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
-        maxConcurrentMovesPerNode, conf);
+        maxConcurrentMovesPerNode, maxNoMoveInterval, conf);
     this.storages = new StorageMap();
     this.targetPaths = nnc.getTargetPaths();
     this.blockStoragePolicies = new BlockStoragePolicy[1 <<

http://git-wip-us.apache.org/repos/asf/hadoop/blob/774b0cd8/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 5161e7d..84d7ca5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -3142,6 +3142,16 @@
 </property>
 
 <property>
+  <name>dfs.balancer.max-no-move-interval</name>
+  <value>60000</value>
+  <description>
+    If this specified amount of time has elapsed and no block has been moved
+    out of a source DataNode, on more effort will be made to move blocks out of
+    this DataNode in the current Balancer iteration.
+  </description>
+</property>
+
+<property>
   <name>dfs.block.invalidate.limit</name>
   <value>1000</value>
   <description>
@@ -3693,6 +3703,16 @@
 </property>
 
 <property>
+  <name>dfs.mover.max-no-move-interval</name>
+  <value>60000</value>
+  <description>
+    If this specified amount of time has elapsed and no block has been moved
+    out of a source DataNode, on more effort will be made to move blocks out of
+    this DataNode in the current Mover iteration.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.audit.log.async</name>
   <value>false</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/774b0cd8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 8e9bd7b..5fdc066 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -233,6 +233,7 @@ public class TestBalancer {
 
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
   }
 
   static void initConfWithRamDisk(Configuration conf,
@@ -243,6 +244,7 @@ public class TestBalancer {
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
     LazyPersistTestCase.initCacheManipulator();
 
     conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message