hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1410692 - in /hadoop/common/branches/branch-1-win: ./ src/core/org/apache/hadoop/net/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/namenode/
Date Sat, 17 Nov 2012 03:42:37 GMT
Author: szetszwo
Date: Sat Nov 17 03:42:36 2012
New Revision: 1410692

URL: http://svn.apache.org/viewvc?rev=1410692&view=rev
Log:
svn merge -c 1410690 from branch-1 for HDFS-3941. Backport HDFS-3498 and HDFS-3601: Support
replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for
reusing code in subclasses, and add BlockPlacementPolicyWithNodeGroup to support block placement
with 4-layer network topology.

Added:
    hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
      - copied, changed from r1410690, hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
      - copied unchanged from r1410690, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicyWithNodeGroup.java
Modified:
    hadoop/common/branches/branch-1-win/   (props changed)
    hadoop/common/branches/branch-1-win/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
    hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
    hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java

Propchange: hadoop/common/branches/branch-1-win/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1:r1410690

Modified: hadoop/common/branches/branch-1-win/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.txt?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.txt Sat Nov 17 03:42:36 2012
@@ -12,6 +12,12 @@ Release 1.2.0 - unreleased
     class pluggable and add NetworkTopologyWithNodeGroup, a 4-layer
     implementation of NetworkTopology.  (Junping Du and Jing Zhao via szetszwo)
 
+    HDFS-3941. Backport HDFS-3498 and HDFS-3601: Support replica removal in
+    BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for
+    reusing code in subclasses, and add BlockPlacementPolicyWithNodeGroup to
+    support block placement with 4-layer network topology.  (Junping Du and
+    Jing Zhao via szetszwo)
+
     HADOOP-7868. Hadoop native fails to compile when default linker
     option is -Wl,--as-needed. (Trevor Robinson via eli)
 

Propchange: hadoop/common/branches/branch-1-win/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1410690

Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
(original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/net/NetworkTopology.java
Sat Nov 17 03:42:36 2012
@@ -742,6 +742,31 @@ public class NetworkTopology {
     }
     return tree.toString();
   }
+  
+  /**
+   * Divide networklocation string into two parts by last separator, and get the
+   * first part here.
+   * 
+   * @param networkLocation
+   * @return
+   */
+  public static String getFirstHalf(String networkLocation) {
+    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+    return networkLocation.substring(0, index);
+  }
+
+  /**
+   * Divide networklocation string into two parts by last separator, and get the
+   * second part here.
+   * 
+   * @param networkLocation
+   * @return
+   */
+  public static String getLastHalf(String networkLocation) {
+    int index = networkLocation.lastIndexOf(NodeBase.PATH_SEPARATOR_STR);
+    return networkLocation.substring(index);
+  }
+
 
   /* swap two array items */
   static protected void swap(Node[] nodes, int i, int j) {

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
(original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
Sat Nov 17 03:42:36 2012
@@ -216,6 +216,36 @@ public abstract class BlockPlacementPoli
                         excludedNodes,
                         blocksize);
   }
+  
+  /**
+   * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur.
+   *
+   * @param rackMap a map from rack to replica
+   * @param moreThanOne The List of replica nodes on rack which has more than 
+   *        one replica
+   * @param exactlyOne The List of replica nodes on rack with only one replica
+   * @param cur current replica to remove
+   */
+  public void adjustSetsWithChosenReplica(final Map<String, 
+      List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) {
+    
+    String rack = getRack(cur);
+    final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
+    datanodes.remove(cur);
+    if (datanodes.isEmpty()) {
+      rackMap.remove(rack);
+    }
+    if (moreThanOne.remove(cur)) {
+      if (datanodes.size() == 1) {
+        moreThanOne.remove(datanodes.get(0));
+        exactlyOne.add(datanodes.get(0));
+      }
+    } else {
+      exactlyOne.remove(cur);
+    }
+  }
 
   /**
    * returns true if a block can be moved from source to 
@@ -236,5 +266,49 @@ public abstract class BlockPlacementPoli
     return false;
   }
 
-}
+  /**
+   * Get rack string from a data node
+   * @param datanode
+   * @return rack of data node
+   */
+  protected String getRack(final DatanodeInfo datanode) {
+    return datanode.getNetworkLocation();
+  }
+  
+  /**
+   * Split data nodes into two sets, one set includes nodes on rack with
+   * more than one  replica, the other set contains the remaining nodes.
+   * 
+   * @param dataNodes
+   * @param rackMap a map from rack to datanodes
+   * @param moreThanOne contains nodes on rack with more than one replica
+   * @param exactlyOne remains contains the remaining nodes
+   */
+  public void splitNodesWithRack(
+      Collection<DatanodeDescriptor> dataNodes,
+      final Map<String, List<DatanodeDescriptor>> rackMap,
+      final List<DatanodeDescriptor> moreThanOne,
+      final List<DatanodeDescriptor> exactlyOne) {
+    for(DatanodeDescriptor node : dataNodes) {
+      final String rackName = getRack(node);
+      List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
+      if (datanodeList == null) {
+        datanodeList = new ArrayList<DatanodeDescriptor>();
+        rackMap.put(rackName, datanodeList);
+      }
+      datanodeList.add(node);
+    }
+    
+    // split nodes into two sets
+    for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
+      if (datanodeList.size() == 1) {
+        // exactlyOne contains nodes on rack with only one replica
+        exactlyOne.add(datanodeList.get(0));
+      } else {
+        // moreThanOne contains nodes on rack with more than one replica
+        moreThanOne.addAll(datanodeList);
+      }
+    }
+  }
 
+}

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
(original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
Sat Nov 17 03:42:36 2012
@@ -119,6 +119,7 @@ public class BlockPlacementPolicyDefault
       new ArrayList<DatanodeDescriptor>(chosenNodes);
     for (Node node:chosenNodes) {
       excludedNodes.put(node, node);
+      adjustExcludedNodes(excludedNodes, node);
     }
       
     if (!clusterMap.contains(writer)) {
@@ -197,12 +198,9 @@ public class BlockPlacementPolicyDefault
    * choose a node on the same rack
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalNode(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results)
     throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
@@ -231,12 +229,9 @@ public class BlockPlacementPolicyDefault
    * in the cluster.
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results)
     throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
@@ -283,7 +278,6 @@ public class BlockPlacementPolicyDefault
    * if not enough nodes are available, choose the remaining ones 
    * from the local rack
    */
-    
   protected void chooseRemoteRack(int numOfReplicas,
                                 DatanodeDescriptor localMachine,
                                 HashMap<Node, Node> excludedNodes,
@@ -306,8 +300,7 @@ public class BlockPlacementPolicyDefault
   /* Randomly choose one target from <i>nodes</i>.
    * @return the chosen node
    */
-  protected DatanodeDescriptor chooseRandom(
-                                          String nodes,
+  protected DatanodeDescriptor chooseRandom(String nodes,
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
@@ -320,10 +313,11 @@ public class BlockPlacementPolicyDefault
         (DatanodeDescriptor)(clusterMap.chooseRandom(nodes));
 
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
-      if (oldNode == null) { // choosendNode was not in the excluded list
+      if (oldNode == null) { // chosendNode was not in the excluded list
         numOfAvailableNodes--;
         if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
           results.add(chosenNode);
+          adjustExcludedNodes(excludedNodes, chosenNode);
           return chosenNode;
         }
       }
@@ -355,6 +349,7 @@ public class BlockPlacementPolicyDefault
         if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
           numOfReplicas--;
           results.add(chosenNode);
+          adjustExcludedNodes(excludedNodes, chosenNode);
         }
       }
     }
@@ -364,7 +359,22 @@ public class BlockPlacementPolicyDefault
                                            "Not able to place enough replicas");
     }
   }
-    
+   
+  /**
+   * After choosing a node to place replica, adjust excluded nodes accordingly.
+   * It should do nothing here as chosenNode is already put into exlcudeNodes, 
+   * but it can be overridden in subclass to put more related nodes into 
+   * excludedNodes.
+   * 
+   * @param excludedNodes
+   * @param chosenNode
+   * @return Number of nodes that should be added into the excludedNodes
+   */
+  protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
+      Node chosenNode) {
+    // do nothing
+  }
+  
   /* judge if a node is a good target.
    * return true if <i>node</i> has enough space, 
    * does not have too much load, and the rack does not have too many nodes
@@ -375,7 +385,23 @@ public class BlockPlacementPolicyDefault
     return isGoodTarget(node, blockSize, maxTargetPerLoc,
                         this.considerLoad, results);
   }
-    
+   
+  /**
+   * Determine if a node is a good target. 
+   * 
+   * @param node The target node
+   * @param blockSize Size of block
+   * @param maxTargetPerLoc Maximum number of targets per rack. The value of 
+   *                        this parameter depends on the number of racks in 
+   *                        the cluster and total number of replicas for a block
+   * @param considerLoad whether or not to consider load of the target node
+   * @param results A list containing currently chosen nodes. Used to check if 
+   *                too many nodes has been chosen in the target rack. 
+   * @param avoidStaleNodes Whether or not to avoid choosing stale nodes.               
+   * @return Return true if <i>node</i> has enough space, 
+   *         does not have too much load, 
+   *         and the rack does not have too many nodes.
+   */
   protected boolean isGoodTarget(DatanodeDescriptor node,
                                long blockSize, int maxTargetPerLoc,
                                boolean considerLoad,
@@ -499,8 +525,7 @@ public class BlockPlacementPolicyDefault
 
     // pick replica from the first Set. If first is empty, then pick replicas
     // from second set.
-    Iterator<DatanodeDescriptor> iter =
-          first.isEmpty() ? second.iterator() : first.iterator();
+    Iterator<DatanodeDescriptor> iter = pickupReplicaSet(first, second);
 
     // pick node with least free space
     while (iter.hasNext() ) {
@@ -513,6 +538,20 @@ public class BlockPlacementPolicyDefault
     }
     return cur;
   }
+  
+  /**
+   * Pick up replica node set for deleting replica as over-replicated. First set
+   * contains replica nodes on rack with more than one replica while second set
+   * contains remaining replica nodes. So pick up first set if not empty. If
+   * first is empty, then pick second.
+   */
+  protected Iterator<DatanodeDescriptor> pickupReplicaSet(
+      Collection<DatanodeDescriptor> first,
+      Collection<DatanodeDescriptor> second) {
+    Iterator<DatanodeDescriptor> iter = first.isEmpty() ? second.iterator()
+        : first.iterator();
+    return iter;
+  }
 
   /** {@inheritDoc} */
   @Override

Copied: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
(from r1410690, hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java)
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java?p2=hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java&p1=hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java&r1=1410690&r2=1410692&rev=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
(original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyWithNodeGroup.java
Sat Nov 17 03:42:36 2012
@@ -66,18 +66,18 @@ public class BlockPlacementPolicyWithNod
   @Override
   protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      List<DatanodeDescriptor> results)
         throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          blocksize, maxNodesPerRack, results);
 
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
       if (isGoodTarget(localMachine, blocksize,
-          maxNodesPerRack, false, results, avoidStaleNodes)) {
+          maxNodesPerRack, false, results)) {
         results.add(localMachine);
         // Nodes under same nodegroup should be excluded.
         addNodeGroupToExcludedNodes(excludedNodes,
@@ -89,13 +89,13 @@ public class BlockPlacementPolicyWithNod
     // try a node on local node group
     DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes);
+        blocksize, maxNodesPerRack, results);
     if (chosenNode != null) {
       return chosenNode;
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes);
+        blocksize, maxNodesPerRack, results);
   }
 
   @Override
@@ -118,13 +118,12 @@ public class BlockPlacementPolicyWithNod
   @Override
   protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      List<DatanodeDescriptor> results)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results, 
-                          avoidStaleNodes);
+                          blocksize, maxNodesPerRack, results);
     }
 
     // choose one from the local rack, but off-nodegroup
@@ -132,8 +131,7 @@ public class BlockPlacementPolicyWithNod
       return chooseRandom(NetworkTopology.getFirstHalf(
                               localMachine.getNetworkLocation()),
                           excludedNodes, blocksize, 
-                          maxNodesPerRack, results, 
-                          avoidStaleNodes);
+                          maxNodesPerRack, results);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -149,16 +147,16 @@ public class BlockPlacementPolicyWithNod
         try {
           return chooseRandom(
               clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes);
+              blocksize, maxNodesPerRack, results);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results);
       }
     }
   }
@@ -166,20 +164,19 @@ public class BlockPlacementPolicyWithNod
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
       DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
-      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
-      boolean avoidStaleNodes) throws NotEnoughReplicasException {
+      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results
+      ) throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
       chooseRandom(
           numOfReplicas,
           "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
-          excludedNodes, blocksize, maxReplicasPerRack, results,
-          avoidStaleNodes);
+          excludedNodes, blocksize, maxReplicasPerRack, results);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
           localMachine.getNetworkLocation(), excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes);
+          maxReplicasPerRack, results);
     }
   }
 
@@ -192,19 +189,19 @@ public class BlockPlacementPolicyWithNod
   private DatanodeDescriptor chooseLocalNodeGroup(
       NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
       HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      List<DatanodeDescriptor> results)
       throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-      blocksize, maxNodesPerRack, results, avoidStaleNodes);
+      blocksize, maxNodesPerRack, results);
     }
 
     // choose one from the local node group
     try {
       return chooseRandom(
           clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
-          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          excludedNodes, blocksize, maxNodesPerRack, results);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -220,17 +217,16 @@ public class BlockPlacementPolicyWithNod
         try {
           return chooseRandom(
               clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
-              excludedNodes, blocksize, maxNodesPerRack, results,
-              avoidStaleNodes);
+              excludedNodes, blocksize, maxNodesPerRack, results);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results);
       }
     }
   }
@@ -294,4 +290,4 @@ public class BlockPlacementPolicyWithNod
         moreThanOne.isEmpty() ? exactlyOne.iterator() : moreThanOne.iterator();
     return iter;
   } 
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
(original)
+++ hadoop/common/branches/branch-1-win/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Sat Nov 17 03:42:36 2012
@@ -3878,35 +3878,15 @@ public class FSNamesystem implements FSC
     INodeFile inode = blocksMap.getINode(b);
     
     // first form a rack to datanodes map and
-    HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
-      new HashMap<String, ArrayList<DatanodeDescriptor>>();
-    for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
-         iter.hasNext();) {
-      DatanodeDescriptor node = iter.next();
-      String rackName = node.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
-      if(datanodeList==null) {
-        datanodeList = new ArrayList<DatanodeDescriptor>();
-      }
-      datanodeList.add(node);
-      rackMap.put(rackName, datanodeList);
-    }
-    
+    final Map<String, List<DatanodeDescriptor>> rackMap = 
+        new HashMap<String, List<DatanodeDescriptor>>();
+    final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>();
+    final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>();
+
     // split nodes into two sets
-    // priSet contains nodes on rack with more than one replica
-    // remains contains the remaining nodes
-    ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
-    ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
-    for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter =

-      rackMap.entrySet().iterator(); iter.hasNext(); ) {
-      Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
-      ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue(); 
-      if( datanodeList.size() == 1 ) {
-        remains.add(datanodeList.get(0));
-      } else {
-        priSet.addAll(datanodeList);
-      }
-    }
+    // moreThanOne contains nodes on rack with more than one replica
+    // exactlyOne contains the remaining nodes
+    replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
     
     // pick one node to delete that favors the delete hint
     // otherwise pick one with least space from priSet if it is not empty
@@ -3916,29 +3896,19 @@ public class FSNamesystem implements FSC
       DatanodeInfo cur = null;
 
       // check if we can del delNodeHint
-      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
&&
-            (priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode)))
) {
+      if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
+          && (moreThanOne.contains(delNodeHint)
+                || (addedNode != null && !moreThanOne.contains(addedNode))) ) {
           cur = delNodeHint;
       } else { // regular excessive replica removal
-        cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains); 
        
+        cur = replicator.chooseReplicaToDelete(inode, b, replication,
+                       moreThanOne, exactlyOne);
       }
 
       firstOne = false;
-      // adjust rackmap, priSet, and remains
-      String rack = cur.getNetworkLocation();
-      ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
-      datanodes.remove(cur);
-      if(datanodes.isEmpty()) {
-        rackMap.remove(rack);
-      }
-      if( priSet.remove(cur) ) {
-        if (datanodes.size() == 1) {
-          priSet.remove(datanodes.get(0));
-          remains.add(datanodes.get(0));
-        }
-      } else {
-        remains.remove(cur);
-      }
+      // adjust rackmap, moreThanOne, and exactlyOne
+      replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
+          exactlyOne, cur);
 
       nonExcess.remove(cur);
 

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java Sat
Nov 17 03:42:36 2012
@@ -31,21 +31,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
@@ -372,4 +375,9 @@ public class DFSTestUtil {
 
     return result;
   }  
+  
+  public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
+      String rackLocation) {
+    return new DatanodeDescriptor(new DatanodeID(ipAddr), rackLocation);
+  }
 }

Modified: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java?rev=1410692&r1=1410691&r2=1410692&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
(original)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/hdfs/server/namenode/TestReplicationPolicy.java
Sat Nov 17 03:42:36 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetworkTopology;
@@ -466,4 +467,45 @@ public class TestReplicationPolicy exten
         blockLocations));
 
   }
+  
+  /**
+   * Test for the chooseReplicaToDelete are processed based on block locality
+   * and free space
+   */
+  public void testChooseReplicaToDelete() throws Exception {
+    List<DatanodeDescriptor> replicaNodeList = new ArrayList<DatanodeDescriptor>();
+    final Map<String, List<DatanodeDescriptor>> rackMap = new HashMap<String,
List<DatanodeDescriptor>>();
+
+    dataNodes[0].setRemaining(4 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[0]);
+
+    dataNodes[1].setRemaining(3 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[1]);
+
+    dataNodes[2].setRemaining(2 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[2]);
+
+    dataNodes[5].setRemaining(1 * 1024 * 1024);
+    replicaNodeList.add(dataNodes[5]);
+
+    List<DatanodeDescriptor> first = new ArrayList<DatanodeDescriptor>();
+    List<DatanodeDescriptor> second = new ArrayList<DatanodeDescriptor>();
+    replicator.splitNodesWithRack(replicaNodeList, rackMap, first, second);
+    // dataNodes[0] and dataNodes[1] are in first set as their rack has two
+    // replica nodes, while datanodes[2] and dataNodes[5] are in second set.
+    assertEquals(2, first.size());
+    assertEquals(2, second.size());
+    DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete(null,
+        null, (short) 3, first, second);
+    // Within first set, dataNodes[1] with less free space
+    assertEquals(chosenNode, dataNodes[1]);
+
+    replicator.adjustSetsWithChosenReplica(rackMap, first, second, chosenNode);
+    assertEquals(0, first.size());
+    assertEquals(3, second.size());
+    // Within second set, dataNodes[5] with less free space
+    chosenNode = replicator.chooseReplicaToDelete(null, null, (short) 2, first,
+        second);
+    assertEquals(chosenNode, dataNodes[5]);
+  }
 }



Mime
View raw message