hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1397387 [3/5] - in /hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project: hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs-httpfs/src/test/java/org/apache/hadoop/fs/http/server/ hadoop-hdfs/ hadoop-hdfs...
Date Fri, 12 Oct 2012 00:15:37 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Oct 12 00:15:22 2012
@@ -64,23 +64,20 @@ public class BlockPlacementPolicyWithNod
    * @return the chosen node
    */
   @Override
-  protected DatanodeDescriptor chooseLocalNode(
-      DatanodeDescriptor localMachine,
-      HashMap<Node, Node> excludedNodes,
-      long blocksize,
-      int maxNodesPerRack,
-      List<DatanodeDescriptor> results)
+  protected DatanodeDescriptor chooseLocalNode(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
         throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes);
 
     // otherwise try local machine first
     Node oldNode = excludedNodes.put(localMachine, localMachine);
     if (oldNode == null) { // was not in the excluded list
       if (isGoodTarget(localMachine, blocksize,
-          maxNodesPerRack, false, results)) {
+          maxNodesPerRack, false, results, avoidStaleNodes)) {
         results.add(localMachine);
         // Nodes under same nodegroup should be excluded.
         addNodeGroupToExcludedNodes(excludedNodes,
@@ -92,13 +89,13 @@ public class BlockPlacementPolicyWithNod
     // try a node on local node group
     DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
     if (chosenNode != null) {
       return chosenNode;
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes);
   }
 
   @Override
@@ -119,17 +116,15 @@ public class BlockPlacementPolicyWithNod
   }
 
   @Override
-  protected DatanodeDescriptor chooseLocalRack(
-                                             DatanodeDescriptor localMachine,
-                                             HashMap<Node, Node> excludedNodes,
-                                             long blocksize,
-                                             int maxNodesPerRack,
-                                             List<DatanodeDescriptor> results)
-    throws NotEnoughReplicasException {
+  protected DatanodeDescriptor chooseLocalRack(DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results);
+                          blocksize, maxNodesPerRack, results, 
+                          avoidStaleNodes);
     }
 
     // choose one from the local rack, but off-nodegroup
@@ -137,7 +132,8 @@ public class BlockPlacementPolicyWithNod
       return chooseRandom(NetworkTopology.getFirstHalf(
                               localMachine.getNetworkLocation()),
                           excludedNodes, blocksize, 
-                          maxNodesPerRack, results);
+                          maxNodesPerRack, results, 
+                          avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -151,39 +147,39 @@ public class BlockPlacementPolicyWithNod
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(clusterMap.getRack(newLocal.getNetworkLocation()),
-                              excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(
+              clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
+              blocksize, maxNodesPerRack, results, avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-                              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-                            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }
 
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
-          DatanodeDescriptor localMachine,
-          HashMap<Node, Node> excludedNodes,
-          long blocksize,
-          int maxReplicasPerRack,
-          List<DatanodeDescriptor> results)
-          throws NotEnoughReplicasException {
+      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes,
+      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
+      boolean avoidStaleNodes) throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
     // randomly choose one node from remote racks
     try {
-      chooseRandom(numOfReplicas, "~"+NetworkTopology.getFirstHalf(
-          localMachine.getNetworkLocation()),
-      excludedNodes, blocksize, maxReplicasPerRack, results);
+      chooseRandom(
+          numOfReplicas,
+          "~" + NetworkTopology.getFirstHalf(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxReplicasPerRack, results,
+          avoidStaleNodes);
     } catch (NotEnoughReplicasException e) {
-      chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas),
-      localMachine.getNetworkLocation(), excludedNodes, blocksize, 
-      maxReplicasPerRack, results);
+      chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
+          localMachine.getNetworkLocation(), excludedNodes, blocksize,
+          maxReplicasPerRack, results, avoidStaleNodes);
     }
   }
 
@@ -193,19 +189,22 @@ public class BlockPlacementPolicyWithNod
    * if still no such node is available, choose a random node in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNodeGroup(NetworkTopologyWithNodeGroup clusterMap,
-      DatanodeDescriptor localMachine, HashMap<Node, Node> excludedNodes, long blocksize, 
-      int maxNodesPerRack, List<DatanodeDescriptor> results) throws NotEnoughReplicasException {
+  private DatanodeDescriptor chooseLocalNodeGroup(
+      NetworkTopologyWithNodeGroup clusterMap, DatanodeDescriptor localMachine,
+      HashMap<Node, Node> excludedNodes, long blocksize, int maxNodesPerRack,
+      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
+      throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-      blocksize, maxNodesPerRack, results);
+      blocksize, maxNodesPerRack, results, avoidStaleNodes);
     }
 
     // choose one from the local node group
     try {
-      return chooseRandom(clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
-      excludedNodes, blocksize, maxNodesPerRack, results);
+      return chooseRandom(
+          clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
+          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
       DatanodeDescriptor newLocal=null;
@@ -219,17 +218,19 @@ public class BlockPlacementPolicyWithNod
       }
       if (newLocal != null) {
         try {
-          return chooseRandom(clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
-            excludedNodes, blocksize, maxNodesPerRack, results);
+          return chooseRandom(
+              clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
+              excludedNodes, blocksize, maxNodesPerRack, results,
+              avoidStaleNodes);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
-          return chooseRandom(NodeBase.ROOT, excludedNodes,
-              blocksize, maxNodesPerRack, results);
+          return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+              maxNodesPerRack, results, avoidStaleNodes);
         }
       } else {
         //otherwise randomly choose one from the network
-        return chooseRandom(NodeBase.ROOT, excludedNodes,
-            blocksize, maxNodesPerRack, results);
+        return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+            maxNodesPerRack, results, avoidStaleNodes);
       }
     }
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Oct 12 00:15:22 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.net.InetAddresses;
 
 /**
@@ -88,8 +89,8 @@ public class DatanodeManager {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
-
   private final HeartbeatManager heartbeatManager;
+  private Daemon decommissionthread = null;
 
   /**
    * Stores the datanode -> block map.  
@@ -127,28 +128,33 @@ public class DatanodeManager {
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
   
+  /** Whether or not to check stale DataNodes for read/write */
+  private final boolean checkForStaleDataNodes;
+
+  /** The interval for judging stale DataNodes for read/write */
+  private final long staleInterval;
+  
+  /** Whether or not to avoid using stale DataNodes for writing */
+  private volatile boolean avoidStaleDataNodesForWrite;
+  
+  /** The number of stale DataNodes */
+  private volatile int numStaleNodes;
+  
   /**
    * Whether or not this cluster has ever consisted of more than 1 rack,
    * according to the NetworkTopology.
    */
   private boolean hasClusterEverBeenMultiRack = false;
   
-  /** Whether or not to check the stale datanodes */
-  private volatile boolean checkForStaleNodes;
-  /** The time interval for detecting stale datanodes */
-  private volatile long staleInterval;
-  
-  DatanodeManager(final BlockManager blockManager,
-      final Namesystem namesystem, final Configuration conf
-      ) throws IOException {
+  DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
+      final Configuration conf) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
     Class<? extends NetworkTopology> networkTopologyClass =
         conf.getClass(CommonConfigurationKeysPublic.NET_TOPOLOGY_IMPL_KEY,
             NetworkTopology.class, NetworkTopology.class);
-    networktopology = (NetworkTopology) ReflectionUtils.newInstance(
-        networkTopologyClass, conf);
+    networktopology = ReflectionUtils.newInstance(networkTopologyClass, conf);
 
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 
@@ -181,25 +187,69 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, blockInvalidateLimit);
     LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
         + "=" + this.blockInvalidateLimit);
-    // set the value of stale interval based on configuration
-    this.checkForStaleNodes = conf.getBoolean(
+    
+    checkForStaleDataNodes = conf.getBoolean(
         DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
         DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
-    if (this.checkForStaleNodes) {
-      this.staleInterval = conf.getLong(
-          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
-          DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT);
-      if (this.staleInterval < DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT) {
-        LOG.warn("The given interval for marking stale datanode = "
-            + this.staleInterval + ", which is smaller than the default value "
-            + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT
-            + ".");
-      }
+
+    staleInterval = getStaleIntervalFromConf(conf, heartbeatExpireInterval);
+    avoidStaleDataNodesForWrite = getAvoidStaleForWriteFromConf(conf,
+        checkForStaleDataNodes);
+  }
+  
+  private static long getStaleIntervalFromConf(Configuration conf,
+      long heartbeatExpireInterval) {
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
+    Preconditions.checkArgument(staleInterval > 0,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY +
+        " = '" + staleInterval + "' is invalid. " +
+        "It should be a positive non-zero value.");
+    
+    final long heartbeatIntervalSeconds = conf.getLong(
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
+        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
+    // The stale interval value cannot be smaller than 
+    // 3 times of heartbeat interval 
+    final long minStaleInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT)
+        * heartbeatIntervalSeconds * 1000;
+    if (staleInterval < minStaleInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is less than "
+          + DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_MINIMUM_INTERVAL_DEFAULT
+          + " heartbeat intervals. This may cause too frequent changes of " 
+          + "stale states of DataNodes since a heartbeat msg may be missing " 
+          + "due to temporary short-term failures. Reset stale interval to " 
+          + minStaleInterval + ".");
+      staleInterval = minStaleInterval;
+    }
+    if (staleInterval > heartbeatExpireInterval) {
+      LOG.warn("The given interval for marking stale datanode = "
+          + staleInterval + ", which is larger than heartbeat expire interval "
+          + heartbeatExpireInterval + ".");
     }
+    return staleInterval;
   }
-
-  private Daemon decommissionthread = null;
-
+  
+  static boolean getAvoidStaleForWriteFromConf(Configuration conf,
+      boolean checkForStale) {
+    boolean avoid = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_DEFAULT);
+    boolean avoidStaleDataNodesForWrite = checkForStale && avoid;
+    if (!checkForStale && avoid) {
+      LOG.warn("Cannot set "
+          + DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY
+          + " as false while setting "
+          + DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_WRITE_KEY
+          + " as true.");
+    }
+    return avoidStaleDataNodesForWrite;
+  }
+  
   void activate(final Configuration conf) {
     final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
     this.decommissionthread = new Daemon(dm.new Monitor(
@@ -253,9 +303,10 @@ public class DatanodeManager {
         client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR + targethost);
     }
     
-    Comparator<DatanodeInfo> comparator = checkForStaleNodes ? 
-                    new DFSUtil.DecomStaleComparator(staleInterval) : 
-                    DFSUtil.DECOM_COMPARATOR;
+    Comparator<DatanodeInfo> comparator = checkForStaleDataNodes ? 
+        new DFSUtil.DecomStaleComparator(staleInterval) : 
+        DFSUtil.DECOM_COMPARATOR;
+        
     for (LocatedBlock b : locatedblocks) {
       networktopology.pseudoSortByDistance(client, b.getLocations());
       // Move decommissioned/stale datanodes to the bottom
@@ -612,7 +663,8 @@ public class DatanodeManager {
         + " storage " + nodeReg.getStorageID());
 
     DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
-    DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getXferAddr());
+    DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
+        nodeReg.getIpAddr(), nodeReg.getXferPort());
       
     if (nodeN != null && nodeN != nodeS) {
       NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
@@ -722,7 +774,7 @@ public class DatanodeManager {
    * 3. Added to exclude --> start decommission.
    * 4. Removed from exclude --> stop decommission.
    */
-  private void refreshDatanodes() throws IOException {
+  private void refreshDatanodes() {
     for(DatanodeDescriptor node : datanodeMap.values()) {
       // Check if not include.
       if (!inHostsList(node)) {
@@ -781,7 +833,61 @@ public class DatanodeManager {
       namesystem.readUnlock();
     }
   }
+  
+  /* Getter and Setter for stale DataNodes related attributes */
+  
+  /**
+   * @return whether or not to avoid writing to stale datanodes
+   */
+  public boolean isAvoidingStaleDataNodesForWrite() {
+    return avoidStaleDataNodesForWrite;
+  }
 
+  /**
+   * Set the value of {@link DatanodeManager#avoidStaleDataNodesForWrite}. 
+   * The HeartbeatManager disable avoidStaleDataNodesForWrite when more than
+   * half of the DataNodes are marked as stale.
+   * 
+   * @param avoidStaleDataNodesForWrite
+   *          The value to set to
+   *          {@link DatanodeManager#avoidStaleDataNodesForWrite}
+   */
+  void setAvoidStaleDataNodesForWrite(boolean avoidStaleDataNodesForWrite) {
+    this.avoidStaleDataNodesForWrite = avoidStaleDataNodesForWrite;
+  }
+
+  /**
+   * @return Whether or not to check stale DataNodes for R/W
+   */
+  boolean isCheckingForStaleDataNodes() {
+    return checkForStaleDataNodes;
+  }
+  
+  /**
+   * @return The time interval used to mark DataNodes as stale.
+   */
+  long getStaleInterval() {
+    return staleInterval;
+  }
+
+  /**
+   * Set the number of current stale DataNodes. The HeartbeatManager got this
+   * number based on DataNodes' heartbeats.
+   * 
+   * @param numStaleNodes
+   *          The number of stale DataNodes to be set.
+   */
+  void setNumStaleNodes(int numStaleNodes) {
+    this.numStaleNodes = numStaleNodes;
+  }
+  
+  /**
+   * @return Return the current number of stale DataNodes (detected by
+   * HeartbeatManager). 
+   */
+  int getNumStaleNodes() {
+    return this.numStaleNodes;
+  }
 
   /** Fetch live and dead datanodes. */
   public void fetchDatanodes(final List<DatanodeDescriptor> live, 
@@ -960,7 +1066,7 @@ public class DatanodeManager {
     return nodes;
   }
   
-  private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
+  private void setDatanodeDead(DatanodeDescriptor node) {
     node.setLastUpdate(0);
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Oct 12 00:15:22 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Manage the heartbeats received from datanodes.
  * The datanode list and statistics are synchronized
@@ -54,18 +56,48 @@ class HeartbeatManager implements Datano
   private final long heartbeatRecheckInterval;
   /** Heartbeat monitor thread */
   private final Daemon heartbeatThread = new Daemon(new Monitor());
-
+  /**
+   * The initial setting of end user which indicates whether or not to avoid
+   * writing to stale datanodes.
+   */
+  private final boolean initialAvoidWriteStaleNodes;
+  /**
+   * When the ratio of stale datanodes reaches this number, stop avoiding 
+   * writing to stale datanodes, i.e., continue using stale nodes for writing.
+   */
+  private final float ratioUseStaleDataNodesForWrite;
+    
   final Namesystem namesystem;
   final BlockManager blockManager;
 
-  HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
-      final Configuration conf) {
-    this.heartbeatRecheckInterval = conf.getInt(
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
-        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
-
+  HeartbeatManager(final Namesystem namesystem,
+      final BlockManager blockManager, final Configuration conf) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
+    boolean checkStaleNodes = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CHECK_STALE_DATANODE_DEFAULT);
+    long recheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 min
+    long staleInterval = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);// 30s
+    this.initialAvoidWriteStaleNodes = DatanodeManager
+        .getAvoidStaleForWriteFromConf(conf, checkStaleNodes);
+    this.ratioUseStaleDataNodesForWrite = conf.getFloat(
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY,
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_DEFAULT);
+    Preconditions.checkArgument(
+        (ratioUseStaleDataNodesForWrite > 0 && 
+            ratioUseStaleDataNodesForWrite <= 1.0f),
+        DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
+        " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
+        "It should be a positive non-zero float value, not greater than 1.0f.");
+    
+    this.heartbeatRecheckInterval = (checkStaleNodes 
+        && initialAvoidWriteStaleNodes 
+        && staleInterval < recheckInterval) ? staleInterval : recheckInterval;
   }
 
   void activate(Configuration conf) {
@@ -210,16 +242,39 @@ class HeartbeatManager implements Datano
     if (namesystem.isInSafeMode()) {
       return;
     }
+    boolean checkStaleNodes = dm.isCheckingForStaleDataNodes();
     boolean allAlive = false;
     while (!allAlive) {
       // locate the first dead node.
       DatanodeID dead = null;
+      // check the number of stale nodes
+      int numOfStaleNodes = 0;
       synchronized(this) {
         for (DatanodeDescriptor d : datanodes) {
-          if (dm.isDatanodeDead(d)) {
+          if (dead == null && dm.isDatanodeDead(d)) {
             stats.incrExpiredHeartbeats();
             dead = d;
-            break;
+            if (!checkStaleNodes) {
+              break;
+            }
+          }
+          if (checkStaleNodes && 
+              d.isStale(dm.getStaleInterval())) {
+            numOfStaleNodes++;
+          }
+        }
+        
+        // Change whether to avoid using stale datanodes for writing
+        // based on proportion of stale datanodes
+        if (checkStaleNodes) {
+          dm.setNumStaleNodes(numOfStaleNodes);
+          if (numOfStaleNodes > 
+                datanodes.size() * ratioUseStaleDataNodesForWrite) {
+            dm.setAvoidStaleDataNodesForWrite(false);
+          } else {
+            if (this.initialAvoidWriteStaleNodes) {
+              dm.setAvoidStaleDataNodesForWrite(true);
+            }
           }
         }
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/Host2NodesMap.java Fri Oct 12 00:15:22 2012
@@ -159,6 +159,35 @@ class Host2NodesMap {
     }
   }
   
+  /**
+   * Find data node by its transfer address
+   *
+   * @return DatanodeDescriptor if found or null otherwise
+   */
+  public DatanodeDescriptor getDatanodeByXferAddr(String ipAddr,
+      int xferPort) {
+    if (ipAddr==null) {
+      return null;
+    }
+
+    hostmapLock.readLock().lock();
+    try {
+      DatanodeDescriptor[] nodes = map.get(ipAddr);
+      // no entry
+      if (nodes== null) {
+        return null;
+      }
+      for(DatanodeDescriptor containedNode:nodes) {
+        if (xferPort == containedNode.getXferPort()) {
+          return containedNode;
+        }
+      }
+      return null;
+    } finally {
+      hostmapLock.readLock().unlock();
+    }
+  }
+
   @Override
   public String toString() {
     final StringBuilder b = new StringBuilder(getClass().getSimpleName())

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java Fri Oct 12 00:15:22 2012
@@ -39,7 +39,8 @@ public final class HdfsServerConstants {
    */
   static public enum NodeType {
     NAME_NODE,
-    DATA_NODE;
+    DATA_NODE,
+    JOURNAL_NODE;
   }
 
   /** Startup options */

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Oct 12 00:15:22 2012
@@ -276,6 +276,9 @@ public class JspHelper {
         FIELD_PERCENT_REMAINING = 9,
         FIELD_ADMIN_STATE       = 10,
         FIELD_DECOMMISSIONED    = 11,
+        FIELD_BLOCKPOOL_USED    = 12,
+        FIELD_PERBLOCKPOOL_USED = 13,
+        FIELD_FAILED_VOLUMES    = 14,
         SORT_ORDER_ASC          = 1,
         SORT_ORDER_DSC          = 2;
 
@@ -303,6 +306,12 @@ public class JspHelper {
           sortField = FIELD_ADMIN_STATE;
         } else if (field.equals("decommissioned")) {
           sortField = FIELD_DECOMMISSIONED;
+        } else if (field.equals("bpused")) {
+          sortField = FIELD_BLOCKPOOL_USED;
+        } else if (field.equals("pcbpused")) {
+          sortField = FIELD_PERBLOCKPOOL_USED;
+        } else if (field.equals("volfails")) {
+          sortField = FIELD_FAILED_VOLUMES;
         } else {
           sortField = FIELD_NAME;
         }
@@ -361,6 +370,18 @@ public class JspHelper {
         case FIELD_NAME: 
           ret = d1.getHostName().compareTo(d2.getHostName());
           break;
+        case FIELD_BLOCKPOOL_USED:
+          dlong = d1.getBlockPoolUsed() - d2.getBlockPoolUsed();
+          ret = (dlong < 0) ? -1 : ((dlong > 0) ? 1 : 0);
+          break;
+        case FIELD_PERBLOCKPOOL_USED:
+          ddbl = d1.getBlockPoolUsedPercent() - d2.getBlockPoolUsedPercent();
+          ret = (ddbl < 0) ? -1 : ((ddbl > 0) ? 1 : 0);
+          break;
+        case FIELD_FAILED_VOLUMES:
+          int dint = d1.getVolumeFailures() - d2.getVolumeFailures();
+          ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0);
+          break;
         }
         return (sortOrder == SORT_ORDER_DSC) ? -ret : ret;
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Oct 12 00:15:22 2012
@@ -42,6 +42,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Preconditions;
+
 
 
 /**
@@ -76,7 +78,7 @@ public abstract class Storage extends St
   /** Layout versions of 0.20.203 release */
   public static final int[] LAYOUT_VERSIONS_203 = {-19, -31};
 
-  private   static final String STORAGE_FILE_LOCK     = "in_use.lock";
+  public    static final String STORAGE_FILE_LOCK     = "in_use.lock";
   protected static final String STORAGE_FILE_VERSION  = "VERSION";
   public    static final String STORAGE_DIR_CURRENT   = "current";
   public    static final String STORAGE_DIR_PREVIOUS  = "previous";
@@ -752,6 +754,15 @@ public abstract class Storage extends St
     return storageDirs.get(idx);
   }
   
+  /**
+   * @return the storage directory, with the precondition that this storage
+   * has exactly one storage directory
+   */
+  public StorageDirectory getSingularStorageDir() {
+    Preconditions.checkState(storageDirs.size() == 1);
+    return storageDirs.get(0);
+  }
+  
   protected void addStorageDir(StorageDirectory sd) {
     storageDirs.add(sd);
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 12 00:15:22 2012
@@ -251,7 +251,7 @@ public class DataNode extends Configured
   Daemon dataXceiverServer = null;
   ThreadGroup threadGroup = null;
   private DNConf dnConf;
-  private boolean heartbeatsDisabledForTests = false;
+  private volatile boolean heartbeatsDisabledForTests = false;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
   DataNodeMetrics metrics;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Fri Oct 12 00:15:22 2012
@@ -114,7 +114,7 @@ class EditLogBackupOutputStream extends 
   }
 
   @Override // EditLogOutputStream
-  protected void flushAndSync() throws IOException {
+  protected void flushAndSync(boolean durable) throws IOException {
     assert out.getLength() == 0 : "Output buffer is not empty";
     
     int numReadyTxns = doubleBuf.countReadyTxns();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Fri Oct 12 00:15:22 2012
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -367,30 +368,36 @@ public class EditLogFileInputStream exte
 
     @Override
     public InputStream getInputStream() throws IOException {
-      HttpURLConnection connection = (HttpURLConnection)
-          SecurityUtil.openSecureHttpConnection(url);
-      
-      if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
-        throw new HttpGetFailedException(
-            "Fetch of " + url +
-            " failed with status code " + connection.getResponseCode() +
-            "\nResponse message:\n" + connection.getResponseMessage(),
-            connection);
-      }
-
-      String contentLength = connection.getHeaderField(CONTENT_LENGTH);
-      if (contentLength != null) {
-        advertisedSize = Long.parseLong(contentLength);
-        if (advertisedSize <= 0) {
-          throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
-              contentLength);
-        }
-      } else {
-        throw new IOException(CONTENT_LENGTH + " header is not provided " +
-                              "by the server when trying to fetch " + url);
-      }
-
-      return connection.getInputStream();
+      return SecurityUtil.doAsCurrentUser(
+          new PrivilegedExceptionAction<InputStream>() {
+            @Override
+            public InputStream run() throws IOException {
+              HttpURLConnection connection = (HttpURLConnection)
+                  SecurityUtil.openSecureHttpConnection(url);
+              
+              if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                throw new HttpGetFailedException(
+                    "Fetch of " + url +
+                    " failed with status code " + connection.getResponseCode() +
+                    "\nResponse message:\n" + connection.getResponseMessage(),
+                    connection);
+              }
+        
+              String contentLength = connection.getHeaderField(CONTENT_LENGTH);
+              if (contentLength != null) {
+                advertisedSize = Long.parseLong(contentLength);
+                if (advertisedSize <= 0) {
+                  throw new IOException("Invalid " + CONTENT_LENGTH + " header: " +
+                      contentLength);
+                }
+              } else {
+                throw new IOException(CONTENT_LENGTH + " header is not provided " +
+                                      "by the server when trying to fetch " + url);
+              }
+        
+              return connection.getInputStream();
+            }
+          });
     }
 
     @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Fri Oct 12 00:15:22 2012
@@ -176,7 +176,7 @@ public class EditLogFileOutputStream ext
    * accumulates new log records while readyBuffer will be flushed and synced.
    */
   @Override
-  public void flushAndSync() throws IOException {
+  public void flushAndSync(boolean durable) throws IOException {
     if (fp == null) {
       throw new IOException("Trying to use aborted output stream");
     }
@@ -186,7 +186,7 @@ public class EditLogFileOutputStream ext
     }
     preallocate(); // preallocate file if necessay
     doubleBuf.flushTo(fp);
-    if (!shouldSkipFsyncForTests) {
+    if (durable && !shouldSkipFsyncForTests) {
       fc.force(false); // metadata updates not needed
     }
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Fri Oct 12 00:15:22 2012
@@ -24,6 +24,7 @@ import static org.apache.hadoop.util.Tim
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.jasper.compiler.JspUtil;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
@@ -92,18 +93,24 @@ public abstract class EditLogOutputStrea
   /**
    * Flush and sync all data that is ready to be flush 
    * {@link #setReadyToFlush()} into underlying persistent store.
+   * @param durable if true, the edits should be made truly durable before
+   * returning
    * @throws IOException
    */
-  abstract protected void flushAndSync() throws IOException;
+  abstract protected void flushAndSync(boolean durable) throws IOException;
 
   /**
    * Flush data to persistent store.
    * Collect sync metrics.
    */
   public void flush() throws IOException {
+    flush(true);
+  }
+  
+  public void flush(boolean durable) throws IOException {
     numSync++;
     long start = now();
-    flushAndSync();
+    flushAndSync(durable);
     long end = now();
     totalTimeSync += (end - start);
   }
@@ -132,4 +139,12 @@ public abstract class EditLogOutputStrea
   protected long getNumSync() {
     return numSync;
   }
+
+  /**
+   * @return a short HTML snippet suitable for describing the current
+   * status of the stream
+   */
+  public String generateHtmlReport() {
+    return JspUtil.escapeXml(this.toString());
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditsDoubleBuffer.java Fri Oct 12 00:15:22 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.Writer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -34,7 +35,8 @@ import com.google.common.base.Preconditi
  * to progress concurrently to flushes without allocating new buffers each
  * time.
  */
-class EditsDoubleBuffer {
+@InterfaceAudience.Private
+public class EditsDoubleBuffer {
 
   private TxnBuffer bufCurrent; // current buffer for writing
   private TxnBuffer bufReady; // buffer ready for flushing
@@ -51,11 +53,11 @@ class EditsDoubleBuffer {
     bufCurrent.writeOp(op);
   }
 
-  void writeRaw(byte[] bytes, int offset, int length) throws IOException {
+  public void writeRaw(byte[] bytes, int offset, int length) throws IOException {
     bufCurrent.write(bytes, offset, length);
   }
   
-  void close() throws IOException {
+  public void close() throws IOException {
     Preconditions.checkNotNull(bufCurrent);
     Preconditions.checkNotNull(bufReady);
 
@@ -69,7 +71,7 @@ class EditsDoubleBuffer {
     bufCurrent = bufReady = null;
   }
   
-  void setReadyToFlush() {
+  public void setReadyToFlush() {
     assert isFlushed() : "previous data not flushed yet";
     TxnBuffer tmp = bufReady;
     bufReady = bufCurrent;
@@ -80,12 +82,12 @@ class EditsDoubleBuffer {
    * Writes the content of the "ready" buffer to the given output stream,
    * and resets it. Does not swap any buffers.
    */
-  void flushTo(OutputStream out) throws IOException {
+  public void flushTo(OutputStream out) throws IOException {
     bufReady.writeTo(out); // write data to file
     bufReady.reset(); // erase all data in the buffer
   }
   
-  boolean shouldForceSync() {
+  public boolean shouldForceSync() {
     return bufCurrent.size() >= initBufferSize;
   }
 
@@ -120,6 +122,12 @@ class EditsDoubleBuffer {
     return bufReady.numTxns;
   }
 
+  /**
+   * @return the number of bytes that are ready to be flushed
+   */
+  public int countReadyBytes() {
+    return bufReady.size();
+  }
   
   private static class TxnBuffer extends DataOutputBuffer {
     long firstTxId;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Oct 12 00:15:22 2012
@@ -32,8 +32,16 @@ public interface FSClusterStats {
    * @return a count of the total number of block transfers and block
    *         writes that are currently occuring on the cluster.
    */
-
-  public int getTotalLoad() ;
+  public int getTotalLoad();
+  
+  /**
+   * Indicate whether or not the cluster is now avoiding 
+   * to use stale DataNodes for writing.
+   * 
+   * @return True if the cluster is currently avoiding using stale DataNodes 
+   *         for writing targets, and false otherwise.
+   */
+  public boolean isAvoidingStaleDataNodesForWrite();
 }
     
     

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 12 00:15:22 2012
@@ -1171,6 +1171,7 @@ public class FSEditLog implements LogsPu
       journalSet.recoverUnfinalizedSegments();
     } catch (IOException ex) {
       // All journals have failed, it is handled in logSync.
+      // TODO: are we sure this is OK?
     }
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 12 00:15:22 2012
@@ -4059,7 +4059,10 @@ public class FSNamesystem implements Nam
         return "Safe mode is OFF.";
       String leaveMsg = "";
       if (areResourcesLow()) {
-        leaveMsg = "Resources are low on NN. Safe mode must be turned off manually";
+        leaveMsg = "Resources are low on NN. " 
+        	+ "Please add or free up more resources then turn off safe mode manually.  "
+        	+ "NOTE:  If you turn off safe mode before adding resources, "
+        	+ "the NN will immediately return to safe mode.";
       } else {
         leaveMsg = "Safe mode will be turned off automatically";
       }
@@ -5536,4 +5539,10 @@ public class FSNamesystem implements Nam
   public void setNNResourceChecker(NameNodeResourceChecker nnResourceChecker) {
     this.nnResourceChecker = nnResourceChecker;
   }
+
+  @Override
+  public boolean isAvoidingStaleDataNodesForWrite() {
+    return this.blockManager.getDatanodeManager()
+        .isAvoidingStaleDataNodesForWrite();
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Oct 12 00:15:22 2012
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.ComparisonChain;
@@ -51,7 +53,8 @@ import com.google.common.collect.Compari
  * Note: this class is not thread-safe and should be externally
  * synchronized.
  */
-class FileJournalManager implements JournalManager {
+@InterfaceAudience.Private
+public class FileJournalManager implements JournalManager {
   private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
 
   private final StorageDirectory sd;
@@ -164,7 +167,7 @@ class FileJournalManager implements Jour
    * @return a list of remote edit logs
    * @throws IOException if edit logs cannot be listed.
    */
-  List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
+  public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@@ -182,6 +185,8 @@ class FileJournalManager implements Jour
       }
     }
     
+    Collections.sort(ret);
+    
     return ret;
   }
 
@@ -195,7 +200,7 @@ class FileJournalManager implements Jour
    * @throws IOException
    *           IOException thrown for invalid logDir
    */
-  static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
+  public static List<EditLogFile> matchEditLogs(File logDir) throws IOException {
     return matchEditLogs(FileUtil.listFiles(logDir));
   }
   
@@ -223,7 +228,7 @@ class FileJournalManager implements Jour
         try {
           long startTxId = Long.valueOf(inProgressEditsMatch.group(1));
           ret.add(
-              new EditLogFile(f, startTxId, startTxId, true));
+              new EditLogFile(f, startTxId, HdfsConstants.INVALID_TXID, true));
         } catch (NumberFormatException nfe) {
           LOG.error("In-progress edits file " + f + " has improperly " +
                     "formatted transaction ID");
@@ -237,15 +242,8 @@ class FileJournalManager implements Jour
   @Override
   synchronized public void selectInputStreams(
       Collection<EditLogInputStream> streams, long fromTxId,
-      boolean inProgressOk) {
-    List<EditLogFile> elfs;
-    try {
-      elfs = matchEditLogs(sd.getCurrentDir());
-    } catch (IOException e) {
-      LOG.error("error listing files in " + this + ". " +
-          "Skipping all edit logs in this directory.", e);
-      return;
-    }
+      boolean inProgressOk) throws IOException {
+    List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
     LOG.debug(this + ": selecting input streams starting at " + fromTxId + 
         (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +
         "from among " + elfs.size() + " candidate file(s)");
@@ -321,7 +319,7 @@ class FileJournalManager implements Jour
     }
   }
 
-  List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
+  public List<EditLogFile> getLogFiles(long fromTxId) throws IOException {
     File currentDir = sd.getCurrentDir();
     List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
     List<EditLogFile> logFiles = Lists.newArrayList();
@@ -337,6 +335,32 @@ class FileJournalManager implements Jour
 
     return logFiles;
   }
+  
+  public EditLogFile getLogFile(long startTxId) throws IOException {
+    return getLogFile(sd.getCurrentDir(), startTxId);
+  }
+  
+  public static EditLogFile getLogFile(File dir, long startTxId)
+      throws IOException {
+    List<EditLogFile> files = matchEditLogs(dir);
+    List<EditLogFile> ret = Lists.newLinkedList();
+    for (EditLogFile elf : files) {
+      if (elf.getFirstTxId() == startTxId) {
+        ret.add(elf);
+      }
+    }
+    
+    if (ret.isEmpty()) {
+      // no matches
+      return null;
+    } else if (ret.size() == 1) {
+      return ret.get(0);
+    } else {
+      throw new IllegalStateException("More than one log segment in " + 
+          dir + " starting at txid " + startTxId + ": " +
+          Joiner.on(", ").join(ret));
+    }
+  }
 
   @Override
   public String toString() {
@@ -346,7 +370,8 @@ class FileJournalManager implements Jour
   /**
    * Record of an edit log that has been located and had its filename parsed.
    */
-  static class EditLogFile {
+  @InterfaceAudience.Private
+  public static class EditLogFile {
     private File file;
     private final long firstTxId;
     private long lastTxId;
@@ -379,17 +404,20 @@ class FileJournalManager implements Jour
       assert (firstTxId > 0) || (firstTxId == HdfsConstants.INVALID_TXID);
       assert file != null;
       
+      Preconditions.checkArgument(!isInProgress ||
+          lastTxId == HdfsConstants.INVALID_TXID);
+      
       this.firstTxId = firstTxId;
       this.lastTxId = lastTxId;
       this.file = file;
       this.isInProgress = isInProgress;
     }
     
-    long getFirstTxId() {
+    public long getFirstTxId() {
       return firstTxId;
     }
     
-    long getLastTxId() {
+    public long getLastTxId() {
       return lastTxId;
     }
     
@@ -402,17 +430,17 @@ class FileJournalManager implements Jour
      * This will update the lastTxId of the EditLogFile or
      * mark it as corrupt if it is.
      */
-    void validateLog() throws IOException {
+    public void validateLog() throws IOException {
       EditLogValidation val = EditLogFileInputStream.validateEditLog(file);
       this.lastTxId = val.getEndTxId();
       this.hasCorruptHeader = val.hasCorruptHeader();
     }
 
-    boolean isInProgress() {
+    public boolean isInProgress() {
       return isInProgress;
     }
 
-    File getFile() {
+    public File getFile() {
       return file;
     }
     
@@ -425,7 +453,7 @@ class FileJournalManager implements Jour
       renameSelf(".corrupt");
     }
 
-    void moveAsideEmptyFile() throws IOException {
+    public void moveAsideEmptyFile() throws IOException {
       assert lastTxId == HdfsConstants.INVALID_TXID;
       renameSelf(".empty");
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Oct 12 00:15:22 2012
@@ -65,9 +65,11 @@ public interface JournalManager extends 
    * @param inProgressOk whether or not in-progress streams should be returned
    *
    * @return a list of streams
+   * @throws IOException if the underlying storage has an error or is otherwise
+   * inaccessible
    */
   void selectInputStreams(Collection<EditLogInputStream> streams,
-      long fromTxnId, boolean inProgressOk);
+      long fromTxnId, boolean inProgressOk) throws IOException;
 
   /**
    * Set the amount of memory that this stream should use to buffer edits

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalSet.java Fri Oct 12 00:15:22 2012
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.SortedSet;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -147,7 +148,7 @@ public class JournalSet implements Journ
       return journal;
     }
 
-    private boolean isDisabled() {
+    boolean isDisabled() {
       return disabled;
     }
 
@@ -165,8 +166,12 @@ public class JournalSet implements Journ
       return required;
     }
   }
-  
-  private List<JournalAndStream> journals = Lists.newArrayList();
+ 
+  // COW implementation is necessary since some users (eg the web ui) call
+  // getAllJournalStreams() and then iterate. Since this is rarely
+  // mutated, there is no performance concern.
+  private List<JournalAndStream> journals =
+      new CopyOnWriteArrayList<JournalSet.JournalAndStream>();
   final int minimumRedundantJournals;
   
   JournalSet(int minimumRedundantResources) {
@@ -242,8 +247,20 @@ public class JournalSet implements Journ
         LOG.info("Skipping jas " + jas + " since it's disabled");
         continue;
       }
-      jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+      try {
+        jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
+      } catch (IOException ioe) {
+        LOG.warn("Unable to determine input streams from " + jas.getManager() +
+            ". Skipping.", ioe);
+      }
     }
+    chainAndMakeRedundantStreams(streams, allStreams, fromTxId, inProgressOk);
+  }
+  
+  public static void chainAndMakeRedundantStreams(
+      Collection<EditLogInputStream> outStreams,
+      PriorityQueue<EditLogInputStream> allStreams,
+      long fromTxId, boolean inProgressOk) {
     // We want to group together all the streams that start on the same start
     // transaction ID.  To do this, we maintain an accumulator (acc) of all
     // the streams we've seen at a given start transaction ID.  When we see a
@@ -261,7 +278,7 @@ public class JournalSet implements Journ
         if (accFirstTxId == elis.getFirstTxId()) {
           acc.add(elis);
         } else if (accFirstTxId < elis.getFirstTxId()) {
-          streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+          outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
           acc.clear();
           acc.add(elis);
         } else if (accFirstTxId > elis.getFirstTxId()) {
@@ -272,7 +289,7 @@ public class JournalSet implements Journ
       }
     }
     if (!acc.isEmpty()) {
-      streams.add(new RedundantEditLogInputStream(acc, fromTxId));
+      outStreams.add(new RedundantEditLogInputStream(acc, fromTxId));
       acc.clear();
     }
   }
@@ -454,12 +471,12 @@ public class JournalSet implements Journ
     }
 
     @Override
-    protected void flushAndSync() throws IOException {
+    protected void flushAndSync(final boolean durable) throws IOException {
       mapJournalsAndReportErrors(new JournalClosure() {
         @Override
         public void apply(JournalAndStream jas) throws IOException {
           if (jas.isActive()) {
-            jas.getCurrentStream().flushAndSync();
+            jas.getCurrentStream().flushAndSync(durable);
           }
         }
       }, "flushAndSync");
@@ -512,7 +529,6 @@ public class JournalSet implements Journ
     }
   }
   
-  @VisibleForTesting
   List<JournalAndStream> getAllJournalStreams() {
     return journals;
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Oct 12 00:15:22 2012
@@ -722,6 +722,12 @@ public class NameNode {
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
     checkAllowFormat(conf);
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      InetSocketAddress socAddr = getAddress(conf);
+      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+          DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+    }
     
     Collection<URI> nameDirsToFormat = FSNamesystem.getNamespaceDirs(conf);
     List<URI> sharedDirs = FSNamesystem.getSharedEditsDirs(conf);
@@ -763,13 +769,13 @@ public class NameNode {
   }
   
   @VisibleForTesting
-  public static boolean initializeSharedEdits(Configuration conf) {
+  public static boolean initializeSharedEdits(Configuration conf) throws IOException {
     return initializeSharedEdits(conf, true);
   }
   
   @VisibleForTesting
   public static boolean initializeSharedEdits(Configuration conf,
-      boolean force) {
+      boolean force) throws IOException {
     return initializeSharedEdits(conf, force, false);
   }
 
@@ -783,7 +789,7 @@ public class NameNode {
    * @return true if the command aborts, false otherwise
    */
   private static boolean initializeSharedEdits(Configuration conf,
-      boolean force, boolean interactive) {
+      boolean force, boolean interactive) throws IOException {
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     String namenodeId = HAUtil.getNameNodeId(conf, nsId);
     initializeGenericKeys(conf, nsId, namenodeId);
@@ -794,6 +800,12 @@ public class NameNode {
       return false;
     }
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      InetSocketAddress socAddr = getAddress(conf);
+      SecurityUtil.login(conf, DFS_NAMENODE_KEYTAB_FILE_KEY,
+          DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());
+    }
+
     NNStorage existingStorage = null;
     try {
       Configuration confWithoutShared = new Configuration(conf);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Fri Oct 12 00:15:22 2012
@@ -107,6 +107,10 @@ public class NameNodeHttpServer {
               DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY,
               SecurityUtil.getServerPrincipal(principalInConf,
                                               bindAddress.getHostName()));
+        } else if (UserGroupInformation.isSecurityEnabled()) {
+          LOG.error("WebHDFS and security are enabled, but configuration property '" +
+                    DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY +
+                    "' is not set.");
         }
         String httpKeytab = conf.get(
           DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY);
@@ -117,6 +121,10 @@ public class NameNodeHttpServer {
           params.put(
             DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
             httpKeytab);
+        } else if (UserGroupInformation.isSecurityEnabled()) {
+          LOG.error("WebHDFS and security are enabled, but configuration property '" +
+                    DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY +
+                    "' is not set.");
         }
         return params;
       }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeResourcePolicy.java Fri Oct 12 00:15:22 2012
@@ -41,6 +41,14 @@ final class NameNodeResourcePolicy {
   static boolean areResourcesAvailable(
       Collection<? extends CheckableNameNodeResource> resources,
       int minimumRedundantResources) {
+
+    // TODO: workaround:
+    // - during startup, if there are no edits dirs on disk, then there is
+    // a call to areResourcesAvailable() with no dirs at all, which was
+    // previously causing the NN to enter safemode
+    if (resources.isEmpty()) {
+      return true;
+    }
     
     int requiredResourceCount = 0;
     int redundantResourceCount = 0;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Fri Oct 12 00:15:22 2012
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.io.Text;
@@ -60,6 +61,8 @@ import org.apache.hadoop.util.Time;
 import org.apache.hadoop.util.VersionInfo;
 import org.znerd.xmlenc.XMLOutputter;
 
+import com.google.common.base.Preconditions;
+
 class NamenodeJspHelper {
   static String getSafeModeText(FSNamesystem fsn) {
     if (!fsn.isInSafeMode())
@@ -212,6 +215,52 @@ class NamenodeJspHelper {
 
       out.print("</table></div>\n");
     }
+    
+    /**
+     * Generate an HTML report containing the current status of the HDFS
+     * journals.
+     */
+    void generateJournalReport(JspWriter out, NameNode nn,
+        HttpServletRequest request) throws IOException {
+      FSEditLog log = nn.getFSImage().getEditLog();
+      Preconditions.checkArgument(log != null, "no edit log set in %s", nn);
+      
+      out.println("<h3> " + nn.getRole() + " Journal Status: </h3>");
+
+      out.println("<b>Current transaction ID:</b> " +
+          nn.getFSImage().getLastAppliedOrWrittenTxId() + "<br/>");
+      
+      
+      boolean openForWrite = log.isOpenForWrite();
+      
+      out.println("<div class=\"dfstable\">");
+      out.println("<table class=\"storage\" title=\"NameNode Journals\">\n"
+              + "<thead><tr><td><b>Journal Manager</b></td><td><b>State</b></td></tr></thead>");
+      for (JournalAndStream jas : log.getJournals()) {
+        out.print("<tr>");
+        out.print("<td>" + jas.getManager());
+        if (jas.isRequired()) {
+          out.print(" [required]");
+        }
+        out.print("</td><td>");
+        
+        if (jas.isDisabled()) {
+          out.print("<span class=\"failed\">Failed</span>");
+        } else if (openForWrite) {
+          EditLogOutputStream elos = jas.getCurrentStream();
+          if (elos != null) {
+            out.println(elos.generateHtmlReport());
+          } else {
+            out.println("not currently writing");
+          }
+        } else {
+          out.println("open for read");
+        }
+        out.println("</td></tr>");
+      }
+      
+      out.println("</table></div>");
+    }
 
     void generateHealthReport(JspWriter out, NameNode nn,
         HttpServletRequest request) throws IOException {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Oct 12 00:15:22 2012
@@ -78,6 +78,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 /**********************************************************
@@ -122,6 +123,8 @@ public class SecondaryNameNode implement
   private CheckpointConf checkpointConf;
   private FSNamesystem namesystem;
 
+  private Thread checkpointThread;
+
 
   @Override
   public String toString() {
@@ -277,6 +280,15 @@ public class SecondaryNameNode implement
    */
   public void shutdown() {
     shouldRun = false;
+    if (checkpointThread != null) {
+      checkpointThread.interrupt();
+      try {
+        checkpointThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted waiting to join on checkpointer thread");
+        Thread.currentThread().interrupt(); // maintain status
+      }
+    }
     try {
       if (infoServer != null) infoServer.stop();
     } catch (Exception e) {
@@ -586,12 +598,20 @@ public class SecondaryNameNode implement
       terminate(ret);
     }
 
-    // Create a never ending deamon
-    Daemon checkpointThread = new Daemon(secondary);
-    checkpointThread.start();
+    secondary.startCheckpointThread();
   }
   
   
+  public void startCheckpointThread() {
+    Preconditions.checkState(checkpointThread == null,
+        "Should not already have a thread");
+    Preconditions.checkState(shouldRun, "shouldRun should be true");
+    
+    checkpointThread = new Daemon(this);
+    checkpointThread.start();
+  }
+
+
   /**
    * Container for parsed command-line options.
    */

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java Fri Oct 12 00:15:22 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
 
 import org.apache.commons.logging.Log;
@@ -172,11 +173,20 @@ public class EditLogTailer {
     Preconditions.checkState(tailerThread == null ||
         !tailerThread.isAlive(),
         "Tailer thread should not be running once failover starts");
-    try {
-      doTailEdits();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
+    // Important to do tailing as the login user, in case the shared
+    // edits storage is implemented by a JournalManager that depends
+    // on security credentials to access the logs (eg QuorumJournalManager).
+    SecurityUtil.doAsLoginUser(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        try {
+          doTailEdits();
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+        return null;
+      }
+    });
   }
   
   @VisibleForTesting

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLog.java Fri Oct 12 00:15:22 2012
@@ -17,18 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.protocol;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.io.Writable;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ComparisonChain;
 
-public class RemoteEditLog implements Writable, Comparable<RemoteEditLog> {
+public class RemoteEditLog implements Comparable<RemoteEditLog> {
   private long startTxId = HdfsConstants.INVALID_TXID;
   private long endTxId = HdfsConstants.INVALID_TXID;
+  private boolean isInProgress = false;
   
   public RemoteEditLog() {
   }
@@ -36,6 +33,13 @@ public class RemoteEditLog implements Wr
   public RemoteEditLog(long startTxId, long endTxId) {
     this.startTxId = startTxId;
     this.endTxId = endTxId;
+    this.isInProgress = (endTxId == HdfsConstants.INVALID_TXID);
+  }
+  
+  public RemoteEditLog(long startTxId, long endTxId, boolean inProgress) {
+    this.startTxId = startTxId;
+    this.endTxId = endTxId;
+    this.isInProgress = inProgress;
   }
 
   public long getStartTxId() {
@@ -45,22 +49,18 @@ public class RemoteEditLog implements Wr
   public long getEndTxId() {
     return endTxId;
   }
-    
-  @Override
-  public String toString() {
-    return "[" + startTxId + "," + endTxId + "]";
-  }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeLong(startTxId);
-    out.writeLong(endTxId);
+  public boolean isInProgress() {
+    return isInProgress;
   }
 
   @Override
-  public void readFields(DataInput in) throws IOException {
-    startTxId = in.readLong();
-    endTxId = in.readLong();
+  public String toString() {
+    if (!isInProgress) {
+      return "[" + startTxId + "," + endTxId + "]";
+    } else {
+      return "[" + startTxId + "-? (in-progress)]";
+    }
   }
   
   @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RemoteEditLogManifest.java Fri Oct 12 00:15:22 2012
@@ -40,8 +40,8 @@ public class RemoteEditLogManifest {
   
   
   /**
-   * Check that the logs are contiguous and non-overlapping
-   * sequences of transactions, in sorted order
+   * Check that the logs are non-overlapping sequences of transactions,
+   * in sorted order. They do not need to be contiguous.
    * @throws IllegalStateException if incorrect
    */
   private void checkState()  {
@@ -50,8 +50,10 @@ public class RemoteEditLogManifest {
     RemoteEditLog prev = null;
     for (RemoteEditLog log : logs) {
       if (prev != null) {
-        if (log.getStartTxId() != prev.getEndTxId() + 1) {
-          throw new IllegalStateException("Invalid log manifest:" + this);
+        if (log.getStartTxId() <= prev.getEndTxId()) {
+          throw new IllegalStateException(
+              "Invalid log manifest (log " + log + " overlaps " + prev + ")\n"
+              + this);
         }
       }
       

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java Fri Oct 12 00:15:22 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.tools;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -53,6 +54,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.util.StringUtils;
@@ -80,7 +82,7 @@ public class DFSAdmin extends FsShell {
       super(fs.getConf());
       if (!(fs instanceof DistributedFileSystem)) {
         throw new IllegalArgumentException("FileSystem " + fs.getUri() + 
-            " is not a distributed file system");
+            " is not an HDFS file system");
       }
       this.dfs = (DistributedFileSystem)fs;
     }
@@ -284,7 +286,7 @@ public class DFSAdmin extends FsShell {
     FileSystem fs = getFS();
     if (!(fs instanceof DistributedFileSystem)) {
       throw new IllegalArgumentException("FileSystem " + fs.getUri() + 
-      " is not a distributed file system");
+      " is not an HDFS file system");
     }
     return (DistributedFileSystem)fs;
   }
@@ -511,11 +513,17 @@ public class DFSAdmin extends FsShell {
    * @return an exit code indicating success or failure.
    * @throws IOException
    */
-  public int fetchImage(String[] argv, int idx) throws IOException {
-    String infoServer = DFSUtil.getInfoServer(
+  public int fetchImage(final String[] argv, final int idx) throws IOException {
+    final String infoServer = DFSUtil.getInfoServer(
         HAUtil.getAddressOfActive(getDFS()), getConf(), false);
-    TransferFsImage.downloadMostRecentImageToDirectory(infoServer,
-        new File(argv[idx]));
+    SecurityUtil.doAsCurrentUser(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        TransferFsImage.downloadMostRecentImageToDirectory(infoServer,
+            new File(argv[idx]));
+        return null;
+      }
+    });
     return 0;
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/BinaryEditsVisitor.java Fri Oct 12 00:15:22 2012
@@ -56,7 +56,7 @@ public class BinaryEditsVisitor implemen
   @Override
   public void close(Throwable error) throws IOException {
     elfos.setReadyToFlush();
-    elfos.flushAndSync();
+    elfos.flushAndSync(true);
     elfos.close();
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java Fri Oct 12 00:15:22 2012
@@ -57,7 +57,9 @@ public class PersistentLongFile {
   }
   
   public void set(long newVal) throws IOException {
-    writeFile(file, newVal);
+    if (value != newVal || !loaded) {
+      writeFile(file, newVal);
+    }
     value = newVal;
     loaded = true;
   }

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1390763-1397380
  Merged /hadoop/common/branches/HDFS-3077/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1363593-1396941

Modified: hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c?rev=1397387&r1=1397386&r2=1397387&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/exception.c Fri Oct 12 00:15:22 2012
@@ -67,6 +67,25 @@ static const struct ExceptionInfo gExcep
     
 };
 
+void getExceptionInfo(const char *excName, int noPrintFlags,
+                      int *excErrno, int *shouldPrint)
+{
+    int i;
+
+    for (i = 0; i < EXCEPTION_INFO_LEN; i++) {
+        if (strstr(gExceptionInfo[i].name, excName)) {
+            break;
+        }
+    }
+    if (i < EXCEPTION_INFO_LEN) {
+        *shouldPrint = !(gExceptionInfo[i].noPrintFlag & noPrintFlags);
+        *excErrno = gExceptionInfo[i].excErrno;
+    } else {
+        *shouldPrint = 1;
+        *excErrno = EINTERNAL;
+    }
+}
+
 int printExceptionAndFreeV(JNIEnv *env, jthrowable exc, int noPrintFlags,
         const char *fmt, va_list ap)
 {



Mime
View raw message