hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject hadoop git commit: HDFS-11384. Balancer disperses getBlocks calls to avoid NameNode's rpc queue saturation. Contributed by Konstantin V Shvachko.
Date Thu, 27 Apr 2017 19:20:04 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 bec23593b -> cf5abf8a7


HDFS-11384. Balancer disperses getBlocks calls to avoid NameNode's rpc queue saturation. Contributed
by Konstantin V Shvachko.

(cherry picked from commit 28eb2aabebd15c15a357d86e23ca407d3c85211c)
# Conflicts:
#	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
#	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf5abf8a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf5abf8a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf5abf8a

Branch: refs/heads/branch-2.7
Commit: cf5abf8a7c06ae19ac97094ffba8562462fc72d7
Parents: bec2359
Author: Konstantin V Shvachko <shv@apache.org>
Authored: Wed Apr 26 17:28:49 2017 -0700
Committer: Konstantin V Shvachko <shv@apache.org>
Committed: Thu Apr 27 11:57:36 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 41 ++++++++++++++++-
 .../hdfs/server/balancer/TestBalancer.java      | 47 ++++++++++++++++----
 .../server/balancer/TestBalancerRPCDelay.java   | 32 +++++++++++++
 .../blockmanagement/BlockManagerTestUtil.java   |  5 +++
 .../hdfs/server/namenode/NameNodeAdapter.java   | 31 ++++++++++++-
 5 files changed, 145 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf5abf8a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index 60919b8..cb55d6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -795,8 +796,11 @@ public class Dispatcher {
      * namenode for more blocks. It terminates when it has dispatch enough block
      * move tasks or it has received enough blocks from the namenode, or the
      * elapsed time of the iteration has exceeded the max time limit.
+     *
+     * @param delay - time to sleep before sending getBlocks. Intended to
+     * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384.
      */
-    private void dispatchBlocks() {
+    private void dispatchBlocks(long delay) {
       this.blocksToReceive = 2 * getScheduledSize();
       long previousMoveTimestamp = Time.monotonicNow();
       while (getScheduledSize() > 0 && !isIterationOver()
@@ -821,11 +825,21 @@ public class Dispatcher {
         if (shouldFetchMoreBlocks()) {
           // fetch new blocks
           try {
+            if(delay > 0) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sleeping " + delay + "  msec.");
+              }
+              Thread.sleep(delay);
+            }
             blocksToReceive -= getBlockList();
             continue;
+          } catch (InterruptedException ignored) {
+            // nothing to do
           } catch (IOException e) {
             LOG.warn("Exception while getting block list", e);
             return;
+          } finally {
+            delay = 0L;
           }
         } else {
           // jump out of while-loop after the configured timeout.
@@ -1009,6 +1023,12 @@ public class Dispatcher {
   }
 
   /**
+   * The best-effort limit on the number of RPCs per second
+   * the Balancer will send to the NameNode.
+   */
+  final static int BALANCER_NUM_RPC_PER_SEC = 20;
+
+  /**
    * Dispatch block moves for each source. The thread selects blocks to move &
    * sends request to proxy source to initiate block move. The process is flow
    * controlled. Block selection is blocked if there are too many un-confirmed
@@ -1020,15 +1040,32 @@ public class Dispatcher {
     final long bytesLastMoved = getBytesMoved();
     final Future<?>[] futures = new Future<?>[sources.size()];
 
+    int concurrentThreads = Math.min(sources.size(),
+        ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
+    assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC);
+      LOG.debug("Balancer concurrent threads = " + concurrentThreads);
+      LOG.debug("Disperse Interval sec = " +
+          concurrentThreads / BALANCER_NUM_RPC_PER_SEC);
+    }
+    long dSec = 0;
     final Iterator<Source> i = sources.iterator();
     for (int j = 0; j < futures.length; j++) {
       final Source s = i.next();
+      final long delay = dSec * 1000;
       futures[j] = dispatchExecutor.submit(new Runnable() {
         @Override
         public void run() {
-          s.dispatchBlocks();
+          s.dispatchBlocks(delay);
         }
       });
+      // Calculate delay in seconds for the next iteration
+      if(j >= concurrentThreads) {
+        dSec = 0;
+      } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) {
+        dSec++;
+      }
     }
 
     // wait for all dispatcher threads to finish

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf5abf8a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index 5052207..398ad5a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -94,6 +94,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
 import org.apache.hadoop.hdfs.server.balancer.Balancer.Result;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.http.HttpConfig;
@@ -118,6 +119,7 @@ public class TestBalancer {
 
   static {
     ((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)Dispatcher.LOG).getLogger().setLevel(Level.DEBUG);
   }
 
   final static long CAPACITY = 5000L;
@@ -651,6 +653,7 @@ public class TestBalancer {
    *   parsing, etc.   Otherwise invoke balancer API directly.
    * @param useFile - if true, the hosts to included or excluded will be stored in a
    *   file and then later read from the file.
+   * @param useNamesystemSpy - spy on FSNamesystem if true
    * @throws Exception
    */
   private void doTest(Configuration conf, long[] capacities,
@@ -663,15 +666,21 @@ public class TestBalancer {
     LOG.info("useTool    = " +  useTool);
     assertEquals(capacities.length, racks.length);
     int numOfDatanodes = capacities.length;
-    cluster = new MiniDFSCluster.Builder(conf)
-                                .numDataNodes(capacities.length)
-                                .racks(racks)
-                                .simulatedCapacities(capacities)
-                                .build();
+
     try {
+      cluster = new MiniDFSCluster.Builder(conf)
+                                  .numDataNodes(0)
+                                  .build();
+      cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+      cluster.startDataNodes(conf, numOfDatanodes, true,
+          StartupOption.REGULAR, racks, null, capacities, false);
+      cluster.waitClusterUp();
       cluster.waitActive();
-      client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0).getUri(),
-          ClientProtocol.class).getProxy();
+      client = NameNodeProxies.createProxy(conf,
+          cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
 
       long totalCapacity = sum(capacities);
       
@@ -751,7 +760,9 @@ public class TestBalancer {
         runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
       }
     } finally {
-      cluster.shutdown();
+      if(cluster != null) {
+        cluster.shutdown();
+      }
     }
   }
 
@@ -1594,6 +1605,26 @@ public class TestBalancer {
   }
 
   /**
+   * Test that makes the Balancer to disperse RPCs to the NameNode
+   * in order to avoid NN's RPC queue saturation.
+   */
+  void testBalancerRPCDelay() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    initConf(conf);
+    conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30);
+
+    int numDNs = 40;
+    long[] capacities = new long[numDNs];
+    String[] racks = new String[numDNs];
+    for(int i = 0; i < numDNs; i++) {
+      capacities[i] = CAPACITY;
+      racks[i] = (i < numDNs/2 ? RACK0 : RACK1);
+    }
+    doTest(conf, capacities, racks, CAPACITY, RACK2,
+        new PortNumberBasedNodes(3, 0, 0), false, false);
+  }
+
+  /**
    * @param args
    */
   public static void main(String[] args) throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf5abf8a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
new file mode 100644
index 0000000..960ad25
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.balancer;
+
+import org.junit.Test;
+
+/**
+ * The Balancer ensures that it disperses RPCs to the NameNode
+ * in order to avoid NN's RPC queue saturation.
+ */
+public class TestBalancerRPCDelay {
+
+  @Test(timeout=100000)
+  public void testBalancerRPCDelay() throws Exception {
+    new TestBalancer().testBalancerRPCDelay();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf5abf8a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index a67d245..497ad58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -140,6 +140,11 @@ public class BlockManagerTestUtil {
     }
   }
 
+  public static HeartbeatManager getHeartbeatManager(
+      final BlockManager blockManager) {
+    return blockManager.getDatanodeManager().getHeartbeatManager();
+  }
+
   /**
    * @return corruptReplicas from block manager
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf5abf8a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index fa23fbf..f96b545 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -172,7 +173,35 @@ public class NameNodeAdapter {
   public static long[] getStats(final FSNamesystem fsn) {
     return fsn.getStats();
   }
-  
+
+  public static FSNamesystem spyOnNamesystem(NameNode nn) {
+    FSNamesystem fsnSpy = Mockito.spy(nn.getNamesystem());
+    FSNamesystem fsnOld = nn.namesystem;
+    fsnOld.writeLock();
+    fsnSpy.writeLock();
+    nn.namesystem = fsnSpy;
+    try {
+      FieldUtils.writeDeclaredField(
+          (NameNodeRpcServer)nn.getRpcServer(), "namesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          fsnSpy.getBlockManager(), "namesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          fsnSpy.getLeaseManager(), "fsnamesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          fsnSpy.getBlockManager().getDatanodeManager(),
+          "namesystem", fsnSpy, true);
+      FieldUtils.writeDeclaredField(
+          BlockManagerTestUtil.getHeartbeatManager(fsnSpy.getBlockManager()),
+          "namesystem", fsnSpy, true);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Cannot set spy FSNamesystem", e);
+    } finally {
+      fsnSpy.writeUnlock();
+      fsnOld.writeUnlock();
+    }
+    return fsnSpy;
+  }
+
   public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
     ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests());
     fsn.setFsLockForTests(spy);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message