hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ayushsax...@apache.org
Subject [hadoop] branch branch-3.1 updated: HDFS-14578. AvailableSpaceBlockPlacementPolicy always prefers local node. Contributed by Ayush Saxena.
Date Sat, 11 Jan 2020 05:13:44 GMT
This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 12b2581  HDFS-14578. AvailableSpaceBlockPlacementPolicy always prefers local node.
Contributed by Ayush Saxena.
12b2581 is described below

commit 12b258115b49bf781615194429da7dd19d3e916e
Author: Ayush Saxena <ayushsaxena@apache.org>
AuthorDate: Sat Jan 11 10:29:11 2020 +0530

    HDFS-14578. AvailableSpaceBlockPlacementPolicy always prefers local node. Contributed
by Ayush Saxena.
---
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   6 +
 .../AvailableSpaceBlockPlacementPolicy.java        |  81 +++++++++-
 .../src/main/resources/hdfs-default.xml            |  12 ++
 .../TestAvailableSpaceBPPBalanceLocal.java         | 164 +++++++++++++++++++++
 4 files changed, 256 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 161581b..5c018d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -939,6 +939,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.available-space-block-placement-policy.balanced-space-preference-fraction";
   public static final float   DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT
=
       0.6f;
+  public static final String
+      DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_KEY =
+      "dfs.namenode.available-space-block-placement-policy.balance-local-node";
+  public static final boolean
+      DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_DEFAULT =
+      false;
   public static final String  DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_KEY
=
       "dfs.namenode.block-placement-policy.default.prefer-local-node";
   public static final boolean  DFS_NAMENODE_BLOCKPLACEMENTPOLICY_DEFAULT_PREFER_LOCAL_NODE_DEFAULT
= true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java
index 8435b46..6f395dd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/AvailableSpaceBlockPlacementPolicy.java
@@ -22,7 +22,10 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT;
 
 import java.util.Collection;
+import java.util.EnumMap;
+import java.util.List;
 import java.util.Random;
+import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
@@ -44,6 +47,7 @@ public class AvailableSpaceBlockPlacementPolicy extends
   private static final Random RAND = new Random();
   private int balancedPreference =
       (int) (100 * DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT);
+  private boolean optimizeLocal;
 
   @Override
   public void initialize(Configuration conf, FSClusterStats stats,
@@ -58,6 +62,10 @@ public class AvailableSpaceBlockPlacementPolicy extends
         + DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
         + " = " + balancedPreferencePercent);
 
+    optimizeLocal = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_DEFAULT);
+
     if (balancedPreferencePercent > 1.0) {
       LOG.warn("The value of "
           + DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY
@@ -82,7 +90,65 @@ public class AvailableSpaceBlockPlacementPolicy extends
         .chooseRandomWithStorageType(scope, excludedNode, type);
     DatanodeDescriptor b = (DatanodeDescriptor) dfsClusterMap
         .chooseRandomWithStorageType(scope, excludedNode, type);
-    return select(a, b);
+    return select(a, b, false);
+  }
+
+  @Override
+  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
+      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
+      throws NotEnoughReplicasException {
+    if (!optimizeLocal) {
+      return super.chooseLocalStorage(localMachine, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageTypes,
+          fallbackToLocalRack);
+    }
+    final EnumMap<StorageType, Integer> initialStorageTypesLocal =
+        storageTypes.clone();
+    final EnumMap<StorageType, Integer> initialStorageTypesLocalRack =
+        storageTypes.clone();
+    DatanodeStorageInfo local =
+        chooseLocalStorage(localMachine, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes,
+            initialStorageTypesLocal);
+    if (!fallbackToLocalRack) {
+      return local;
+    }
+    if (local != null) {
+      results.remove(local);
+    }
+    DatanodeStorageInfo localRack =
+        chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack,
+            results, avoidStaleNodes, initialStorageTypesLocalRack);
+    if (local != null && localRack != null) {
+      if (select(local.getDatanodeDescriptor(),
+          localRack.getDatanodeDescriptor(), true) == local
+          .getDatanodeDescriptor()) {
+        results.remove(localRack);
+        results.add(local);
+        swapStorageTypes(initialStorageTypesLocal, storageTypes);
+        excludedNodes.remove(localRack.getDatanodeDescriptor());
+        return local;
+      } else {
+        swapStorageTypes(initialStorageTypesLocalRack, storageTypes);
+        excludedNodes.remove(local.getDatanodeDescriptor());
+        return localRack;
+      }
+    } else if (localRack == null && local != null) {
+      results.add(local);
+      swapStorageTypes(initialStorageTypesLocal, storageTypes);
+      return local;
+    } else {
+      swapStorageTypes(initialStorageTypesLocalRack, storageTypes);
+      return localRack;
+    }
+  }
+
+  private void swapStorageTypes(EnumMap<StorageType, Integer> fromStorageTypes,
+      EnumMap<StorageType, Integer> toStorageTypes) {
+    toStorageTypes.clear();
+    toStorageTypes.putAll(fromStorageTypes);
   }
 
   @Override
@@ -92,13 +158,13 @@ public class AvailableSpaceBlockPlacementPolicy extends
         (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
     DatanodeDescriptor b =
         (DatanodeDescriptor) clusterMap.chooseRandom(scope, excludedNode);
-    return select(a, b);
+    return select(a, b, false);
   }
 
-  private DatanodeDescriptor select(
-      DatanodeDescriptor a, DatanodeDescriptor b) {
+  private DatanodeDescriptor select(DatanodeDescriptor a, DatanodeDescriptor b,
+      boolean isBalanceLocal) {
     if (a != null && b != null){
-      int ret = compareDataNode(a, b);
+      int ret = compareDataNode(a, b, isBalanceLocal);
       if (ret == 0) {
         return a;
       } else if (ret < 0) {
@@ -115,9 +181,10 @@ public class AvailableSpaceBlockPlacementPolicy extends
    * Compare the two data nodes.
    */
   protected int compareDataNode(final DatanodeDescriptor a,
-      final DatanodeDescriptor b) {
+      final DatanodeDescriptor b, boolean isBalanceLocal) {
     if (a.equals(b)
-        || Math.abs(a.getDfsUsedPercent() - b.getDfsUsedPercent()) < 5) {
+        || Math.abs(a.getDfsUsedPercent() - b.getDfsUsedPercent()) < 5 || ((
+        isBalanceLocal && a.getDfsUsedPercent() < 50))) {
       return 0;
     }
     return a.getDfsUsedPercent() < b.getDfsUsedPercent() ? -1 : 1;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0d804af..62c56d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4316,6 +4316,18 @@
 </property>
 
 <property>
+  <name>
+    dfs.namenode.available-space-block-placement-policy.balance-local-node
+  </name>
+  <value>false</value>
+  <description>
+    Only used when the dfs.block.replicator.classname is set to
+    org.apache.hadoop.hdfs.server.blockmanagement.AvailableSpaceBlockPlacementPolicy.
+    If true, balances the local node too.
+  </description>
+</property>
+
+<property>
   <name>dfs.namenode.backup.dnrpc-address</name>
   <value></value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java
new file mode 100644
index 0000000..a5920c4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestAvailableSpaceBPPBalanceLocal.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_KEY;
+
+/**
+ * Tests AvailableSpaceBlockPlacementPolicy with balance local.
+ */
+public class TestAvailableSpaceBPPBalanceLocal {
+  private final static int NUM_RACKS = 2;
+  private final static int NODES_PER_RACK = 3;
+  final static int BLOCK_SIZE = 1024;
+  final static int CHOOSE_TIMES = 10000;
+  final static String FILE = "/tobers/test";
+
+  private static DatanodeStorageInfo[] storages;
+  private static DatanodeDescriptor[] dataNodes;
+  private static Configuration conf;
+  private static NameNode namenode;
+  private static NetworkTopology cluster;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setFloat(
+        DFSConfigKeys.DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_KEY,
+        0.6f);
+    conf.setBoolean(
+        DFS_NAMENODE_AVAILABLE_SPACE_BLOCK_PLACEMENT_POLICY_BALANCE_LOCAL_NODE_KEY,
+        true);
+    String[] racks = new String[NUM_RACKS];
+    for (int i = 0; i < NUM_RACKS; i++) {
+      racks[i] = "/rack" + i;
+    }
+
+    String[] ownerRackOfNodes = new String[NUM_RACKS * NODES_PER_RACK];
+    for (int i = 0; i < NUM_RACKS; i++) {
+      for (int j = 0; j < NODES_PER_RACK; j++) {
+        ownerRackOfNodes[i * NODES_PER_RACK + j] = racks[i];
+      }
+    }
+
+    storages = DFSTestUtil.createDatanodeStorageInfos(ownerRackOfNodes);
+    dataNodes = DFSTestUtil.toDatanodeDescriptor(storages);
+
+    FileSystem.setDefaultUri(conf, "hdfs://localhost:0");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    File baseDir =
+        PathUtils.getTestDir(AvailableSpaceBlockPlacementPolicy.class);
+    conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY,
+        new File(baseDir, "name").getPath());
+    conf.set(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
+        AvailableSpaceBlockPlacementPolicy.class.getName());
+
+    DFSTestUtil.formatNameNode(conf);
+    namenode = new NameNode(conf);
+
+    final BlockManager bm = namenode.getNamesystem().getBlockManager();
+    cluster = bm.getDatanodeManager().getNetworkTopology();
+    for (int i = 0; i < NODES_PER_RACK * NUM_RACKS; i++) {
+      cluster.add(dataNodes[i]);
+    }
+
+    setupDataNodeCapacity();
+  }
+
+  protected static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
+      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
+      long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
+      int volFailures) {
+    dn.getStorageInfos()[0]
+        .setUtilizationForTesting(capacity, dfsUsed, remaining, blockPoolUsed);
+    dn.updateHeartbeat(BlockManagerTestUtil.getStorageReportsForDatanode(dn),
+        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures, null);
+  }
+
+  protected static void setupDataNodeCapacity() {
+    for (int i = 0; i < NODES_PER_RACK * NUM_RACKS; i++) {
+      if ((i % 2) == 0) {
+        // remaining 100%
+        updateHeartbeatWithUsage(dataNodes[i],
+            4 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L,
+            4 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L,
+            0L, 0, 0);
+      } else {
+        // remaining 25%
+        updateHeartbeatWithUsage(dataNodes[i],
+            4 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
+            3 * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE,
+            HdfsServerConstants.MIN_BLOCKS_FOR_WRITE * BLOCK_SIZE, 0L, 0L, 0L,
+            0, 0);
+      }
+    }
+  }
+
+  @Test
+  public void testChooseLocalNode() {
+    // Choosing datanode with zero usage.
+    DatanodeDescriptor localNode = dataNodes[0];
+    for (int i = 0; i < CHOOSE_TIMES; i++) {
+      DatanodeStorageInfo[] targets =
+          namenode.getNamesystem().getBlockManager().getBlockPlacementPolicy()
+              .chooseTarget(FILE, 1, localNode,
+                  new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
+                  TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
+      Assert.assertEquals(1, targets.length);
+      Assert.assertEquals(localNode, targets[0].getDatanodeDescriptor());
+    }
+  }
+
+  @Test
+  public void testChooseLocalNodeWithLocalNodeLoaded() {
+    // Choosing datanode with 75 percent usage.
+    DatanodeDescriptor localNode = dataNodes[1];
+    int numLocalChosen = 0;
+    for (int i = 0; i < CHOOSE_TIMES; i++) {
+      DatanodeStorageInfo[] targets =
+          namenode.getNamesystem().getBlockManager().getBlockPlacementPolicy()
+              .chooseTarget(FILE, 1, localNode,
+                  new ArrayList<DatanodeStorageInfo>(), false, null, BLOCK_SIZE,
+                  TestBlockStoragePolicy.DEFAULT_STORAGE_POLICY, null);
+
+      Assert.assertEquals(1, targets.length);
+      if (localNode == targets[0].getDatanodeDescriptor()) {
+        numLocalChosen++;
+      }
+    }
+    Assert.assertTrue(numLocalChosen < (CHOOSE_TIMES - numLocalChosen));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message