hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1399950 [10/27] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/ hadoop-hdfs-httpfs/src/main/java/org/apac...
Date Fri, 19 Oct 2012 02:28:07 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -42,7 +41,8 @@ import org.apache.hadoop.net.NodeBase;
 
 import com.google.common.annotations.VisibleForTesting;
 
-/** The class is responsible for choosing the desired number of targets
+/**
+ * The class is responsible for choosing the desired number of targets
  * for placing block replicas.
  * The replica placement strategy is that if the writer is on a datanode,
  * the 1st replica is placed on the local machine, 
@@ -52,15 +52,18 @@ import com.google.common.annotations.Vis
  */
 @InterfaceAudience.Private
 public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
+
   private static final String enableDebugLogging =
     "For more information, please enable DEBUG log level on "
-    + ((Log4JLogger)LOG).getLogger().getName();
+    + LOG.getClass().getName();
 
   protected boolean considerLoad; 
   private boolean preferLocalNode = true;
   protected NetworkTopology clusterMap;
   private FSClusterStats stats;
   protected long heartbeatInterval;   // interval for DataNode heartbeats
+  private long staleInterval;   // interval used to identify stale DataNodes
+  
   /**
    * A miss of that many heartbeats is tolerated for replica deletion policy.
    */
@@ -77,7 +80,8 @@ public class BlockPlacementPolicyDefault
   @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
                          NetworkTopology clusterMap) {
-    this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
+    this.considerLoad = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
     this.stats = stats;
     this.clusterMap = clusterMap;
     this.heartbeatInterval = conf.getLong(
@@ -86,6 +90,9 @@ public class BlockPlacementPolicyDefault
     this.tolerateHeartbeatMultiplier = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY,
         DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT);
+    this.staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
   }
 
   protected ThreadLocal<StringBuilder> threadLocalBuilder =
@@ -118,7 +125,6 @@ public class BlockPlacementPolicyDefault
         excludedNodes, blocksize);
   }
 
-
   /** This is the implementation. */
   DatanodeDescriptor[] chooseTarget(int numOfReplicas,
                                     DatanodeDescriptor writer,
@@ -155,8 +161,10 @@ public class BlockPlacementPolicyDefault
       writer=null;
     }
       
-    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer, 
-                                                excludedNodes, blocksize, maxNodesPerRack, results);
+    boolean avoidStaleNodes = (stats != null
+        && stats.isAvoidingStaleDataNodesForWrite());
+    DatanodeDescriptor localNode = chooseTarget(numOfReplicas, writer,
+        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (!returnChosenNodes) {  
       results.removeAll(chosenNodes);
     }
@@ -172,8 +180,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) {
-      
+                                          List<DatanodeDescriptor> results,
+                                          final boolean avoidStaleNodes) {
     if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
       return writer;
     }
@@ -184,18 +192,21 @@ public class BlockPlacementPolicyDefault
     if (writer == null && !newBlock) {
       writer = results.get(0);
     }
-      
+
+    // Keep a copy of original excludedNodes
+    final HashMap<Node, Node> oldExcludedNodes = avoidStaleNodes ? 
+        new HashMap<Node, Node>(excludedNodes) : null;
     try {
       if (numOfResults == 0) {
-        writer = chooseLocalNode(writer, excludedNodes, 
-                                 blocksize, maxNodesPerRack, results);
+        writer = chooseLocalNode(writer, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
       if (numOfResults <= 1) {
-        chooseRemoteRack(1, results.get(0), excludedNodes, 
-                         blocksize, maxNodesPerRack, results);
+        chooseRemoteRack(1, results.get(0), excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
         if (--numOfReplicas == 0) {
           return writer;
         }
@@ -203,24 +214,36 @@ public class BlockPlacementPolicyDefault
       if (numOfResults <= 2) {
         if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
           chooseRemoteRack(1, results.get(0), excludedNodes,
-                           blocksize, maxNodesPerRack, results);
+                           blocksize, maxNodesPerRack, 
+                           results, avoidStaleNodes);
         } else if (newBlock){
           chooseLocalRack(results.get(1), excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, avoidStaleNodes);
         } else {
-          chooseLocalRack(writer, excludedNodes, blocksize,
-                          maxNodesPerRack, results);
+          chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
+              results, avoidStaleNodes);
         }
         if (--numOfReplicas == 0) {
           return writer;
         }
       }
-      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, 
-                   blocksize, maxNodesPerRack, results);
+      chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       LOG.warn("Not able to place enough replicas, still in need of "
                + numOfReplicas + " to reach " + totalReplicasExpected + "\n"
                + e.getMessage());
+      if (avoidStaleNodes) {
+        // ecxludedNodes now has - initial excludedNodes, any nodes that were
+        // chosen and nodes that were tried but were not chosen because they
+        // were stale, decommissioned or for any other reason a node is not
+        // chosen for write. Retry again now not avoiding stale node
+        for (Node node : results) {
+          oldExcludedNodes.put(node, node);
+        }
+        return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
+            maxNodesPerRack, results, false);
+      }
     }
     return writer;
   }
@@ -235,26 +258,27 @@ public class BlockPlacementPolicyDefault
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+                                             List<DatanodeDescriptor> results,
+                                             boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     if (preferLocalNode) {
       // otherwise try local machine first
       Node oldNode = excludedNodes.put(localMachine, localMachine);
       if (oldNode == null) { // was not in the excluded list
-        if (isGoodTarget(localMachine, blocksize,
-                         maxNodesPerRack, false, results)) {
+        if (isGoodTarget(localMachine, blocksize, maxNodesPerRack, false,
+            results, avoidStaleNodes)) {
           results.add(localMachine);
           return localMachine;
         }
       } 
     }      
     // try a node on local rack
-    return chooseLocalRack(localMachine, excludedNodes, 
-                           blocksize, maxNodesPerRack, results);
+    return chooseLocalRack(localMachine, excludedNodes, blocksize,
+        maxNodesPerRack, results, avoidStaleNodes);
   }
     
   /* choose one node from the rack that <i>localMachine</i> is on.
@@ -269,19 +293,19 @@ public class BlockPlacementPolicyDefault
                                              HashMap<Node, Node> excludedNodes,
                                              long blocksize,
                                              int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
+                                             List<DatanodeDescriptor> results,
+                                             boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes);
     }
       
     // choose one from the local rack
     try {
-      return chooseRandom(
-                          localMachine.getNetworkLocation(),
-                          excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(localMachine.getNetworkLocation(), excludedNodes,
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -295,18 +319,17 @@ public class BlockPlacementPolicyDefault
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(
-                              newLocal.getNetworkLocation(),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(newLocal.getNetworkLocation(), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
@@ -322,17 +345,19 @@ public class BlockPlacementPolicyDefault
                                 HashMap<Node, Node> excludedNodes,
                                 long blocksize,
                                 int maxReplicasPerRack,
-                                List<DatanodeDescriptor> results)
+                                List<DatanodeDescriptor> results,
+                                boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+localMachine.getNetworkLocation(),
-                   excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
       chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
                    localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-                   maxReplicasPerRack, results);
+                   maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
@@ -344,7 +369,8 @@ public class BlockPlacementPolicyDefault
                                           HashMap<Node, Node> excludedNodes,
                                           long blocksize,
                                           int maxNodesPerRack,
-                                          List<DatanodeDescriptor> results) 
+                                          List<DatanodeDescriptor> results,
+                                          boolean avoidStaleNodes) 
     throws NotEnoughReplicasException {
     int numOfAvailableNodes =
       clusterMap.countNumOfAvailableNodes(nodes, excludedNodes.keySet());
@@ -362,7 +388,8 @@ public class BlockPlacementPolicyDefault
       Node oldNode = excludedNodes.put(chosenNode, chosenNode);
       if (oldNode == null) { // chosenNode was not in the excluded list
         numOfAvailableNodes--;
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, 
+                maxNodesPerRack, results, avoidStaleNodes)) {
           results.add(chosenNode);
           adjustExcludedNodes(excludedNodes, chosenNode);
           return chosenNode;
@@ -389,7 +416,8 @@ public class BlockPlacementPolicyDefault
                             HashMap<Node, Node> excludedNodes,
                             long blocksize,
                             int maxNodesPerRack,
-                            List<DatanodeDescriptor> results)
+                            List<DatanodeDescriptor> results,
+                            boolean avoidStaleNodes)
     throws NotEnoughReplicasException {
       
     int numOfAvailableNodes =
@@ -408,7 +436,8 @@ public class BlockPlacementPolicyDefault
       if (oldNode == null) {
         numOfAvailableNodes--;
 
-        if (isGoodTarget(chosenNode, blocksize, maxNodesPerRack, results)) {
+        if (isGoodTarget(chosenNode, blocksize, 
+              maxNodesPerRack, results, avoidStaleNodes)) {
           numOfReplicas--;
           results.add(chosenNode);
           adjustExcludedNodes(excludedNodes, chosenNode);
@@ -449,16 +478,34 @@ public class BlockPlacementPolicyDefault
    * does not have too much load, and the rack does not have too many nodes
    */
   private boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
-                               List<DatanodeDescriptor> results) {
-    return isGoodTarget(node, blockSize, maxTargetPerLoc,
-                        this.considerLoad, results);
+                               long blockSize, int maxTargetPerRack,
+                               List<DatanodeDescriptor> results, 
+                               boolean avoidStaleNodes) {
+    return isGoodTarget(node, blockSize, maxTargetPerRack, this.considerLoad,
+        results, avoidStaleNodes);
   }
-    
+  
+  /**
+   * Determine if a node is a good target. 
+   * 
+   * @param node The target node
+   * @param blockSize Size of block
+   * @param maxTargetPerRack Maximum number of targets per rack. The value of 
+   *                       this parameter depends on the number of racks in 
+   *                       the cluster and total number of replicas for a block
+   * @param considerLoad whether or not to consider load of the target node
+   * @param results A list containing currently chosen nodes. Used to check if 
+   *                too many nodes has been chosen in the target rack.
+   * @param avoidStaleNodes Whether or not to avoid choosing stale nodes
+   * @return Return true if <i>node</i> has enough space, 
+   *         does not have too much load, 
+   *         and the rack does not have too many nodes.
+   */
   protected boolean isGoodTarget(DatanodeDescriptor node,
-                               long blockSize, int maxTargetPerLoc,
+                               long blockSize, int maxTargetPerRack,
                                boolean considerLoad,
-                               List<DatanodeDescriptor> results) {
+                               List<DatanodeDescriptor> results,                           
+                               boolean avoidStaleNodes) {
     // check if the node is (being) decommissed
     if (node.isDecommissionInProgress() || node.isDecommissioned()) {
       if(LOG.isDebugEnabled()) {
@@ -469,6 +516,17 @@ public class BlockPlacementPolicyDefault
       return false;
     }
 
+    if (avoidStaleNodes) {
+      if (node.isStale(this.staleInterval)) {
+        if (LOG.isDebugEnabled()) {
+          threadLocalBuilder.get().append(node.toString()).append(": ")
+              .append("Node ").append(NodeBase.getPath(node))
+              .append(" is not chosen because the node is staled ");
+        }
+        return false;
+      }
+    }
+    
     long remaining = node.getRemaining() - 
                      (node.getBlocksScheduled() * blockSize); 
     // check the remaining capacity of the target machine
@@ -508,7 +566,7 @@ public class BlockPlacementPolicyDefault
         counter++;
       }
     }
-    if (counter>maxTargetPerLoc) {
+    if (counter>maxTargetPerRack) {
       if(LOG.isDebugEnabled()) {
         threadLocalBuilder.get().append(node.toString()).append(": ")
           .append("Node ").append(NodeBase.getPath(node))

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Oct 19 02:25:55 2012
@@ -52,6 +52,7 @@ public class BlockPlacementPolicyWithNod
   BlockPlacementPolicyWithNodeGroup() {
   }
 
+  @Override
   public void initialize(Configuration conf,  FSClusterStats stats,
           NetworkTopology clusterMap) {
     super.initialize(conf, stats, clusterMap);
@@ -63,23 +64,20 @@ public class BlockPlacementPolicyWithNod
    * @return the chosen node
    */
   @Override
-  protected DatanodeDescriptor chooseLocalNode(
-      DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes,
-      long blocksize,
-      int maxNodesPerRack,
-      List<DatanodeDescriptor> results)
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
         throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
 
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
       if (isGoodTarget(localMachine, blocksize,
-          maxNodesPerRack, false, results)) {
+          maxNodesPerRack, false, results, avoidStaleNodes)) {
         results.add(localMachine);
         // Nodes under same nodegroup should be excluded.
         addNodeGroupToExcludedNodes(excludedNodes,
@@ -91,18 +89,15 @@ public class BlockPlacementPolicyWithNod
     // try a node on local node group
     DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (chosenNode != null) {
       return chosenNode;
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   protected void adjustExcludedNodes(HashMap<Node, Node> excludedNodes,
       Node chosenNode) {
@@ -120,21 +115,16 @@ public class BlockPlacementPolicyWithNod
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+                          blocksize, maxNodesPerRack, results, 
+                          avoidStaleNodes);
     }
 
     // choose one from the local rack, but off-nodegroup
@@ -142,7 +132,8 @@ public class BlockPlacementPolicyWithNod
       return chooseRandom(NetworkTopology.getFirstHalf(
                               localMachine.getNetworkLocation()),
                           excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, 
+                          avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -156,42 +147,39 @@ public class BlockPlacementPolicyWithNod
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(
+              clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
-          DatanodeDescriptor localMachine,
-          HashMap<Node, Node> excludedNodes,
-          long blocksize,
-          int maxReplicasPerRack,
-          List<DatanodeDescriptor> results)
-          throws NotEnoughReplicasException {
+      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
-          localMachine.getNetworkLocation()),
-      excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(
+          numOfReplicas,
+          "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
-      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-      localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-      maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+          localMachine.getNetworkLocation(), excludedNodes, blocksize,
+          maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
@@ -201,19 +189,22 @@ public class BlockPlacementPolicyWithNod
    * if still no such node is available, choose a random node in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
-      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize, 
-      int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+  private DatanodeDescriptor chooseLocalNodeGroup(
+      NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-      blocksize, maxNodesPerRack, results);
+      blocksize, maxNodesPerRack, results, avoidStaleNodes);
     }
 
     // choose one from the local node group
     try {
-      return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
-      excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(
+          clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -227,17 +218,19 @@ public class BlockPlacementPolicyWithNod
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
-            excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(
+              clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+              excludedNodes, blocksize, maxNodesPerRack, results,
+              avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Fri Oct 19 02:25:55 2012
@@ -37,15 +37,18 @@ class BlocksMap {
       this.blockInfo = blkInfo;
     }
 
+    @Override
     public boolean hasNext() {
       return blockInfo != null && nextIdx < blockInfo.getCapacity()
               && blockInfo.getDatanode(nextIdx) != null;
     }
 
+    @Override
     public DatanodeDescriptor next() {
       return blockInfo.getDatanode(nextIdx++);
     }
 
+    @Override
     public void remove()  {
       throw new UnsupportedOperationException("Sorry. can't remove.");
     }
@@ -91,7 +94,7 @@ class BlocksMap {
   }
 
   void close() {
-    blocks = null;
+    // Empty blocks once GSet#clear is implemented (HDFS-3940)
   }
 
   BlockCollection getBlockCollection(Block b) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Oct 19 02:25:55 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
 
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
@@ -275,11 +276,11 @@ public class DatanodeDescriptor extends 
   }
 
   public void resetBlocks() {
-    this.capacity = 0;
-    this.remaining = 0;
-    this.blockPoolUsed = 0;
-    this.dfsUsed = 0;
-    this.xceiverCount = 0;
+    setCapacity(0);
+    setRemaining(0);
+    setBlockPoolUsed(0);
+    setDfsUsed(0);
+    setXceiverCount(0);
     this.blockList = null;
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
@@ -302,15 +303,15 @@ public class DatanodeDescriptor extends 
    */
   public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
       long blockPoolUsed, int xceiverCount, int volFailures) {
-    this.capacity = capacity;
-    this.dfsUsed = dfsUsed;
-    this.remaining = remaining;
-    this.blockPoolUsed = blockPoolUsed;
-    this.lastUpdate = System.currentTimeMillis();
-    this.xceiverCount = xceiverCount;
+    setCapacity(capacity);
+    setRemaining(remaining);
+    setBlockPoolUsed(blockPoolUsed);
+    setDfsUsed(dfsUsed);
+    setXceiverCount(xceiverCount);
+    setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
     this.heartbeatedSinceFailover = true;
-    rollBlocksScheduled(lastUpdate);
+    rollBlocksScheduled(getLastUpdate());
   }
 
   /**
@@ -325,16 +326,19 @@ public class DatanodeDescriptor extends 
       this.node = dn;
     }
 
+    @Override
     public boolean hasNext() {
       return current != null;
     }
 
+    @Override
     public BlockInfo next() {
       BlockInfo res = current;
       current = current.getNext(current.findDatanode(node));
       return res;
     }
 
+    @Override
     public void remove()  {
       throw new UnsupportedOperationException("Sorry. can't remove.");
     }
@@ -541,6 +545,7 @@ public class DatanodeDescriptor extends 
   /**
    * @param nodeReg DatanodeID to update registration for.
    */
+  @Override
   public void updateRegInfo(DatanodeID nodeReg) {
     super.updateRegInfo(nodeReg);
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -50,7 +51,6 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
@@ -73,8 +73,10 @@ import org.apache.hadoop.net.ScriptBased
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 
 /**
@@ -87,8 +89,8 @@ public class DatanodeManager {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
-
   private final HeartbeatManager heartbeatManager;
+  private Daemon decommissionthread = null;
 
   /**
    * Stores the datanode -> block map.  
@@ -126,23 +128,33 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
+  /** Whether or not to check stale DataNodes for read/write */
+  private final boolean checkForStaleDataNodes;
+
+  /** The interval for judging stale DataNodes for read/write */
+  private final long staleInterval;
+  
+  /** Whether or not to avoid using stale DataNodes for writing */
+  private volatile boolean avoidStaleDataNodesForWrite;
+  
+  /** The number of stale DataNodes */
+  private volatile int numStaleNodes;
+  
   /**
    * Whether or not this cluster has ever consisted of more than 1 rack,
    * according to the NetworkTopology.
    */
   private boolean hasClusterEverBeenMultiRack = false;
   
-  DatanodeManager(final BlockManager blockManager,
-      final Namesystem namesystem, final Configuration conf
-      ) throws IOException {
+  DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
+      final Configuration conf) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
     Class<? extends NetworkTopology> networkTopologyClass =
         conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
             NetworkTopology.class, NetworkTopology.class);
-    networktopology = (NetworkTopology) ReflectionUtils.newInstance(
-        networkTopologyClass, conf);
+    networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 
@@ -175,10 +187,69 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
     LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
         + "=" + this.blockInvalidateLimit);
+    
+    checkForStaleDataNodes = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+
+    staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+    avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+        checkForStaleDataNodes);
   }
-
-  private Daemon decommissionthread = null;
-
+  
+  private static long getStaleIntervalFromConf(Configuration conf,
+      long heartbeatExpireInterval) {
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+    Preconditions.checkArgument(staleInterval > 0,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
+        " = '" + staleInterval + "' is invalid. " +
+        "It should be a positive non-zero value.");
+    
+    final long heartbeatIntervalSeconds = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    // The stale interval value cannot be smaller than 
+    // 3 times of heartbeat interval 
+    final long minStaleInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+        * heartbeatIntervalSeconds * 1000;
+    if (staleInterval < minStaleInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is less than "
+          + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+          + " heartbeat intervals. This may cause too frequent changes of " 
+          + "stale states of DataNodes since a heartbeat msg may be missing " 
+          + "due to temporary short-term failures. Reset stale interval to " 
+          + minStaleInterval + ".");
+      staleInterval = minStaleInterval;
+    }
+    if (staleInterval > heartbeatExpireInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is larger than heartbeat expire interval "
+          + heartbeatExpireInterval + ".");
+    }
+    return staleInterval;
+  }
+  
+  static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+      boolean checkForStale) {
+    boolean avoid = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+    boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+    if (!checkForStale && avoid) {
+      LOG.warn("Cannot set "
+          + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+          + " as false while setting "
+          + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+          + " as true.");
+    }
+    return avoidStaleDataNodesForWrite;
+  }
+  
   void activate(final Configuration conf) {
     final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
     this.decommissionthread = new Daemon(dm.new Monitor(
@@ -192,7 +263,13 @@ public class DatanodeManager {
   }
 
   void close() {
-    if (decommissionthread != null) decommissionthread.interrupt();
+    if (decommissionthread != null) {
+      decommissionthread.interrupt();
+      try {
+        decommissionthread.join(3000);
+      } catch (InterruptedException e) {
+      }
+    }
     heartbeatManager.close();
   }
 
@@ -225,14 +302,18 @@ public class DatanodeManager {
       if (rName != null)
         client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
     }
+    
+    Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ? 
+        new DFSUtil.DecomStaleComparator(staleInterval) : 
+        DFSUtil.DECOM_COMPARATOR;
+        
     for (LocatedBlock b : locatedblocks) {
       networktopology.pseudoSortByDistance(client, b.getLocations());
-      
-      // Move decommissioned datanodes to the bottom
-      Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
+      // Move decommissioned/stale datanodes to the bottom
+      Arrays.sort(b.getLocations(), comparator);
     }
   }
-
+  
   CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
       final String firstkey) {
     return new CyclicIteration<String, DatanodeDescriptor>(
@@ -338,7 +419,7 @@ public class DatanodeManager {
   /** Is the datanode dead? */
   boolean isDatanodeDead(DatanodeDescriptor node) {
     return (node.getLastUpdate() <
-            (Util.now() - heartbeatExpireInterval));
+            (Time.now() - heartbeatExpireInterval));
   }
 
   /** Add a datanode. */
@@ -486,7 +567,7 @@ public class DatanodeManager {
   /**
    * Decommission the node if it is in exclude list.
    */
-  private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) { 
+  private void checkDecommissioning(DatanodeDescriptor nodeReg) { 
     // If the registered node is in exclude list, then decommission it
     if (inExcludedHostsList(nodeReg)) {
       startDecommission(nodeReg);
@@ -582,7 +663,8 @@ public class DatanodeManager {
         + " storage " + nodeReg.getStorageID());
 
     DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
+    DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
+        nodeReg.getIpAddr(), nodeReg.getXferPort());
       
     if (nodeN != null && nodeN != nodeS) {
       NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
@@ -631,7 +713,7 @@ public class DatanodeManager {
         
       // also treat the registration message as a heartbeat
       heartbeatManager.register(nodeS);
-      checkDecommissioning(nodeS, dnAddress);
+      checkDecommissioning(nodeS);
       return;
     } 
 
@@ -651,7 +733,7 @@ public class DatanodeManager {
       = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
     resolveNetworkLocation(nodeDescr);
     addDatanode(nodeDescr);
-    checkDecommissioning(nodeDescr, dnAddress);
+    checkDecommissioning(nodeDescr);
     
     // also treat the registration message as a heartbeat
     // no need to update its timestamp
@@ -692,7 +774,7 @@ public class DatanodeManager {
    * 3. Added to exclude --> start decommission.
    * 4. Removed from exclude --> stop decommission.
    */
-  private void refreshDatanodes() throws IOException {
+  private void refreshDatanodes() {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node)) {
@@ -751,7 +833,61 @@ public class DatanodeManager {
       namesystem.readUnlock();
     }
   }
+  
+  /* Getter and Setter for stale DataNodes related attributes */
+  
+  /**
+   * @return whether or not to avoid writing to stale datanodes
+   */
+  public boolean isAvoidingStaleDataNodesForWrite() {
+    return avoidStaleDataNodesForWrite;
+  }
+
+  /**
+   * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}. 
+   * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
+   * half of the DataNodes are marked as stale.
+   * 
+   * @param avoidStaleDataNodesForWrite
+   *          The value to set to
+   *          {@link DatanodeManager#avoidStaleDataNodesForWrite}
+   */
+  void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+    this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+  }
+
+  /**
+   * @return Whether or not to check stale DataNodes for R/W
+   */
+  boolean isCheckingForStaleDataNodes() {
+    return checkForStaleDataNodes;
+  }
+  
+  /**
+   * @return The time interval used to mark DataNodes as stale.
+   */
+  long getStaleInterval() {
+    return staleInterval;
+  }
 
+  /**
+   * Set the number of current stale DataNodes. The HeartbeatManager got this
+   * number based on DataNodes' heartbeats.
+   * 
+   * @param numStaleNodes
+   *          The number of stale DataNodes to be set.
+   */
+  void setNumStaleNodes(int numStaleNodes) {
+    this.numStaleNodes = numStaleNodes;
+  }
+  
+  /**
+   * @return Return the current number of stale DataNodes (detected by
+   * HeartbeatManager). 
+   */
+  public int getNumStaleNodes() {
+    return this.numStaleNodes;
+  }
 
   /** Fetch live and dead datanodes. */
   public void fetchDatanodes(final List<DatanodeDescriptor> live, 
@@ -930,7 +1066,7 @@ public class DatanodeManager {
     return nodes;
   }
   
-  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+  private void setDatanodeDead(DatanodeDescriptor node) {
     node.setLastUpdate(0);
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Oct 19 02:25:55 2012
@@ -26,9 +26,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Manage the heartbeats received from datanodes.
@@ -54,18 +56,48 @@ class HeartbeatManager implements Datano
   private final long heartbeatRecheckInterval;
   /** Heartbeat monitor thread */
   private final Daemon heartbeatThread = new Daemon(new Monitor());
-
+  /**
+   * The initial setting of end user which indicates whether or not to avoid
+   * writing to stale datanodes.
+   */
+  private final boolean initialAvoidWriteStaleNodes;
+  /**
+   * When the ratio of stale datanodes reaches this number, stop avoiding 
+   * writing to stale datanodes, i.e., continue using stale nodes for writing.
+   */
+  private final float ratioUseStaleDataNodesForWrite;
+    
   final Namesystem namesystem;
   final BlockManager blockManager;
 
-  HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
-      final Configuration conf) {
-    this.heartbeatRecheckInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-
+  HeartbeatManager(final Namesystem namesystem,
+      final BlockManager blockManager, final Configuration conf) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
+    boolean checkStaleNodes = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+    long recheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
+    this.initialAvoidWriteStaleNodes = DatanodeManager
+        .getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
+    this.ratioUseStaleDataNodesForWrite = conf.getFloat(
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+    Preconditions.checkArgument(
+        (ratioUseStaleDataNodesForWrite > 0 && 
+            ratioUseStaleDataNodesForWrite <= 1.0f),
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
+        " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
+        "It should be a positive non-zero float value, not greater than 1.0f.");
+    
+    this.heartbeatRecheckInterval = (checkStaleNodes 
+        && initialAvoidWriteStaleNodes 
+        && staleInterval < recheckInterval) ? staleInterval : recheckInterval;
   }
 
   void activate(Configuration conf) {
@@ -74,6 +106,11 @@ class HeartbeatManager implements Datano
 
   void close() {
     heartbeatThread.interrupt();
+    try {
+      // This will no effect if the thread hasn't yet been started.
+      heartbeatThread.join(3000);
+    } catch (InterruptedException e) {
+    }
   }
   
   synchronized int getLiveDatanodeCount() {
@@ -205,16 +242,39 @@ class HeartbeatManager implements Datano
     if (namesystem.isInSafeMode()) {
       return;
     }
+    boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
     boolean allAlive = false;
     while (!allAlive) {
       // locate the first dead node.
       DatanodeID dead = null;
+      // check the number of stale nodes
+      int numOfStaleNodes = 0;
       synchronized(this) {
         for (DatanodeDescriptor d : datanodes) {
-          if (dm.isDatanodeDead(d)) {
+          if (dead == null && dm.isDatanodeDead(d)) {
             stats.incrExpiredHeartbeats();
             dead = d;
-            break;
+            if (!checkStaleNodes) {
+              break;
+            }
+          }
+          if (checkStaleNodes && 
+              d.isStale(dm.getStaleInterval())) {
+            numOfStaleNodes++;
+          }
+        }
+        
+        // Change whether to avoid using stale datanodes for writing
+        // based on proportion of stale datanodes
+        if (checkStaleNodes) {
+          dm.setNumStaleNodes(numOfStaleNodes);
+          if (numOfStaleNodes > 
+                datanodes.size() * ratioUseStaleDataNodesForWrite) {
+            dm.setAvoidStaleDataNodesForWrite(false);
+          } else {
+            if (this.initialAvoidWriteStaleNodes) {
+              dm.setAvoidStaleDataNodesForWrite(true);
+            }
           }
         }
       }
@@ -223,10 +283,10 @@ class HeartbeatManager implements Datano
       if (!allAlive) {
         // acquire the fsnamesystem lock, and then remove the dead node.
         namesystem.writeLock();
-        if (namesystem.isInSafeMode()) {
-          return;
-        }
         try {
+          if (namesystem.isInSafeMode()) {
+            return;
+          }
           synchronized(this) {
             dm.removeDeadDatanode(dead);
           }
@@ -247,7 +307,7 @@ class HeartbeatManager implements Datano
     public void run() {
       while(namesystem.isRunning()) {
         try {
-          final long now = Util.now();
+          final long now = Time.now();
           if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
             heartbeatCheck();
             lastHeartbeatCheck = now;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Fri Oct 19 02:25:55 2012
@@ -159,6 +159,35 @@ class Host2NodesMap {
     }
   }
   
+  /**
+   * Find data node by its transfer address
+   *
+   * @return DatanodeDescriptor if found or null otherwise
+   */
+  public DatanodeDescriptor getDatanodeByXferAddr(String ipAddr,
+      int xferPort) {
+    if (ipAddr==null) {
+      return null;
+    }
+
+    hostmapLock.readLock().lock();
+    try {
+      DatanodeDescriptor[] nodes = map.get(ipAddr);
+      // no entry
+      if (nodes== null) {
+        return null;
+      }
+      for(DatanodeDescriptor containedNode:nodes) {
+        if (xferPort == containedNode.getXferPort()) {
+          return containedNode;
+        }
+      }
+      return null;
+    } finally {
+      hostmapLock.readLock().unlock();
+    }
+  }
+
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Oct 19 02:25:55 2012
@@ -134,26 +134,7 @@ class InvalidateBlocks {
     return new ArrayList<String>(node2blocks.keySet());
   }
 
-  /** Invalidate work for the storage. */
-  int invalidateWork(final String storageId) {
-    final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
-    if (dn == null) {
-      remove(storageId);
-      return 0;
-    }
-    final List<Block> toInvalidate = invalidateWork(storageId, dn);
-    if (toInvalidate == null) {
-      return 0;
-    }
-
-    if (NameNode.stateChangeLog.isInfoEnabled()) {
-      NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
-          + ": ask " + dn + " to delete " + toInvalidate);
-    }
-    return toInvalidate.size();
-  }
-
-  private synchronized List<Block> invalidateWork(
+  synchronized List<Block> invalidateWork(
       final String storageId, final DatanodeDescriptor dn) {
     final LightWeightHashSet<Block> set = node2blocks.get(storageId);
     if (set == null) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Fri Oct 19 02:25:55 2012
@@ -22,11 +22,7 @@ import java.util.Map;
 import java.util.Queue;
 
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Oct 19 02:25:55 2012
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.server.common.Util.now;
+import static org.apache.hadoop.util.Time.now;
 
 import java.io.PrintWriter;
 import java.sql.Time;
@@ -192,6 +192,7 @@ class PendingReplicationBlocks {
    * their replication request.
    */
   class PendingReplicationMonitor implements Runnable {
+    @Override
     public void run() {
       while (fsRunning) {
         long period = Math.min(defaultRecheckInterval, timeout);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Fri Oct 19 02:25:55 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 
 /****************************************************************
@@ -35,7 +37,7 @@ public class GenerationStamp implements 
    */
   public static final long GRANDFATHER_GENERATION_STAMP = 0;
 
-  private volatile long genstamp;
+  private AtomicLong genstamp = new AtomicLong();
 
   /**
    * Create a new instance, initialized to FIRST_VALID_STAMP.
@@ -48,35 +50,36 @@ public class GenerationStamp implements 
    * Create a new instance, initialized to the specified value.
    */
   GenerationStamp(long stamp) {
-    this.genstamp = stamp;
+    genstamp.set(stamp);
   }
 
   /**
    * Returns the current generation stamp
    */
   public long getStamp() {
-    return this.genstamp;
+    return genstamp.get();
   }
 
   /**
    * Sets the current generation stamp
    */
   public void setStamp(long stamp) {
-    this.genstamp = stamp;
+    genstamp.set(stamp);
   }
 
   /**
    * First increments the counter and then returns the stamp 
    */
-  public synchronized long nextStamp() {
-    this.genstamp++;
-    return this.genstamp;
+  public long nextStamp() {
+    return genstamp.incrementAndGet();
   }
 
   @Override // Comparable
   public int compareTo(GenerationStamp that) {
-    return this.genstamp < that.genstamp ? -1 :
-           this.genstamp > that.genstamp ? 1 : 0;
+    long stamp1 = this.genstamp.get();
+    long stamp2 = that.genstamp.get();
+    return stamp1 < stamp2 ? -1 :
+           stamp1 > stamp2 ? 1 : 0;
   }
 
   @Override // Object
@@ -89,6 +92,7 @@ public class GenerationStamp implements 
 
   @Override // Object
   public int hashCode() {
-    return (int) (genstamp^(genstamp>>>32));
+    long stamp = genstamp.get();
+    return (int) (stamp^(stamp>>>32));
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Oct 19 02:25:55 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
    */
   static public enum NodeType {
     NAME_NODE,
-    DATA_NODE;
+    DATA_NODE,
+    JOURNAL_NODE;
   }
 
   /** Startup options */
@@ -60,7 +61,7 @@ public final class HdfsServerConstants {
     FORCE("-force"),
     NONINTERACTIVE("-nonInteractive");
     
-    private String name = null;
+    private final String name;
     
     // Used only with format and upgrade options
     private String clusterId = null;
@@ -141,6 +142,7 @@ public final class HdfsServerConstants {
     private String description = null;
     private NamenodeRole(String arg) {this.description = arg;}
   
+    @Override
     public String toString() {
       return description;
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Oct 19 02:25:55 2012
@@ -44,12 +44,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -59,14 +59,12 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.http.HtmlQuoting;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authentication.util.KerberosName;
-import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
@@ -111,6 +109,7 @@ public class JspHelper {
   // compare two records based on their frequency
   private static class NodeRecordComparator implements Comparator<NodeRecord> {
 
+    @Override
     public int compare(NodeRecord o1, NodeRecord o2) {
       if (o1.frequency < o2.frequency) {
         return -1;
@@ -194,7 +193,8 @@ public class JspHelper {
   public static void streamBlockInAscii(InetSocketAddress addr, String poolId,
       long blockId, Token<BlockTokenIdentifier> blockToken, long genStamp,
       long blockSize, long offsetIntoBlock, long chunkSizeToView,
-      JspWriter out, Configuration conf) throws IOException {
+      JspWriter out, Configuration conf, DataEncryptionKey encryptionKey)
+          throws IOException {
     if (chunkSizeToView == 0) return;
     Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket();
     s.connect(addr, HdfsServerConstants.READ_TIMEOUT);
@@ -207,7 +207,7 @@ public class JspHelper {
     BlockReader blockReader = BlockReaderFactory.newBlockReader(
         conf, s, file,
         new ExtendedBlock(poolId, blockId, 0, genStamp), blockToken,
-        offsetIntoBlock, amtToRead);
+        offsetIntoBlock, amtToRead, encryptionKey);
         
     byte[] buf = new byte[(int)amtToRead];
     int readOffset = 0;
@@ -276,6 +276,9 @@ public class JspHelper {
         FIELD_PERCENT_REMAINING = 9,
         FIELD_ADMIN_STATE       = 10,
         FIELD_DECOMMISSIONED    = 11,
+        FIELD_BLOCKPOOL_USED    = 12,
+        FIELD_PERBLOCKPOOL_USED = 13,
+        FIELD_FAILED_VOLUMES    = 14,
         SORT_ORDER_ASC          = 1,
         SORT_ORDER_DSC          = 2;
 
@@ -303,6 +306,12 @@ public class JspHelper {
           sortField = FIELD_ADMIN_STATE;
         } else if (field.equals("decommissioned")) {
           sortField = FIELD_DECOMMISSIONED;
+        } else if (field.equals("bpused")) {
+          sortField = FIELD_BLOCKPOOL_USED;
+        } else if (field.equals("pcbpused")) {
+          sortField = FIELD_PERBLOCKPOOL_USED;
+        } else if (field.equals("volfails")) {
+          sortField = FIELD_FAILED_VOLUMES;
         } else {
           sortField = FIELD_NAME;
         }
@@ -314,6 +323,7 @@ public class JspHelper {
         }
       }
 
+      @Override
       public int compare(DatanodeDescriptor d1,
                          DatanodeDescriptor d2) {
         int ret = 0;
@@ -360,6 +370,18 @@ public class JspHelper {
         case FIELD_NAME: 
           ret = d1.getHostName().compareTo(d2.getHostName());
           break;
+        case FIELD_BLOCKPOOL_USED:
+          dlong = d1.getBlockPoolUsed() - d2.getBlockPoolUsed();
+          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
+          break;
+        case FIELD_PERBLOCKPOOL_USED:
+          ddbl = d1.getBlockPoolUsedPercent() - d2.getBlockPoolUsedPercent();
+          ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
+          break;
+        case FIELD_FAILED_VOLUMES:
+          int dint = d1.getVolumeFailures() - d2.getVolumeFailures();
+          ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0);
+          break;
         }
         return (sortOrder == SORT_ORDER_DSC) ? -ret : ret;
       }
@@ -485,12 +507,17 @@ public class JspHelper {
    */
   public static UserGroupInformation getDefaultWebUser(Configuration conf
                                                        ) throws IOException {
+    return UserGroupInformation.createRemoteUser(getDefaultWebUserName(conf));
+  }
+
+  private static String getDefaultWebUserName(Configuration conf
+      ) throws IOException {
     String user = conf.get(
         HADOOP_HTTP_STATIC_USER, DEFAULT_HADOOP_HTTP_STATIC_USER);
     if (user == null || user.length() == 0) {
       throw new IOException("Cannot determine UGI from request or conf");
     }
-    return UserGroupInformation.createRemoteUser(user);
+    return user;
   }
 
   private static InetSocketAddress getNNServiceAddress(ServletContext context,
@@ -536,65 +563,45 @@ public class JspHelper {
       HttpServletRequest request, Configuration conf,
       final AuthenticationMethod secureAuthMethod,
       final boolean tryUgiParameter) throws IOException {
-    final UserGroupInformation ugi;
+    UserGroupInformation ugi = null;
     final String usernameFromQuery = getUsernameFromQuery(request, tryUgiParameter);
     final String doAsUserFromQuery = request.getParameter(DoAsParam.NAME);
-
-    if(UserGroupInformation.isSecurityEnabled()) {
-      final String remoteUser = request.getRemoteUser();
-      String tokenString = request.getParameter(DELEGATION_PARAMETER_NAME);
+    final String remoteUser;
+   
+    if (UserGroupInformation.isSecurityEnabled()) {
+      remoteUser = request.getRemoteUser();
+      final String tokenString = request.getParameter(DELEGATION_PARAMETER_NAME);
       if (tokenString != null) {
-        Token<DelegationTokenIdentifier> token = 
-          new Token<DelegationTokenIdentifier>();
-        token.decodeFromUrlString(tokenString);
-        InetSocketAddress serviceAddress = getNNServiceAddress(context, request);
-        if (serviceAddress != null) {
-          SecurityUtil.setTokenService(token, serviceAddress);
-          token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
-        }
-        ByteArrayInputStream buf = new ByteArrayInputStream(token
-            .getIdentifier());
-        DataInputStream in = new DataInputStream(buf);
-        DelegationTokenIdentifier id = new DelegationTokenIdentifier();
-        id.readFields(in);
-        if (context != null) {
-          final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
-          if (nn != null) {
-            // Verify the token.
-            nn.getNamesystem().verifyToken(id, token.getPassword());
-          }
-        }
-        ugi = id.getUser();
-        if (ugi.getRealUser() == null) {
-          //non-proxy case
-          checkUsername(ugi.getShortUserName(), usernameFromQuery);
-          checkUsername(null, doAsUserFromQuery);
-        } else {
-          //proxy case
-          checkUsername(ugi.getRealUser().getShortUserName(), usernameFromQuery);
-          checkUsername(ugi.getShortUserName(), doAsUserFromQuery);
-          ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
-        }
-        ugi.addToken(token);
-        ugi.setAuthenticationMethod(AuthenticationMethod.TOKEN);
-      } else {
-        if(remoteUser == null) {
-          throw new IOException("Security enabled but user not " +
-                                "authenticated by filter");
-        }
-        final UserGroupInformation realUgi = UserGroupInformation.createRemoteUser(remoteUser);
-        checkUsername(realUgi.getShortUserName(), usernameFromQuery);
+        // Token-based connections need only verify the effective user, and
+        // disallow proxying to different user.  Proxy authorization checks
+        // are not required since the checks apply to issuing a token.
+        ugi = getTokenUGI(context, request, tokenString, conf);
+        checkUsername(ugi.getShortUserName(), usernameFromQuery);
+        checkUsername(ugi.getShortUserName(), doAsUserFromQuery);
+      } else if (remoteUser == null) {
+        throw new IOException(
+            "Security enabled but user not authenticated by filter");
+      }
+    } else {
+      // Security's not on, pull from url or use default web user
+      remoteUser = (usernameFromQuery == null)
+          ? getDefaultWebUserName(conf) // not specified in request
+          : usernameFromQuery;
+    }
+
+    if (ugi == null) { // security is off, or there's no token
+      ugi = UserGroupInformation.createRemoteUser(remoteUser);
+      checkUsername(ugi.getShortUserName(), usernameFromQuery);
+      if (UserGroupInformation.isSecurityEnabled()) {
         // This is not necessarily true, could have been auth'ed by user-facing
         // filter
-        realUgi.setAuthenticationMethod(secureAuthMethod);
-        ugi = initUGI(realUgi, doAsUserFromQuery, request, true, conf);
+        ugi.setAuthenticationMethod(secureAuthMethod);
+      }
+      if (doAsUserFromQuery != null) {
+        // create and attempt to authorize a proxy user
+        ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, ugi);
+        ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
       }
-    } else { // Security's not on, pull from url
-      final UserGroupInformation realUgi = usernameFromQuery == null?
-          getDefaultWebUser(conf) // not specified in request
-          : UserGroupInformation.createRemoteUser(usernameFromQuery);
-      realUgi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
-      ugi = initUGI(realUgi, doAsUserFromQuery, request, false, conf);
     }
     
     if(LOG.isDebugEnabled())
@@ -602,21 +609,34 @@ public class JspHelper {
     return ugi;
   }
 
-  private static UserGroupInformation initUGI(final UserGroupInformation realUgi,
-      final String doAsUserFromQuery, final HttpServletRequest request,
-      final boolean isSecurityEnabled, final Configuration conf
-      ) throws AuthorizationException {
-    final UserGroupInformation ugi;
-    if (doAsUserFromQuery == null) {
-      //non-proxy case
-      ugi = realUgi;
-    } else {
-      //proxy case
-      ugi = UserGroupInformation.createProxyUser(doAsUserFromQuery, realUgi);
-      ugi.setAuthenticationMethod(
-          isSecurityEnabled? AuthenticationMethod.PROXY: AuthenticationMethod.SIMPLE);
-      ProxyUsers.authorize(ugi, request.getRemoteAddr(), conf);
+  private static UserGroupInformation getTokenUGI(ServletContext context,
+                                                  HttpServletRequest request,
+                                                  String tokenString,
+                                                  Configuration conf)
+                                                      throws IOException {
+    final Token<DelegationTokenIdentifier> token =
+        new Token<DelegationTokenIdentifier>();
+    token.decodeFromUrlString(tokenString);
+    InetSocketAddress serviceAddress = getNNServiceAddress(context, request);
+    if (serviceAddress != null) {
+      SecurityUtil.setTokenService(token, serviceAddress);
+      token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    }
+
+    ByteArrayInputStream buf =
+        new ByteArrayInputStream(token.getIdentifier());
+    DataInputStream in = new DataInputStream(buf);
+    DelegationTokenIdentifier id = new DelegationTokenIdentifier();
+    id.readFields(in);
+    if (context != null) {
+      final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
+      if (nn != null) {
+        // Verify the token.
+        nn.getNamesystem().verifyToken(id, token.getPassword());
+      }
     }
+    UserGroupInformation ugi = id.getUser();
+    ugi.addToken(token);
     return ugi;
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Oct 19 02:25:55 2012
@@ -39,8 +39,11 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Preconditions;
+
 
 
 /**
@@ -75,7 +78,7 @@ public abstract class Storage extends St
   /** Layout versions of 0.20.203 release */
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
-  private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
+  public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
   public    static final String STORAGE_DIR_PREVIOUS  = "previous";
@@ -85,6 +88,12 @@ public abstract class Storage extends St
   public    static final String STORAGE_TMP_LAST_CKPT = "lastcheckpoint.tmp";
   public    static final String STORAGE_PREVIOUS_CKPT = "previous.checkpoint";
   
+  /**
+   * The blocksBeingWritten directory which was used in some 1.x and earlier
+   * releases.
+   */
+  public static final String STORAGE_1_BBW = "blocksBeingWritten";
+  
   public enum StorageState {
     NON_EXISTENT,
     NOT_FORMATTED,
@@ -123,6 +132,7 @@ public abstract class Storage extends St
       this.prevIndex = 0;
     }
     
+    @Override
     public boolean hasNext() {
       if (storageDirs.isEmpty() || nextIndex >= storageDirs.size())
         return false;
@@ -138,6 +148,7 @@ public abstract class Storage extends St
       return true;
     }
     
+    @Override
     public StorageDirectory next() {
       StorageDirectory sd = getStorageDir(nextIndex);
       prevIndex = nextIndex;
@@ -152,6 +163,7 @@ public abstract class Storage extends St
       return sd;
     }
     
+    @Override
     public void remove() {
       nextIndex = prevIndex; // restore previous state
       storageDirs.remove(prevIndex); // remove last returned element
@@ -216,7 +228,7 @@ public abstract class Storage extends St
    * One of the storage directories.
    */
   @InterfaceAudience.Private
-  public static class StorageDirectory {
+  public static class StorageDirectory implements FormatConfirmable {
     final File root;              // root directory
     final boolean useLock;        // flag to enable storage lock
     final StorageDirType dirType; // storage dir type
@@ -571,6 +583,32 @@ public abstract class Storage extends St
         throw new IOException("Unexpected FS state: " + curState);
       }
     }
+    
+    /**
+     * @return true if the storage directory should prompt the user prior
+     * to formatting (i.e if the directory appears to contain some data)
+     * @throws IOException if the SD cannot be accessed due to an IO error
+     */
+    @Override
+    public boolean hasSomeData() throws IOException {
+      // Its alright for a dir not to exist, or to exist (properly accessible)
+      // and be completely empty.
+      if (!root.exists()) return false;
+      
+      if (!root.isDirectory()) {
+        // a file where you expect a directory should not cause silent
+        // formatting
+        return true;
+      }
+      
+      if (FileUtil.listFiles(root).length == 0) {
+        // Empty dir can format without prompt.
+        return false;
+      }
+      
+      return true;
+    }
+
 
     /**
      * Lock storage to provide exclusive access.
@@ -716,6 +754,15 @@ public abstract class Storage extends St
     return storageDirs.get(idx);
   }
   
+  /**
+   * @return the storage directory, with the precondition that this storage
+   * has exactly one storage directory
+   */
+  public StorageDirectory getSingularStorageDir() {
+    Preconditions.checkState(storageDirs.size() == 1);
+    return storageDirs.get(0);
+  }
+  
   protected void addStorageDir(StorageDirectory sd) {
     storageDirs.add(sd);
   }
@@ -765,6 +812,69 @@ public abstract class Storage extends St
   }
   
   /**
+   * Iterate over each of the {@link FormatConfirmable} objects,
+   * potentially checking with the user whether it should be formatted.
+   * 
+   * If running in interactive mode, will prompt the user for each
+   * directory to allow them to format anyway. Otherwise, returns
+   * false, unless 'force' is specified.
+   * 
+   * @param force format regardless of whether dirs exist
+   * @param interactive prompt the user when a dir exists
+   * @return true if formatting should proceed
+   * @throws IOException if some storage cannot be accessed
+   */
+  public static boolean confirmFormat(
+      Iterable<? extends FormatConfirmable> items,
+      boolean force, boolean interactive) throws IOException {
+    for (FormatConfirmable item : items) {
+      if (!item.hasSomeData())
+        continue;
+      if (force) { // Don't confirm, always format.
+        System.err.println(
+            "Data exists in " + item + ". Formatting anyway.");
+        continue;
+      }
+      if (!interactive) { // Don't ask - always don't format
+        System.err.println(
+            "Running in non-interactive mode, and data appears to exist in " +
+            item + ". Not formatting.");
+        return false;
+      }
+      if (!ToolRunner.confirmPrompt("Re-format filesystem in " + item + " ?")) {
+        System.err.println("Format aborted in " + item);
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  /**
+   * Interface for classes which need to have the user confirm their
+   * formatting during NameNode -format and other similar operations.
+   * 
+   * This is currently a storage directory or journal manager.
+   */
+  @InterfaceAudience.Private
+  public interface FormatConfirmable {
+    /**
+     * @return true if the storage seems to have some valid data in it,
+     * and the user should be required to confirm the format. Otherwise,
+     * false.
+     * @throws IOException if the storage cannot be accessed at all.
+     */
+    public boolean hasSomeData() throws IOException;
+    
+    /**
+     * @return a string representation of the formattable item, suitable
+     * for display to the user inside a prompt
+     */
+    @Override
+    public String toString();
+  }
+  
+  /**
    * Get common storage fields.
    * Should be overloaded if additional fields need to be get.
    * 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Fri Oct 19 02:25:55 2012
@@ -78,6 +78,7 @@ public class StorageInfo {
     cTime = from.cTime;
   }
   
+  @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID)

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/UpgradeStatusReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/UpgradeStatusReport.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/UpgradeStatusReport.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/UpgradeStatusReport.java Fri Oct 19 02:25:55 2012
@@ -82,6 +82,7 @@ public class UpgradeStatusReport {
   /**
    * Print basic upgradeStatus details.
    */
+  @Override
   public String toString() {
     return getStatusText(false);
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java Fri Oct 19 02:25:55 2012
@@ -34,29 +34,6 @@ public final class Util {
   private final static Log LOG = LogFactory.getLog(Util.class.getName());
 
   /**
-   * Current system time.  Do not use this to calculate a duration or interval
-   * to sleep, because it will be broken by settimeofday.  Instead, use
-   * monotonicNow.
-   * @return current time in msec.
-   */
-  public static long now() {
-    return System.currentTimeMillis();
-  }
-  
-  /**
-   * Current time from some arbitrary time base in the past, counting in
-   * milliseconds, and not affected by settimeofday or similar system clock
-   * changes.  This is appropriate to use when computing how much longer to
-   * wait for an interval to expire.
-   * @return a monotonic clock that counts in milliseconds.
-   */
-  public static long monotonicNow() {
-    final long NANOSECONDS_PER_MILLISECOND = 1000000;
-
-    return System.nanoTime() / NANOSECONDS_PER_MILLISECOND;
-  }
-
-  /**
    * Interprets the passed string as a URI. In case of error it 
    * assumes the specified string is a file.
    *



Mime
View raw message