hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject [2/2] hadoop git commit: HDFS-8824. Do not use small blocks for balancing the cluster.
Date Fri, 14 Aug 2015 20:09:18 GMT
HDFS-8824. Do not use small blocks for balancing the cluster.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61b9e5f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61b9e5f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61b9e5f7

Branch: refs/heads/branch-2
Commit: 61b9e5f7ff15daa0efd09e98efd70351f474c8cb
Parents: b094f8b
Author: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Authored: Fri Aug 14 13:03:19 2015 -0700
Committer: Tsz-Wo Nicholas Sze <szetszwo@hortonworks.com>
Committed: Fri Aug 14 13:08:57 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  2 ++
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  4 +++
 .../hadoop/hdfs/server/balancer/Balancer.java   |  9 ++++-
 .../hadoop/hdfs/server/balancer/Dispatcher.java | 38 ++++++++++++--------
 .../hdfs/server/balancer/TestBalancer.java      | 11 ++++--
 5 files changed, 47 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/61b9e5f7/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0565b17..b812c21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -443,6 +443,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-7649. Multihoming docs should emphasize using hostnames in
     configurations. (Brahma Reddy Battula via Arpit Agarwal)
 
+    HDFS-8824. Do not use small blocks for balancing the cluster.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61b9e5f7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f44f0d7..eedebad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -349,6 +349,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200;
   public static final String  DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY = "dfs.balancer.max-size-to-move";
   public static final long    DFS_BALANCER_MAX_SIZE_TO_MOVE_DEFAULT = 10L*1024*1024*1024;
+  public static final String  DFS_BALANCER_GETBLOCKS_SIZE_KEY = "dfs.balancer.getBlocks.size";
+  public static final long    DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT = 2L*1024*1024*1024; //
2GB
+  public static final String  DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY = "dfs.balancer.getBlocks.min-block-size";
+  public static final long    DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT = 10L*1024*1024;
// 10MB
 
 
   public static final String  DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61b9e5f7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index e96664f..4f3f18e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -252,10 +252,17 @@ public class Balancer {
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
         DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
 
+    final long getBlocksSize = getLong(conf,
+        DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
+        DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
+    final long getBlocksMinBlockSize = getLong(conf,
+        DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
+        DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
+
     this.nnc = theblockpool;
     this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded,
         p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads,
-        maxConcurrentMovesPerNode, conf);
+        maxConcurrentMovesPerNode, getBlocksSize, getBlocksMinBlockSize, conf);
     this.threshold = p.threshold;
     this.policy = p.policy;
     this.runDuringUpgrade = p.runDuringUpgrade;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61b9e5f7/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index a4adf7f..b06219c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -84,9 +84,6 @@ import com.google.common.base.Preconditions;
 public class Dispatcher {
   static final Log LOG = LogFactory.getLog(Dispatcher.class);
 
-  private static final long GB = 1L << 30; // 1GB
-  private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2 * GB;
-
   private static final int MAX_NO_PENDING_MOVE_ITERATIONS = 5;
   /**
    * the period of time to delay the usage of a DataNode after hitting
@@ -121,6 +118,9 @@ public class Dispatcher {
   /** The maximum number of concurrent blocks moves at a datanode */
   private final int maxConcurrentMovesPerNode;
 
+  private final long getBlocksSize;
+  private final long getBlocksMinBlockSize;
+
   private final int ioFileBufferSize;
 
   static class Allocator {
@@ -652,8 +652,9 @@ public class Dispatcher {
      * @return the total size of the received blocks in the number of bytes.
      */
     private long getBlockList() throws IOException {
-      final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
+      final long size = Math.min(getBlocksSize, blocksToReceive);
       final BlocksWithLocations newBlocks = nnc.getBlocks(getDatanodeInfo(), size);
+
       if (LOG.isTraceEnabled()) {
         LOG.trace("getBlocks(" + getDatanodeInfo() + ", "
             + StringUtils.TraditionalBinaryPrefix.long2String(size, "B", 2)
@@ -662,6 +663,11 @@ public class Dispatcher {
 
       long bytesReceived = 0;
       for (BlockWithLocations blk : newBlocks.getBlocks()) {
+        // Skip small blocks.
+        if (blk.getBlock().getNumBytes() < getBlocksMinBlockSize) {
+          continue;
+        }
+
         bytesReceived += blk.getBlock().getNumBytes();
         synchronized (globalBlocks) {
           final DBlock block = globalBlocks.get(blk.getBlock());
@@ -840,9 +846,19 @@ public class Dispatcher {
     }
   }
 
+  /** Constructor called by Mover. */
   public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
       Set<String> excludedNodes, long movedWinWidth, int moverThreads,
       int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
+    this(nnc, includedNodes, excludedNodes, movedWinWidth,
+        moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
+        0L, 0L, conf);
+  }
+
+  Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
+      Set<String> excludedNodes, long movedWinWidth, int moverThreads,
+      int dispatcherThreads, int maxConcurrentMovesPerNode,
+      long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) {
     this.nnc = nnc;
     this.excludedNodes = excludedNodes;
     this.includedNodes = includedNodes;
@@ -855,6 +871,9 @@ public class Dispatcher {
     this.moverThreadAllocator = new Allocator(moverThreads);
     this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode;
 
+    this.getBlocksSize = getBlocksSize;
+    this.getBlocksMinBlockSize = getBlocksMinBlockSize;
+
     this.saslClient = new SaslDataTransferClient(conf,
         DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnc.fallbackToSimpleAuth);
@@ -1003,9 +1022,6 @@ public class Dispatcher {
     return getBytesMoved() - bytesLastMoved;
   }
 
-  /** The sleeping period before checking if block move is completed again */
-  static private long blockMoveWaitTime = 30000L;
-
   /**
    * Wait for all block move confirmations.
    * @return true if there is failed move execution
@@ -1027,7 +1043,7 @@ public class Dispatcher {
         return hasFailure; // all pending queues are empty
       }
       try {
-        Thread.sleep(blockMoveWaitTime);
+        Thread.sleep(1000);
       } catch (InterruptedException ignored) {
       }
     }
@@ -1154,12 +1170,6 @@ public class Dispatcher {
     movedBlocks.cleanup();
   }
 
-  /** set the sleeping period for block move completion check */
-  @VisibleForTesting
-  public static void setBlockMoveWaitTime(long time) {
-    blockMoveWaitTime = time;
-  }
-
   @VisibleForTesting
   public static void setDelayAfterErrors(long time) {
     delayAfterErrors = time;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61b9e5f7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
index bb8a45b..5e1b45b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
@@ -118,8 +118,6 @@ public class TestBalancer {
   }
 
   public static void initTestSetup() {
-    Dispatcher.setBlockMoveWaitTime(1000L) ;
-
     // do not create id file since it occupies the disk space
     NameNodeConnector.setWrite2IdFile(false);
   }
@@ -128,9 +126,12 @@ public class TestBalancer {
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
     SimulatedFSDataset.setFactory(conf);
+
     conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
   }
 
   static void initConfWithRamDisk(Configuration conf,
@@ -142,6 +143,8 @@ public class TestBalancer {
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
     LazyPersistTestCase.initCacheManipulator();
+
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
   }
 
   /* create a file with a length of <code>fileLen</code> */
@@ -1336,6 +1339,8 @@ public class TestBalancer {
     conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
 
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+
     final int BLOCK_SIZE = 1024*1024;
     cluster = new MiniDFSCluster
         .Builder(conf)
@@ -1410,6 +1415,8 @@ public class TestBalancer {
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1L);
 
+    conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
+
     int numOfDatanodes =2;
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
         .numDataNodes(2)


Mime
View raw message