hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r533966 [1/2] - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Tue, 01 May 2007 08:39:51 GMT
Author: tomwhite
Date: Tue May  1 01:39:50 2007
New Revision: 533966

URL: http://svn.apache.org/viewvc?view=rev&rev=533966
Log:
HADOOP-1272.  Extract inner classes from FSNamesystem into separate classes.  Contributed by Dhruba Borthakur.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FsckServlet.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/GetImageServlet.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UnderReplicatedBlocks.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestHost2NodesMap.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestReplicationPolicy.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=533966&r1=533965&r2=533966
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May  1 01:39:50 2007
@@ -294,6 +294,9 @@
 87. HADOOP-1290.  Move contrib/abacus into mapred/lib/aggregate.
     (Runping Qi via cutting)
 
+88. HADOOP-1272.  Extract inner classes from FSNamesystem into separate 
+    classes.  (Dhruba Borthakur via tomwhite)
+
 
 Release 0.12.3 - 2007-04-06
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java?view=auto&rev=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlocksMap.java Tue May  1 01:39:50 2007
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+/**
+ * This class maintains the map from a block to its metadata.
+ * block's metadata currently includes INode it belongs to and
+ * the datanodes that store the block.
+ */
+class BlocksMap {
+        
+  /**
+   * Internal class for block metadata.
+   */
+  class BlockInfo {
+    private FSDirectory.INode              inode;
+      
+    /** nodes could contain some null entries at the end, so 
+     *  nodes.legth >= number of datanodes. 
+     *  if nodes != null then nodes[0] != null.
+     */
+    private DatanodeDescriptor[]           nodes;
+    private Block                          block; //block that was inserted.   
+  }
+      
+  private class NodeIterator implements Iterator<DatanodeDescriptor> {
+    NodeIterator(DatanodeDescriptor[] nodes) {
+      arr = nodes;
+    }
+    private DatanodeDescriptor[] arr;
+    private int nextIdx = 0;
+      
+    public boolean hasNext() {
+      return arr != null && nextIdx < arr.length && arr[nextIdx] != null;
+    }
+      
+    public DatanodeDescriptor next() {
+      return arr[nextIdx++];
+    }
+      
+    public void remove()  {
+      throw new UnsupportedOperationException("Sorry. can't remove.");
+    }
+  }
+      
+  private Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
+      
+  /** add BlockInfo if mapping does not exist. */
+  private BlockInfo checkBlockInfo(Block b) {
+    BlockInfo info = map.get(b);
+    if (info == null) {
+      info = new BlockInfo();
+      info.block = b;
+      map.put(b, info);
+    }
+    return info;
+  }
+      
+  public FSDirectory.INode getINode(Block b) {
+    BlockInfo info = map.get(b);
+    return (info != null) ? info.inode : null;
+  }
+          
+  public void addINode(Block b, FSDirectory.INode iNode) {
+    BlockInfo info = checkBlockInfo(b);
+    info.inode = iNode;
+  }
+    
+  public void removeINode(Block b) {
+    BlockInfo info = map.get(b);
+    if (info != null) {
+      info.inode = null;
+      if (info.nodes == null) {
+        map.remove(b);
+      }
+    }
+  }
+      
+  /** Returns the block object it it exists in the map. */
+  public Block getStoredBlock(Block b) {
+    BlockInfo info = map.get(b);
+    return (info != null) ? info.block : null;
+  }
+    
+  /** Returned Iterator does not support. */
+  public Iterator<DatanodeDescriptor> nodeIterator(Block b) {
+    BlockInfo info = map.get(b);
+    return new NodeIterator((info != null) ? info.nodes : null);
+  }
+    
+  /** counts number of containing nodes. Better than using iterator. */
+  public int numNodes(Block b) {
+    int count = 0;
+    BlockInfo info = map.get(b);
+    if (info != null && info.nodes != null) {
+      count = info.nodes.length;
+      while (info.nodes[ count-1 ] == null) {// mostly false
+        count--;
+      }
+    }
+    return count;
+  }
+      
+  /** returns true if the node does not already exists and is added.
+   * false if the node already exists.*/
+  public boolean addNode(Block b, 
+                         DatanodeDescriptor node,
+                         int replicationHint) {
+    BlockInfo info = checkBlockInfo(b);
+    if (info.nodes == null) {
+      info.nodes = new DatanodeDescriptor[ replicationHint ];
+    }
+      
+    DatanodeDescriptor[] arr = info.nodes;
+    for(int i=0; i < arr.length; i++) {
+      if (arr[i] == null) {
+        arr[i] = node;
+        return true;
+      }
+      if (arr[i] == node) {
+        return false;
+      }
+    }
+
+    /* Not enough space left. Create a new array. Should normally 
+     * happen only when replication is manually increased by the user. */
+    info.nodes = new DatanodeDescriptor[ arr.length + 1 ];
+    for(int i=0; i < arr.length; i++) {
+      info.nodes[i] = arr[i];
+    }
+    info.nodes[ arr.length ] = node;
+    return true;
+  }
+    
+  public boolean removeNode(Block b, DatanodeDescriptor node) {
+    BlockInfo info = map.get(b);
+    if (info == null || info.nodes == null) {
+      return false;
+    }
+
+    boolean removed = false;
+    // swap lastNode and node's location. set lastNode to null.
+    DatanodeDescriptor[] arr = info.nodes;
+    int lastNode = -1;
+    for(int i=arr.length-1; i >= 0; i--) {
+      if (lastNode < 0 && arr[i] != null) {
+        lastNode = i;
+      }
+      if (arr[i] == node) {
+        arr[i] = arr[ lastNode ];
+        arr[ lastNode ] = null;
+        removed = true;
+        break;
+      }
+    }
+        
+    /*
+     * if ((lastNode + 1) < arr.length/4) {
+     *    we could trim the array.
+     * } 
+     */
+    if (arr[0] == null) { // no datanodes left.
+      info.nodes = null;
+      if (info.inode == null) {
+        map.remove(b);
+      }
+    }
+    return removed;
+  }
+
+  public int size() {
+    return map.size();
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=533966&r1=533965&r2=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Tue May  1 01:39:50 2007
@@ -32,12 +32,6 @@
 import java.util.*;
 import java.lang.UnsupportedOperationException;
 
-import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * DataNode.
@@ -199,6 +193,7 @@
   private String localMachine;
   private int port;
   private SafeModeInfo safeMode;  // safe mode information
+  private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
     
   // datanode networktoplogy
   NetworkTopology clusterMap = new NetworkTopology();
@@ -217,7 +212,10 @@
                       NameNode nn, Configuration conf) throws IOException {
     fsNamesystemObject = this;
     this.replicator = new ReplicationTargetChooser(
-                                                   conf.getBoolean("dfs.replication.considerLoad", true));
+                                                   conf.getBoolean("dfs.replication.considerLoad", true),
+                                                   this,
+                                                   clusterMap,
+                                                   LOG);
     this.defaultReplication = conf.getInt("dfs.replication", 3);
     this.maxReplication = conf.getInt("dfs.replication.max", 512);
     this.minReplication = conf.getInt("dfs.replication.min", 1);
@@ -406,193 +404,15 @@
     }
   }
 
-  /* Class for keeping track of under replication blocks
-   * Blocks have replication priority, with priority 0 indicating the highest
-   * Blocks have only one replicas has the highest
-   */
-  private class UnderReplicatedBlocks {
-    private static final int LEVEL = 3;
-    List<TreeSet<Block>> priorityQueues = new ArrayList<TreeSet<Block>>();
-        
-    /* constructor */
-    UnderReplicatedBlocks() {
-      for(int i=0; i<LEVEL; i++) {
-        priorityQueues.add(new TreeSet<Block>());
-      }
-    }
-        
-    /* Return the total number of under replication blocks */
-    synchronized int size() {
-      int size = 0;
-      for(int i=0; i<LEVEL; i++) {
-        size += priorityQueues.get(i).size();
-      }
-      return size;
-    }
-        
-    /* Check if a block is in the neededReplication queue */
-    synchronized boolean contains(Block block) {
-      for(TreeSet<Block> set:priorityQueues) {
-        if (set.contains(block)) return true;
-      }
-      return false;
-    }
-        
-    /* Return the priority of a block
-     * @param block a under replication block
-     * @param curReplicas current number of replicas of the block
-     * @param expectedReplicas expected number of replicas of the block
-     */
-    private int getPriority(Block block, 
-                            int curReplicas, int expectedReplicas) {
-      if (curReplicas<=0 || curReplicas>=expectedReplicas) {
-        return LEVEL; // no need to replicate
-      } else if (curReplicas==1) {
-        return 0; // highest priority
-      } else if (curReplicas*3<expectedReplicas) {
-        return 1;
-      } else {
-        return 2;
-      }
-    }
-        
-    /* add a block to a under replication queue according to its priority
-     * @param block a under replication block
-     * @param curReplicas current number of replicas of the block
-     * @param expectedReplicas expected number of replicas of the block
-     */
-    synchronized boolean add(
-                             Block block, int curReplicas, int expectedReplicas) {
-      if (curReplicas<=0 || expectedReplicas <= curReplicas) {
-        return false;
-      }
-      int priLevel = getPriority(block, curReplicas, expectedReplicas);
-      if (priorityQueues.get(priLevel).add(block)) {
-        NameNode.stateChangeLog.debug(
-                                      "BLOCK* NameSystem.UnderReplicationBlock.add:"
-                                      + block.getBlockName()
-                                      + " has only "+curReplicas
-                                      + " replicas and need " + expectedReplicas
-                                      + " replicas so is added to neededReplications"
-                                      + " at priority level " + priLevel);
-        return true;
-      }
-      return false;
-    }
-
-    /* add a block to a under replication queue */
-    synchronized boolean add(Block block) {
-      int expectedReplicas = getReplication(block);
-      return add(block,
-                 countContainingNodes(block),
-                 expectedReplicas);
-    }
-        
-    /* remove a block from a under replication queue */
-    synchronized boolean remove(Block block, 
-                                int oldReplicas, int oldExpectedReplicas) {
-      int priLevel = getPriority(block, oldReplicas, oldExpectedReplicas);
-      return remove(block, priLevel);
-    }
-        
-    /* remove a block from a under replication queue given a priority*/
-    private boolean remove(Block block, int priLevel) {
-      if (priLevel >= 0 && priLevel < LEVEL 
-          && priorityQueues.get(priLevel).remove(block)) {
-        NameNode.stateChangeLog.debug(
-                                      "BLOCK* NameSystem.UnderReplicationBlock.remove: "
-                                      + "Removing block " + block.getBlockName()
-                                      + " from priority queue "+ priLevel);
-        return true;
-      } else {
-        for(int i=0; i<LEVEL; i++) {
-          if (i!=priLevel && priorityQueues.get(i).remove(block)) {
-            NameNode.stateChangeLog.debug(
-                                          "BLOCK* NameSystem.UnderReplicationBlock.remove: "
-                                          + "Removing block " + block.getBlockName()
-                                          + " from priority queue "+ i);
-            return true;
-          }
-        }
-      }
-      return false;
-    }
-        
-    /* remove a block from a under replication queue */
-    synchronized boolean remove(Block block) {
-      int curReplicas = countContainingNodes(block);
-      int expectedReplicas = getReplication(block);
-      return remove(block, curReplicas, expectedReplicas);
-    }
-        
-    /* update the priority level of a block */
-    synchronized void update(Block block,
-                             int curReplicasDelta, int expectedReplicasDelta) {
-      int curReplicas = countContainingNodes(block);
-      int curExpectedReplicas = getReplication(block);
-      int oldReplicas = curReplicas-curReplicasDelta;
-      int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-      int curPri = getPriority(block, curReplicas, curExpectedReplicas);
-      int oldPri = getPriority(block, oldReplicas, oldExpectedReplicas);
-      NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
-                                    block +
-                                    " curReplicas " + curReplicas +
-                                    " curExpectedReplicas " + curExpectedReplicas +
-                                    " oldReplicas " + oldReplicas +
-                                    " oldExpectedReplicas  " + oldExpectedReplicas +
-                                    " curPri  " + curPri +
-                                    " oldPri  " + oldPri);
-      if (oldPri != LEVEL && oldPri != curPri) {
-        remove(block, oldPri);
-      }
-      if (curPri != LEVEL && oldPri != curPri 
-          && priorityQueues.get(curPri).add(block)) {
-        NameNode.stateChangeLog.debug(
-                                      "BLOCK* NameSystem.UnderReplicationBlock.update:"
-                                      + block.getBlockName()
-                                      + " has only "+curReplicas
-                                      + " replicas and need " + curExpectedReplicas
-                                      + " replicas so is added to neededReplications"
-                                      + " at priority level " + curPri);
-      }
-    }
-        
-    /* return a iterator of all the under replication blocks */
-    synchronized Iterator<Block> iterator() {
-      return new Iterator<Block>() {
-        int level;
-        List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
-                
-        {
-          level=0;
-          for(int i=0; i<LEVEL; i++) {
-            iterators.add(priorityQueues.get(i).iterator());
-          }
-        }
-                
-        private void update() {
-          while(level< LEVEL-1 && !iterators.get(level).hasNext() ) {
-            level++;
-          }
-        }
-                
-        public Block next() {
-          update();
-          return iterators.get(level).next();
-        }
-                
-        public boolean hasNext() {
-          update();
-          return iterators.get(level).hasNext();
-        }
-                
-        public void remove() {
-          iterators.get(level).remove();
-        }
-      };
-    }
+  /* updates a block in under replication queue */
+  synchronized void updateNeededReplications(Block block,
+                        int curReplicasDelta, int expectedReplicasDelta) {
+    int curReplicas = countContainingNodes( block );
+    int curExpectedReplicas = getReplication(block);
+    neededReplications.update(block, curReplicas, curExpectedReplicas,
+                                     curReplicasDelta, expectedReplicasDelta);
   }
-    
+
   /////////////////////////////////////////////////////////
   //
   // These methods are called by HadoopFS clients
@@ -670,7 +490,7 @@
     LOG.info("Increasing replication for file " + src 
              + ". New replication is " + replication);
     for(int idx = 0; idx < fileBlocks.length; idx++)
-      neededReplications.update(fileBlocks[idx], 0, replication-oldRepl);
+      updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
       
     if (oldRepl > replication) {  
       // old replication > the new one; need to remove copies
@@ -1929,7 +1749,9 @@
     if (timedOutItems != null) {
       synchronized (this) {
         for (int i = 0; i < timedOutItems.length; i++) {
-          neededReplications.add(timedOutItems[i]);
+          neededReplications.add(timedOutItems[i], 
+                                 countContainingNodes(timedOutItems[i]),
+                                 getReplication(timedOutItems[i]));
         }
       }
     }
@@ -2254,9 +2076,9 @@
     // handle underReplication/overReplication
     short fileReplication = fileINode.getReplication();
     if (numCurrentReplica >= fileReplication) {
-      neededReplications.remove(block);
+      neededReplications.remove(block, numCurrentReplica, fileReplication);
     } else {
-      neededReplications.update(block, curReplicaDelta, 0);
+      updateNeededReplications(block, curReplicaDelta, 0);
     }
     if (numCurrentReplica > fileReplication) {
       proccessOverReplicatedBlock(block, fileReplication);
@@ -2363,7 +2185,7 @@
     //
     FSDirectory.INode fileINode = blocksMap.getINode(block);
     if (fileINode != null) {
-      neededReplications.update(block, -1, 0);
+      updateNeededReplications(block, -1, 0);
     }
 
     //
@@ -2496,7 +2318,7 @@
       // replicated.
       Block decommissionBlocks[] = node.getBlocks();
       for (int j = 0; j < decommissionBlocks.length; j++) {
-        neededReplications.update(decommissionBlocks[j], -1, 0);
+        updateNeededReplications(decommissionBlocks[j], -1, 0);
       }
     }
   }
@@ -2852,447 +2674,7 @@
     }
   }
   
-  /** The class is responsible for choosing the desired number of targets
-   * for placing block replicas.
-   * The replica placement strategy is that if the writer is on a datanode,
-   * the 1st replica is placed on the local machine, 
-   * otherwise a random datanode. The 2nd replica is placed on a datanode
-   * that is on a different rack. The 3rd replica is placed on a datanode
-   * which is on the same rack as the first replca.
-   * @author hairong
-   *
-   */
-  class ReplicationTargetChooser {
-    final boolean considerLoad; 
-      
-    ReplicationTargetChooser(boolean considerLoad) {
-      this.considerLoad = considerLoad;
-    }
-      
-    private class NotEnoughReplicasException extends Exception {
-      NotEnoughReplicasException(String msg) {
-        super(msg);
-      }
-    }
-      
-    /**
-     * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
-     * a block with size <i>blocksize</i> 
-     * If not, return as many as we can.
-     * 
-     * @param numOfReplicas: number of replicas wanted.
-     * @param writer: the writer's machine, null if not in the cluster.
-     * @param excludedNodes: datanodesthat should not be considered targets.
-     * @param blocksize: size of the data to be written.
-     * @return array of DatanodeDescriptor instances chosen as targets
-     * and sorted as a pipeline.
-     */
-    DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                      DatanodeDescriptor writer,
-                                      List<DatanodeDescriptor> excludedNodes,
-                                      long blocksize) {
-      if (excludedNodes == null) {
-        excludedNodes = new ArrayList<DatanodeDescriptor>();
-      }
-        
-      return chooseTarget(numOfReplicas, writer, 
-                          new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
-    }
-      
-    /**
-     * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
-     * to re-replicate a block with size <i>blocksize</i> 
-     * If not, return as many as we can.
-     * 
-     * @param numOfReplicas: additional number of replicas wanted.
-     * @param writer: the writer's machine, null if not in the cluster.
-     * @param choosenNodes: datanodes that have been choosen as targets.
-     * @param excludedNodes: datanodesthat should not be considered targets.
-     * @param blocksize: size of the data to be written.
-     * @return array of DatanodeDescriptor instances chosen as target 
-     * and sorted as a pipeline.
-     */
-    DatanodeDescriptor[] chooseTarget(int numOfReplicas,
-                                      DatanodeDescriptor writer,
-                                      List<DatanodeDescriptor> choosenNodes,
-                                      List<DatanodeDescriptor> excludedNodes,
-                                      long blocksize) {
-      if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-        return new DatanodeDescriptor[0];
-      }
-        
-      if (excludedNodes == null) {
-        excludedNodes = new ArrayList<DatanodeDescriptor>();
-      }
-        
-      int clusterSize = clusterMap.getNumOfLeaves();
-      int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
-      if (totalNumOfReplicas > clusterSize) {
-        numOfReplicas -= (totalNumOfReplicas-clusterSize);
-        totalNumOfReplicas = clusterSize;
-      }
-        
-      int maxNodesPerRack = 
-        (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
-        
-      List<DatanodeDescriptor> results = 
-        new ArrayList<DatanodeDescriptor>(choosenNodes);
-      excludedNodes.addAll(choosenNodes);
-        
-      if (!clusterMap.contains(writer))
-        writer=null;
-        
-      DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                  excludedNodes, blocksize, maxNodesPerRack, results);
-        
-      results.removeAll(choosenNodes);
-        
-      // sorting nodes to form a pipeline
-      return getPipeline((writer==null)?localNode:writer,
-                         results.toArray(new DatanodeDescriptor[results.size()]));
-    }
-      
-    /* choose <i>numOfReplicas</i> from all data nodes */
-    private DatanodeDescriptor chooseTarget(int numOfReplicas,
-                                            DatanodeDescriptor writer,
-                                            List<DatanodeDescriptor> excludedNodes,
-                                            long blocksize,
-                                            int maxNodesPerRack,
-                                            List<DatanodeDescriptor> results) {
-        
-      if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
-        return writer;
-      }
-        
-      int numOfResults = results.size();
-      if (writer == null && (numOfResults==1 || numOfResults==2)) {
-        writer = results.get(0);
-      }
-        
-      try {
-        switch(numOfResults) {
-        case 0:
-          writer = chooseLocalNode(writer, excludedNodes, 
-                                   blocksize, maxNodesPerRack, results);
-          if (--numOfReplicas == 0) break;
-        case 1:
-          chooseRemoteRack(1, writer, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
-          if (--numOfReplicas == 0) break;
-        case 2:
-          if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
-            chooseRemoteRack(1, writer, excludedNodes,
-                             blocksize, maxNodesPerRack, results);
-          } else {
-            chooseLocalRack(writer, excludedNodes, 
-                            blocksize, maxNodesPerRack, results);
-          }
-          if (--numOfReplicas == 0) break;
-        default:
-          chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                       blocksize, maxNodesPerRack, results);
-        }
-      } catch (NotEnoughReplicasException e) {
-        LOG.warn("Not able to place enough replicas, still in need of "
-                 + numOfReplicas);
-      }
-      return writer;
-    }
-      
-    /* choose <i>localMachine</i> as the target.
-     * if <i>localMachine</i> is not availabe, 
-     * choose a node on the same rack
-     * @return the choosen node
-     */
-    private DatanodeDescriptor chooseLocalNode(
-                                               DatanodeDescriptor localMachine,
-                                               List<DatanodeDescriptor> excludedNodes,
-                                               long blocksize,
-                                               int maxNodesPerRack,
-                                               List<DatanodeDescriptor> results)
-      throws NotEnoughReplicasException {
-      // if no local machine, randomly choose one node
-      if (localMachine == null)
-        return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                            blocksize, maxNodesPerRack, results);
-        
-      // otherwise try local machine first
-      if (!excludedNodes.contains(localMachine)) {
-        excludedNodes.add(localMachine);
-        if (isGoodTarget(localMachine, blocksize,
-                         maxNodesPerRack, false, results)) {
-          results.add(localMachine);
-          return localMachine;
-        }
-      } 
-        
-      // try a node on local rack
-      return chooseLocalRack(localMachine, excludedNodes, 
-                             blocksize, maxNodesPerRack, results);
-    }
-      
-    /* choose one node from the rack that <i>localMachine</i> is on.
-     * if no such node is availabe, choose one node from the rack where
-     * a second replica is on.
-     * if still no such node is available, choose a random node 
-     * in the cluster.
-     * @return the choosen node
-     */
-    private DatanodeDescriptor chooseLocalRack(
-                                               DatanodeDescriptor localMachine,
-                                               List<DatanodeDescriptor> excludedNodes,
-                                               long blocksize,
-                                               int maxNodesPerRack,
-                                               List<DatanodeDescriptor> results)
-      throws NotEnoughReplicasException {
-      // no local machine, so choose a random machine
-      if (localMachine == null) {
-        return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                            blocksize, maxNodesPerRack, results);
-      }
-        
-      // choose one from the local rack
-      try {
-        return chooseRandom(
-                            localMachine.getNetworkLocation(),
-                            excludedNodes, blocksize, maxNodesPerRack, results);
-      } catch (NotEnoughReplicasException e1) {
-        // find the second replica
-        DatanodeDescriptor newLocal=null;
-        for(Iterator<DatanodeDescriptor> iter=results.iterator();
-            iter.hasNext();) {
-          DatanodeDescriptor nextNode = iter.next();
-          if (nextNode != localMachine) {
-            newLocal = nextNode;
-            break;
-          }
-        }
-        if (newLocal != null) {
-          try {
-            return chooseRandom(
-                                newLocal.getNetworkLocation(),
-                                excludedNodes, blocksize, maxNodesPerRack, results);
-          } catch(NotEnoughReplicasException e2) {
-            //otherwise randomly choose one from the network
-            return chooseRandom(NodeBase.ROOT, excludedNodes,
-                                blocksize, maxNodesPerRack, results);
-          }
-        } else {
-          //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
-        }
-      }
-    }
-      
-    /* choose <i>numOfReplicas</i> nodes from the racks 
-     * that <i>localMachine</i> is NOT on.
-     * if not enough nodes are availabe, choose the remaining ones 
-     * from the local rack
-     */
-      
-    private void chooseRemoteRack(int numOfReplicas,
-                                  DatanodeDescriptor localMachine,
-                                  List<DatanodeDescriptor> excludedNodes,
-                                  long blocksize,
-                                  int maxReplicasPerRack,
-                                  List<DatanodeDescriptor> results)
-      throws NotEnoughReplicasException {
-      int oldNumOfReplicas = results.size();
-      // randomly choose one node from remote racks
-      try {
-        chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                     excludedNodes, blocksize, maxReplicasPerRack, results);
-      } catch (NotEnoughReplicasException e) {
-        chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-                     localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                     maxReplicasPerRack, results);
-      }
-    }
-      
-    /* Randomly choose one target from <i>nodes</i>.
-     * @return the choosen node
-     */
-    private DatanodeDescriptor chooseRandom(
-                                            String nodes,
-                                            List<DatanodeDescriptor> excludedNodes,
-                                            long blocksize,
-                                            int maxNodesPerRack,
-                                            List<DatanodeDescriptor> results) 
-      throws NotEnoughReplicasException {
-      DatanodeDescriptor result;
-      do {
-        DatanodeDescriptor[] selectedNodes = 
-          chooseRandom(1, nodes, excludedNodes);
-        if (selectedNodes.length == 0) {
-          throw new NotEnoughReplicasException(
-                                               "Not able to place enough replicas");
-        }
-        result = (DatanodeDescriptor)(selectedNodes[0]);
-      } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results));
-      results.add(result);
-      return result;
-    }
-      
-    /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
-     */
-    private void chooseRandom(int numOfReplicas,
-                              String nodes,
-                              List<DatanodeDescriptor> excludedNodes,
-                              long blocksize,
-                              int maxNodesPerRack,
-                              List<DatanodeDescriptor> results)
-      throws NotEnoughReplicasException {
-      boolean toContinue = true;
-      do {
-        DatanodeDescriptor[] selectedNodes = 
-          chooseRandom(numOfReplicas, nodes, excludedNodes);
-        if (selectedNodes.length < numOfReplicas) {
-          toContinue = false;
-        }
-        for(int i=0; i<selectedNodes.length; i++) {
-          DatanodeDescriptor result = (DatanodeDescriptor)(selectedNodes[i]);
-          if (isGoodTarget(result, blocksize, maxNodesPerRack, results)) {
-            numOfReplicas--;
-            results.add(result);
-          }
-        } // end of for
-      } while (numOfReplicas>0 && toContinue);
-        
-      if (numOfReplicas>0) {
-        throw new NotEnoughReplicasException(
-                                             "Not able to place enough replicas");
-      }
-    }
-      
-    /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
-     * @return the choosen nodes
-     */
-    private DatanodeDescriptor[] chooseRandom(int numOfReplicas, 
-                                              String nodes,
-                                              List<DatanodeDescriptor> excludedNodes) {
-      List<DatanodeDescriptor> results = 
-        new ArrayList<DatanodeDescriptor>();
-      int numOfAvailableNodes =
-        clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
-      numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
-        numOfAvailableNodes:numOfReplicas;
-      while(numOfReplicas > 0) {
-        DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes);
-        if (!excludedNodes.contains(choosenNode)) {
-          results.add(choosenNode);
-          excludedNodes.add(choosenNode);
-          numOfReplicas--;
-        }
-      }
-      return (DatanodeDescriptor[])results.toArray(
-                                                   new DatanodeDescriptor[results.size()]);    
-    }
-      
-    /* judge if a node is a good target.
-     * return true if <i>node</i> has enough space, 
-     * does not have too much load, and the rack does not have too many nodes
-     */
-    private boolean isGoodTarget(DatanodeDescriptor node,
-                                 long blockSize, int maxTargetPerLoc,
-                                 List<DatanodeDescriptor> results) {
-      return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                          this.considerLoad, results);
-    }
-      
-    private boolean isGoodTarget(DatanodeDescriptor node,
-                                 long blockSize, int maxTargetPerLoc,
-                                 boolean considerLoad,
-                                 List<DatanodeDescriptor> results) {
-        
-      // check if the node is (being) decommissed
-      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-        LOG.debug("Node "+node.getPath()+
-                  " is not chosen because the node is (being) decommissioned");
-        return false;
-      }
-
-      // check the remaining capacity of the target machine
-      if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) {
-        LOG.debug("Node "+node.getPath()+
-                  " is not chosen because the node does not have enough space");
-        return false;
-      }
-        
-      // check the communication traffic of the target machine
-      if (considerLoad) {
-        double avgLoad = 0;
-        int size = clusterMap.getNumOfLeaves();
-        if (size != 0) {
-          avgLoad = (double)totalLoad()/size;
-        }
-        if (node.getXceiverCount() > (2.0 * avgLoad)) {
-          LOG.debug("Node "+node.getPath()+
-                    " is not chosen because the node is too busy");
-          return false;
-        }
-      }
-        
-      // check if the target rack has chosen too many nodes
-      String rackname = node.getNetworkLocation();
-      int counter=1;
-      for(Iterator<DatanodeDescriptor> iter = results.iterator();
-          iter.hasNext();) {
-        DatanodeDescriptor result = iter.next();
-        if (rackname.equals(result.getNetworkLocation())) {
-          counter++;
-        }
-      }
-      if (counter>maxTargetPerLoc) {
-        LOG.debug("Node "+node.getPath()+
-                  " is not chosen because the rack has too many chosen nodes");
-        return false;
-      }
-      return true;
-    }
-      
-    /* Return a pipeline of nodes.
-     * The pipeline is formed finding a shortest path that 
-     * starts from the writer and tranverses all <i>nodes</i>
-     * This is basically a traveling salesman problem.
-     */
-    private DatanodeDescriptor[] getPipeline(
-                                             DatanodeDescriptor writer,
-                                             DatanodeDescriptor[] nodes) {
-      if (nodes.length==0) return nodes;
-        
-      synchronized(clusterMap) {
-        int index=0;
-        if (writer == null || !clusterMap.contains(writer)) {
-          writer = nodes[0];
-        }
-        for(;index<nodes.length; index++) {
-          DatanodeDescriptor shortestNode = null;
-          int shortestDistance = Integer.MAX_VALUE;
-          int shortestIndex = index;
-          for(int i=index; i<nodes.length; i++) {
-            DatanodeDescriptor currentNode = nodes[i];
-            int currentDistance = clusterMap.getDistance(writer, currentNode);
-            if (shortestDistance>currentDistance) {
-              shortestDistance = currentDistance;
-              shortestNode = currentNode;
-              shortestIndex = i;
-            }
-          }
-          //switch position index & shortestIndex
-          if (index != shortestIndex) {
-            nodes[shortestIndex] = nodes[index];
-            nodes[index] = shortestNode;
-          }
-          writer = shortestNode;
-        }
-      }
-      return nodes;
-    }
-  } //end of Replicator
-
-    // Keeps track of which datanodes are allowed to connect to the namenode.
-        
+  // Keeps track of which datanodes are allowed to connect to the namenode.
   private boolean inHostsList(DatanodeID node) {
     Set<String> hostsList = hostsReader.getHosts();
     return (hostsList.isEmpty() || 
@@ -3412,72 +2794,6 @@
     }
   }
     
-
-  /**
-   * Information about the file while it is being written to.
-   * Note that at that time the file is not visible to the outside.
-   * 
-   * This class contains a <code>Collection</code> of {@link Block}s that has
-   * been written into the file so far, and file replication. 
-   * 
-   * @author shv
-   */
-  private class FileUnderConstruction {
-    private short blockReplication; // file replication
-    private long blockSize;
-    private Collection<Block> blocks;
-    private UTF8 clientName;         // lease holder
-    private UTF8 clientMachine;
-    private DatanodeDescriptor clientNode; // if client is a cluster node too.
-      
-    FileUnderConstruction(short replication,
-                          long blockSize,
-                          UTF8 clientName,
-                          UTF8 clientMachine,
-                          DatanodeDescriptor clientNode) throws IOException {
-      this.blockReplication = replication;
-      this.blockSize = blockSize;
-      this.blocks = new ArrayList<Block>();
-      this.clientName = clientName;
-      this.clientMachine = clientMachine;
-      this.clientNode = clientNode;
-    }
-      
-    public short getReplication() {
-      return this.blockReplication;
-    }
-      
-    public long getBlockSize() {
-      return blockSize;
-    }
-      
-    public Collection<Block> getBlocks() {
-      return blocks;
-    }
-      
-    public UTF8 getClientName() {
-      return clientName;
-    }
-      
-    public UTF8 getClientMachine() {
-      return clientMachine;
-    }
-
-    public DatanodeDescriptor getClientNode() {
-      return clientNode;
-    }
-
-    /**
-     * Return the penultimate allocated block for this file
-     */
-    public Block getPenultimateBlock() {
-      if (blocks.size() <= 1) {
-        return null;
-      }
-      return ((ArrayList<Block>)blocks).get(blocks.size() - 2);
-    }
-  }
-
   /**
    * Get data node by storage ID.
    * 
@@ -3499,136 +2815,6 @@
     return node;
   }
     
-  static class Host2NodesMap {
-    private HashMap<String, DatanodeDescriptor[]> map
-      = new HashMap<String, DatanodeDescriptor[]>();
-    private Random r = new Random();
-                        
-    /** Check if node is already in the map */
-    synchronized boolean contains(DatanodeDescriptor node) {
-      if (node==null) return false;
-        
-      String host = node.getHost();
-      DatanodeDescriptor[] nodes = map.get(host);
-      if (nodes != null) {
-        for(DatanodeDescriptor containedNode:nodes) {
-          if (node==containedNode)
-            return true;
-        }
-      }
-      return false;
-    }
-      
-    /** add <node.getHost(), node> to the map 
-     * return true if the node is added; false otherwise
-     */
-    synchronized boolean add(DatanodeDescriptor node) {
-      if (node==null || contains(node)) return false;
-        
-      String host = node.getHost();
-      DatanodeDescriptor[] nodes = map.get(host);
-      DatanodeDescriptor[] newNodes;
-      if (nodes==null) {
-        newNodes = new DatanodeDescriptor[1];
-        newNodes[0]=node;
-      } else { // rare case: more than one datanode on the host
-        newNodes = new DatanodeDescriptor[nodes.length+1];
-        System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
-        newNodes[nodes.length] = node;
-      }
-      map.put(host, newNodes);
-      return true;
-    }
-      
-    /** remove node from the map 
-     * return true if the node is removed; false otherwise
-     */
-    synchronized boolean remove(DatanodeDescriptor node) {
-      if (node==null) return false;
-        
-      String host = node.getHost();
-      DatanodeDescriptor[] nodes = map.get(host);
-      if (nodes==null) {
-        return false;
-      }
-      if (nodes.length==1) {
-        if (nodes[0]==node) {
-          map.remove(host);
-          return true;
-        } else {
-          return false;
-        }
-      }
-      //rare case
-      int i=0;
-      for(; i<nodes.length; i++) {
-        if (nodes[i]==node) {
-          break;
-        }
-      }
-      if (i==nodes.length) {
-        return false;
-      } else {
-        DatanodeDescriptor[] newNodes;
-        newNodes = new DatanodeDescriptor[nodes.length-1];
-        System.arraycopy(nodes, 0, newNodes, 0, i);
-        System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
-        map.put(host, newNodes);
-        return true;
-      }
-    }
-      
-    /** get a data node by its host
-     * @return DatanodeDescriptor if found; otherwise null
-     */
-    synchronized DatanodeDescriptor getDatanodeByHost(String host) {
-      if (host==null) return null;
-        
-      DatanodeDescriptor[] nodes = map.get(host);
-      // no entry
-      if (nodes== null) {
-        return null;
-      }
-      // one node
-      if (nodes.length == 1) {
-        return nodes[0];
-      }
-      // more than one node
-      return nodes[r.nextInt(nodes.length)];
-    }
-      
-    /**
-     * Find data node by its name.
-     * 
-     * @return DatanodeDescriptor if found or null otherwise 
-     */
-    public DatanodeDescriptor getDatanodeByName(String name) {
-      if (name==null) return null;
-        
-      int colon = name.indexOf(":");
-      String host;
-      if (colon < 0) {
-        host = name;
-      } else {
-        host = name.substring(0, colon);
-      }
-
-      DatanodeDescriptor[] nodes = map.get(host);
-      // no entry
-      if (nodes== null) {
-        return null;
-      }
-      for(DatanodeDescriptor containedNode:nodes) {
-        if (name.equals(containedNode.getName())) {
-          return containedNode;
-        }
-      }
-      return null;
-    }
-  }
-    
-  private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
-       
   /** Stop at and return the datanode at index (used for content browsing)*/
   private DatanodeDescriptor getDatanodeByIndex(int index) {
     int i = 0;
@@ -3899,7 +3085,7 @@
       if (blockTotal == -1 && blockSafe == -1) {
         return true; // manual safe mode
       }
-      int activeBlocks = blocksMap.map.size();
+      int activeBlocks = blocksMap.size();
       for(Iterator<Collection<Block>> it = 
             recentInvalidateSets.values().iterator(); it.hasNext();) {
         activeBlocks -= it.next().size();
@@ -3978,7 +3164,7 @@
   void setBlockTotal() {
     if (safeMode == null)
       return;
-    safeMode.setBlockTotal(blocksMap.map.size());
+    safeMode.setBlockTotal(blocksMap.size());
   }
 
   /**
@@ -4040,238 +3226,9 @@
   }
     
   /**
-   * This class is used in Namesystem's jetty to do fsck on namenode
-   * @author Milind Bhandarkar
-   */
-  public static class FsckServlet extends HttpServlet {
-    @SuppressWarnings("unchecked")
-    public void doGet(HttpServletRequest request,
-                      HttpServletResponse response
-                      ) throws ServletException, IOException {
-      Map<String,String[]> pmap = request.getParameterMap();
-      try {
-        ServletContext context = getServletContext();
-        NameNode nn = (NameNode) context.getAttribute("name.node");
-        Configuration conf = (Configuration) context.getAttribute("name.conf");
-        NamenodeFsck fscker = new NamenodeFsck(conf, nn, pmap, response);
-        fscker.fsck();
-      } catch (IOException ie) {
-        StringUtils.stringifyException(ie);
-        LOG.warn(ie);
-        String errMsg = "Fsck on path " + pmap.get("path") + " failed.";
-        response.sendError(HttpServletResponse.SC_GONE, errMsg);
-        throw ie;
-      }
-    }
-  }
-
-  /**
-   * This class is used in Namesystem's jetty to retrieve a file.
-   * Typically used by the Secondary NameNode to retrieve image and
-   * edit file for periodic checkpointing.
-   * @author Dhruba Borthakur
-   */
-  public static class GetImageServlet extends HttpServlet {
-    @SuppressWarnings("unchecked")
-    public void doGet(HttpServletRequest request,
-                      HttpServletResponse response
-                      ) throws ServletException, IOException {
-      Map<String,String[]> pmap = request.getParameterMap();
-      try {
-        ServletContext context = getServletContext();
-        NameNode nn = (NameNode) context.getAttribute("name.node");
-        TransferFsImage ff = new TransferFsImage(pmap, request, response);
-        if (ff.getImage()) {
-          // send fsImage to Secondary
-          TransferFsImage.getFileServer(response.getOutputStream(),
-                                        nn.getFsImageName()); 
-        } else if (ff.getEdit()) {
-          // send old edits to Secondary
-          TransferFsImage.getFileServer(response.getOutputStream(),
-                                        nn.getFsEditName());
-        } else if (ff.putImage()) {
-          // issue a HTTP get request to download the new fsimage 
-          TransferFsImage.getFileClient(ff.getInfoServer(), "getimage=1", 
-                                        nn.getFsImageNameCheckpoint());
-        }
-      } catch (IOException ie) {
-        StringUtils.stringifyException(ie);
-        LOG.warn(ie);
-        String errMsg = "GetImage failed.";
-        response.sendError(HttpServletResponse.SC_GONE, errMsg);
-        throw ie;
-      }
-    }
-  }
-    
-  /**
    * Returns whether the given block is one pointed-to by a file.
    */
   public boolean isValidBlock(Block b) {
     return blocksMap.getINode(b) != null;
-  }
-    
-  /**
-   * This class maintains the map from a block to its metadata.
-   * block's metadata currently includes INode it belongs to and
-   * the datanodes that store the block.
-   */
-  class BlocksMap {
-        
-    /**
-     * Internal class for block metadata
-     */
-    private class BlockInfo {
-      FSDirectory.INode              inode;
-        
-      /** nodes could contain some null entries at the end, so 
-       *  nodes.legth >= number of datanodes. 
-       *  if nodes != null then nodes[0] != null.
-       */
-      DatanodeDescriptor             nodes[];
-      Block                          block; //block that was inserted.   
-    }
-      
-    private class NodeIterator implements Iterator<DatanodeDescriptor> {
-      NodeIterator(DatanodeDescriptor[] nodes) {
-        arr = nodes;
-      }
-      DatanodeDescriptor[] arr;
-      int nextIdx = 0;
-        
-      public boolean hasNext() {
-        return arr != null && nextIdx < arr.length && arr[nextIdx] != null;
-      }
-        
-      public DatanodeDescriptor next() {
-        return arr[nextIdx++];
-      }
-        
-      public void remove()  {
-        throw new UnsupportedOperationException("Sorry. can't remove.");
-      }
-    }
-      
-    Map<Block, BlockInfo> map = new HashMap<Block, BlockInfo>();
-      
-    /** add BlockInfo if mapping does not exist */
-    private BlockInfo checkBlockInfo(Block b) {
-      BlockInfo info = map.get(b);
-      if (info == null) {
-        info = new BlockInfo();
-        info.block = b;
-        map.put(b, info);
-      }
-      return info;
-    }
-      
-    public FSDirectory.INode getINode(Block b) {
-      BlockInfo info = map.get(b);
-      return (info != null) ? info.inode : null;
-    }
-            
-    public void addINode(Block b, FSDirectory.INode iNode) {
-      BlockInfo info = checkBlockInfo(b);
-      info.inode = iNode;
-    }
-      
-    public void removeINode(Block b) {
-      BlockInfo info = map.get(b);
-      if (info != null) {
-        info.inode = null;
-        if (info.nodes == null) {
-          map.remove(b);
-        }
-      }
-    }
-      
-    /** Returns the block object it it exists in the map */
-    public Block getStoredBlock(Block b) {
-      BlockInfo info = map.get(b);
-      return (info != null) ? info.block : null;
-    }
-      
-    /** Returned Iterator does not support */
-    public Iterator<DatanodeDescriptor> nodeIterator(Block b) {
-      BlockInfo info = map.get(b);
-      return new NodeIterator((info != null) ? info.nodes : null);
-    }
-      
-    /** counts number of containing nodes. Better than using iterator. */
-    public int numNodes(Block b) {
-      int count = 0;
-      BlockInfo info = map.get(b);
-      if (info != null && info.nodes != null) {
-        count = info.nodes.length;
-        while (info.nodes[ count-1 ] == null) // mostly false
-          count--;
-      }
-      return count;
-    }
-      
-    /** returns true if the node does not already exists and is added.
-     * false if the node already exists.*/
-    public boolean addNode(Block b, 
-                           DatanodeDescriptor node,
-                           int replicationHint) {
-      BlockInfo info = checkBlockInfo(b);
-      if (info.nodes == null) {
-        info.nodes = new DatanodeDescriptor[ replicationHint ];
-      }
-        
-      DatanodeDescriptor[] arr = info.nodes;
-      for(int i=0; i < arr.length; i++) {
-        if (arr[i] == null) {
-          arr[i] = node;
-          return true;
-        }
-        if (arr[i] == node) {
-          return false;
-        }
-      }
-
-      /* Not enough space left. Create a new array. Should normally 
-       * happen only when replication is manually increased by the user. */
-      info.nodes = new DatanodeDescriptor[ arr.length + 1 ];
-      for(int i=0; i < arr.length; i++) {
-        info.nodes[i] = arr[i];
-      }
-      info.nodes[ arr.length ] = node;
-      return true;
-    }
-      
-    public boolean removeNode(Block b, DatanodeDescriptor node) {
-      BlockInfo info = map.get(b);
-      if (info == null || info.nodes == null)
-        return false;
-
-      boolean removed = false;
-      // swap lastNode and node's location. set lastNode to null.
-      DatanodeDescriptor[] arr = info.nodes;
-      int lastNode = -1;
-      for(int i=arr.length-1; i >= 0; i--) {
-        if (lastNode < 0 && arr[i] != null)
-          lastNode = i;
-        if (arr[i] == node) {
-          arr[i] = arr[ lastNode ];
-          arr[ lastNode ] = null;
-          removed = true;
-          break;
-        }
-      }
-          
-      /*
-       * if ((lastNode + 1) < arr.length/4) {
-       *    we could trim the array.
-       * } 
-       */
-      if (arr[0] == null) { // no datanodes left.
-        info.nodes = null;
-        if (info.inode == null) {
-          map.remove(b);
-        }
-      }
-      return removed;
-    }
   }
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java?view=auto&rev=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FileUnderConstruction.java Tue May  1 01:39:50 2007
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.io.UTF8;
+import java.io.*;
+import java.util.*;
+
+/**
+ * Information about the file while it is being written to.
+ * Note that at that time the file is not visible to the outside.
+ * 
+ * This class contains a <code>Collection</code> of blocks that has
+ * been written into the file so far, and file replication. 
+ * 
+ * @author shv
+ */
+class FileUnderConstruction {
+  private short blockReplication; // file replication
+  private long blockSize;
+  private Collection<Block> blocks;
+  private UTF8 clientName;         // lease holder
+  private UTF8 clientMachine;
+  private DatanodeDescriptor clientNode; // if client is a cluster node too.
+    
+  FileUnderConstruction(short replication,
+                        long blockSize,
+                        UTF8 clientName,
+                        UTF8 clientMachine,
+                        DatanodeDescriptor clientNode) throws IOException {
+    this.blockReplication = replication;
+    this.blockSize = blockSize;
+    this.blocks = new ArrayList<Block>();
+    this.clientName = clientName;
+    this.clientMachine = clientMachine;
+    this.clientNode = clientNode;
+  }
+    
+  public short getReplication() {
+    return this.blockReplication;
+  }
+    
+  public long getBlockSize() {
+    return blockSize;
+  }
+    
+  public Collection<Block> getBlocks() {
+    return blocks;
+  }
+    
+  public UTF8 getClientName() {
+    return clientName;
+  }
+    
+  public UTF8 getClientMachine() {
+    return clientMachine;
+  }
+
+  public DatanodeDescriptor getClientNode() {
+    return clientNode;
+  }
+
+  /**
+   * Return the penultimate allocated block for this file.
+   */
+  public Block getPenultimateBlock() {
+    if (blocks.size() <= 1) {
+      return null;
+    }
+    return ((ArrayList<Block>)blocks).get(blocks.size() - 2);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FsckServlet.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FsckServlet.java?view=auto&rev=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FsckServlet.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FsckServlet.java Tue May  1 01:39:50 2007
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.*;
+import org.apache.commons.logging.*;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * This class is used in Namesystem's jetty to do fsck on namenode.
+ * @author Milind Bhandarkar
+ */
+public class FsckServlet extends HttpServlet {
+
+  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.FSNamesystem");
+
+  @SuppressWarnings("unchecked")
+  public void doGet(HttpServletRequest request,
+                    HttpServletResponse response
+                    ) throws ServletException, IOException {
+    Map<String,String[]> pmap = request.getParameterMap();
+    try {
+      ServletContext context = getServletContext();
+      NameNode nn = (NameNode) context.getAttribute("name.node");
+      Configuration conf = (Configuration) context.getAttribute("name.conf");
+      NamenodeFsck fscker = new NamenodeFsck(conf, nn, pmap, response);
+      fscker.fsck();
+    } catch (IOException ie) {
+      StringUtils.stringifyException(ie);
+      LOG.warn(ie);
+      String errMsg = "Fsck on path " + pmap.get("path") + " failed.";
+      response.sendError(HttpServletResponse.SC_GONE, errMsg);
+      throw ie;
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/GetImageServlet.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/GetImageServlet.java?view=auto&rev=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/GetImageServlet.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/GetImageServlet.java Tue May  1 01:39:50 2007
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.commons.logging.*;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+
+/**
+ * This class is used in Namesystem's jetty to retrieve a file.
+ * Typically used by the Secondary NameNode to retrieve image and
+ * edit file for periodic checkpointing.
+ * @author Dhruba Borthakur
+ */
+public class GetImageServlet extends HttpServlet {
+
+  private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.FSNamesystem");
+
+  @SuppressWarnings("unchecked")
+  public void doGet(HttpServletRequest request,
+                    HttpServletResponse response
+                    ) throws ServletException, IOException {
+    Map<String,String[]> pmap = request.getParameterMap();
+    try {
+      ServletContext context = getServletContext();
+      NameNode nn = (NameNode) context.getAttribute("name.node");
+      TransferFsImage ff = new TransferFsImage(pmap, request, response);
+      if (ff.getImage()) {
+        // send fsImage to Secondary
+        TransferFsImage.getFileServer(response.getOutputStream(),
+                                      nn.getFsImageName()); 
+      } else if (ff.getEdit()) {
+        // send old edits to Secondary
+        TransferFsImage.getFileServer(response.getOutputStream(),
+                                      nn.getFsEditName());
+      } else if (ff.putImage()) {
+        // issue a HTTP get request to download the new fsimage 
+        TransferFsImage.getFileClient(ff.getInfoServer(), "getimage=1", 
+                                      nn.getFsImageNameCheckpoint());
+      }
+    } catch (IOException ie) {
+      StringUtils.stringifyException(ie);
+      LOG.warn(ie);
+      String errMsg = "GetImage failed.";
+      response.sendError(HttpServletResponse.SC_GONE, errMsg);
+      throw ie;
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java?view=auto&rev=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Host2NodesMap.java Tue May  1 01:39:50 2007
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.*;
+
+class Host2NodesMap {
+  private HashMap<String, DatanodeDescriptor[]> map
+    = new HashMap<String, DatanodeDescriptor[]>();
+  private Random r = new Random();
+                      
+  /** Check if node is already in the map. */
+  synchronized boolean contains(DatanodeDescriptor node) {
+    if (node==null) {
+      return false;
+    }
+      
+    String host = node.getHost();
+    DatanodeDescriptor[] nodes = map.get(host);
+    if (nodes != null) {
+      for(DatanodeDescriptor containedNode:nodes) {
+        if (node==containedNode) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+    
+  /** add node to the map 
+   * return true if the node is added; false otherwise.
+   */
+  synchronized boolean add(DatanodeDescriptor node) {
+    if (node==null || contains(node)) {
+      return false;
+    }
+      
+    String host = node.getHost();
+    DatanodeDescriptor[] nodes = map.get(host);
+    DatanodeDescriptor[] newNodes;
+    if (nodes==null) {
+      newNodes = new DatanodeDescriptor[1];
+      newNodes[0]=node;
+    } else { // rare case: more than one datanode on the host
+      newNodes = new DatanodeDescriptor[nodes.length+1];
+      System.arraycopy(nodes, 0, newNodes, 0, nodes.length);
+      newNodes[nodes.length] = node;
+    }
+    map.put(host, newNodes);
+    return true;
+  }
+    
+  /** remove node from the map 
+   * return true if the node is removed; false otherwise.
+   */
+  synchronized boolean remove(DatanodeDescriptor node) {
+    if (node==null) {
+      return false;
+    }
+      
+    String host = node.getHost();
+    DatanodeDescriptor[] nodes = map.get(host);
+    if (nodes==null) {
+      return false;
+    }
+    if (nodes.length==1) {
+      if (nodes[0]==node) {
+        map.remove(host);
+        return true;
+      } else {
+        return false;
+      }
+    }
+    //rare case
+    int i=0;
+    for(; i<nodes.length; i++) {
+      if (nodes[i]==node) {
+        break;
+      }
+    }
+    if (i==nodes.length) {
+      return false;
+    } else {
+      DatanodeDescriptor[] newNodes;
+      newNodes = new DatanodeDescriptor[nodes.length-1];
+      System.arraycopy(nodes, 0, newNodes, 0, i);
+      System.arraycopy(nodes, i+1, newNodes, i, nodes.length-i-1);
+      map.put(host, newNodes);
+      return true;
+    }
+  }
+    
+  /** get a data node by its host.
+   * @return DatanodeDescriptor if found; otherwise null.
+   */
+  synchronized DatanodeDescriptor getDatanodeByHost(String host) {
+    if (host==null) {
+      return null;
+    }
+      
+    DatanodeDescriptor[] nodes = map.get(host);
+    // no entry
+    if (nodes== null) {
+      return null;
+    }
+    // one node
+    if (nodes.length == 1) {
+      return nodes[0];
+    }
+    // more than one node
+    return nodes[r.nextInt(nodes.length)];
+  }
+    
+  /**
+   * Find data node by its name.
+   * 
+   * @return DatanodeDescriptor if found or null otherwise 
+   */
+  public DatanodeDescriptor getDatanodeByName(String name) {
+    if (name==null) {
+      return null;
+    }
+      
+    int colon = name.indexOf(":");
+    String host;
+    if (colon < 0) {
+      host = name;
+    } else {
+      host = name.substring(0, colon);
+    }
+
+    DatanodeDescriptor[] nodes = map.get(host);
+    // no entry
+    if (nodes== null) {
+      return null;
+    }
+    for(DatanodeDescriptor containedNode:nodes) {
+      if (name.equals(containedNode.getName())) {
+        return containedNode;
+      }
+    }
+    return null;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java?view=auto&rev=533966
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ReplicationTargetChooser.java Tue May  1 01:39:50 2007
@@ -0,0 +1,477 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.commons.logging.*;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
+import java.util.*;
+
+/** The class is responsible for choosing the desired number of targets
+ * for placing block replicas.
+ * The replica placement strategy is that if the writer is on a datanode,
+ * the 1st replica is placed on the local machine, 
+ * otherwise a random datanode. The 2nd replica is placed on a datanode
+ * that is on a different rack. The 3rd replica is placed on a datanode
+ * which is on the same rack as the first replca.
+ * @author hairong
+ *
+ */
+class ReplicationTargetChooser {
+  private final boolean considerLoad; 
+  private NetworkTopology clusterMap;
+  private Log logr;
+  private FSNamesystem fs;
+    
+  ReplicationTargetChooser(boolean considerLoad,  FSNamesystem fs,
+                           NetworkTopology clusterMap, Log logr) {
+    this.considerLoad = considerLoad;
+    this.fs = fs;
+    this.clusterMap = clusterMap;
+    this.logr = logr;
+  }
+    
+  private class NotEnoughReplicasException extends Exception {
+    NotEnoughReplicasException(String msg) {
+      super(msg);
+    }
+  }
+    
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> to replicate
+   * a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param numOfReplicas: number of replicas wanted.
+   * @param writer: the writer's machine, null if not in the cluster.
+   * @param excludedNodes: datanodesthat should not be considered targets.
+   * @param blocksize: size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as targets
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> excludedNodes,
+                                    long blocksize) {
+    if (excludedNodes == null) {
+      excludedNodes = new ArrayList<DatanodeDescriptor>();
+    }
+      
+    return chooseTarget(numOfReplicas, writer, 
+                        new ArrayList<DatanodeDescriptor>(), excludedNodes, blocksize);
+  }
+    
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
+   * to re-replicate a block with size <i>blocksize</i> 
+   * If not, return as many as we can.
+   * 
+   * @param numOfReplicas: additional number of replicas wanted.
+   * @param writer: the writer's machine, null if not in the cluster.
+   * @param choosenNodes: datanodes that have been choosen as targets.
+   * @param excludedNodes: datanodesthat should not be considered targets.
+   * @param blocksize: size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target 
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> choosenNodes,
+                                    List<DatanodeDescriptor> excludedNodes,
+                                    long blocksize) {
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return new DatanodeDescriptor[0];
+    }
+      
+    if (excludedNodes == null) {
+      excludedNodes = new ArrayList<DatanodeDescriptor>();
+    }
+      
+    int clusterSize = clusterMap.getNumOfLeaves();
+    int totalNumOfReplicas = choosenNodes.size()+numOfReplicas;
+    if (totalNumOfReplicas > clusterSize) {
+      numOfReplicas -= (totalNumOfReplicas-clusterSize);
+      totalNumOfReplicas = clusterSize;
+    }
+      
+    int maxNodesPerRack = 
+      (totalNumOfReplicas-1)/clusterMap.getNumOfRacks()+2;
+      
+    List<DatanodeDescriptor> results = 
+      new ArrayList<DatanodeDescriptor>(choosenNodes);
+    excludedNodes.addAll(choosenNodes);
+      
+    if (!clusterMap.contains(writer)) {
+      writer=null;
+    }
+      
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
+                                                excludedNodes, blocksize, maxNodesPerRack, results);
+      
+    results.removeAll(choosenNodes);
+      
+    // sorting nodes to form a pipeline
+    return getPipeline((writer==null)?localNode:writer,
+                       results.toArray(new DatanodeDescriptor[results.size()]));
+  }
+    
+  /* choose <i>numOfReplicas</i> from all data nodes */
+  private DatanodeDescriptor chooseTarget(int numOfReplicas,
+                                          DatanodeDescriptor writer,
+                                          List<DatanodeDescriptor> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) {
+      
+    if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
+      return writer;
+    }
+      
+    int numOfResults = results.size();
+    if (writer == null && (numOfResults==1 || numOfResults==2)) {
+      writer = results.get(0);
+    }
+      
+    try {
+      switch(numOfResults) {
+      case 0:
+        writer = chooseLocalNode(writer, excludedNodes, 
+                                 blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          break;
+        }
+      case 1:
+        chooseRemoteRack(1, writer, excludedNodes, 
+                         blocksize, maxNodesPerRack, results);
+        if (--numOfReplicas == 0) {
+          break;
+        }
+      case 2:
+        if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
+          chooseRemoteRack(1, writer, excludedNodes,
+                           blocksize, maxNodesPerRack, results);
+        } else {
+          chooseLocalRack(writer, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+        }
+        if (--numOfReplicas == 0) {
+          break;
+        }
+      default:
+        chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
+                     blocksize, maxNodesPerRack, results);
+      }
+    } catch (NotEnoughReplicasException e) {
+      logr.warn("Not able to place enough replicas, still in need of "
+               + numOfReplicas);
+    }
+    return writer;
+  }
+    
+  /* choose <i>localMachine</i> as the target.
+   * if <i>localMachine</i> is not availabe, 
+   * choose a node on the same rack
+   * @return the choosen node
+   */
+  private DatanodeDescriptor chooseLocalNode(
+                                             DatanodeDescriptor localMachine,
+                                             List<DatanodeDescriptor> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // if no local machine, randomly choose one node
+    if (localMachine == null)
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+      
+    // otherwise try local machine first
+    if (!excludedNodes.contains(localMachine)) {
+      excludedNodes.add(localMachine);
+      if (isGoodTarget(localMachine, blocksize,
+                       maxNodesPerRack, false, results)) {
+        results.add(localMachine);
+        return localMachine;
+      }
+    } 
+      
+    // try a node on local rack
+    return chooseLocalRack(localMachine, excludedNodes, 
+                           blocksize, maxNodesPerRack, results);
+  }
+    
+  /* choose one node from the rack that <i>localMachine</i> is on.
+   * if no such node is availabe, choose one node from the rack where
+   * a second replica is on.
+   * if still no such node is available, choose a random node 
+   * in the cluster.
+   * @return the choosen node
+   */
+  private DatanodeDescriptor chooseLocalRack(
+                                             DatanodeDescriptor localMachine,
+                                             List<DatanodeDescriptor> excludedNodes,
+                                             long blocksize,
+                                             int maxNodesPerRack,
+                                             List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    // no local machine, so choose a random machine
+    if (localMachine == null) {
+      return chooseRandom(NodeBase.ROOT, excludedNodes, 
+                          blocksize, maxNodesPerRack, results);
+    }
+      
+    // choose one from the local rack
+    try {
+      return chooseRandom(
+                          localMachine.getNetworkLocation(),
+                          excludedNodes, blocksize, maxNodesPerRack, results);
+    } catch (NotEnoughReplicasException e1) {
+      // find the second replica
+      DatanodeDescriptor newLocal=null;
+      for(Iterator<DatanodeDescriptor> iter=results.iterator();
+          iter.hasNext();) {
+        DatanodeDescriptor nextNode = iter.next();
+        if (nextNode != localMachine) {
+          newLocal = nextNode;
+          break;
+        }
+      }
+      if (newLocal != null) {
+        try {
+          return chooseRandom(
+                              newLocal.getNetworkLocation(),
+                              excludedNodes, blocksize, maxNodesPerRack, results);
+        } catch(NotEnoughReplicasException e2) {
+          //otherwise randomly choose one from the network
+          return chooseRandom(NodeBase.ROOT, excludedNodes,
+                              blocksize, maxNodesPerRack, results);
+        }
+      } else {
+        //otherwise randomly choose one from the network
+        return chooseRandom(NodeBase.ROOT, excludedNodes,
+                            blocksize, maxNodesPerRack, results);
+      }
+    }
+  }
+    
+  /* choose <i>numOfReplicas</i> nodes from the racks 
+   * that <i>localMachine</i> is NOT on.
+   * if not enough nodes are availabe, choose the remaining ones 
+   * from the local rack
+   */
+    
+  private void chooseRemoteRack(int numOfReplicas,
+                                DatanodeDescriptor localMachine,
+                                List<DatanodeDescriptor> excludedNodes,
+                                long blocksize,
+                                int maxReplicasPerRack,
+                                List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    int oldNumOfReplicas = results.size();
+    // randomly choose one node from remote racks
+    try {
+      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
+                   excludedNodes, blocksize, maxReplicasPerRack, results);
+    } catch (NotEnoughReplicasException e) {
+      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
+                   localMachine.getNetworkLocation(), excludedNodes, blocksize, 
+                   maxReplicasPerRack, results);
+    }
+  }
+    
+  /* Randomly choose one target from <i>nodes</i>.
+   * @return the choosen node
+   */
+  private DatanodeDescriptor chooseRandom(
+                                          String nodes,
+                                          List<DatanodeDescriptor> excludedNodes,
+                                          long blocksize,
+                                          int maxNodesPerRack,
+                                          List<DatanodeDescriptor> results) 
+    throws NotEnoughReplicasException {
+    DatanodeDescriptor result;
+    do {
+      DatanodeDescriptor[] selectedNodes = 
+        chooseRandom(1, nodes, excludedNodes);
+      if (selectedNodes.length == 0) {
+        throw new NotEnoughReplicasException(
+                                             "Not able to place enough replicas");
+      }
+      result = (DatanodeDescriptor)(selectedNodes[0]);
+    } while(!isGoodTarget(result, blocksize, maxNodesPerRack, results));
+    results.add(result);
+    return result;
+  }
+    
+  /* Randomly choose <i>numOfReplicas</i> targets from <i>nodes</i>.
+   */
+  private void chooseRandom(int numOfReplicas,
+                            String nodes,
+                            List<DatanodeDescriptor> excludedNodes,
+                            long blocksize,
+                            int maxNodesPerRack,
+                            List<DatanodeDescriptor> results)
+    throws NotEnoughReplicasException {
+    boolean toContinue = true;
+    do {
+      DatanodeDescriptor[] selectedNodes = 
+        chooseRandom(numOfReplicas, nodes, excludedNodes);
+      if (selectedNodes.length < numOfReplicas) {
+        toContinue = false;
+      }
+      for(int i=0; i<selectedNodes.length; i++) {
+        DatanodeDescriptor result = (DatanodeDescriptor)(selectedNodes[i]);
+        if (isGoodTarget(result, blocksize, maxNodesPerRack, results)) {
+          numOfReplicas--;
+          results.add(result);
+        }
+      } // end of for
+    } while (numOfReplicas>0 && toContinue);
+      
+    if (numOfReplicas>0) {
+      throw new NotEnoughReplicasException(
+                                           "Not able to place enough replicas");
+    }
+  }
+    
+  /* Randomly choose <i>numOfNodes</i> nodes from <i>scope</i>.
+   * @return the choosen nodes
+   */
+  private DatanodeDescriptor[] chooseRandom(int numOfReplicas, 
+                                            String nodes,
+                                            List<DatanodeDescriptor> excludedNodes) {
+    List<DatanodeDescriptor> results = 
+      new ArrayList<DatanodeDescriptor>();
+    int numOfAvailableNodes =
+      clusterMap.countNumOfAvailableNodes(nodes, excludedNodes);
+    numOfReplicas = (numOfAvailableNodes<numOfReplicas)?
+      numOfAvailableNodes:numOfReplicas;
+    while(numOfReplicas > 0) {
+      DatanodeDescriptor choosenNode = clusterMap.chooseRandom(nodes);
+      if (!excludedNodes.contains(choosenNode)) {
+        results.add(choosenNode);
+        excludedNodes.add(choosenNode);
+        numOfReplicas--;
+      }
+    }
+    return (DatanodeDescriptor[])results.toArray(
+                                                 new DatanodeDescriptor[results.size()]);    
+  }
+    
+  /* judge if a node is a good target.
+   * return true if <i>node</i> has enough space, 
+   * does not have too much load, and the rack does not have too many nodes
+   */
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               List<DatanodeDescriptor> results) {
+    return isGoodTarget(node, blockSize, maxTargetPerLoc,
+                        this.considerLoad, results);
+  }
+    
+  private boolean isGoodTarget(DatanodeDescriptor node,
+                               long blockSize, int maxTargetPerLoc,
+                               boolean considerLoad,
+                               List<DatanodeDescriptor> results) {
+      
+    // check if the node is (being) decommissed
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      logr.debug("Node "+node.getPath()+
+                " is not chosen because the node is (being) decommissioned");
+      return false;
+    }
+
+    // check the remaining capacity of the target machine
+    if (blockSize* FSConstants.MIN_BLOCKS_FOR_WRITE>node.getRemaining()) {
+      logr.debug("Node "+node.getPath()+
+                " is not chosen because the node does not have enough space");
+      return false;
+    }
+      
+    // check the communication traffic of the target machine
+    if (considerLoad) {
+      double avgLoad = 0;
+      int size = clusterMap.getNumOfLeaves();
+      if (size != 0) {
+        avgLoad = (double)fs.totalLoad()/size;
+      }
+      if (node.getXceiverCount() > (2.0 * avgLoad)) {
+        logr.debug("Node "+node.getPath()+
+                  " is not chosen because the node is too busy");
+        return false;
+      }
+    }
+      
+    // check if the target rack has chosen too many nodes
+    String rackname = node.getNetworkLocation();
+    int counter=1;
+    for(Iterator<DatanodeDescriptor> iter = results.iterator();
+        iter.hasNext();) {
+      DatanodeDescriptor result = iter.next();
+      if (rackname.equals(result.getNetworkLocation())) {
+        counter++;
+      }
+    }
+    if (counter>maxTargetPerLoc) {
+      logr.debug("Node "+node.getPath()+
+                " is not chosen because the rack has too many chosen nodes");
+      return false;
+    }
+    return true;
+  }
+    
+  /* Return a pipeline of nodes.
+   * The pipeline is formed finding a shortest path that 
+   * starts from the writer and tranverses all <i>nodes</i>
+   * This is basically a traveling salesman problem.
+   */
+  private DatanodeDescriptor[] getPipeline(
+                                           DatanodeDescriptor writer,
+                                           DatanodeDescriptor[] nodes) {
+    if (nodes.length==0) return nodes;
+      
+    synchronized(clusterMap) {
+      int index=0;
+      if (writer == null || !clusterMap.contains(writer)) {
+        writer = nodes[0];
+      }
+      for(;index<nodes.length; index++) {
+        DatanodeDescriptor shortestNode = null;
+        int shortestDistance = Integer.MAX_VALUE;
+        int shortestIndex = index;
+        for(int i=index; i<nodes.length; i++) {
+          DatanodeDescriptor currentNode = nodes[i];
+          int currentDistance = clusterMap.getDistance(writer, currentNode);
+          if (shortestDistance>currentDistance) {
+            shortestDistance = currentDistance;
+            shortestNode = currentNode;
+            shortestIndex = i;
+          }
+        }
+        //switch position index & shortestIndex
+        if (index != shortestIndex) {
+          nodes[shortestIndex] = nodes[index];
+          nodes[index] = shortestNode;
+        }
+        writer = shortestNode;
+      }
+    }
+    return nodes;
+  }
+} //end of Replicator
+



Mime
View raw message