hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r512924 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/net/ src/test/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/net/
Date Wed, 28 Feb 2007 19:36:33 GMT
Author: cutting
Date: Wed Feb 28 11:36:33 2007
New Revision: 512924

URL: http://svn.apache.org/viewvc?view=rev&rev=512924
Log:
HADOOP-972.  Optimize HDFS's rack-aware block placement algorithm.  Contributed by Hairong.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=512924&r1=512923&r2=512924
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Feb 28 11:36:33 2007
@@ -156,6 +156,9 @@
 46. HADOOP-1044.  Fix HDFS's TestDecommission to not spuriously fail.
     (Wendy Chien via cutting)
 
+47. HADOOP-972.  Optimize HDFS's rack-aware block placement algorithm.
+    (Hairong Kuang via cutting)
+
 
 Release 0.11.2 - 2007-02-16
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=512924&r1=512923&r2=512924
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Feb 28 11:36:33
2007
@@ -203,7 +203,7 @@
     // datanode networktoplogy
     NetworkTopology clusterMap = new NetworkTopology();
     // for block replicas placement
-    Replicator replicator = new Replicator();
+    ReplicationTargetChooser replicator = new ReplicationTargetChooser();
 
     private HostsFileReader hostsReader; 
     private Daemon dnthread = null;
@@ -2691,7 +2691,7 @@
      * @author hairong
      *
      */
-    class Replicator {
+    class ReplicationTargetChooser {
       private class NotEnoughReplicasException extends Exception {
         NotEnoughReplicasException( String msg ) {
           super( msg );
@@ -2722,21 +2722,19 @@
             new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
       }
       
-      /*
-       *  re-replicate <i>numOfReplicas</i>
-       /**
-        * choose <i>numOfReplicas</i> data nodes for <i>writer</i>

-        * to re-replicate a block with size <i>blocksize</i> 
-        * If not, return as many as we can.
-        * 
-        * @param numOfReplicas: additional number of replicas wanted.
-        * @param writer: the writer's machine, null if not in the cluster.
-        * @param choosenNodes: datanodes that have been choosen as targets.
-        * @param excludedNodes: datanodesthat should not be considered targets.
-        * @param blocksize: size of the data to be written.
-        * @return array of DatanodeDescriptor instances chosen as target 
-        * and sorted as a pipeline.
-        */
+      /**
+       * choose <i>numOfReplicas</i> data nodes for <i>writer</i>

+       * to re-replicate a block with size <i>blocksize</i> 
+       * If not, return as many as we can.
+       * 
+       * @param numOfReplicas: additional number of replicas wanted.
+       * @param writer: the writer's machine, null if not in the cluster.
+       * @param choosenNodes: datanodes that have been choosen as targets.
+       * @param excludedNodes: datanodesthat should not be considered targets.
+       * @param blocksize: size of the data to be written.
+       * @return array of DatanodeDescriptor instances chosen as target 
+       * and sorted as a pipeline.
+       */
       DatanodeDescriptor[] chooseTarget(int numOfReplicas,
           DatanodeDescriptor writer,
           List<DatanodeDescriptor> choosenNodes,
@@ -2767,7 +2765,6 @@
           writer=null;
         
         DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-            clusterMap.getLeaves(NodeBase.ROOT),
             excludedNodes, blocksize, maxNodesPerRack, results );
         
         results.removeAll(choosenNodes);
@@ -2776,16 +2773,17 @@
         return getPipeline((writer==null)?localNode:writer, results);
       }
       
-      /* choose <i>numOfReplicas</i> from <i>clusterNodes</i> */
+      /* choose <i>numOfReplicas</i> from all data nodes */
       private DatanodeDescriptor chooseTarget(int numOfReplicas,
           DatanodeDescriptor writer,
-          DatanodeDescriptor[] clusterNodes,
           List<DatanodeDescriptor> excludedNodes,
           long blocksize,
           int maxNodesPerRack,
           List<DatanodeDescriptor> results) {
         
-        if( numOfReplicas == 0 ) return writer;
+        if( numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0 ) {
+            return writer;
+        }
         
         int numOfResults = results.size();
         if(writer == null && (numOfResults==1 || numOfResults==2) ) {
@@ -2795,28 +2793,28 @@
         try {
           switch( numOfResults ) {
           case 0:
-            writer = chooseLocalNode(writer, clusterNodes, excludedNodes, 
+            writer = chooseLocalNode(writer, excludedNodes, 
                 blocksize, maxNodesPerRack, results);
             if(--numOfReplicas == 0) break;
           case 1:
-            chooseRemoteRack(1, writer, clusterNodes, excludedNodes, 
+            chooseRemoteRack(1, writer, excludedNodes, 
                 blocksize, maxNodesPerRack, results);
             if(--numOfReplicas == 0) break;
           case 2:
             if(clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-              chooseRemoteRack(1, writer, clusterNodes, excludedNodes,
+              chooseRemoteRack(1, writer, excludedNodes,
                   blocksize, maxNodesPerRack, results);
             } else {
-              chooseLocalRack(writer, clusterNodes, excludedNodes, 
+              chooseLocalRack(writer, excludedNodes, 
                   blocksize, maxNodesPerRack, results);
             }
             if(--numOfReplicas == 0) break;
           default:
-            chooseRandom(numOfReplicas, clusterNodes, excludedNodes, 
+            chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
                 blocksize, maxNodesPerRack, results);
           }
         } catch (NotEnoughReplicasException e) {
-          LOG.warn("Not be able to place enough replicas, still in need of "
+          LOG.warn("Not able to place enough replicas, still in need of "
               + numOfReplicas );
         }
         return writer;
@@ -2829,7 +2827,6 @@
        */
       private DatanodeDescriptor chooseLocalNode(
           DatanodeDescriptor localMachine,
-          DatanodeDescriptor[] nodes,
           List<DatanodeDescriptor> excludedNodes,
           long blocksize,
           int maxNodesPerRack,
@@ -2837,20 +2834,21 @@
       throws NotEnoughReplicasException {
         // if no local machine, randomly choose one node
         if(localMachine == null)
-          return chooseRandom(nodes, excludedNodes, 
+          return chooseRandom(NodeBase.ROOT, excludedNodes, 
               blocksize, maxNodesPerRack, results);
         
         // otherwise try local machine first
         if(!excludedNodes.contains(localMachine)) {
           excludedNodes.add(localMachine);
-          if( isGoodTarget(localMachine, blocksize, maxNodesPerRack, results)) {
+          if( isGoodTarget(localMachine, blocksize,
+                  maxNodesPerRack, false, results)) {
             results.add(localMachine);
             return localMachine;
           }
         } 
         
         // try a node on local rack
-        return chooseLocalRack(localMachine, nodes, excludedNodes, 
+        return chooseLocalRack(localMachine, excludedNodes, 
             blocksize, maxNodesPerRack, results);
       }
       
@@ -2858,12 +2856,11 @@
        * if no such node is availabe, choose one node from the rack where
        * a second replica is on.
        * if still no such node is available, choose a random node 
-       * in the cluster <i>nodes</i>.
+       * in the cluster.
        * @return the choosen node
        */
       private DatanodeDescriptor chooseLocalRack(
           DatanodeDescriptor localMachine,
-          DatanodeDescriptor[] nodes,
           List<DatanodeDescriptor> excludedNodes,
           long blocksize,
           int maxNodesPerRack,
@@ -2871,14 +2868,14 @@
       throws NotEnoughReplicasException {
         // no local machine, so choose a random machine
         if( localMachine == null ) {
-          return chooseRandom(nodes, excludedNodes, 
+          return chooseRandom(NodeBase.ROOT, excludedNodes, 
               blocksize, maxNodesPerRack, results );
         }
         
         // choose one from the local rack
         try {
           return chooseRandom(
-              clusterMap.getLeaves( localMachine.getNetworkLocation() ),
+              localMachine.getNetworkLocation(),
               excludedNodes, blocksize, maxNodesPerRack, results);
         } catch (NotEnoughReplicasException e1) {
           // find the second replica
@@ -2894,16 +2891,16 @@
           if( newLocal != null ) {
             try {
               return chooseRandom(
-                  clusterMap.getLeaves( newLocal.getNetworkLocation() ),
+                  newLocal.getNetworkLocation(),
                   excludedNodes, blocksize, maxNodesPerRack, results);
             } catch( NotEnoughReplicasException e2 ) {
               //otherwise randomly choose one from the network
-              return chooseRandom(nodes, excludedNodes,
+              return chooseRandom(NodeBase.ROOT, excludedNodes,
                   blocksize, maxNodesPerRack, results);
             }
           } else {
             //otherwise randomly choose one from the network
-            return chooseRandom(nodes, excludedNodes,
+            return chooseRandom(NodeBase.ROOT, excludedNodes,
                 blocksize, maxNodesPerRack, results);
           }
         }
@@ -2917,38 +2914,19 @@
       
       private void chooseRemoteRack( int numOfReplicas,
           DatanodeDescriptor localMachine,
-          DatanodeDescriptor[] nodes,
           List<DatanodeDescriptor> excludedNodes,
           long blocksize,
           int maxReplicasPerRack,
           List<DatanodeDescriptor> results)
       throws NotEnoughReplicasException {
-        // get all the nodes on the local rack
-        DatanodeDescriptor[] nodesOnRack = clusterMap.getLeaves(
-            localMachine.getNetworkLocation() );
-        
-        // can we speed up this??? using hashing sets?
-        DatanodeDescriptor[] nodesOnRemoteRack 
-        = new DatanodeDescriptor[nodes.length-nodesOnRack.length];
-        HashSet<DatanodeDescriptor> set1 = new HashSet<DatanodeDescriptor>(nodes.length);
-        HashSet<DatanodeDescriptor> set2 = new HashSet<DatanodeDescriptor>(nodesOnRack.length);
-        for(int i=0; i<nodes.length; i++) {
-          set1.add(nodes[i]);
-        }
-        for(int i=0; i<nodesOnRack.length; i++) {
-          set2.add(nodesOnRack[i]);
-        }
-        set1.removeAll(set2);
-        nodesOnRemoteRack = set1.toArray(nodesOnRemoteRack);
-        
         int oldNumOfReplicas = results.size();
         // randomly choose one node from remote racks
         try {
-          chooseRandom( numOfReplicas, nodesOnRemoteRack, excludedNodes, 
-              blocksize, maxReplicasPerRack, results );
+          chooseRandom( numOfReplicas, "~"+localMachine.getNetworkLocation(),
+                  excludedNodes, blocksize, maxReplicasPerRack, results );
         } catch (NotEnoughReplicasException e) {
           chooseRandom( numOfReplicas-(results.size()-oldNumOfReplicas),
-              nodesOnRack, excludedNodes, blocksize, 
+              localMachine.getNetworkLocation(), excludedNodes, blocksize, 
               maxReplicasPerRack, results);
         }
       }
@@ -2957,7 +2935,7 @@
        * @return the choosen node
        */
       private DatanodeDescriptor chooseRandom(
-          DatanodeDescriptor[] nodes,
+          String nodes,
           List<DatanodeDescriptor> excludedNodes,
           long blocksize,
           int maxNodesPerRack,
@@ -2980,7 +2958,7 @@
       /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
        */
       private void chooseRandom(int numOfReplicas,
-          DatanodeDescriptor[] nodes,
+          String nodes,
           List<DatanodeDescriptor> excludedNodes,
           long blocksize,
           int maxNodesPerRack,
@@ -3008,24 +2986,20 @@
         }
       }
       
-      /* Randomly choose one node from <i>nodes</i>.
-       * @return the choosen node
+      /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
+       * @return the choosen nodes
        */
       private DatanodeDescriptor[] chooseRandom(int numOfReplicas, 
-          DatanodeDescriptor[] nodes,
+          String nodes,
           List<DatanodeDescriptor> excludedNodes) {
         List<DatanodeDescriptor> results = 
           new ArrayList<DatanodeDescriptor>();
-        int numOfAvailableNodes = 0;
-        for(int i=0; i<nodes.length; i++) {
-          if( !excludedNodes.contains(nodes[i]) ) {
-            numOfAvailableNodes++;
-          }
-        }
+        int numOfAvailableNodes =
+            clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
         numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
             numOfAvailableNodes:numOfReplicas;
         while( numOfReplicas > 0 ) {
-          DatanodeDescriptor choosenNode = nodes[r.nextInt(nodes.length)];
+          DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes);
           if(!excludedNodes.contains(choosenNode)) {
             results.add( choosenNode );
             excludedNodes.add(choosenNode);
@@ -3041,27 +3015,42 @@
        * does not have too much load, and the rack does not have too many nodes
        */
       private boolean isGoodTarget( DatanodeDescriptor node,
+              long blockSize, int maxTargetPerLoc,
+              List<DatanodeDescriptor> results) {
+          return isGoodTarget(node, blockSize, maxTargetPerLoc, true, results);
+      }
+      
+      private boolean isGoodTarget( DatanodeDescriptor node,
           long blockSize, int maxTargetPerLoc,
+          boolean considerLoad,
           List<DatanodeDescriptor> results) {
         
         // check if the node is (being) decommissed
         if(node.isDecommissionInProgress() || node.isDecommissioned()) {
+          LOG.debug("Node "+node.getPath()+
+             " is not chosen because the node is (being) decommissioned");
           return false;
         }
 
         // check the remaining capacity of the target machine
         if(blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining() ) {
+            LOG.debug("Node "+node.getPath()+
+               " is not chosen because the node does not have enough space");
           return false;
         }
         
         // check the communication traffic of the target machine
-        double avgLoad = 0;
-        int size = clusterMap.getNumOfLeaves();
-        if( size != 0 ) {
-          avgLoad = (double)totalLoad()/size;
-        }
-        if(node.getXceiverCount() > (2.0 * avgLoad)) {
-          return false;
+        if(considerLoad) {
+          double avgLoad = 0;
+          int size = clusterMap.getNumOfLeaves();
+          if( size != 0 ) {
+            avgLoad = (double)totalLoad()/size;
+          }
+          if(node.getXceiverCount() > (2.0 * avgLoad)) {
+            LOG.debug("Node "+node.getPath()+
+               " is not chosen because the node is too busy");
+            return false;
+          }
         }
         
         // check if the target rack has chosen too many nodes
@@ -3075,6 +3064,8 @@
           }
         }
         if(counter>maxTargetPerLoc) {
+          LOG.debug("Node "+node.getPath()+
+             " is not chosen because the rack has too many chosen nodes");
           return false;
         }
         return true;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java?view=diff&rev=512924&r1=512923&r2=512924
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetworkTopology.java Wed Feb 28 11:36:33
2007
@@ -19,8 +19,8 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,8 +46,8 @@
      * Different from a leave node, it has non-null children.
      */
     private class InnerNode extends NodeBase {
-        private HashMap<String, Node> children = 
-            new HashMap<String, Node>(); // maps a name to a node 
+        private ArrayList<Node> children=new ArrayList<Node>();
+        private int numOfLeaves;
         
         /** Construct an InnerNode from a path-like string */
         InnerNode( String path ) {
@@ -60,7 +60,7 @@
         }
         
         /** Get its children */
-        HashMap<String, Node> getChildren() {return children;}
+        Collection<Node> getChildren() {return children;}
         
         /** Return the number of children this node has */
         int getNumOfChildren() {
@@ -75,7 +75,7 @@
                 return true;
             }
             
-            Node firstChild = children.values().iterator().next();
+            Node firstChild = children.get(0);
             if(firstChild instanceof InnerNode) {
                 return false;
             }
@@ -89,7 +89,9 @@
          * @return true if this node is an ancestor of <i>n</i>
          */
         boolean isAncestor(Node n) {
-            return n.getNetworkLocation().startsWith(getPath());
+            return getPath().equals(NodeBase.PATH_SEPARATOR_STR) ||
+                   (n.getNetworkLocation()+NodeBase.PATH_SEPARATOR_STR).
+                    startsWith(getPath()+NodeBase.PATH_SEPARATOR_STR);
         }
         
         /** Judge if this node is the parent of node <i>n</i>
@@ -121,7 +123,7 @@
          * @param n node to be added
          * @return true if the node is added; false otherwise
          */
-        boolean add( Node n ) {
+        boolean add( DatanodeDescriptor n ) {
             String parent = n.getNetworkLocation();
             String currentPath = getPath();
             if( !isAncestor( n ) )
@@ -129,18 +131,36 @@
                         +parent+", is not a decendent of "+currentPath);
             if( isParent( n ) ) {
                 // this node is the parent of n; add n directly
-                return (null == children.put(n.getName(), n) );
+                for(int i=0; i<children.size(); i++) {
+                    if(children.get(i).getName().equals(n.getName())) {
+                        children.set(i, n);
+                        return false;
+                    }
+                }
+                children.add(n);
+                numOfLeaves++;
+                return true;
             } else {
                 // find the next ancestor node
                 String parentName = getNextAncestorName( n );
-                InnerNode parentNode = (InnerNode)children.get(parentName);
+                InnerNode parentNode = null;
+                for(int i=0; i<children.size(); i++) {
+                    if(children.get(i).getName().equals(parentName)) {
+                        parentNode = (InnerNode)children.get(i);
+                    }
+                }
                 if( parentNode == null ) {
                     // create a new InnerNode
                     parentNode = new InnerNode( parentName, currentPath );
-                    children.put(parentName, parentNode);
+                    children.add(parentNode);
                 }
                 // add n to the subtree of the next ancestor node
-                return parentNode.add(n);
+                if( parentNode.add(n) ) {
+                    numOfLeaves++;
+                    return true;
+                } else {
+                    return false;
+                }
             }
         }
         
@@ -148,19 +168,34 @@
          * @parameter n node to be deleted 
          * @return true if the node is deleted; false otherwise
          */
-        boolean remove( Node n ) {
+        boolean remove( DatanodeDescriptor n ) {
             String parent = n.getNetworkLocation();
             String currentPath = getPath();
             if(!isAncestor(n))
-                throw new IllegalArgumentException( n.getName()+", which is located at "
+                throw new IllegalArgumentException( n.getName()
+                        +", which is located at "
                         +parent+", is not a decendent of "+currentPath);
             if( isParent(n) ) {
                 // this node is the parent of n; remove n directly
-                return (n == children.remove(n.getName()));
+                for(int i=0; i<children.size(); i++) {
+                    if(children.get(i).getName().equals(n.getName())) {
+                        children.remove(i);
+                        numOfLeaves--;
+                        return true;
+                    }
+                }
+                return false;
             } else {
                 // find the next ancestor node: the parent node
                 String parentName = getNextAncestorName( n );
-                InnerNode parentNode = (InnerNode)children.get(parentName);
+                InnerNode parentNode = null;
+                int i;
+                for(i=0; i<children.size(); i++) {
+                    if(children.get(i).getName().equals(parentName)) {
+                        parentNode = (InnerNode)children.get(i);
+                        break;
+                    }
+                }
                 if(parentNode==null) {
                     throw new IllegalArgumentException( n.getName()
                             + ", which is located at "
@@ -169,8 +204,11 @@
                 // remove n from the parent node
                 boolean isRemoved = parentNode.remove( n );
                 // if the parent node has no children, remove the parent node too
-                if(parentNode.getNumOfChildren() == 0 ) {
-                    children.remove(parentName);
+                if(isRemoved) {
+                    if(parentNode.getNumOfChildren() == 0 ) {
+                        children.remove(i);
+                    }
+                    numOfLeaves--;
                 }
                 return isRemoved;
             }
@@ -179,8 +217,14 @@
         /** Given a node's string representation, return a reference to the node */ 
         Node getLoc( String loc ) {
             if( loc == null || loc.length() == 0 ) return this;
+            
             String[] path = loc.split(PATH_SEPARATOR_STR, 2);
-            Node childnode = children.get( path[0] );
+            Node childnode = null;
+            for(int i=0; i<children.size(); i++) {
+                if(children.get(i).getName().equals(path[0])) {
+                    childnode = children.get(i);
+                }
+            }
             if(childnode == null ) return null; // non-existing node
             if( path.length == 1 ) return childnode;
             if( childnode instanceof InnerNode ) {
@@ -190,22 +234,62 @@
             }
         }
         
-        /** Get all the data nodes belonged to the subtree of this node */
-        void getLeaves( Collection<DatanodeDescriptor> results ) {
-            for( Iterator<Node> iter = children.values().iterator();
-            iter.hasNext(); ) {
-                Node childNode = iter.next();
-                if( childNode instanceof InnerNode ) {
-                    ((InnerNode)childNode).getLeaves(results);
-                } else {
-                    results.add( (DatanodeDescriptor)childNode );
+        /** get <i>leaveIndex</i> leaf of this subtree 
+         * if it is not in the <i>excludedNode</i>*/
+        private DatanodeDescriptor getLeaf(int leaveIndex, Node excludedNode) {
+            int count=0;
+            int numOfExcludedLeaves = 1;
+            if( excludedNode instanceof InnerNode ) {
+                numOfExcludedLeaves = ((InnerNode)excludedNode).getNumOfLeaves();
+            }
+            if( isRack() ) { // children are leaves
+                // range check
+                if(leaveIndex<0 || leaveIndex>=this.getNumOfChildren()) {
+                    return null;
                 }
+                DatanodeDescriptor child =
+                    (DatanodeDescriptor)children.get(leaveIndex);
+                if(excludedNode == null || excludedNode != child) {
+                    // child is not the excludedNode
+                    return child;
+                } else { // child is the excludedNode so return the next child
+                    if(leaveIndex+1>=this.getNumOfChildren()) {
+                        return null;
+                    } else {
+                        return (DatanodeDescriptor)children.get(leaveIndex+1);
+                    }
+                }
+            } else {
+                for( int i=0; i<children.size(); i++ ) {
+                    InnerNode child = (InnerNode)children.get(i);
+                    if(excludedNode == null || excludedNode != child) {
+                        // not the excludedNode
+                        int numOfLeaves = child.getNumOfLeaves();
+                        if( excludedNode != null && child.isAncestor(excludedNode)
) {
+                            numOfLeaves -= numOfExcludedLeaves;
+                        }
+                        if( count+numOfLeaves > leaveIndex ) {
+                            // the leaf is in the child subtree
+                            return child.getLeaf(leaveIndex-count, excludedNode);
+                        } else {
+                            // go to the next child
+                            count = count+numOfLeaves;
+                        }
+                    } else { // it is the excluededNode
+                        // skip it and set the excludedNode to be null
+                        excludedNode = null;
+                    }
+                }
+                return null;
             }
         }
+        
+        int getNumOfLeaves() {
+            return numOfLeaves;
+        }
     } // end of InnerNode
     
-    InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); //the root of the tree
-    private int numOfLeaves = 0; // data nodes counter
+    InnerNode clusterMap = new InnerNode( InnerNode.ROOT ); // the root
     private int numOfRacks = 0;  // rack counter
     
     public NetworkTopology() {
@@ -215,7 +299,7 @@
      * Update data node counter & rack counter if neccessary
      * @param node
      *          data node to be added
-     * @exception IllegalArgumentException if add a data node to an existing leave
+     * @exception IllegalArgumentException if add a data node to a leave
      */
     public synchronized void add( DatanodeDescriptor node ) {
         if( node==null ) return;
@@ -227,7 +311,6 @@
                     + " at an illegal network location");
         }
         if( clusterMap.add( node) ) {
-            numOfLeaves++;
             if( rack == null ) {
                 numOfRacks++;
             }
@@ -244,7 +327,6 @@
         if( node==null ) return;
         LOG.info("Removing a node: "+node.getPath());
         if( clusterMap.remove( node ) ) {
-            numOfLeaves--;
             InnerNode rack = (InnerNode)getNode(node.getNetworkLocation());
             if(rack == null) {
                 numOfRacks--;
@@ -265,7 +347,7 @@
         return (rNode == node); 
     }
     
-    /** Given a string representation of a node, return the reference to the node
+    /** Given a string representation of a node, return its reference
      * 
      * @param loc
      *          a path-like string representation of a node
@@ -278,54 +360,14 @@
         return clusterMap.getLoc( loc );
     }
     
-    /* Add all the data nodes that belong to 
-     * the subtree of the node <i>loc</i> to <i>results</i>*/
-    private synchronized void getLeaves( String loc,
-            Collection<DatanodeDescriptor> results ) {
-        Node node = getNode(loc);
-        if( node instanceof InnerNode )
-            ((InnerNode)node).getLeaves(results);
-        else {
-            results.add((DatanodeDescriptor)node);
-        }
-    }
-    
-    /** Return all the data nodes that belong to the subtree of <i>loc</i>
-     * @param loc
-     *          a path-like string representation of a node
-     * @return an array of data nodes that belong to the subtree of <i>loc</i>
-     */
-    public synchronized DatanodeDescriptor[] getLeaves( String loc ) {
-        Collection<DatanodeDescriptor> results = new ArrayList<DatanodeDescriptor>();
-        getLeaves(loc, results);
-        return results.toArray(new DatanodeDescriptor[results.size()]);
-    }
-    
-    /** Return all the data nodes that belong to the subtrees of <i>locs</i>
-     * @param locs
-     *          a collection of strings representing nodes
-     * @return an array of data nodes that belong to subtrees of <i>locs</i>
-     */
-    public synchronized DatanodeDescriptor[] getLeaves(
-            Collection<String> locs ) {
-        Collection<DatanodeDescriptor> nodes = new ArrayList<DatanodeDescriptor>();
-        if( locs != null ) { 
-            Iterator<String> iter = locs.iterator();
-            while(iter.hasNext()) {
-                getLeaves( iter.next(), nodes );
-            }
-        }
-        return nodes.toArray(new DatanodeDescriptor[nodes.size()]);
-    }
-    
     /** Return the total number of racks */
-    public int getNumOfRacks( ) {
+    public synchronized int getNumOfRacks( ) {
         return numOfRacks;
     }
     
     /** Return the total number of data nodes */
-    public int getNumOfLeaves() {
-        return numOfLeaves;
+    public synchronized int getNumOfLeaves() {
+        return clusterMap.getNumOfLeaves();
     }
     
     private void checkArgument( DatanodeDescriptor node ) {
@@ -354,11 +396,7 @@
     public int getDistance(DatanodeDescriptor node1, DatanodeDescriptor node2 ) {
         checkArgument( node1 );
         checkArgument( node2 );
-        /*
-        if( !contains(node1) || !contains(node2) ) {
-            return Integer.MAX_VALUE;
-        }
-        */
+
         if( node1 == node2 || node1.equals(node2)) {
             return 0;
         }
@@ -398,6 +436,85 @@
         return location1.equals(location2);
     }
     
+    final private static Random r = new Random();
+    /** randomly choose one node from <i>scope</i>
+     * if scope starts with ~, choose one from the all datanodes except for the
+     * ones in <i>scope</i>; otherwise, choose one from <i>scope</i>
+     * @param scope range of datanodes from which a node will be choosen
+     * @return the choosen data node
+     */
+    public DatanodeDescriptor chooseRandom(String scope) {
+        if(scope.startsWith("~")) {
+            return chooseRandom(NodeBase.ROOT, scope.substring(1));
+        } else {
+            return chooseRandom(scope, null);
+        }
+    }
+    
+    private DatanodeDescriptor chooseRandom(String scope, String excludedScope){
+        if(excludedScope != null) {
+            if(scope.startsWith(excludedScope)) {
+                return null;
+            }
+            if(!excludedScope.startsWith(scope)) {
+                excludedScope = null;
+            }
+        }
+        Node node = getNode(scope);
+        if(node instanceof DatanodeDescriptor) {
+            return (DatanodeDescriptor)node;
+        }
+       InnerNode innerNode = (InnerNode)node;
+       int numOfDatanodes = innerNode.getNumOfLeaves();
+       if(excludedScope == null) {
+           node = null;
+       } else {
+           node = getNode(excludedScope);
+           if(node instanceof DatanodeDescriptor) {
+               numOfDatanodes -= 1;
+           } else {
+               numOfDatanodes -= ((InnerNode)node).getNumOfLeaves();
+           }
+       }
+       int leaveIndex = r.nextInt(numOfDatanodes);
+       return innerNode.getLeaf(leaveIndex, node);
+    }
+       
+    /** return the number of leaves in <i>scope</i> but not in <i>excludedNodes</i>
+     * if scope starts with ~, return the number of datanodes that are not
+     * in <i>scope</i> and <i>excludedNodes</i>; 
+     * @param scope a path string that may start with ~
+     * @param excludedNodes a list of data nodes
+     * @return number of available data nodes
+     */
+    public int countNumOfAvailableNodes(String scope,
+            List<DatanodeDescriptor> excludedNodes) {
+        boolean isExcluded=false;
+        if(scope.startsWith("~")) {
+            isExcluded=true;
+            scope=scope.substring(1);
+        }
+        scope = NodeBase.normalize(scope);
+        int count=0; // the number of nodes in both scope & excludedNodes
+        for( DatanodeDescriptor node:excludedNodes) {
+            if( (node.getPath()+NodeBase.PATH_SEPARATOR_STR).
+                    startsWith(scope+NodeBase.PATH_SEPARATOR_STR)) {
+                count++;
+            }
+        }
+        Node n=getNode(scope);
+        int scopeNodeCount=1;
+        if(n instanceof InnerNode) {
+            scopeNodeCount=((InnerNode)n).getNumOfLeaves();
+        }
+        if(isExcluded) {
+            return clusterMap.getNumOfLeaves()-
+                scopeNodeCount-excludedNodes.size()+count;
+        } else {
+            return scopeNodeCount-count;
+        }
+    }
+    
     /** convert a network tree to a string */
     public String toString() {
         // print the number of racks
@@ -406,18 +523,13 @@
         tree.append( numOfRacks );
         tree.append( "\n" );
         // print the number of leaves
+        int numOfLeaves = getNumOfLeaves();
         tree.append( "Expected number of leaves:" );
         tree.append( numOfLeaves );
         tree.append( "\n" );
-        // get all datanodes
-        DatanodeDescriptor[] datanodes = getLeaves( NodeBase.ROOT );
-        // print the number of leaves
-        tree.append( "Actual number of leaves:" );
-        tree.append( datanodes.length );
-        tree.append( "\n" );
         // print datanodes
-        for( int i=0; i<datanodes.length; i++ ) {
-            tree.append( datanodes[i].getPath() );
+        for( int i=0; i<numOfLeaves; i++ ) {
+            tree.append( clusterMap.getLeaf(i, null).getPath() );
             tree.append( "\n");
         }
         return tree.toString();

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java?view=diff&rev=512924&r1=512923&r2=512924
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java Wed Feb
28 11:36:33 2007
@@ -1,6 +1,5 @@
 package org.apache.hadoop.dfs;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -16,7 +15,7 @@
   private static final Configuration CONF = new Configuration();
   private static final NetworkTopology cluster;
   private static NameNode namenode;
-  private static FSNamesystem.Replicator replicator;
+  private static FSNamesystem.ReplicationTargetChooser replicator;
   private static DatanodeDescriptor dataNodes[] = 
          new DatanodeDescriptor[] {
     new DatanodeDescriptor(new DatanodeID("h1:5020", "0", -1), "/d1/r1"),
@@ -33,6 +32,7 @@
   static {
     try {
       CONF.set("fs.default.name", "localhost:8020");
+      NameNode.format(CONF);
       namenode = new NameNode(CONF);
     } catch (IOException e) {
       // TODO Auto-generated catch block
@@ -61,6 +61,10 @@
    * @throws Exception
    */
   public void testChooseTarget1() throws Exception {
+    dataNodes[0].updateHeartbeat(
+              2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+              FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
+
     DatanodeDescriptor[] targets;
     targets = replicator.chooseTarget(
         0, dataNodes[0], null, BLOCK_SIZE);
@@ -91,6 +95,10 @@
     assertTrue(cluster.isOnSameRack(targets[0], targets[1]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[2]));
     assertFalse(cluster.isOnSameRack(targets[0], targets[3]));
+
+    dataNodes[0].updateHeartbeat(
+        2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
+        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0); 
   }
 
   /**
@@ -161,8 +169,8 @@
     // make data node 0 to be not qualified to choose
     dataNodes[0].updateHeartbeat(
         2*FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 
-        FSConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 4); // overloaded
-      
+        (FSConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0); // no space
+        
     DatanodeDescriptor[] targets;
     targets = replicator.chooseTarget(
         0, dataNodes[0], null, BLOCK_SIZE);

Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java?view=diff&rev=512924&r1=512923&r2=512924
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/net/TestNetworkTopology.java Wed Feb 28
11:36:33 2007
@@ -1,6 +1,6 @@
 package org.apache.hadoop.net;
 
-import java.util.HashSet;
+
 import org.apache.hadoop.dfs.DatanodeDescriptor;
 import org.apache.hadoop.dfs.DatanodeID;
 import junit.framework.TestCase;
@@ -40,20 +40,6 @@
     assertEquals(cluster.getNumOfRacks(), 3);
   }
   
-  public void testGetLeaves() throws Exception {
-    DatanodeDescriptor [] leaves = cluster.getLeaves(NodeBase.ROOT);
-    assertEquals(leaves.length, dataNodes.length);
-    HashSet<DatanodeDescriptor> set1 = 
-      new HashSet<DatanodeDescriptor>(leaves.length);
-    HashSet<DatanodeDescriptor> set2 = 
-      new HashSet<DatanodeDescriptor>(dataNodes.length);
-    for(int i=0; i<leaves.length; i++) {
-      set1.add(leaves[i]);
-      set2.add(dataNodes[i]);
-    }
-    assertTrue(set1.equals(set2));
-  }
-  
   public void testGetDistance() throws Exception {
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[0]), 0);
     assertEquals(cluster.getDistance(dataNodes[0], dataNodes[1]), 2);
@@ -73,6 +59,4 @@
       cluster.add( dataNodes[i] );
     }
   }
-
-
 }



Mime
View raw message