hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1149455 - in /hadoop/common/trunk/hdfs: ./ src/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/blockmana...
Date Fri, 22 Jul 2011 04:20:22 GMT
Author: szetszwo
Date: Fri Jul 22 04:20:21 2011
New Revision: 1149455

URL: http://svn.apache.org/viewvc?rev=1149455&view=rev
Log:
HDFS-2167.  Move dnsToSwitchMapping and hostsReader from FSNamesystem to DatanodeManager.

Added:
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
Modified:
    hadoop/common/trunk/hdfs/CHANGES.txt
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
    hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
    hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java

Modified: hadoop/common/trunk/hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/CHANGES.txt?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hdfs/CHANGES.txt Fri Jul 22 04:20:21 2011
@@ -583,6 +583,9 @@ Trunk (unreleased changes)
     to DelegationTokenIdentifier.  (szetszwo)
 
     HDFS-1774. Small optimization to FSDataset. (Uma Maheswara Rao G via eli)
+
+    HDFS-2167.  Move dnsToSwitchMapping and hostsReader from FSNamesystem to
+    DatanodeManager.  (szetszwo)
     
   OPTIMIZATIONS
 

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Jul 22 04:20:21 2011
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
@@ -33,6 +35,7 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -106,10 +109,8 @@ public class BlockManager {
 
   private final DatanodeManager datanodeManager;
 
-  //
-  // Store blocks-->datanodedescriptor(s) map of corrupt replicas
-  //
-  private final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
+  /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
+  final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
   //
   // Keeps a Collection for every named machine containing
@@ -136,34 +137,34 @@ public class BlockManager {
   public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
   private final PendingReplicationBlocks pendingReplications;
 
-  //  The maximum number of replicas allowed for a block
+  /** The maximum number of replicas allowed for a block */
   public final int maxReplication;
-  //  How many outgoing replication streams a given node should have at one time
+  /** The maximum number of outgoing replication streams
+   *  a given node should have at one time 
+   */
   public int maxReplicationStreams;
-  // Minimum copies needed or else write is disallowed
+  /** Minimum copies needed or else write is disallowed */
   public final int minReplication;
-  // Default number of replicas
+  /** Default number of replicas */
   public final int defaultReplication;
-  // How many entries are returned by getCorruptInodes()
+  /** The maximum number of entries returned by getCorruptInodes() */
   final int maxCorruptFilesReturned;
   
-  // variable to enable check for enough racks 
+  /** variable to enable check for enough racks */
   final boolean shouldCheckForEnoughRacks;
 
-  /**
-   * Last block index used for replication work.
-   */
+  /** Last block index used for replication work. */
   private int replIndex = 0;
 
-  // for block replicas placement
-  public final BlockPlacementPolicy replicator;
+  /** for block replicas placement */
+  private BlockPlacementPolicy blockplacement;
 
   public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
     namesystem = fsn;
-    datanodeManager = new DatanodeManager(fsn);
+    datanodeManager = new DatanodeManager(fsn, conf);
 
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
-    replicator = BlockPlacementPolicy.getInstance(
+    blockplacement = BlockPlacementPolicy.getInstance(
         conf, namesystem, datanodeManager.getNetworkTopology());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -220,6 +221,19 @@ public class BlockManager {
     return datanodeManager;
   }
 
+  /** @return the BlockPlacementPolicy */
+  public BlockPlacementPolicy getBlockPlacementPolicy() {
+    return blockplacement;
+  }
+
+  /** Set BlockPlacementPolicy */
+  public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) {
+    if (newpolicy == null) {
+      throw new HadoopIllegalArgumentException("newpolicy == null");
+    }
+    this.blockplacement = newpolicy;
+  }
+
   public void metaSave(PrintWriter out) {
     //
     // Dump contents of neededReplication
@@ -551,7 +565,7 @@ public class BlockManager {
     }
   }
   
-  void removeFromInvalidates(String storageID, Block block) {
+  private void removeFromInvalidates(String storageID, Block block) {
     Collection<Block> v = recentInvalidateSets.get(storageID);
     if (v != null && v.remove(block)) {
       pendingDeletionBlocksCount--;
@@ -921,7 +935,7 @@ public class BlockManager {
     // It is costly to extract the filename for which chooseTargets is called,
     // so for now we pass in the Inode itself.
     DatanodeDescriptor targets[] = 
-                       replicator.chooseTarget(fileINode, additionalReplRequired,
+                       blockplacement.chooseTarget(fileINode, additionalReplRequired,
                        srcNode, containingNodes, block.getNumBytes());
     if(targets.length == 0)
       return false;
@@ -1021,7 +1035,7 @@ public class BlockManager {
       final HashMap<Node, Node> excludedNodes,
       final long blocksize) throws IOException {
     // choose targets for the new block to be allocated.
-    final DatanodeDescriptor targets[] = replicator.chooseTarget(
+    final DatanodeDescriptor targets[] = blockplacement.chooseTarget(
         src, numOfReplicas, client, excludedNodes, blocksize);
     if (targets.length < minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
@@ -1240,7 +1254,7 @@ public class BlockManager {
     }
   }
 
-  void reportDiff(DatanodeDescriptor dn, 
+  private void reportDiff(DatanodeDescriptor dn, 
       BlockListAsLongs newReport, 
       Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
       Collection<Block> toRemove,           // remove from DatanodeDescriptor
@@ -1670,7 +1684,7 @@ public class BlockManager {
       }
     }
     namesystem.chooseExcessReplicates(nonExcess, block, replication, 
-        addedNode, delNodeHint, replicator);
+        addedNode, delNodeHint, blockplacement);
   }
 
   public void addToExcessReplicate(DatanodeInfo dn, Block block) {
@@ -1694,7 +1708,7 @@ public class BlockManager {
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
+  private void removeStoredBlock(Block block, DatanodeDescriptor node) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
           + block + " from " + node.getName());
@@ -1881,7 +1895,8 @@ public class BlockManager {
    * On stopping decommission, check if the node has excess replicas.
    * If there are any excess replicas, call processOverReplicatedBlock()
    */
-  public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) {
+  private void processOverReplicatedBlocksOnReCommission(
+      final DatanodeDescriptor srcNode) {
     final Iterator<? extends Block> it = srcNode.getBlockIterator();
     while(it.hasNext()) {
       final Block block = it.next();
@@ -1900,7 +1915,7 @@ public class BlockManager {
    * Return true if there are any blocks on this node that have not
    * yet reached their replication factor. Otherwise returns false.
    */
-  public boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
+  boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
     boolean status = false;
     int underReplicatedBlocks = 0;
     int decommissionOnlyReplicas = 0;
@@ -2022,7 +2037,7 @@ public class BlockManager {
   }
 
   /** Remove a datanode from the invalidatesSet */
-  public void removeFromInvalidates(String storageID) {
+  private void removeFromInvalidates(String storageID) {
     Collection<Block> blocks = recentInvalidateSets.remove(storageID);
     if (blocks != null) {
       pendingDeletionBlocksCount -= blocks.size();
@@ -2086,28 +2101,6 @@ public class BlockManager {
       namesystem.writeUnlock();
     }
   }
-  
-  //Returns the number of racks over which a given block is replicated
-  //decommissioning/decommissioned nodes are not counted. corrupt replicas 
-  //are also ignored
-  public int getNumberOfRacks(Block b) {
-    HashSet<String> rackSet = new HashSet<String>(0);
-    Collection<DatanodeDescriptor> corruptNodes = 
-                                  corruptReplicas.getNodes(b);
-    for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); 
-         it.hasNext();) {
-      DatanodeDescriptor cur = it.next();
-      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
-        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
-          String rackName = cur.getNetworkLocation();
-          if (!rackSet.contains(rackName)) {
-            rackSet.add(rackName);
-          }
-        }
-      }
-    }
-    return rackSet.size();
-  }
 
   boolean blockHasEnoughRacks(Block b) {
     if (!this.shouldCheckForEnoughRacks) {
@@ -2209,4 +2202,50 @@ public class BlockManager {
     return neededReplications
         .iterator(UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
+
+  /**
+   * Change, if appropriate, the admin state of a datanode to 
+   * decommission completed. Return true if decommission is complete.
+   */
+  boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
+    // Check to see if all blocks in this decommissioned
+    // node has reached their target replication factor.
+    if (node.isDecommissionInProgress()) {
+      if (!isReplicationInProgress(node)) {
+        node.setDecommissioned();
+        LOG.info("Decommission complete for node " + node.getName());
+      }
+    }
+    return node.isDecommissioned();
+  }
+
+  /** Start decommissioning the specified datanode. */
+  void startDecommission(DatanodeDescriptor node) throws IOException {
+    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
+          node.numBlocks() +  " blocks.");
+      synchronized (namesystem.heartbeats) {
+        namesystem.updateStats(node, false);
+        node.startDecommission();
+        namesystem.updateStats(node, true);
+      }
+      node.decommissioningStatus.setStartTime(now());
+      
+      // all the blocks that reside on this node have to be replicated.
+      checkDecommissionStateInternal(node);
+    }
+  }
+
+  /** Stop decommissioning the specified datanodes. */
+  void stopDecommission(DatanodeDescriptor node) throws IOException {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stop Decommissioning node " + node.getName());
+      synchronized (namesystem.heartbeats) {
+        namesystem.updateStats(node, false);
+        node.stopDecommission();
+        namesystem.updateStats(node, true);
+      }
+      processOverReplicatedBlocksOnReCommission(node);
+    }
+  }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 22 04:20:21 2011
@@ -18,8 +18,14 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,11 +34,23 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * Manage datanodes, include decommission and other activities.
@@ -49,9 +67,29 @@ public class DatanodeManager {
 
   /** Host names to datanode descriptors mapping. */
   private final Host2NodesMap host2DatanodeMap = new Host2NodesMap();
+
+  private final DNSToSwitchMapping dnsToSwitchMapping;
+
+  /** Read include/exclude files*/
+  private final HostsFileReader hostsReader; 
   
-  DatanodeManager(final FSNamesystem namesystem) {
+  DatanodeManager(final FSNamesystem namesystem, final Configuration conf
+      ) throws IOException {
     this.namesystem = namesystem;
+    this.hostsReader = new HostsFileReader(
+        conf.get(DFSConfigKeys.DFS_HOSTS, ""),
+        conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+
+    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
+            ScriptBasedMapping.class, DNSToSwitchMapping.class), conf);
+    
+    // If the dns to switch mapping supports cache, resolve network
+    // locations of those hosts in the include list and store the mapping
+    // in the cache; so future calls to resolve will be fast.
+    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
+    }
   }
 
   private Daemon decommissionthread = null;
@@ -93,7 +131,7 @@ public class DatanodeManager {
   }
 
   /** Add a datanode. */
-  public void addDatanode(final DatanodeDescriptor node) {
+  private void addDatanode(final DatanodeDescriptor node) {
     // To keep host2DatanodeMap consistent with datanodeMap,
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
@@ -112,7 +150,7 @@ public class DatanodeManager {
   }
 
   /** Physically remove node from datanodeMap. */
-  public void wipeDatanode(final DatanodeID node) throws IOException {
+  private void wipeDatanode(final DatanodeID node) throws IOException {
     final String key = node.getStorageID();
     synchronized (namesystem.datanodeMap) {
       host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
@@ -123,4 +161,380 @@ public class DatanodeManager {
           + " is removed from datanodeMap.");
     }
   }
+
+  /* Resolve a node's network location */
+  private void resolveNetworkLocation (DatanodeDescriptor node) {
+    List<String> names = new ArrayList<String>(1);
+    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
+      // get the node's IP address
+      names.add(node.getHost());
+    } else {
+      // get the node's host name
+      String hostName = node.getHostName();
+      int colon = hostName.indexOf(":");
+      hostName = (colon==-1)?hostName:hostName.substring(0,colon);
+      names.add(hostName);
+    }
+    
+    // resolve its network location
+    List<String> rName = dnsToSwitchMapping.resolve(names);
+    String networkLocation;
+    if (rName == null) {
+      LOG.error("The resolve call returned null! Using " + 
+          NetworkTopology.DEFAULT_RACK + " for host " + names);
+      networkLocation = NetworkTopology.DEFAULT_RACK;
+    } else {
+      networkLocation = rName.get(0);
+    }
+    node.setNetworkLocation(networkLocation);
+  }
+
+  private boolean inHostsList(DatanodeID node, String ipAddr) {
+     return checkInList(node, ipAddr, hostsReader.getHosts(), false);
+  }
+  
+  private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
+    return checkInList(node, ipAddr, hostsReader.getExcludedHosts(), true);
+  }
+
+  /**
+   * Remove an already decommissioned data node who is neither in include nor
+   * exclude hosts lists from the the list of live or dead nodes.  This is used
+   * to not display an already decommssioned data node to the operators.
+   * The operation procedure of making a already decommissioned data node not
+   * to be displayed is as following:
+   * <ol>
+   *   <li> 
+   *   Host must have been in the include hosts list and the include hosts list
+   *   must not be empty.
+   *   </li>
+   *   <li>
+   *   Host is decommissioned by remaining in the include hosts list and added
+   *   into the exclude hosts list. Name node is updated with the new 
+   *   information by issuing dfsadmin -refreshNodes command.
+   *   </li>
+   *   <li>
+   *   Host is removed from both include hosts and exclude hosts lists.  Name 
+   *   node is updated with the new informationby issuing dfsamin -refreshNodes 
+   *   command.
+   *   <li>
+   * </ol>
+   * 
+   * @param nodeList
+   *          , array list of live or dead nodes.
+   */
+  public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
+    // If the include list is empty, any nodes are welcomed and it does not
+    // make sense to exclude any nodes from the cluster. Therefore, no remove.
+    if (hostsReader.getHosts().isEmpty()) {
+      return;
+    }
+    
+    for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
+      DatanodeDescriptor node = it.next();
+      if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
+          && node.isDecommissioned()) {
+        // Include list is not empty, an existing datanode does not appear
+        // in both include or exclude lists and it has been decommissioned.
+        // Remove it from the node list.
+        it.remove();
+      }
+    }
+  }
+
+  /**
+   * Check if the given node (of DatanodeID or ipAddress) is in the (include or
+   * exclude) list.  If ipAddress in null, check only based upon the given 
+   * DatanodeID.  If ipAddress is not null, the ipAddress should refers to the
+   * same host that given DatanodeID refers to.
+   * 
+   * @param node, the host DatanodeID
+   * @param ipAddress, if not null, should refers to the same host
+   *                   that DatanodeID refers to
+   * @param hostsList, the list of hosts in the include/exclude file
+   * @param isExcludeList, boolean, true if this is the exclude list
+   * @return boolean, if in the list
+   */
+  private static boolean checkInList(final DatanodeID node,
+      final String ipAddress,
+      final Set<String> hostsList,
+      final boolean isExcludeList) {
+    final InetAddress iaddr;
+    if (ipAddress != null) {
+      try {
+        iaddr = InetAddress.getByName(ipAddress);
+      } catch (UnknownHostException e) {
+        LOG.warn("Unknown ip address: " + ipAddress, e);
+        return isExcludeList;
+      }
+    } else {
+      try {
+        iaddr = InetAddress.getByName(node.getHost());
+      } catch (UnknownHostException e) {
+        LOG.warn("Unknown host: " + node.getHost(), e);
+        return isExcludeList;
+      }
+    }
+
+    // if include list is empty, host is in include list
+    if ( (!isExcludeList) && (hostsList.isEmpty()) ){
+      return true;
+    }
+    return // compare ipaddress(:port)
+    (hostsList.contains(iaddr.getHostAddress().toString()))
+        || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
+            + node.getPort()))
+        // compare hostname(:port)
+        || (hostsList.contains(iaddr.getHostName()))
+        || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
+        || ((node instanceof DatanodeInfo) && hostsList
+            .contains(((DatanodeInfo) node).getHostName()));
+  }
+
+  /**
+   * Decommission the node if it is in exclude list.
+   */
+  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
+    throws IOException {
+    // If the registered node is in exclude list, then decommission it
+    if (inExcludedHostsList(nodeReg, ipAddr)) {
+      namesystem.getBlockManager().startDecommission(nodeReg);
+    }
+  }
+
+  
+  /**
+   * Generate new storage ID.
+   * 
+   * @return unique storage ID
+   * 
+   * Note: that collisions are still possible if somebody will try 
+   * to bring in a data storage from a different cluster.
+   */
+  private String newStorageID() {
+    String newID = null;
+    while(newID == null) {
+      newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
+      if (namesystem.datanodeMap.get(newID) != null)
+        newID = null;
+    }
+    return newID;
+  }
+
+  public void registerDatanode(DatanodeRegistration nodeReg
+      ) throws IOException {
+    String dnAddress = Server.getRemoteAddress();
+    if (dnAddress == null) {
+      // Mostly called inside an RPC.
+      // But if not, use address passed by the data-node.
+      dnAddress = nodeReg.getHost();
+    }      
+
+    // Checks if the node is not on the hosts list.  If it is not, then
+    // it will be disallowed from registering. 
+    if (!inHostsList(nodeReg, dnAddress)) {
+      throw new DisallowedDatanodeException(nodeReg);
+    }
+
+    String hostName = nodeReg.getHost();
+      
+    // update the datanode's name with ip:port
+    DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
+                                      nodeReg.getStorageID(),
+                                      nodeReg.getInfoPort(),
+                                      nodeReg.getIpcPort());
+    nodeReg.updateRegInfo(dnReg);
+    nodeReg.exportedKeys = namesystem.getBlockKeys();
+      
+    NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+        + "node registration from " + nodeReg.getName()
+        + " storage " + nodeReg.getStorageID());
+
+    DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
+    DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
+      
+    if (nodeN != null && nodeN != nodeS) {
+      NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
+                        + "node from name: " + nodeN.getName());
+      // nodeN previously served a different data storage, 
+      // which is not served by anybody anymore.
+      namesystem.removeDatanode(nodeN);
+      // physically remove node from datanodeMap
+      wipeDatanode(nodeN);
+      nodeN = null;
+    }
+
+    if (nodeS != null) {
+      if (nodeN == nodeS) {
+        // The same datanode has been just restarted to serve the same data 
+        // storage. We do not need to remove old data blocks, the delta will
+        // be calculated on the next block report from the datanode
+        if(NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
+                                        + "node restarted.");
+        }
+      } else {
+        // nodeS is found
+        /* The registering datanode is a replacement node for the existing 
+          data storage, which from now on will be served by a new node.
+          If this message repeats, both nodes might have same storageID 
+          by (insanely rare) random chance. User needs to restart one of the
+          nodes with its data cleared (or user can just remove the StorageID
+          value in "VERSION" file under the data directory of the datanode,
+          but this is might not work if VERSION file format has changed 
+       */        
+        NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
+                                      + "node " + nodeS.getName()
+                                      + " is replaced by " + nodeReg.getName() + 
+                                      " with the same storageID " +
+                                      nodeReg.getStorageID());
+      }
+      // update cluster map
+      getNetworkTopology().remove(nodeS);
+      nodeS.updateRegInfo(nodeReg);
+      nodeS.setHostName(hostName);
+      nodeS.setDisallowed(false); // Node is in the include list
+      
+      // resolve network location
+      resolveNetworkLocation(nodeS);
+      getNetworkTopology().add(nodeS);
+        
+      // also treat the registration message as a heartbeat
+      synchronized(namesystem.heartbeats) {
+        if( !namesystem.heartbeats.contains(nodeS)) {
+          namesystem.heartbeats.add(nodeS);
+          //update its timestamp
+          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+          nodeS.isAlive = true;
+        }
+      }
+      checkDecommissioning(nodeS, dnAddress);
+      return;
+    } 
+
+    // this is a new datanode serving a new data storage
+    if (nodeReg.getStorageID().equals("")) {
+      // this data storage has never been registered
+      // it is either empty or was created by pre-storageID version of DFS
+      nodeReg.storageID = newStorageID();
+      if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+            "BLOCK* NameSystem.registerDatanode: "
+            + "new storageID " + nodeReg.getStorageID() + " assigned.");
+      }
+    }
+    // register new datanode
+    DatanodeDescriptor nodeDescr 
+      = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
+    resolveNetworkLocation(nodeDescr);
+    addDatanode(nodeDescr);
+    checkDecommissioning(nodeDescr, dnAddress);
+    
+    // also treat the registration message as a heartbeat
+    synchronized(namesystem.heartbeats) {
+      namesystem.heartbeats.add(nodeDescr);
+      nodeDescr.isAlive = true;
+      // no need to update its timestamp
+      // because its is done when the descriptor is created
+    }
+  }
+
+  /** Reread include/exclude files. */
+  public void refreshHostsReader(Configuration conf) throws IOException {
+    // Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
+    // Update the file names and refresh internal includes and excludes list.
+    if (conf == null) {
+      conf = new HdfsConfiguration();
+    }
+    hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS, ""), 
+                                conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
+    hostsReader.refresh();
+  }
+  
+  /**
+   * Rereads the config to get hosts and exclude list file names.
+   * Rereads the files to update the hosts and exclude lists.  It
+   * checks if any of the hosts have changed states:
+   * 1. Added to hosts  --> no further work needed here.
+   * 2. Removed from hosts --> mark AdminState as decommissioned. 
+   * 3. Added to exclude --> start decommission.
+   * 4. Removed from exclude --> stop decommission.
+   */
+  public void refreshDatanodes() throws IOException {
+    for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
+      // Check if not include.
+      if (!inHostsList(node, null)) {
+        node.setDisallowed(true);  // case 2.
+      } else {
+        if (inExcludedHostsList(node, null)) {
+          namesystem.getBlockManager().startDecommission(node);   // case 3.
+        } else {
+          namesystem.getBlockManager().stopDecommission(node);   // case 4.
+        }
+      }
+    }
+  }
+
+  /** For generating datanode reports */
+  public List<DatanodeDescriptor> getDatanodeListForReport(
+      final DatanodeReportType type) {
+    boolean listLiveNodes = type == DatanodeReportType.ALL ||
+                            type == DatanodeReportType.LIVE;
+    boolean listDeadNodes = type == DatanodeReportType.ALL ||
+                            type == DatanodeReportType.DEAD;
+
+    HashMap<String, String> mustList = new HashMap<String, String>();
+
+    if (listDeadNodes) {
+      //first load all the nodes listed in include and exclude files.
+      Iterator<String> it = hostsReader.getHosts().iterator();
+      while (it.hasNext()) {
+        mustList.put(it.next(), "");
+      }
+      it = hostsReader.getExcludedHosts().iterator(); 
+      while (it.hasNext()) {
+        mustList.put(it.next(), "");
+      }
+    }
+
+    ArrayList<DatanodeDescriptor> nodes = null;
+    
+    synchronized (namesystem.datanodeMap) {
+      nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() + 
+                                                mustList.size());
+      Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
+      while (it.hasNext()) { 
+        DatanodeDescriptor dn = it.next();
+        boolean isDead = namesystem.isDatanodeDead(dn);
+        if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
+          nodes.add(dn);
+        }
+        //Remove any form of the this datanode in include/exclude lists.
+        try {
+          InetAddress inet = InetAddress.getByName(dn.getHost());
+          // compare hostname(:port)
+          mustList.remove(inet.getHostName());
+          mustList.remove(inet.getHostName()+":"+dn.getPort());
+          // compare ipaddress(:port)
+          mustList.remove(inet.getHostAddress().toString());
+          mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
+        } catch ( UnknownHostException e ) {
+          mustList.remove(dn.getName());
+          mustList.remove(dn.getHost());
+          LOG.warn(e);
+        }
+      }
+    }
+    
+    if (listDeadNodes) {
+      Iterator<String> it = mustList.keySet().iterator();
+      while (it.hasNext()) {
+        DatanodeDescriptor dn = 
+            new DatanodeDescriptor(new DatanodeID(it.next()));
+        dn.setLastUpdate(0);
+        nodes.add(dn);
+      }
+    }
+    return nodes;
+  }
 }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Fri Jul 22 04:20:21 2011
@@ -35,9 +35,11 @@ class DecommissionManager {
   static final Log LOG = LogFactory.getLog(DecommissionManager.class);
 
   private final FSNamesystem fsnamesystem;
+  private final BlockManager blockmanager;
 
   DecommissionManager(FSNamesystem namesystem) {
     this.fsnamesystem = namesystem;
+    this.blockmanager = fsnamesystem.getBlockManager();
   }
 
   /** Periodically check decommission status. */
@@ -88,7 +90,7 @@ class DecommissionManager {
 
         if (d.isDecommissionInProgress()) {
           try {
-            fsnamesystem.checkDecommissionStateInternal(d);
+            blockmanager.checkDecommissionStateInternal(d);
           } catch(Exception e) {
             LOG.warn("entry=" + entry, e);
           }

Modified: hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 22 04:20:21 2011
@@ -31,7 +31,6 @@ import java.io.PrintWriter;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.net.URI;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -102,6 +101,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
@@ -133,11 +133,8 @@ import org.apache.hadoop.metrics2.annota
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
@@ -145,8 +142,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.HostsFileReader;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
@@ -313,10 +308,6 @@ public class FSNamesystem implements FSC
       ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
-    
-  private DNSToSwitchMapping dnsToSwitchMapping;
-
-  private HostsFileReader hostsReader; 
 
   private long maxFsObjects = 0;          // maximum number of fs objects
 
@@ -376,9 +367,6 @@ public class FSNamesystem implements FSC
       this.dir = new FSDirectory(fsImage, this, conf);
     }
     this.safeMode = new SafeModeInfo(conf);
-    this.hostsReader = new HostsFileReader(
-      conf.get(DFSConfigKeys.DFS_HOSTS,""),
-      conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE,""));
     if (isBlockTokenEnabled) {
       blockTokenSecretManager = new BlockTokenSecretManager(true,
           blockKeyUpdateInterval, blockTokenLifetime);
@@ -407,19 +395,6 @@ public class FSNamesystem implements FSC
     this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
     nnrmthread.start();
 
-    this.dnsToSwitchMapping = ReflectionUtils.newInstance(
-        conf.getClass(DFSConfigKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, 
-                      ScriptBasedMapping.class,
-            DNSToSwitchMapping.class), conf);
-    
-    /* If the dns to switch mapping supports cache, resolve network
-     * locations of those hosts in the include list, 
-     * and store the mapping in the cache; so future calls to resolve
-     * will be fast.
-     */
-    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
-      dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
-    }
     registerMXBean();
     DefaultMetricsSystem.instance().register(this);
   }
@@ -768,7 +743,7 @@ public class FSNamesystem implements FSC
    * 
    * @return current access keys
    */
-  ExportedBlockKeys getBlockKeys() {
+  public ExportedBlockKeys getBlockKeys() {
     return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
         : ExportedBlockKeys.DUMMY_KEYS;
   }
@@ -1836,8 +1811,8 @@ public class FSNamesystem implements FSC
     }
 
     // choose new datanodes.
-    final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
-        src, numAdditionalNodes, clientnode, chosen, true,
+    final DatanodeInfo[] targets = blockManager.getBlockPlacementPolicy(
+        ).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
         excludes, preferredblocksize);
     final LocatedBlock lb = new LocatedBlock(blk, targets);
     if (isBlockTokenEnabled) {
@@ -2780,162 +2755,12 @@ public class FSNamesystem implements FSC
       throws IOException {
     writeLock();
     try {
-      registerDatanodeInternal(nodeReg);
+      getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
+      checkSafeMode();
     } finally {
       writeUnlock();
     }
   }
-
-  /** @see #registerDatanode(DatanodeRegistration) */
-  public void registerDatanodeInternal(DatanodeRegistration nodeReg)
-      throws IOException {
-    assert hasWriteLock();
-    String dnAddress = Server.getRemoteAddress();
-    if (dnAddress == null) {
-      // Mostly called inside an RPC.
-      // But if not, use address passed by the data-node.
-      dnAddress = nodeReg.getHost();
-    }      
-
-    // check if the datanode is allowed to be connect to the namenode
-    if (!verifyNodeRegistration(nodeReg, dnAddress)) {
-      throw new DisallowedDatanodeException(nodeReg);
-    }
-
-    String hostName = nodeReg.getHost();
-      
-    // update the datanode's name with ip:port
-    DatanodeID dnReg = new DatanodeID(dnAddress + ":" + nodeReg.getPort(),
-                                      nodeReg.getStorageID(),
-                                      nodeReg.getInfoPort(),
-                                      nodeReg.getIpcPort());
-    nodeReg.updateRegInfo(dnReg);
-    nodeReg.exportedKeys = getBlockKeys();
-      
-    NameNode.stateChangeLog.info(
-                                 "BLOCK* NameSystem.registerDatanode: "
-                                 + "node registration from " + nodeReg.getName()
-                                 + " storage " + nodeReg.getStorageID());
-
-    DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN =
-        blockManager.getDatanodeManager().getDatanodeByHost(nodeReg.getName());
-      
-    if (nodeN != null && nodeN != nodeS) {
-      NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
-                        + "node from name: " + nodeN.getName());
-      // nodeN previously served a different data storage, 
-      // which is not served by anybody anymore.
-      removeDatanode(nodeN);
-      // physically remove node from datanodeMap
-      blockManager.getDatanodeManager().wipeDatanode(nodeN);
-      nodeN = null;
-    }
-
-    if (nodeS != null) {
-      if (nodeN == nodeS) {
-        // The same datanode has been just restarted to serve the same data 
-        // storage. We do not need to remove old data blocks, the delta will
-        // be calculated on the next block report from the datanode
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
-                                        + "node restarted.");
-        }
-      } else {
-        // nodeS is found
-        /* The registering datanode is a replacement node for the existing 
-          data storage, which from now on will be served by a new node.
-          If this message repeats, both nodes might have same storageID 
-          by (insanely rare) random chance. User needs to restart one of the
-          nodes with its data cleared (or user can just remove the StorageID
-          value in "VERSION" file under the data directory of the datanode,
-          but this is might not work if VERSION file format has changed 
-       */        
-        NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
-                                      + "node " + nodeS.getName()
-                                      + " is replaced by " + nodeReg.getName() + 
-                                      " with the same storageID " +
-                                      nodeReg.getStorageID());
-      }
-      // update cluster map
-      blockManager.getDatanodeManager().getNetworkTopology().remove(nodeS);
-      nodeS.updateRegInfo(nodeReg);
-      nodeS.setHostName(hostName);
-      nodeS.setDisallowed(false); // Node is in the include list
-      
-      // resolve network location
-      resolveNetworkLocation(nodeS);
-      blockManager.getDatanodeManager().getNetworkTopology().add(nodeS);
-        
-      // also treat the registration message as a heartbeat
-      synchronized(heartbeats) {
-        if( !heartbeats.contains(nodeS)) {
-          heartbeats.add(nodeS);
-          //update its timestamp
-          nodeS.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
-          nodeS.isAlive = true;
-        }
-      }
-      checkDecommissioning(nodeS, dnAddress);
-      return;
-    } 
-
-    // this is a new datanode serving a new data storage
-    if (nodeReg.getStorageID().equals("")) {
-      // this data storage has never been registered
-      // it is either empty or was created by pre-storageID version of DFS
-      nodeReg.storageID = newStorageID();
-      if(NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug(
-            "BLOCK* NameSystem.registerDatanode: "
-            + "new storageID " + nodeReg.getStorageID() + " assigned.");
-      }
-    }
-    // register new datanode
-    DatanodeDescriptor nodeDescr 
-      = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK, hostName);
-    resolveNetworkLocation(nodeDescr);
-    blockManager.getDatanodeManager().addDatanode(nodeDescr);
-    checkDecommissioning(nodeDescr, dnAddress);
-    
-    // also treat the registration message as a heartbeat
-    synchronized(heartbeats) {
-      heartbeats.add(nodeDescr);
-      nodeDescr.isAlive = true;
-      // no need to update its timestamp
-      // because its is done when the descriptor is created
-    }
-
-    checkSafeMode();
-  }
-    
-  /* Resolve a node's network location */
-  private void resolveNetworkLocation (DatanodeDescriptor node) {
-    assert hasWriteLock();
-    List<String> names = new ArrayList<String>(1);
-    if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
-      // get the node's IP address
-      names.add(node.getHost());
-    } else {
-      // get the node's host name
-      String hostName = node.getHostName();
-      int colon = hostName.indexOf(":");
-      hostName = (colon==-1)?hostName:hostName.substring(0,colon);
-      names.add(hostName);
-    }
-    
-    // resolve its network location
-    List<String> rName = dnsToSwitchMapping.resolve(names);
-    String networkLocation;
-    if (rName == null) {
-      LOG.error("The resolve call returned null! Using " + 
-          NetworkTopology.DEFAULT_RACK + " for host " + names);
-      networkLocation = NetworkTopology.DEFAULT_RACK;
-    } else {
-      networkLocation = rName.get(0);
-    }
-    node.setNetworkLocation(networkLocation);
-  }
   
   /**
    * Get registrationID for datanodes based on the namespaceID.
@@ -2946,26 +2771,8 @@ public class FSNamesystem implements FSC
   public String getRegistrationID() {
     return Storage.getRegistrationID(dir.fsImage.getStorage());
   }
-    
-  /**
-   * Generate new storage ID.
-   * 
-   * @return unique storage ID
-   * 
-   * Note: that collisions are still possible if somebody will try 
-   * to bring in a data storage from a different cluster.
-   */
-  private String newStorageID() {
-    String newID = null;
-    while(newID == null) {
-      newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
-      if (datanodeMap.get(newID) != null)
-        newID = null;
-    }
-    return newID;
-  }
-    
-  private boolean isDatanodeDead(DatanodeDescriptor node) {
+
+  public boolean isDatanodeDead(DatanodeDescriptor node) {
     return (node.getLastUpdate() <
             (now() - heartbeatExpireInterval));
   }
@@ -3078,7 +2885,7 @@ public class FSNamesystem implements FSC
     return null;
   }
 
-  private void updateStats(DatanodeDescriptor node, boolean isAdded) {
+  public void updateStats(DatanodeDescriptor node, boolean isAdded) {
     //
     // The statistics are protected by the heartbeat lock
     // For decommissioning/decommissioned nodes, only used capacity
@@ -3280,10 +3087,10 @@ public class FSNamesystem implements FSC
   /**
    * Remove a datanode descriptor.
    * @param nodeID datanode ID.
-   * @throws IOException
+   * @throws UnregisteredNodeException 
    */
-  public void removeDatanode(DatanodeID nodeID) 
-    throws IOException {
+  public void removeDatanode(final DatanodeID nodeID
+      ) throws UnregisteredNodeException {
     writeLock();
     try {
       DatanodeDescriptor nodeInfo = getDatanode(nodeID);
@@ -3664,83 +3471,23 @@ public class FSNamesystem implements FSC
   }
 
   int getNumberOfDatanodes(DatanodeReportType type) {
-    return getDatanodeListForReport(type).size(); 
-  }
-
-  private ArrayList<DatanodeDescriptor> getDatanodeListForReport(
-      DatanodeReportType type) {
     readLock();
-    try {    
-      boolean listLiveNodes = type == DatanodeReportType.ALL ||
-                              type == DatanodeReportType.LIVE;
-      boolean listDeadNodes = type == DatanodeReportType.ALL ||
-                              type == DatanodeReportType.DEAD;
-  
-      HashMap<String, String> mustList = new HashMap<String, String>();
-  
-      if (listDeadNodes) {
-        //first load all the nodes listed in include and exclude files.
-        Iterator<String> it = hostsReader.getHosts().iterator();
-        while (it.hasNext()) {
-          mustList.put(it.next(), "");
-        }
-        it = hostsReader.getExcludedHosts().iterator(); 
-        while (it.hasNext()) {
-          mustList.put(it.next(), "");
-        }
-      }
-
-      ArrayList<DatanodeDescriptor> nodes = null;
-      
-      synchronized (datanodeMap) {
-        nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() + 
-                                                  mustList.size());
-        Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-        while (it.hasNext()) { 
-          DatanodeDescriptor dn = it.next();
-          boolean isDead = isDatanodeDead(dn);
-          if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
-            nodes.add(dn);
-          }
-          //Remove any form of the this datanode in include/exclude lists.
-          try {
-            InetAddress inet = InetAddress.getByName(dn.getHost());
-            // compare hostname(:port)
-            mustList.remove(inet.getHostName());
-            mustList.remove(inet.getHostName()+":"+dn.getPort());
-            // compare ipaddress(:port)
-            mustList.remove(inet.getHostAddress().toString());
-            mustList.remove(inet.getHostAddress().toString()+ ":" +dn.getPort());
-          } catch ( UnknownHostException e ) {
-            mustList.remove(dn.getName());
-            mustList.remove(dn.getHost());
-            LOG.warn(e);
-          }
-        }
-      }
-      
-      if (listDeadNodes) {
-        Iterator<String> it = mustList.keySet().iterator();
-        while (it.hasNext()) {
-          DatanodeDescriptor dn = 
-              new DatanodeDescriptor(new DatanodeID(it.next()));
-          dn.setLastUpdate(0);
-          nodes.add(dn);
-        }
-      }
-      return nodes;
+    try {
+      return getBlockManager().getDatanodeManager().getDatanodeListForReport(
+          type).size(); 
     } finally {
       readUnlock();
     }
   }
 
-  public DatanodeInfo[] datanodeReport( DatanodeReportType type)
-      throws AccessControlException {
+  DatanodeInfo[] datanodeReport(final DatanodeReportType type
+      ) throws AccessControlException {
+    checkSuperuserPrivilege();
     readLock();
     try {
-      checkSuperuserPrivilege();
-  
-      ArrayList<DatanodeDescriptor> results = getDatanodeListForReport(type);
+      final DatanodeManager dm = getBlockManager().getDatanodeManager();      
+      final List<DatanodeDescriptor> results = dm.getDatanodeListForReport(type);
+
       DatanodeInfo[] arr = new DatanodeInfo[results.size()];
       for (int i=0; i<arr.length; i++) {
         arr[i] = new DatanodeInfo(results.get(i));
@@ -3804,8 +3551,8 @@ public class FSNamesystem implements FSC
                                           ArrayList<DatanodeDescriptor> dead) {
     readLock();
     try {
-      ArrayList<DatanodeDescriptor> results = 
-                              getDatanodeListForReport(DatanodeReportType.ALL);    
+      final List<DatanodeDescriptor> results = getBlockManager(
+          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);    
       for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
         DatanodeDescriptor node = it.next();
         if (isDatanodeDead(node))
@@ -3836,44 +3583,6 @@ public class FSNamesystem implements FSC
     }
   }
 
-  /**
-   * Start decommissioning the specified datanode. 
-   */
-  private void startDecommission(DatanodeDescriptor node) 
-    throws IOException {
-    assert hasWriteLock();
-    if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning node " + node.getName() + " with " + 
-          node.numBlocks() +  " blocks.");
-      synchronized (heartbeats) {
-        updateStats(node, false);
-        node.startDecommission();
-        updateStats(node, true);
-      }
-      node.decommissioningStatus.setStartTime(now());
-      
-      // all the blocks that reside on this node have to be replicated.
-      checkDecommissionStateInternal(node);
-    }
-  }
-
-  /**
-   * Stop decommissioning the specified datanodes.
-   */
-  public void stopDecommission(DatanodeDescriptor node) 
-    throws IOException {
-    assert hasWriteLock();
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      LOG.info("Stop Decommissioning node " + node.getName());
-      synchronized (heartbeats) {
-        updateStats(node, false);
-        node.stopDecommission();
-        updateStats(node, true);
-      }
-      blockManager.processOverReplicatedBlocksOnReCommission(node);
-    }
-  }
-
   public Date getStartTime() {
     return new Date(systemStart); 
   }
@@ -3898,85 +3607,6 @@ public class FSNamesystem implements FSC
     return replication;
   }
     
-  /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed. Return true if decommission is complete.
-   */
-  public boolean checkDecommissionStateInternal(DatanodeDescriptor node) {
-    assert hasWriteLock();
-    //
-    // Check to see if all blocks in this decommissioned
-    // node has reached their target replication factor.
-    //
-    if (node.isDecommissionInProgress()) {
-      if (!blockManager.isReplicationInProgress(node)) {
-        node.setDecommissioned();
-        LOG.info("Decommission complete for node " + node.getName());
-      }
-    }
-    return node.isDecommissioned();
-  }
-
-  /** 
-   * Keeps track of which datanodes/ipaddress are allowed to connect to the namenode.
-   */
-  private boolean inHostsList(DatanodeID node, String ipAddr) {
-    Set<String> hostsList = hostsReader.getHosts();
-     return checkInList(node, ipAddr, hostsList, false);
-  }
-  
-  private boolean inExcludedHostsList(DatanodeID node, String ipAddr) {
-    Set<String> excludeList = hostsReader.getExcludedHosts();
-    return checkInList(node, ipAddr, excludeList, true);
-  }
-
-
-  /**
-   * Check if the given node (of DatanodeID or ipAddress) is in the (include or 
-   * exclude) list.  If ipAddress in null, check only based upon the given 
-   * DatanodeID.  If ipAddress is not null, the ipAddress should refers to the
-   * same host that given DatanodeID refers to.
-   * 
-   * @param node, DatanodeID, the host DatanodeID
-   * @param ipAddress, if not null, should refers to the same host
-   *                   that DatanodeID refers to
-   * @param hostsList, the list of hosts in the include/exclude file
-   * @param isExcludeList, boolean, true if this is the exclude list
-   * @return boolean, if in the list
-   */
-  private boolean checkInList(DatanodeID node, String ipAddress,
-      Set<String> hostsList, boolean isExcludeList) {
-    InetAddress iaddr = null;
-    try {
-      if (ipAddress != null) {
-        iaddr = InetAddress.getByName(ipAddress);
-      } else {
-        iaddr = InetAddress.getByName(node.getHost());
-      }
-    }catch (UnknownHostException e) {
-      LOG.warn("Unknown host in host list: "+ipAddress);
-      // can't resolve the host name.
-      if (isExcludeList){
-        return true;
-      } else {
-        return false;
-      }
-    }
-
-    // if include list is empty, host is in include list
-    if ( (!isExcludeList) && (hostsList.isEmpty()) ){
-      return true;
-    }
-    return // compare ipaddress(:port)
-    (hostsList.contains(iaddr.getHostAddress().toString()))
-        || (hostsList.contains(iaddr.getHostAddress().toString() + ":"
-            + node.getPort()))
-        // compare hostname(:port)
-        || (hostsList.contains(iaddr.getHostName()))
-        || (hostsList.contains(iaddr.getHostName() + ":" + node.getPort()))
-        || ((node instanceof DatanodeInfo) && hostsList
-            .contains(((DatanodeInfo) node).getHostName()));
-  }
   
   /**
    * Rereads the config to get hosts and exclude list file names.
@@ -3989,29 +3619,10 @@ public class FSNamesystem implements FSC
    */
   public void refreshNodes(Configuration conf) throws IOException {
     checkSuperuserPrivilege();
-    // Reread the config to get dfs.hosts and dfs.hosts.exclude filenames.
-    // Update the file names and refresh internal includes and excludes list
-    if (conf == null)
-      conf = new HdfsConfiguration();
-    hostsReader.updateFileNames(conf.get(DFSConfigKeys.DFS_HOSTS,""), 
-                                conf.get(DFSConfigKeys.DFS_HOSTS_EXCLUDE, ""));
-    hostsReader.refresh();
+    getBlockManager().getDatanodeManager().refreshHostsReader(conf);
     writeLock();
     try {
-      for (Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
-           it.hasNext();) {
-        DatanodeDescriptor node = it.next();
-        // Check if not include.
-        if (!inHostsList(node, null)) {
-          node.setDisallowed(true);  // case 2.
-        } else {
-          if (inExcludedHostsList(node, null)) {
-            startDecommission(node);   // case 3.
-          } else {
-            stopDecommission(node);   // case 4.
-          }
-        }
-      }
+      getBlockManager().getDatanodeManager().refreshDatanodes();
     } finally {
       writeUnlock();
     }
@@ -4021,27 +3632,7 @@ public class FSNamesystem implements FSC
     checkSuperuserPrivilege();
     getFSImage().finalizeUpgrade();
   }
-
-  /**
-   * Checks if the node is not on the hosts list.  If it is not, then
-   * it will be disallowed from registering. 
-   */
-  private boolean verifyNodeRegistration(DatanodeID nodeReg, String ipAddr) {
-    assert hasWriteLock();
-    return inHostsList(nodeReg, ipAddr);
-  }
     
-  /**
-   * Decommission the node if it is in exclude list.
-   */
-  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) 
-    throws IOException {
-    assert hasWriteLock();
-    // If the registered node is in exclude list, then decommission it
-    if (inExcludedHostsList(nodeReg, ipAddr)) {
-      startDecommission(nodeReg);
-    }
-  }
     
   /**
    * Get data node by storage ID.
@@ -4050,7 +3641,8 @@ public class FSNamesystem implements FSC
    * @return DatanodeDescriptor or null if the node is not found.
    * @throws IOException
    */
-  public DatanodeDescriptor getDatanode(DatanodeID nodeID) throws IOException {
+  public DatanodeDescriptor getDatanode(DatanodeID nodeID
+      ) throws UnregisteredNodeException {
     assert hasReadOrWriteLock();
     UnregisteredNodeException e = null;
     DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
@@ -5411,8 +5003,8 @@ public class FSNamesystem implements FSC
     try {
       ArrayList<DatanodeDescriptor> decommissioningNodes = 
         new ArrayList<DatanodeDescriptor>();
-      ArrayList<DatanodeDescriptor> results = 
-        getDatanodeListForReport(DatanodeReportType.LIVE);
+      final List<DatanodeDescriptor> results = getBlockManager(
+          ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
       for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
         DatanodeDescriptor node = it.next();
         if (node.isDecommissionInProgress()) {
@@ -5824,49 +5416,8 @@ public class FSNamesystem implements FSC
   public BlockManager getBlockManager() {
     return blockManager;
   }
-
-  /**
-   * Remove an already decommissioned data node who is neither in include nor
-   * exclude hosts lists from the the list of live or dead nodes.  This is used
-   * to not display an already decommssioned data node to the operators.
-   * The operation procedure of making a already decommissioned data node not
-   * to be displayed is as following:
-   * <ol>
-   *   <li> 
-   *   Host must have been in the include hosts list and the include hosts list
-   *   must not be empty.
-   *   </li>
-   *   <li>
-   *   Host is decommissioned by remaining in the include hosts list and added
-   *   into the exclude hosts list. Name node is updated with the new 
-   *   information by issuing dfsadmin -refreshNodes command.
-   *   </li>
-   *   <li>
-   *   Host is removed from both include hosts and exclude hosts lists.  Name 
-   *   node is updated with the new informationby issuing dfsamin -refreshNodes 
-   *   command.
-   *   <li>
-   * </ol>
-   * 
-   * @param nodeList
-   *          , array list of live or dead nodes.
-   */
-  void removeDecomNodeFromList(ArrayList<DatanodeDescriptor> nodeList) {
-    // If the include list is empty, any nodes are welcomed and it does not
-    // make sense to exclude any nodes from the cluster. Therefore, no remove.
-    if (hostsReader.getHosts().isEmpty()) {
-      return;
-    }
-    
-    for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
-      DatanodeDescriptor node = it.next();
-      if ((!inHostsList(node, null)) && (!inExcludedHostsList(node, null))
-          && node.isDecommissioned()) {
-        // Include list is not empty, an existing datanode does not appear
-        // in both include or exclude lists and it has been decommissioned.
-        // Remove it from the node list.
-        it.remove();
-      }
-    }
+  
+  void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
+    getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
   }
 }

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Fri Jul 22 04:20:21 2011
@@ -59,13 +59,13 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
@@ -282,7 +282,8 @@ public class DFSTestUtil {
 
     do {
       Thread.sleep(1000);
-      int []r = NameNodeAdapter.getReplicaInfo(cluster.getNameNode(), b.getLocalBlock());
+      int[] r = BlockManagerTestUtil.getReplicaInfo(cluster.getNamesystem(),
+          b.getLocalBlock());
       curRacks = r[0];
       curReplicas = r[1];
       curNeededReplicas = r[2];

Added: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1149455&view=auto
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (added)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Fri Jul 22 04:20:21 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+public class BlockManagerTestUtil {
+  /**
+   * @return a tuple of the replica state (number racks, number live
+   * replicas, and number needed replicas) for the given block.
+   */
+  public static int[] getReplicaInfo(final FSNamesystem namesystem, final Block b) {
+    final BlockManager bm = namesystem.getBlockManager();
+    namesystem.readLock();
+    try {
+      return new int[]{getNumberOfRacks(bm, b),
+          bm.countNodes(b).liveReplicas(),
+          bm.neededReplications.contains(b) ? 1 : 0};
+    } finally {
+      namesystem.readUnlock();
+    }
+  }
+
+  /**
+   * @return the number of racks over which a given block is replicated
+   * decommissioning/decommissioned nodes are not counted. corrupt replicas 
+   * are also ignored
+   */
+  private static int getNumberOfRacks(final BlockManager blockmanager,
+      final Block b) {
+    final Set<String> rackSet = new HashSet<String>(0);
+    final Collection<DatanodeDescriptor> corruptNodes = 
+        blockmanager.corruptReplicas.getNodes(b);
+    for (Iterator<DatanodeDescriptor> it = blockmanager.blocksMap.nodeIterator(b); 
+         it.hasNext();) {
+      DatanodeDescriptor cur = it.next();
+      if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if ((corruptNodes == null ) || !corruptNodes.contains(cur)) {
+          String rackName = cur.getNetworkLocation();
+          if (!rackSet.contains(rackName)) {
+            rackSet.add(rackName);
+          }
+        }
+      }
+    }
+    return rackSet.size();
+  }
+
+}

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Fri Jul 22 04:20:21 2011
@@ -68,7 +68,7 @@ public class TestReplicationPolicy exten
       throw (RuntimeException)new RuntimeException().initCause(e);
     }
     final BlockManager bm = namenode.getNamesystem().getBlockManager();
-    replicator = bm.replicator;
+    replicator = bm.getBlockPlacementPolicy();
     cluster = bm.getDatanodeManager().getNetworkTopology();
     // construct network topology
     for(int i=0; i<NUM_OF_DATANODES; i++) {

Modified: hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=1149455&r1=1149454&r2=1149455&view=diff
==============================================================================
--- hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java (original)
+++ hadoop/common/trunk/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java Fri Jul 22 04:20:21 2011
@@ -60,21 +60,6 @@ public class NameNodeAdapter {
   public static Server getRpcServer(NameNode namenode) {
     return namenode.server;
   }
-
-  /**
-   * Return a tuple of the replica state (number racks, number live
-   * replicas, and number needed replicas) for the given block.
-   * @param namenode to proxy the invocation to.
-   */
-  public static int[] getReplicaInfo(NameNode namenode, Block b) {
-    FSNamesystem ns = namenode.getNamesystem();
-    ns.readLock();
-    int[] r = {ns.blockManager.getNumberOfRacks(b),
-               ns.blockManager.countNodes(b).liveReplicas(),
-               ns.blockManager.neededReplications.contains(b) ? 1 : 0};
-    ns.readUnlock();
-    return r;
-  }
   
   public static String getLeaseHolderForPath(NameNode namenode, String path) {
     return namenode.getNamesystem().leaseManager.getLeaseByPath(path).getHolder();



Mime
View raw message